# F1 Track Visualization — Data Preparation

This notebook extracts track layouts and driver position data from FastF1,
then saves them as compact JSON files for the FormulaHub backend to serve.

**Run once per set of races you want available.** Output files go into `track_data/`.

### Pipeline
1. Load race session with telemetry from FastF1
2. Extract track outline from fastest lap X/Y coordinates
3. Extract driver positions at 1 Hz (1 sample/second)
4. Normalize coordinates and save as JSON

### Output
One JSON file per race: `track_data/{year}_{round}.json`  
Each contains: track layout + all driver positions + metadata

In [None]:
!pip install fastf1 pyarrow --quiet

In [None]:
import fastf1
import pandas as pd
import numpy as np
import json
import os
import warnings
from pathlib import Path

warnings.filterwarnings('ignore')

CACHE_DIR = Path('f1_cache')
OUTPUT_DIR = Path('track_data')
CACHE_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)

fastf1.Cache.enable_cache(str(CACHE_DIR))
print('Ready')

## Configuration

Choose which races to process. Each race takes ~2-5 minutes to download telemetry.

In [None]:
# ============================================================
#  RACES TO PROCESS
#  Format: (year, round_number_or_event_name)
#  Telemetry is available for 2018+ seasons
# ============================================================

RACES = [
    (2024, 1),   # Bahrain GP
    (2024, 6),   # Monaco GP
    (2024, 12),  # British GP (Silverstone)
    (2024, 14),  # Belgian GP (Spa)
    (2024, 16),  # Italian GP (Monza)
]

# Position sampling rate (Hz). 1 = one sample per second.
# Higher = smoother but larger files. 1-2 is ideal.
SAMPLE_RATE = 1

# Track outline resolution (number of points)
TRACK_POINTS = 500

print(f'Will process {len(RACES)} races at {SAMPLE_RATE} Hz')

## Team Color Map

In [None]:
TEAM_COLORS = {
    'Red Bull Racing': '#3671C6',
    'Ferrari': '#E80020',
    'Mercedes': '#27F4D2',
    'McLaren': '#FF8000',
    'Aston Martin': '#229971',
    'Alpine': '#FF87BC',
    'Williams': '#64C4FF',
    'RB': '#6692FF',
    'AlphaTauri': '#6692FF',
    'Kick Sauber': '#52E252',
    'Alfa Romeo': '#C92D4B',
    'Haas F1 Team': '#B6BABD',
}

def get_team_color(team_name):
    """Get team color, trying FastF1 first then fallback map."""
    # Exact match
    if team_name in TEAM_COLORS:
        return TEAM_COLORS[team_name]
    # Partial match
    for key, color in TEAM_COLORS.items():
        if key.lower() in team_name.lower() or team_name.lower() in key.lower():
            return color
    return '#FFFFFF'

## Data Extraction Functions

