In [1]:
import requests
import pandas as pd
from datetime import datetime
import time

In [2]:
def get_f1_race_data(year):
    """
    Fetch F1 race data for a specific year using OpenF1 API
    Updated to handle missing grid positions and pit stop data
    """
    base_url = "https://api.openf1.org/v1"
    
    # Get all meetings (Grand Prix) for the year
    print(f"Fetching meetings for {year}...")
    meetings_url = f"{base_url}/meetings?year={year}"
    meetings_response = requests.get(meetings_url)
    meetings = meetings_response.json()
    
    all_race_data = []
    
    for meeting in meetings:
        meeting_key = meeting['meeting_key']
        grand_prix = meeting['meeting_name']
        circuit_name = meeting['circuit_short_name']
        
        print(f"\nProcessing: {grand_prix}")
        
        # Get all sessions for this meeting
        sessions_url = f"{base_url}/sessions?meeting_key={meeting_key}"
        sessions_response = requests.get(sessions_url)
        all_sessions = sessions_response.json()
        
        # Find Race session
        race_session = None
        qualifying_session = None
        
        for session in all_sessions:
            if session['session_name'] == 'Race':
                race_session = session
            elif 'Qualifying' in session['session_name']:
                qualifying_session = session
        
        if not race_session:
            print(f"  No race session found for {grand_prix}")
            continue
            
        session_key = race_session['session_key']
        
        # Get drivers for this session
        drivers_url = f"{base_url}/drivers?session_key={session_key}"
        drivers_response = requests.get(drivers_url)
        drivers = drivers_response.json()
        
        # Get weather data to determine wet/dry
        weather_url = f"{base_url}/weather?session_key={session_key}"
        weather_response = requests.get(weather_url)
        weather_data = weather_response.json()
        
        # Determine if wet or dry
        wet_dry = "dry"
        if weather_data:
            if any(w.get('rainfall', 0) > 0 for w in weather_data):
                wet_dry = "wet"
        
        # Get session results for finish positions
        results_url = f"{base_url}/position?session_key={session_key}"
        results_response = requests.get(results_url)
        position_data = results_response.json()
        
        # Get final race results
        race_results_url = f"{base_url}/session_result?session_key={session_key}"
        race_results_response = requests.get(race_results_url)
        race_results = race_results_response.json()
        
        # Create mappings for finish positions
        finish_positions = {}
        if race_results:
            finish_positions = {r['driver_number']: r['position'] for r in race_results}
        
        # Try multiple methods to get grid positions
        grid_positions = {}
        
        # Method 1: Use starting_grid endpoint
        grid_url = f"{base_url}/starting_grid?session_key={session_key}"
        grid_response = requests.get(grid_url)
        grid_data = grid_response.json()
        
        if grid_data:
            grid_positions = {g['driver_number']: g['position'] for g in grid_data}
            print(f"  Grid positions found via starting_grid endpoint: {len(grid_positions)} drivers")
        
        # Method 2: If no grid data, try qualifying results
        if not grid_positions and qualifying_session:
            qual_session_key = qualifying_session['session_key']
            qual_results_url = f"{base_url}/session_result?session_key={qual_session_key}"
            qual_results_response = requests.get(qual_results_url)
            qual_results = qual_results_response.json()
            
            if qual_results:
                grid_positions = {q['driver_number']: q['position'] for q in qual_results}
                print(f"  Grid positions found via qualifying results: {len(grid_positions)} drivers")
        
        # Method 3: If still no grid data, use initial position data from race
        if not grid_positions and position_data:
            # Group by driver and get earliest position
            driver_initial_positions = {}
            for pos in position_data:
                driver_num = pos['driver_number']
                if driver_num not in driver_initial_positions:
                    driver_initial_positions[driver_num] = pos
                else:
                    # Keep earliest timestamp
                    if pos['date'] < driver_initial_positions[driver_num]['date']:
                        driver_initial_positions[driver_num] = pos
            
            grid_positions = {d: p['position'] for d, p in driver_initial_positions.items()}
            print(f"  Grid positions found via initial race positions: {len(grid_positions)} drivers")
        
        for driver in drivers:
            driver_number = driver['driver_number']
            driver_name = driver['full_name']
            driver_code = driver['name_acronym']
            
            print(f"  Processing driver: {driver_code} (#{driver_number})")
            
            # Get laps for this driver
            laps_url = f"{base_url}/laps?session_key={session_key}&driver_number={driver_number}"
            laps_response = requests.get(laps_url)
            laps = laps_response.json()
            
            if not laps:
                print(f"    No lap data found")
                continue
            
            # Calculate fastest lap time
            valid_laps = [l for l in laps if l.get('lap_duration') is not None and l.get('lap_duration') > 0]
            fastest_lap_time = min([l['lap_duration'] for l in valid_laps]) if valid_laps else None
            
            # Calculate average lap time (excluding pit laps and outliers)
            lap_times = [l['lap_duration'] for l in valid_laps if not l.get('is_pit_out_lap', False)]
            avg_lap_time = sum(lap_times) / len(lap_times) if lap_times else None
            
            # Calculate average sector times
            sector_1_times = [l['duration_sector_1'] for l in valid_laps if l.get('duration_sector_1') is not None and l.get('duration_sector_1') > 0]
            sector_2_times = [l['duration_sector_2'] for l in valid_laps if l.get('duration_sector_2') is not None and l.get('duration_sector_2') > 0]
            sector_3_times = [l['duration_sector_3'] for l in valid_laps if l.get('duration_sector_3') is not None and l.get('duration_sector_3') > 0]
            
            avg_sector_1 = sum(sector_1_times) / len(sector_1_times) if sector_1_times else None
            avg_sector_2 = sum(sector_2_times) / len(sector_2_times) if sector_2_times else None
            avg_sector_3 = sum(sector_3_times) / len(sector_3_times) if sector_3_times else None
            
            # Get pit stops using multiple methods
            pitstops = 0
            
            # Method 1: Use pit endpoint
            pit_url = f"{base_url}/pit?session_key={session_key}&driver_number={driver_number}"
            pit_response = requests.get(pit_url)
            pit_data = pit_response.json()
            
            if pit_data:
                pitstops = len(pit_data)
                print(f"    Pit stops from pit endpoint: {pitstops}")
            else:
                # Method 2: Use stints data (more reliable for historical races)
                stints_url = f"{base_url}/stints?session_key={session_key}&driver_number={driver_number}"
                stints_response = requests.get(stints_url)
                stints_data = stints_response.json()
                
                if stints_data:
                    # Number of pit stops = number of stints - 1
                    pitstops = max(0, len(stints_data) - 1)
                    print(f"    Pit stops calculated from stints: {pitstops}")
                else:
                    # Method 3: Count pit out laps
                    pit_out_laps = sum(1 for l in laps if l.get('is_pit_out_lap', False))
                    pitstops = pit_out_laps
                    print(f"    Pit stops from pit out laps: {pitstops}")
            
            # Get grid position and finish position
            grid_position = grid_positions.get(driver_number, None)
            finished_position = finish_positions.get(driver_number, None)
            
            # Determine track type
            street_circuits = ['Monaco', 'Singapore', 'Baku', 'Jeddah', 'Melbourne', 'Miami', 'Las Vegas']
            track_type = "street" if any(street in circuit_name for street in street_circuits) else "circuit"
            
            race_data = {
                'grandprix': grand_prix,
                'year': year,
                'driver_name': driver_name,
                'driver_code': driver_code,
                'driver_number': driver_number,
                'track': circuit_name,
                'track_type': track_type,
                'wet_dry': wet_dry,
                'fastest_lap_time': fastest_lap_time,
                'avg_lap_time': avg_lap_time,
                'avg_sector_1_time': avg_sector_1,
                'avg_sector_2_time': avg_sector_2,
                'avg_sector_3_time': avg_sector_3,
                'pitstops': pitstops,
                'grid_position': grid_position,
                'finished_position': finished_position
            }
            
            all_race_data.append(race_data)
            
        # Small delay to avoid overwhelming the API
        time.sleep(0.5)
    
    return pd.DataFrame(all_race_data)

