# F1 Race Report Generator

Two-agent system with memory:
- **Agent 1**: Data Collection (FastF1)
- **Agent 2**: Report Generation (Gemini)
- **Memory**: Store and retrieve race reports

## 1. Setup

In [None]:
%pip install -q google-cloud-aiplatform google-adk fastf1 pandas python-dotenv

## 2. Imports & Configuration

In [2]:
import os
import json
from datetime import datetime
from typing import Dict, List, Optional, Any
import pandas as pd
from dotenv import load_dotenv
import fastf1
import vertexai
from vertexai.generative_models import GenerativeModel

# Configuration
load_dotenv()
PROJECT_ID = os.getenv('GCP_PROJECT_ID', 'gen-lang-client-0467867580')
LOCATION = os.getenv('GCP_LOCATION', 'us-central1')
MODEL_NAME = 'gemini-2.5-flash'

vertexai.init(project=PROJECT_ID, location=LOCATION)
fastf1.Cache.enable_cache('f1_cache')

print("‚úÖ Environment configured")


‚úÖ Environment configured


## 3. Agent Engine Setup


In [3]:
# Initialize Vertex AI Agent Engine for Memory Bank
client = vertexai.Client(project=PROJECT_ID, location=LOCATION)

# Try to get existing agent engine or create new one
try:
    # List existing agent engines
    agent_engines = list(client.agent_engines.list())
    if agent_engines:
        agent_engine = agent_engines[0]
        print(f"‚úÖ Using existing Agent Engine: {agent_engine.api_resource.name}")
    else:
        # Create new agent engine
        agent_engine = client.agent_engines.create()
        print(f"‚úÖ Created new Agent Engine: {agent_engine.api_resource.name}")
    
    agent_engine_id = agent_engine.api_resource.name.split("/")[-1]
    print(f"   Engine ID: {agent_engine_id}")
except Exception as e:
    print(f"‚ùå Error initializing Agent Engine: {e}")
    raise


‚úÖ Using existing Agent Engine: projects/178353823233/locations/us-central1/reasoningEngines/349394008981635072
   Engine ID: 349394008981635072


## 4. Memory Service (Vertex AI Memory Bank)


In [4]:
import asyncio
from google.adk.memory import VertexAiMemoryBankService