In [None]:
def extract_track_layout(session, num_points=500):
    """
    Extract the track outline from the fastest lap's telemetry.
    Returns normalized X, Y arrays (0-1 range, aspect ratio preserved).
    """
    fastest = session.laps.pick_fastest()
    tel = fastest.get_telemetry()

    if tel is None or tel.empty or 'X' not in tel.columns:
        raise ValueError('No position data in telemetry')

    x = tel['X'].values.astype(float)
    y = tel['Y'].values.astype(float)

    # Remove NaN
    mask = ~(np.isnan(x) | np.isnan(y))
    x, y = x[mask], y[mask]

    if len(x) == 0:
        raise ValueError('All position data is NaN')

    # Normalize preserving aspect ratio
    x_min, x_max = x.min(), x.max()
    y_min, y_max = y.min(), y.max()
    x_range = x_max - x_min
    y_range = y_max - y_min
    scale = max(x_range, y_range)

    padding = 0.05
    x_norm = (x - x_min) / scale * (1 - 2 * padding) + padding
    y_norm = (y - y_min) / scale * (1 - 2 * padding) + padding

    # Center the shorter axis
    if x_range < y_range:
        x_norm += (1 - 2 * padding - x_range / scale * (1 - 2 * padding)) / 2
    else:
        y_norm += (1 - 2 * padding - y_range / scale * (1 - 2 * padding)) / 2

    # Downsample to target number of points
    step = max(1, len(x_norm) // num_points)
    x_ds = np.round(x_norm[::step], 4).tolist()
    y_ds = np.round(y_norm[::step], 4).tolist()

    return x_ds, y_ds, {
        'x_min': float(x_min), 'x_max': float(x_max),
        'y_min': float(y_min), 'y_max': float(y_max),
        'scale': float(scale), 'padding': padding,
    }


def normalize_position(val, val_min, scale, padding):
    """Normalize a coordinate using the same transform as the track layout."""
    return (val - val_min) / scale * (1 - 2 * padding) + padding

In [None]:
def extract_driver_positions(session, norm_params, sample_rate=1):
    """
    Extract every driver's position at a fixed sample rate.

    Returns:
        drivers_info: dict of driver metadata
        positions: dict of {abbr: {x: [...], y: [...]}}
        duration_sec: total duration in seconds
    """
    drivers_info = {}
    raw_positions = {}  # abbr -> DataFrame with time_sec, x, y

    # Determine global race time reference
    global_min_time = None

    for drv_num in session.drivers:
        drv_laps = session.laps.pick_drivers(drv_num)
        if drv_laps.empty:
            continue

        abbr = drv_laps.iloc[0]['Driver']
        team = str(drv_laps.iloc[0]['Team'])
        full_name = f"{abbr}"  # FastF1 doesn't always have full name in laps

        # Try to get full name from session results
        try:
            res = session.results
            drv_res = res[res['Abbreviation'] == abbr]
            if not drv_res.empty:
                full_name = str(drv_res.iloc[0]['FullName'])
        except Exception:
            pass

        # Collect position data from all laps
        pos_frames = []
        for _, lap in drv_laps.iterlaps():
            try:
                pos = lap.get_pos_data()
                if pos is not None and not pos.empty and 'X' in pos.columns:
                    pos_frames.append(pos[['Date', 'X', 'Y']].copy())
            except Exception:
                continue

        if not pos_frames:
            print(f'    {abbr}: no position data, skipping')
            continue

        combined = pd.concat(pos_frames).sort_values('Date').drop_duplicates(subset='Date')
        combined = combined.dropna(subset=['X', 'Y'])

        if combined.empty:
            continue

        if global_min_time is None:
            global_min_time = combined['Date'].min()
        else:
            global_min_time = min(global_min_time, combined['Date'].min())

        drivers_info[abbr] = {
            'full_name': full_name,
            'number': int(drv_num),
            'team': team,
            'color': get_team_color(team),
        }
        raw_positions[abbr] = combined

    if global_min_time is None:
        raise ValueError('No position data found for any driver')

    # Find global max time
    global_max_time = max(df['Date'].max() for df in raw_positions.values())
    duration_sec = (global_max_time - global_min_time).total_seconds()

    # Create uniform time grid
    sample_interval = 1.0 / sample_rate
    time_grid = np.arange(0, duration_sec, sample_interval)

    positions = {}
    p = norm_params

    for abbr, df in raw_positions.items():
        df = df.copy()
        df['time_sec'] = (df['Date'] - global_min_time).dt.total_seconds()

        # Normalize coords with same transform as track
        x_raw = df['X'].values.astype(float)
        y_raw = df['Y'].values.astype(float)

        x_norm = normalize_position(x_raw, p['x_min'], p['scale'], p['padding'])
        y_norm = normalize_position(y_raw, p['y_min'], p['scale'], p['padding'])

        # Center offset (same as track)
        x_range = p['x_max'] - p['x_min']
        y_range = p['y_max'] - p['y_min']
        if x_range < y_range:
            x_norm += (1 - 2*p['padding'] - x_range/p['scale']*(1 - 2*p['padding'])) / 2
        else:
            y_norm += (1 - 2*p['padding'] - y_range/p['scale']*(1 - 2*p['padding'])) / 2

        time_vals = df['time_sec'].values

        # Interpolate to uniform grid
        x_interp = np.interp(time_grid, time_vals, x_norm)
        y_interp = np.interp(time_grid, time_vals, y_norm)

        positions[abbr] = {
            'x': np.round(x_interp, 4).tolist(),
            'y': np.round(y_interp, 4).tolist(),
        }
        print(f'    {abbr}: {len(time_grid)} samples')

    return drivers_info, positions, float(duration_sec)

In [None]:
def extract_lap_times(session):
    """
    Compute cumulative time at each lap boundary (from the leader).
    Returns list of seconds from race start where each lap starts.
    """
    try:
        # Use the race winner's lap times as reference
        results = session.results.sort_values('Position')
        winner_abbr = results.iloc[0]['Abbreviation']
        winner_laps = session.laps.pick_drivers(winner_abbr).sort_values('LapNumber')

        lap_starts = [0.0]
        cumulative = 0.0
        for _, lap in winner_laps.iterrows():
            lt = lap['LapTime']
            if pd.notna(lt):
                cumulative += lt.total_seconds()
                lap_starts.append(round(cumulative, 1))

        return lap_starts
    except Exception as e:
        print(f'    Could not extract lap times: {e}')
        return [0.0]

## Process Races

In [None]:
for year, event in RACES:
    print(f'\n{"=" * 60}')
    print(f'Processing: {year} — Event {event}')
    print('=' * 60)

    try:
        # Load session with telemetry
        session = fastf1.get_session(year, event, 'R')
        session.load(telemetry=True, weather=False, messages=False)

        event_name = session.event['EventName']
        round_num = int(session.event['RoundNumber'])
        circuit_name = session.event.get('CircuitShortName', session.event.get('Location', event_name))
        country = session.event.get('Country', '')
        event_date = str(session.event.get('EventDate', ''))[:10]

        print(f'  Event: {event_name} (Round {round_num})')
        print(f'  Circuit: {circuit_name}, {country}')

        # 1. Track layout
        print('  Extracting track layout...')
        track_x, track_y, norm_params = extract_track_layout(session, TRACK_POINTS)
        print(f'    Track: {len(track_x)} points')

        # 2. Driver positions
        print('  Extracting driver positions...')
        drivers_info, positions, duration_sec = extract_driver_positions(
            session, norm_params, SAMPLE_RATE
        )

        # 3. Lap timing
        print('  Extracting lap times...')
        lap_starts = extract_lap_times(session)
        total_laps = len(lap_starts) - 1
        print(f'    {total_laps} laps')

        # 4. Build output JSON
        output = {
            'info': {
                'year': year,
                'round': round_num,
                'event': event_name,
                'circuit': circuit_name,
                'country': country,
                'date': event_date,
                'total_laps': total_laps,
                'duration_sec': round(duration_sec, 1),
                'sample_rate': SAMPLE_RATE,
            },
            'track': {
                'x': track_x,
                'y': track_y,
            },
            'drivers': drivers_info,
            'lap_starts': lap_starts,
            'positions': positions,
        }

        # 5. Save
        filename = f'{year}_{round_num}.json'
        filepath = OUTPUT_DIR / filename
        with open(filepath, 'w') as f:
            json.dump(output, f)

        size_mb = filepath.stat().st_size / (1024 * 1024)
        print(f'  Saved: {filepath} ({size_mb:.1f} MB)')
        print(f'  Drivers: {len(drivers_info)}, Duration: {duration_sec/60:.0f} min')

    except Exception as e:
        print(f'  FAILED: {e}')
        import traceback
        traceback.print_exc()

print(f'\n\nDone! Files in {OUTPUT_DIR}:')
for f in sorted(OUTPUT_DIR.glob('*.json')):
    print(f'  {f.name} ({f.stat().st_size / 1024:.0f} KB)')

## Preview Track Layout

In [None]:
import matplotlib.pyplot as plt

# Show all extracted tracks
files = sorted(OUTPUT_DIR.glob('*.json'))
n = len(files)
if n == 0:
    print('No track data files found.')
else:
    cols = min(3, n)
    rows = (n + cols - 1) // cols
    fig, axes = plt.subplots(rows, cols, figsize=(6*cols, 6*rows))
    if n == 1:
        axes = [axes]
    else:
        axes = axes.flat

    for ax, filepath in zip(axes, files):
        with open(filepath) as f:
            data = json.load(f)
        tx = data['track']['x']
        ty = data['track']['y']
        ax.plot(tx, ty, color='#333', linewidth=8, solid_capstyle='round')
        ax.plot(tx, ty, color='#666', linewidth=6, solid_capstyle='round')
        # Start/finish
        ax.plot(tx[0], ty[0], 'rs', markersize=10)
        ax.set_title(f"{data['info']['event']}\n{data['info']['year']}")
        ax.set_aspect('equal')
        ax.invert_yaxis()
        ax.set_facecolor('#111')
        ax.tick_params(colors='#666')

    # Hide unused axes
    for i in range(n, len(list(axes))):
        axes[i].set_visible(False)

    fig.patch.set_facecolor('#111')
    plt.tight_layout()
    plt.show()

## Download for Backend

In [None]:
# Download all JSON files
try:
    from google.colab import files
    for f in sorted(OUTPUT_DIR.glob('*.json')):
        files.download(str(f))
    print('Files downloaded — place them in formula-hub-backend/track_data/')
except ImportError:
    print(f'Not in Colab. Files are at: {OUTPUT_DIR.absolute()}')