In [3]:
# Main execution
if __name__ == "__main__":
    year = 2025
    
    print(f"Starting data collection for F1 {year} season...")
    df = get_f1_race_data(year)
    
    # Save to CSV
    filename = f"f1_race_data_{year}.csv"
    df.to_csv(filename, index=False)
    
    print(f"\n{'='*60}")
    print(f"Data collection complete!")
    print(f"Total records: {len(df)}")
    print(f"File saved as: {filename}")
    print(f"{'='*60}")
    
    # Display first few rows
    print("\nFirst 5 rows of data:")
    print(df.head())
    
    # Check for missing data
    print("\nMissing data summary:")
    print(df.isnull().sum())
    
    # Summary by race
    print("\nData summary by Grand Prix:")
    summary = df.groupby('grandprix').agg({
        'grid_position': lambda x: f"{x.notna().sum()}/{len(x)}",
        'pitstops': 'mean'
    })
    print(summary)

Starting data collection for F1 2025 season...
Fetching meetings for 2025...

Processing: Pre-Season Testing
  No race session found for Pre-Season Testing

Processing: Australian Grand Prix
  Grid positions found via qualifying results: 20 drivers
  Processing driver: VER (#1)
    Pit stops from pit endpoint: 5
  Processing driver: NOR (#4)
    Pit stops from pit endpoint: 5
  Processing driver: BOR (#5)
    Pit stops from pit endpoint: 5
  Processing driver: HAD (#6)
    Pit stops calculated from stints: 0
  Processing driver: DOO (#7)
    Pit stops calculated from stints: 0
  Processing driver: GAS (#10)
    Pit stops from pit endpoint: 5
  Processing driver: ANT (#12)
    Pit stops from pit endpoint: 5
  Processing driver: ALO (#14)
    Pit stops from pit endpoint: 3
  Processing driver: LEC (#16)
    Pit stops from pit endpoint: 5
  Processing driver: STR (#18)
    Pit stops from pit endpoint: 5
  Processing driver: TSU (#22)
    Pit stops from pit endpoint: 5
  Processing driver: