# Data Ingestion using FastF1


In [None]:
# Install fastf1 if not already installed
!pip install fastf1

# --------------------------------------------------------------
#  FASTF1 – 2018-2025 → ONE BIG CSV PER DATA TYPE
# --------------------------------------------------------------


In [14]:

# --------------------------------------------------------------



#  SIMPLE, CORRECT, FASTF1-NATIVE: 2018–2025 → CSVs

# --------------------------------------------------------------

import fastf1 as ff1

from pathlib import Path

import logging

import time

from typing import Any, Callable, Dict, Optional, Set, Tuple

import pandas as pd

from fastf1.core import DataNotLoadedError

from fastf1.req import RateLimitExceededError

from requests.exceptions import ConnectionError as RequestsConnectionError, Timeout as RequestsTimeout

# --- LOGGING ---

logging.basicConfig(

    level=logging.INFO,

    format="%(asctime)s | %(levelname)8s | %(message)s",

    datefmt="%H:%M:%S",

    force=True,

)

logger = logging.getLogger("fastf1-export")

logging.getLogger("fastf1").setLevel(logging.INFO)

# --- PATHS ---

CACHE_DIR = Path(r"C:\Users\erikv\Downloads\F1\notebooks\f1_cache")

SAVE_ROOT = Path(r"C:\Users\erikv\Downloads\F1\data\raw\fastf1_2018plus")

SAVE_ROOT.mkdir(parents=True, exist_ok=True)

ff1.Cache.enable_cache(str(CACHE_DIR))

cache_info = ff1.Cache.get_cache_info()

logger.info("FastF1 cache enabled: %s @ %s", cache_info[0], cache_info[1])

# --- CONFIG ---

YEARS = range(2018, 2026)

SESSION_TYPES = ["R", "Q", "FP1", "FP2", "FP3", "Sprint"]  # Sprint only 2021+

DATASETS = ("RESULTS", "LAPS", "TELEMETRY", "WEATHER", "MESSAGES")

MESSAGES_START_YEAR = 2021

LOAD_RETRIES = 60

LOAD_RETRY_BASE_SECONDS = 5

GENERAL_MAX_RETRIES = 5

RATE_LIMIT_WAIT_SECONDS = 60

# --- HELPERS ---

def to_dataframe(obj: Any) -> Optional[pd.DataFrame]:

    if obj is None:

        return None

    if isinstance(obj, pd.DataFrame):

        df = obj.copy()

    else:

        try:

            df = obj.copy()

            if not isinstance(df, pd.DataFrame):

                df = pd.DataFrame(df)

        except Exception:

            try:

                df = pd.DataFrame(obj)

            except Exception:

                return None

    if hasattr(df, "empty") and df.empty:

        return None

    return df.reset_index(drop=True)


def append_dataset(

    store: Dict[str, list],

    key: str,

    df_like: Any,

    *,

    year: int,

    event: str,

    session: str,

    label: str,

    extra: Optional[Dict[str, Any]] = None,

) -> None:

    if key not in store:

        return

    df = to_dataframe(df_like)

    if df is None:

        logger.info("    %s → %s: no data", label, key)

        return

    df["Year"] = year

    df["Event"] = event

    df["Session"] = session

    if extra:

        for col, value in extra.items():

            df[col] = value

    store[key].append(df)

    logger.info("    %s → %s: %d rows", label, key, len(df))


def dataset_output_path(kind: str, year: int) -> Path:

    return SAVE_ROOT / f"ALL_{kind}_{year}.csv"


def determine_required_datasets(year: int) -> Set[str]:

    required: Set[str] = set()

    for kind in DATASETS:

        if kind == "MESSAGES" and year < MESSAGES_START_YEAR:

            continue

        if not dataset_output_path(kind, year).exists():

            required.add(kind)

    return required


def try_session_attr(sess: Any, attr: str) -> Any:

    try:

        return getattr(sess, attr, None)

    except DataNotLoadedError:

        return None


def detect_missing_data(

    sess: Any,

    required: Set[str],

    load_flags: Dict[str, bool],

    *,

    telemetry_enabled: bool,

    session_label: str,

) -> Set[str]:

    missing: Set[str] = set()

    if "LAPS" in required and load_flags.get("laps", False):

        if to_dataframe(try_session_attr(sess, "laps")) is None:

            missing.add("LAPS")

    if "WEATHER" in required and load_flags.get("weather", False):

        if to_dataframe(try_session_attr(sess, "weather_data")) is None:

            missing.add("WEATHER")

    if "TELEMETRY" in required and load_flags.get("telemetry", False) and telemetry_enabled:

        car_data = try_session_attr(sess, "car_data")

        has_data = False

        if isinstance(car_data, dict):

            for tel in car_data.values():

                if to_dataframe(tel) is not None:

                    has_data = True

                    break

        else:

            has_data = to_dataframe(car_data) is not None

        if not has_data:

            missing.add("TELEMETRY")

    if missing:

        logger.warning(

            "    %s → required datasets missing after load: %s",

            session_label,

            ", ".join(sorted(missing)),

        )

    return missing