class MemoryService:
    """Persistent storage for race reports using Vertex AI Memory Bank + local backup."""
    
    def __init__(self, project: str, location: str, agent_engine_id: str, backup_file: str = "f1_reports_backup.json"):
        self._service = VertexAiMemoryBankService(
            project=project,
            location=location,
            agent_engine_id=agent_engine_id
        )
        self._cache = {}  # Local cache for quick access
        self._backup_file = backup_file
        self._load_from_backup()  # Load from local backup first
        self._sync_cache()  # Then sync with Memory Bank
    
    def _load_from_backup(self):
        """Load reports from local JSON backup file."""
        try:
            if os.path.exists(self._backup_file):
                with open(self._backup_file, 'r', encoding='utf-8') as f:
                    self._cache = json.load(f)
                print(f"Loaded {len(self._cache)} report(s) from local backup")
            else:
                print("‚ÑπNo local backup found, starting fresh")
        except Exception as e:
            print(f"Error loading backup: {e}")
    
    def _save_to_backup(self):
        """Save reports to local JSON backup file."""
        try:
            with open(self._backup_file, 'w', encoding='utf-8') as f:
                json.dump(self._cache, f, indent=2, ensure_ascii=False)
        except Exception as e:
            print(f"Error saving backup: {e}")
    
    def _sync_cache(self):
        """Sync local cache with Memory Bank."""
        try:
            # Get all memories from Memory Bank
            loop = asyncio.get_event_loop()
            if loop.is_running():
                # If loop is running, schedule task
                asyncio.ensure_future(self._async_sync_cache())
            else:
                # Run in new event loop
                loop.run_until_complete(self._async_sync_cache())
        except Exception as e:
            print(f"Cache sync warning: {e}")
    
    async def _async_sync_cache(self):
        """Async cache sync helper - retrieve all sessions from Memory Bank."""
        try:
            # Use the memory bank service to retrieve all stored sessions
            from google.adk.sessions import Session
            
            # Since Memory Bank doesn't have a direct "list all" method,
            # we'll need to query it. Let's try to retrieve sessions by querying
            # with a broad search that matches our stored data
            
            # Alternative: Use the underlying storage to list sessions
            # The Memory Bank service stores sessions that we can retrieve
            sessions = await self._service.get_sessions(user_id="f1_report_system")
            
            if sessions:
                for session in sessions:
                    if hasattr(session, 'session_id') and hasattr(session, 'metadata'):
                        race_id = session.session_id
                        entry = session.metadata
                        if entry and isinstance(entry, dict):
                            self._cache[race_id] = entry
                
                print(f"Synced {len(self._cache)} report(s) from Memory Bank")
        except AttributeError:
            # If get_sessions doesn't exist, try alternative approach
            print("Memory Bank sync not available - using local cache only")
        except Exception as e:
            print(f"Async cache sync error: {e}")
    
    def add_session_to_memory(self, race_id: str, report_data: Dict[str, Any]) -> None:
        """Store a race report in Memory Bank and local backup."""
        try:
            timestamp = datetime.now().isoformat()
            entry = {
                "data": report_data,
                "timestamp": timestamp
            }
            
            # Store in local cache
            self._cache[race_id] = entry
            
            # Save to local backup file immediately
            self._save_to_backup()
            
            # Store in Memory Bank (async operation wrapped in sync)
            loop = asyncio.get_event_loop()
            if loop.is_running():
                asyncio.ensure_future(self._async_add_session(race_id, entry))
            else:
                loop.run_until_complete(self._async_add_session(race_id, entry))
        except Exception as e:
            print(f"Error storing in Memory Bank: {e}")
            raise
    
    async def _async_add_session(self, race_id: str, entry: Dict[str, Any]):
        """Async helper to add session to Memory Bank."""
        # Create a session object that Memory Bank expects
        from google.adk.sessions import Session
        session = Session(
            session_id=race_id,
            user_id="f1_report_system",
            metadata=entry
        )
        await self._service.add_session_to_memory(session)
    
    def search_memory(self, query: str) -> List[Dict[str, Any]]:
        """Search stored reports by race_id or GP name."""
        results = []
        query_lower = query.lower()
        
        for race_id, entry in self._cache.items():
            # Search in race_id and GP name
            gp_name = entry['data'].get('race_data', {}).get('gp_info', {}).get('name', '')
            if query_lower in race_id.lower() or query_lower in gp_name.lower():
                results.append({
                    "race_id": race_id,
                    "gp_name": gp_name,
                    "timestamp": entry['timestamp']
                })
        
        return results
    
    def get_report(self, race_id: str) -> Optional[Dict[str, Any]]:
        """Retrieve a specific report."""
        return self._cache.get(race_id)
    
    def list_all(self) -> List[Dict[str, Any]]:
        """List all stored reports."""
        return [{
            "race_id": race_id,
            "gp_name": entry['data'].get('race_data', {}).get('gp_info', {}).get('name', 'Unknown'),
            "timestamp": entry['timestamp']
        } for race_id, entry in self._cache.items()]

# Initialize memory service with Vertex AI Memory Bank + local backup
memory = MemoryService(
    project=PROJECT_ID,
    location=LOCATION,
    agent_engine_id=agent_engine_id
)
print("Memory service initialized (Vertex AI Memory Bank + local backup)")


Loaded 2 report(s) from local backup
Memory service initialized (Vertex AI Memory Bank + local backup)


Memory Bank sync not available - using local cache only