def safe_session_attr(sess: Any, attr: str, session_label: str) -> Any:

    try:

        return getattr(sess, attr, None)

    except DataNotLoadedError:

        logger.warning("    %s → %s not loaded", session_label, attr)

        return None


def call_with_rate_limit_retry(

    func: Callable[[], Any],

    *,

    description: str,

    max_attempts: int = GENERAL_MAX_RETRIES,

) -> Any:

    attempt = 0

    while True:

        attempt += 1

        try:

            return func()

        except RateLimitExceededError as exc:

            wait_seconds = max(RATE_LIMIT_WAIT_SECONDS, LOAD_RETRY_BASE_SECONDS * attempt)

            logger.warning(

                "%s → rate limit reached (%s); waiting %ds before retry (attempt %d)",

                description,

                exc,

                wait_seconds,

                attempt,

            )

            time.sleep(wait_seconds)

        except (RequestsConnectionError, RequestsTimeout) as exc:

            if attempt >= max_attempts:

                raise

            logger.warning(

                "%s → connection issue (%s); retrying (attempt %d/%d)",

                description,

                exc,

                attempt,

                max_attempts,

            )


def load_session_with_retry(

    sess: Any,

    *,

    session_label: str,

    load_flags: Dict[str, bool],

    required_datasets: Set[str],

) -> Dict[str, Any]:

    flags = dict(load_flags)

    telemetry_enabled = flags.get("telemetry", False)

    total_start = time.perf_counter()

    if not any(flags.values()):

        flags["laps"] = True

    attempt_counter = 0

    failure_counter = 0

    while True:

        attempt_counter += 1

        attempt_start = time.perf_counter()

        try:

            sess.load(

                laps=flags.get("laps", False),

                telemetry=telemetry_enabled,

                weather=flags.get("weather", False),

                messages=flags.get("messages", False),

            )

            elapsed = time.perf_counter() - attempt_start

            logger.info(

                "    %s → load() completed in %.1fs (attempt %d; telemetry=%s)",

                session_label,

                elapsed,

                attempt_counter,

                telemetry_enabled,

            )

            missing = detect_missing_data(

                sess,

                required_datasets,

                flags,

                telemetry_enabled=telemetry_enabled,

                session_label=session_label,

            )

            if missing:

                if attempt_counter < LOAD_RETRIES:

                    wait_seconds = max(

                        RATE_LIMIT_WAIT_SECONDS,

                        LOAD_RETRY_BASE_SECONDS * attempt_counter,

                    )

                    logger.warning(

                        "    %s → data unavailable for: %s; waiting %ds before retry",

                        session_label,

                        ", ".join(sorted(missing)),

                        wait_seconds,

                    )

                    time.sleep(wait_seconds)

                    continue

                logger.warning(

                    "    %s → data unavailable after %d attempts; skipping datasets: %s",

                    session_label,

                    attempt_counter,

                    ", ".join(sorted(missing)),

                )

                return {

                    "success": True,

                    "telemetry_enabled": telemetry_enabled,

                    "attempts": attempt_counter,

                    "elapsed": time.perf_counter() - total_start,

                    "missing": missing,

                }

            return {

                "success": True,

                "telemetry_enabled": telemetry_enabled,

                "attempts": attempt_counter,

                "elapsed": time.perf_counter() - total_start,

                "missing": set(),

            }

        except Exception as exc:  # noqa: BLE001

            failure_counter += 1

            exc_msg = f"{type(exc).__name__}: {exc}"

            if (

                telemetry_enabled

                and flags.get("telemetry", False)

                and isinstance(exc, TypeError)

                and "TimedeltaArray" in exc_msg

                and "NoneType" in exc_msg

            ):

                logger.warning(

                    "    %s → telemetry processing failed (%s); disabling telemetry and retrying",

                    session_label,

                    exc_msg,

                )

                telemetry_enabled = False

                flags["telemetry"] = False

                failure_counter = 0

                continue

            if isinstance(exc, (RateLimitExceededError, RequestsConnectionError)) or "Failed to load" in exc_msg:

                wait_seconds = max(

                    RATE_LIMIT_WAIT_SECONDS,

                    LOAD_RETRY_BASE_SECONDS * min(failure_counter, LOAD_RETRIES),

                )

                logger.warning(

                    "    %s → possible rate limit/transport issue (%s); waiting %ds before retry",

                    session_label,

                    exc_msg,

                    wait_seconds,

                )

                time.sleep(wait_seconds)

                continue

            if isinstance(exc, RequestsTimeout):

                if failure_counter >= LOAD_RETRIES:

                    logger.exception(

                        "    %s → load() failed after %d attempts (%s)",

                        session_label,

                        failure_counter,

                        exc_msg,

                    )

                    return {

                        "success": False,

                        "telemetry_enabled": telemetry_enabled,

                        "attempts": attempt_counter,

                        "elapsed": time.perf_counter() - total_start,

                        "missing": set(),

                    }

                logger.warning(

                    "    %s → load() connection issue (%s); retrying (attempt %d/%d)",

                    session_label,

                    exc_msg,

                    failure_counter,

                    LOAD_RETRIES,

                )

                continue

            if failure_counter >= LOAD_RETRIES:

                logger.exception(

                    "    %s → load() failed after %d attempts (%s)",

                    session_label,

                    failure_counter,

                    exc_msg,

                )

                return {

                    "success": False,

                    "telemetry_enabled": telemetry_enabled,

                    "attempts": attempt_counter,

                    "elapsed": time.perf_counter() - total_start,

                    "missing": set(),

                }

            logger.warning(

                "    %s → load() failed (%s); retrying (attempt %d/%d)",

                session_label,

                exc_msg,

                failure_counter,

                LOAD_RETRIES,

            )

    return {

        "success": False,

        "telemetry_enabled": telemetry_enabled,

        "attempts": attempt_counter,

        "elapsed": time.perf_counter() - total_start,

        "missing": set(),

    }


# --- MAIN LOOP ---

for year in YEARS:

    logger.info("\n%s YEAR %s %s", "=" * 20, year, "=" * 20)

    year_start = time.perf_counter()

    required_datasets = determine_required_datasets(year)

    if not required_datasets:

        logger.info("  All dataset CSVs already exist for %s; skipping year", year)

        continue

    logger.info(

        "  Pending datasets: %s",

        ", ".join(sorted(required_datasets)),

    )

    try:

        schedule = call_with_rate_limit_retry(

            lambda: ff1.get_event_schedule(year, include_testing=False),

            description=f"{year} schedule",

        )

    except Exception:

        logger.exception("Failed to fetch event schedule for %s", year)

        continue

    data_store = {key: [] for key in required_datasets}

    processed_sessions = 0

    for _, ev in schedule.iterrows():

        ev_name = ev["EventName"]

        logger.info("  → %s", ev_name)

        for sess_type in SESSION_TYPES:

            if sess_type == "Sprint" and year < MESSAGES_START_YEAR:

                continue

            session_label = f"{year} {ev_name} {sess_type}"

            load_flags = {

                "laps": bool({"LAPS", "TELEMETRY", "WEATHER"} & required_datasets),

                "telemetry": "TELEMETRY" in required_datasets,

                "weather": "WEATHER" in required_datasets,

                "messages": (year >= MESSAGES_START_YEAR) and ("MESSAGES" in required_datasets),

            }

            if "RESULTS" in required_datasets and not any(load_flags.values()):

                load_flags["laps"] = True

            try:

                sess = call_with_rate_limit_retry(

                    lambda yr=year, name=ev_name, st=sess_type: ff1.get_session(yr, name, st),

                    description=f"{session_label} get_session",

                )

            except ValueError as exc:

                logger.info("    %s → session unavailable (%s); skipping", session_label, exc)

                continue

            except Exception:

                logger.exception("    %s → get_session failed", session_label)

                continue

            load_result = load_session_with_retry(

                sess,

                session_label=session_label,

                load_flags=load_flags,

                required_datasets=required_datasets,

            )

            if not load_result["success"]:

                continue

            if (

                "TELEMETRY" in required_datasets

                and not load_result["telemetry_enabled"]

            ):

                logger.warning(

                    "    %s → telemetry disabled; telemetry data will be missing",

                    session_label,

                )

            loaded_flags = sorted(getattr(sess, "_data_loaded", []))

            if loaded_flags:

                logger.info(

                    "    %s → datasets loaded: %s",

                    session_label,

                    ", ".join(loaded_flags),

                )

            else:

                logger.warning("    %s → datasets loaded: none", session_label)

            missing_datasets = load_result.get("missing", set())

            if "RESULTS" in data_store and "RESULTS" not in missing_datasets:

                results_data = safe_session_attr(sess, "results", session_label)

                append_dataset(

                    data_store,

                    "RESULTS",

                    results_data,

                    year=year,

                    event=ev_name,

                    session=sess_type,

                    label=session_label,

                )

            if "LAPS" in data_store:

                if "LAPS" in missing_datasets:

                    logger.info("    %s → skipping LAPS export (no data)", session_label)

                else:

                    laps_data = safe_session_attr(sess, "laps", session_label)

                    append_dataset(

                        data_store,

                        "LAPS",

                        laps_data,

                        year=year,

                        event=ev_name,

                        session=sess_type,

                        label=session_label,

                    )

            if "TELEMETRY" in data_store:

                if "TELEMETRY" in missing_datasets:

                    logger.info("    %s → skipping TELEMETRY export (no data)", session_label)

                else:

                    if load_result["telemetry_enabled"]:

                        car_data = safe_session_attr(sess, "car_data", session_label)

                    else:

                        logger.info("    %s → telemetry not loaded", session_label)

                        car_data = None

                    if isinstance(car_data, dict) and car_data:

                        for drv, tel in car_data.items():

                            append_dataset(

                                data_store,

                                "TELEMETRY",

                                tel,

                                year=year,

                                event=ev_name,

                                session=sess_type,

                                label=f"{session_label} [{drv}]",

                                extra={"Driver": drv},

                            )

                    else:

                        append_dataset(

                            data_store,

                            "TELEMETRY",

                            car_data,

                            year=year,

                            event=ev_name,

                            session=sess_type,

                            label=session_label,

                        )

            if "WEATHER" in data_store:

                if "WEATHER" in missing_datasets:

                    logger.info("    %s → skipping WEATHER export (no data)", session_label)

                else:

                    weather_data = safe_session_attr(sess, "weather_data", session_label)

                    append_dataset(

                        data_store,

                        "WEATHER",

                        weather_data,

                        year=year,

                        event=ev_name,

                        session=sess_type,

                        label=session_label,

                    )

            if "MESSAGES" in data_store:

                if "MESSAGES" in missing_datasets:

                    logger.info("    %s → skipping MESSAGES export (no data)", session_label)

                else:

                    session_info = safe_session_attr(sess, "session_info", session_label)

                    messages = None

                    if isinstance(session_info, dict):

                        messages = session_info.get("Messages")

                        if messages is None:

                            logger.info("    %s → Messages not available", session_label)

                    append_dataset(

                        data_store,

                        "MESSAGES",

                        messages,

                        year=year,

                        event=ev_name,

                        session=sess_type,

                        label=session_label,

                    )

            processed_sessions += 1

    for kind, frames in data_store.items():

        if kind == "MESSAGES" and year < MESSAGES_START_YEAR:

            if frames:

                logger.warning(

                    "  Unexpected messages data for %s before %s; saving regardless.",

                    year,

                    MESSAGES_START_YEAR,

                )

            else:

                logger.info("  [SKIP] MESSAGES not collected before %s", MESSAGES_START_YEAR)

                continue

        if not frames:

            logger.warning("  [NO DATA] %s for %s", kind, year)

            continue

        combined = pd.concat(frames, ignore_index=True, sort=False)

        path = SAVE_ROOT / f"ALL_{kind}_{year}.csv"

        combined.to_csv(path, index=False)

        logger.info("  [SAVED] %s rows → %s", len(combined), path.name)

    year_elapsed = time.perf_counter() - year_start

    logger.info(

        "Completed %s with %d sessions in %.1fs",

        year,

        processed_sessions,

        year_elapsed,

    )

logger.info("\n=== ALL DONE ===")




06:36:02 |     INFO | FastF1 cache enabled: C:\Users\erikv\Downloads\F1\notebooks\f1_cache @ 26895407080
06:36:02 |     INFO | 
06:36:02 |     INFO |   All dataset CSVs already exist for 2018; skipping year
06:36:02 |     INFO | 
06:36:02 |     INFO |   All dataset CSVs already exist for 2019; skipping year
06:36:02 |     INFO | 
06:36:02 |     INFO |   All dataset CSVs already exist for 2020; skipping year
06:36:02 |     INFO | 
06:36:02 |     INFO |   Pending datasets: MESSAGES
06:36:05 |     INFO |   → Bahrain Grand Prix
core           INFO 	Loading data for Bahrain Grand Prix - Race [v3.6.1]
06:36:05 |     INFO | Loading data for Bahrain Grand Prix - Race [v3.6.1]
req            INFO 	Using cached data for session_info
06:36:05 |     INFO | Using cached data for session_info
req            INFO 	Using cached data for driver_info
06:36:05 |     INFO | Using cached data for driver_info
req            INFO 	Using cached data for race_control_messages
06:36:07 |     INFO | Using cached

KeyboardInterrupt: 