In [5]:
# F1 Calendar (2024/2025 - compatible structure)
F1_2025_CALENDAR = {
    1: {"name": "Bahrain Grand Prix", "circuit": "Bahrain International Circuit"},
    2: {"name": "Saudi Arabian Grand Prix", "circuit": "Jeddah Corniche Circuit"},
    3: {"name": "Australian Grand Prix", "circuit": "Albert Park Circuit"},
    4: {"name": "Japanese Grand Prix", "circuit": "Suzuka International Racing Course"},
    5: {"name": "Chinese Grand Prix", "circuit": "Shanghai International Circuit"},
    6: {"name": "Miami Grand Prix", "circuit": "Miami International Autodrome"},
    7: {"name": "Emilia Romagna Grand Prix", "circuit": "Autodromo Enzo e Dino Ferrari"},
    8: {"name": "Monaco Grand Prix", "circuit": "Circuit de Monaco"},
    9: {"name": "Spanish Grand Prix", "circuit": "Circuit de Barcelona-Catalunya"},
    10: {"name": "Canadian Grand Prix", "circuit": "Circuit Gilles Villeneuve"},
    11: {"name": "Austrian Grand Prix", "circuit": "Red Bull Ring"},
    12: {"name": "British Grand Prix", "circuit": "Silverstone Circuit"},
    13: {"name": "Belgian Grand Prix", "circuit": "Circuit de Spa-Francorchamps"},
    14: {"name": "Hungarian Grand Prix", "circuit": "Hungaroring"},
    15: {"name": "Dutch Grand Prix", "circuit": "Circuit Zandvoort"},
    16: {"name": "Italian Grand Prix", "circuit": "Autodromo Nazionale di Monza"},
    17: {"name": "Azerbaijan Grand Prix", "circuit": "Baku City Circuit"},
    18: {"name": "Singapore Grand Prix", "circuit": "Marina Bay Street Circuit"},
    19: {"name": "United States Grand Prix", "circuit": "Circuit of the Americas"},
    20: {"name": "Mexico City Grand Prix", "circuit": "Aut√≥dromo Hermanos Rodr√≠guez"},
    21: {"name": "S√£o Paulo Grand Prix", "circuit": "Aut√≥dromo Jos√© Carlos Pace"},
    22: {"name": "Las Vegas Grand Prix", "circuit": "Las Vegas Street Circuit"},
    23: {"name": "Qatar Grand Prix", "circuit": "Lusail International Circuit"},
    24: {"name": "Abu Dhabi Grand Prix", "circuit": "Yas Marina Circuit"}
}

print(f"‚úÖ Calendar loaded: {len(F1_2025_CALENDAR)} races")

‚úÖ Calendar loaded: 24 races


## 5. Agent 1: Data Collection

In [6]:
class DataCollectionAgent:
    """Validates input and collects F1 race data."""
    
    def __init__(self, calendar: Dict[int, Dict[str, str]], year: int = 2025):
        self.calendar = calendar
        self.year = year
    
    def validate_input(self, user_input: str) -> Optional[int]:
        """Validate and convert user input to round number."""
        user_input = user_input.strip()
        
        # Try parsing as round number
        try:
            round_num = int(user_input)
            return round_num if round_num in self.calendar else None
        except ValueError:
            pass
        
        # Try matching GP name
        user_lower = user_input.lower()
        for round_num, info in self.calendar.items():
            if user_lower in info['name'].lower():
                return round_num
        
        return None
    
    def collect_race_data(self, round_num: int, year: Optional[int] = None) -> Optional[Dict[str, Any]]:
        """Collect comprehensive race data."""
        if year is None:
            year = self.year
            
        try:
            print(f"üîç Collecting data for Round {round_num} ({year})...")
            
            # Get event and session
            event = fastf1.get_event(year, round_num)
            session = fastf1.get_session(year, round_num, "R")
            session.load()
            
            results = session.results
            
            # Process results
            drivers_results = []
            for idx, row in results.iterrows():
                # Try multiple position fields (fallback chain) with safe conversion
                position = None
                try:
                    if pd.notna(row.get('Position')) and str(row.get('Position', '')).strip():
                        position = int(row['Position'])
                except (ValueError, TypeError):
                    pass
                
                if position is None:
                    try:
                        if pd.notna(row.get('ClassifiedPosition')) and str(row.get('ClassifiedPosition', '')).strip():
                            position = int(row['ClassifiedPosition'])
                    except (ValueError, TypeError):
                        pass
                
                if position is None and 'Status' in row and str(row['Status']) == 'Finished':
                    # For finished drivers without position, use order in dataframe (usually sorted)
                    position = len([d for d in drivers_results if d['position'] is not None]) + 1
                
                # Handle GridPosition safely (might be empty string or NaN)
                grid_pos = None
                try:
                    if pd.notna(row['GridPosition']) and str(row['GridPosition']).strip():
                        grid_pos = int(row['GridPosition'])
                except (ValueError, TypeError):
                    pass
                
                drivers_results.append({
                    "position": position,
                    "full_name": str(row['FullName']) if pd.notna(row['FullName']) else None,
                    "team": str(row['TeamName']) if pd.notna(row['TeamName']) else None,
                    "grid_position": grid_pos,
                    "time": str(row['Time']) if pd.notna(row['Time']) else None,
                    "points": float(row['Points']) if pd.notna(row['Points']) else 0.0,
                })
            
            # If no positions were found, assign based on results order (FastF1 usually sorts by finish)
            if all(r['position'] is None for r in drivers_results):
                print("   ‚ÑπNo position data from Ergast, using results order")
                for idx, driver in enumerate(drivers_results):
                    driver['position'] = idx + 1
            
            # Get podium and key stats
            podium = sorted([r for r in drivers_results if r['position'] in [1, 2, 3]], key=lambda x: x['position'])
            
            # Compile data
            race_data = {
                "race_id": f"{year}_R{round_num}",
                "year": year,
                "round": round_num,
                "gp_info": {
                    "name": event.EventName,
                    "country": event.Country,
                    "circuit": self.calendar[round_num]['circuit'],
                },
                "podium": podium,
                "final_results": [r for r in drivers_results if r['position'] is not None]
            }
            
            print(f"‚úÖ Data collected: {event.EventName} ({year})")
            print(f"   Podium finishers: {len(podium)}")
            return race_data
            
        except Exception as e:
            # Fallback to previous year if current year fails
            if year >= 2024 and year == self.year:
                print(f"‚ö†Ô∏è {year} data unavailable, trying {year-1}...")
                return self.collect_race_data(round_num, year=year-1)
            print(f"‚ùå Error: {e}")
            return None
    
    def run(self, user_input: str) -> Optional[Dict[str, Any]]:
        """Main execution."""
        round_num = self.validate_input(user_input)
        if not round_num:
            print(f"‚ùå Invalid input: '{user_input}'")
            return None
        return self.collect_race_data(round_num)

agent1 = DataCollectionAgent(F1_2025_CALENDAR)
print("‚úÖ Agent 1 initialized")

‚úÖ Agent 1 initialized


## 6. Agent 2: Report Generation

In [7]:
class ReportGenerationAgent:
    """Generates social media reports from race data."""
    
    def __init__(self, model_name: str = 'gemini-2.5-flash'):
        self.model = GenerativeModel(model_name)
    
    def generate_report(self, race_data: Dict[str, Any]) -> Optional[str]:
        """Generate social media post."""
        try:
            gp_info = race_data['gp_info']
            podium = race_data['podium']
            
            # Validate podium data
            if len(podium) < 3:
                print(f"‚ùå Incomplete podium data: only {len(podium)} finisher(s)")
                return None
            
            print(f"‚úçÔ∏è Generating report for {gp_info['name']}...")
            
            prompt = f"""Create an Instagram post for this F1 race:

RACE: {gp_info['name']} ({race_data['year']})
CIRCUIT: {gp_info['circuit']}

PODIUM:
1st: {podium[0]['full_name']} ({podium[0]['team']}) - Started P{podium[0]['grid_position']}
2nd: {podium[1]['full_name']} ({podium[1]['team']}) - Started P{podium[1]['grid_position']}
3rd: {podium[2]['full_name']} ({podium[2]['team']}) - Started P{podium[2]['grid_position']}

Write an engaging 200-250 word post that tells the race story and highlights the key moments. Don't generate images, just text."""

            response = self.model.generate_content(
                prompt,
                generation_config={
                    "max_output_tokens": 2048,
                    "temperature": 0.5,
                }
            )
            
            print(f"‚úÖ Report generated ({len(response.text)} chars)")
            return response.text.strip()
            
        except Exception as e:
            print(f"‚ùå Error: {e}")
            return None
    
    def run(self, race_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Main execution."""
        if not race_data:
            return None
        
        social_media_post = self.generate_report(race_data)
        if not social_media_post:
            return None
        
        return {
            "race_id": race_data['race_id'],
            "race_data": race_data,
            "social_media_post": social_media_post,
            "timestamp": datetime.now().isoformat()
        }

agent2 = ReportGenerationAgent(MODEL_NAME)
print("‚úÖ Agent 2 initialized")

‚úÖ Agent 2 initialized




## 7. Workflow: Generate & Store Reports

In [8]:
def generate_f1_report(race_input: str) -> Optional[Dict[str, Any]]:
    """Complete workflow: collect data ‚Üí generate report ‚Üí store in memory."""
    
    # 1. Collect race data
    race_data = agent1.run(race_input)
    if not race_data:
        return None
    
    # 2. Generate report
    full_report = agent2.run(race_data)
    if not full_report:
        return None
    
    # 3. Store in memory (INGEST)
    memory.add_session_to_memory(full_report['race_id'], full_report)
    
    # Display result
    print("\n" + "="*70)
    print("üì± SOCIAL MEDIA POST")
    print("="*70)
    print(f"\n{full_report['social_media_post']}\n")
    print("="*70)
    print(f"üíæ Stored as: {full_report['race_id']}")
    
    return full_report

print("‚úÖ Workflow ready")

‚úÖ Workflow ready


## 8. Memory Operations

In [9]:
# Search memory (RETRIEVE)
def search_reports(query: str):
    """Search stored reports."""
    results = memory.search_memory(query)
    if results:
        print(f"üîç Found {len(results)} report(s):")
        for r in results:
            print(f"  ‚Ä¢ {r['race_id']}: {r['gp_name']} ({r['timestamp']})")
    else:
        print(f"‚ùå No reports found for '{query}'")
    return results

# List all reports
def list_reports():
    """List all stored reports."""
    reports = memory.list_all()
    if reports:
        print(f"üìã Stored reports ({len(reports)}):")
        for r in reports:
            print(f"  ‚Ä¢ {r['race_id']}: {r['gp_name']}")
    else:
        print("üìã No reports stored yet")
    return reports

# Get specific report
def get_report(race_id: str):
    """Retrieve a specific report."""
    report = memory.get_report(race_id)
    if report:
        print(f"‚úÖ Retrieved: {race_id}")
        print(f"   GP: {report['data']['race_data']['gp_info']['name']}")
        print(f"   Stored: {report['timestamp']}")
        return report
    else:
        print(f"‚ùå Report '{race_id}' not found")
        return None

print("‚úÖ Memory operations ready")

‚úÖ Memory operations ready


## 9. Example Usage

In [None]:
# Generate a report (try with different races)
report = generate_f1_report("Bahrain")

# Or use round number
# report = generate_f1_report("1")

## 10. Search & Retrieve from Memory


In [None]:
# List all stored reports
list_reports()

# Search for specific reports
search_reports("Bahrain")

# Retrieve a specific report
report = get_report("2025_R1")


In [None]:
# Interactive mode
race_input = input("Enter race (round number or GP name): ")
report = generate_f1_report(race_input)


In [10]:
list_reports()


üìã Stored reports (2):
  ‚Ä¢ 2025_R1: Australian Grand Prix
  ‚Ä¢ 2025_R8: Monaco Grand Prix


[{'race_id': '2025_R1',
  'gp_name': 'Australian Grand Prix',
  'timestamp': '2025-11-25T09:25:56.412506'},
 {'race_id': '2025_R8',
  'gp_name': 'Monaco Grand Prix',
  'timestamp': '2025-11-25T09:27:51.198281'}]