In [2]:
import fastf1
import pandas as pd
import os
import time
import logging
from datetime import datetime

In [3]:
CACHE_DIR = 'fastf1_cache'
SAVE_DIR = 'f1_data_csvs'
START_YEAR = 2018          # F1 timing/telemetry data generally available from 2018
END_YEAR = datetime.now().year
SESSIONS_TO_GET = ['FP1', 'FP2', 'FP3', 'Q', 'S', 'SQ', 'R'] # Common sessions (Sprint='S', Sprint Quali='SQ')
DELAY_SECONDS = 1         # Delay between processing sessions to be polite to API

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

try:
    fastf1.Cache.enable_cache(CACHE_DIR)
    logging.info(f"FastF1 cache enabled at: {CACHE_DIR}")
except Exception as e:
    logging.error(f"Error enabling FastF1 cache: {e}")


os.makedirs(SAVE_DIR, exist_ok=True)
logging.info(f"CSV data will be saved in: {SAVE_DIR}")

2025-05-04 20:21:56,007 - INFO - FastF1 cache enabled at: fastf1_cache
2025-05-04 20:21:56,008 - INFO - CSV data will be saved in: f1_data_csvs


In [4]:
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)

#The background is set with 40 plus the number of the color, and the foreground with 30

#These are the sequences need to get colored ouput
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
BOLD_SEQ = "\033[1m"

def formatter_message(message, use_color = True):
    if use_color:
        message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ)
    else:
        message = message.replace("$RESET", "").replace("$BOLD", "")
    return message

COLORS = {
    'WARNING': YELLOW,
    'INFO': WHITE,
    'DEBUG': BLUE,
    'CRITICAL': YELLOW,
    'ERROR': RED
}

class ColoredFormatter(logging.Formatter):
    def __init__(self, msg, use_color = True):
        logging.Formatter.__init__(self, msg)
        self.use_color = use_color

    def format(self, record):
        levelname = record.levelname
        if self.use_color and levelname in COLORS:
            levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) + levelname + RESET_SEQ
            record.levelname = levelname_color
        return logging.Formatter.format(self, record)

In [5]:
# Custom logger class with multiple destinations
class ColoredLogger(logging.Logger):
    FORMAT = "[$BOLD%(name)-20s$RESET][%(levelname)-18s]  %(message)s ($BOLD%(filename)s$RESET:%(lineno)d)"
    COLOR_FORMAT = formatter_message(FORMAT, True)
    def __init__(self, name):
        logging.Logger.__init__(self, name, logging.DEBUG)

        color_formatter = ColoredFormatter(self.COLOR_FORMAT)

        console = logging.StreamHandler()
        console.setFormatter(color_formatter)

        self.addHandler(console)
        return


logging.setLoggerClass(ColoredLogger)

In [13]:

# --- Main Loop ---
for year in range(START_YEAR, END_YEAR + 1):
    logging.debug(f"--- Processing Year: {year} ---", )
    try:
        schedule = fastf1.get_event_schedule(year)
        # Convert EventDate to datetime objects to filter past events if needed
        # schedule['EventDate'] = pd.to_datetime(schedule['EventDate']).dt.date
        # schedule = schedule[schedule['EventDate'] < datetime.now().date()] # Optional: Only process past events

    except Exception as e:
        logging.error(f"Could not get event schedule for {year}: {e}")
        continue

    for index, event in schedule.iterrows():
        event_name = event['EventName']
        event_round = event['RoundNumber']
        logging.info(f"Processing Event: {year} - Round {event_round} - {event_name}")

        for session_name in SESSIONS_TO_GET:
            logging.debug(f"Attempting Session: {session_name}")
            session_identifier = f"{year}_{event_round:02d}_{event_name}_{session_name}" # Unique ID for logging/paths
            session_save_path = os.path.join(SAVE_DIR, str(year), f"{event_round:02d}_{event_name}", session_name)

            # # Check if all expected CSVs exist for this session, skip if so
            # expected_files = ['laps.csv', 'results.csv', 'weather.csv', 'messages.csv']
            # csvs_exist = all(os.path.exists(os.path.join(session_save_path, fname)) for fname in expected_files)

            # # Check for at least one telemetry file (since driver list may change)
            # telemetry_files_exist = any(
            #     fname.startswith('telemetry_') and fname.endswith('.csv')
            #     for fname in os.listdir(session_save_path) if os.path.isdir(session_save_path)
            # ) if os.path.isdir(session_save_path) else False

            # if csvs_exist and telemetry_files_exist:
            #     logging.info(f"All CSVs already exist for {session_identifier}, skipping session.")
            #     continue
            try:
                session = fastf1.get_session(year, event_name, session_name)

                session.load(laps=True, weather=True, messages=True, telemetry=True)
                logging.info(f"Loaded basic data for {session_identifier}")

                os.makedirs(session_save_path, exist_ok=True)

                if hasattr(session, 'laps') and not session.laps.empty:
                    session.laps.to_csv(os.path.join(session_save_path, 'laps.csv'), index=False)
                    logging.debug(f"Saved laps for {session_identifier}")

                    logging.info(f"Loading telemetry for {session_identifier}...")
                    telemetry_saved = False
                    for drv_id in session.drivers: # Iterate through driver numbers
                        try:
                            drv_laps = session.laps.pick_drivers(drv_id)
                            if not drv_laps.empty:
                                drv_abbr = drv_laps['Driver'].iloc[0]
                                drv_tel = drv_laps.get_telemetry()

                                if not drv_tel.empty:
                                    drv_tel = drv_tel.merge(drv_laps[['LapNumber', 'SessionTime']], on='SessionTime', how='left')

                                    drv_tel.to_csv(os.path.join(session_save_path, f'telemetry_{drv_abbr}.csv'), index=False)
                                    logging.debug(f"Saved telemetry for driver {drv_abbr} in {session_identifier}")
                                    telemetry_saved = True
                                else:
                                     logging.debug(f"No telemetry data returned for driver {drv_abbr} in {session_identifier}")

                        except Exception as tel_ex:
                            logging.warning(f"Could not get/save telemetry for driver {drv_id} in {session_identifier}: {tel_ex}")
                    if telemetry_saved:
                         logging.info(f"Finished processing telemetry for {session_identifier}")
                    else:
                         logging.info(f"No telemetry saved for any driver in {session_identifier}")


                if hasattr(session, 'results') and not session.results.empty:
                    session.results.to_csv(os.path.join(session_save_path, 'results.csv'), index=False)
                    logging.debug(f"Saved results for {session_identifier}")

                if hasattr(session, 'weather_data') and not session.weather_data.empty:
                    session.weather_data.to_csv(os.path.join(session_save_path, 'weather.csv'), index=False)
                    logging.debug(f"Saved weather for {session_identifier}")

                if hasattr(session, 'messages') and not session.messages.empty:
                    session.messages.to_csv(os.path.join(session_save_path, 'messages.csv'), index=False)
                    logging.debug(f"Saved messages for {session_identifier}")

                logging.info(f"Successfully processed and saved data for {session_identifier}")

            # except fastf1.core.SessionNotAvailableError:
            #      logging.warning(f"Session {session_name} not available or does not exist for {year} {event_name}. Skipping.")
            except fastf1.core.DataNotLoadedError as e:
                logging.error(f"Data not loaded for {session_identifier}. Might be too recent or unavailable. Error: {e}")
            except ConnectionError as e:
                 logging.error(f"Connection error during {session_identifier}: {e}. Check network.")
                 time.sleep(10) # Longer sleep on connection error
            except Exception as e:
                # Catch other potential errors (API issues, unexpected data format, etc.)
                logging.error(f"An unexpected error occurred for {session_identifier}: {e.__class__.__name__} - {e}")

            finally:
                # Add a delay after processing each session regardless of success/failure
                logging.debug(f"Waiting {DELAY_SECONDS} seconds before next session...")
                time.sleep(DELAY_SECONDS)

    logging.info(f"--- Finished Processing Year: {year} ---")

logging.info("--- All Years Processed ---")

2025-05-05 15:58:08,226 - DEBUG - Traceback for failure in FastF1 schedule
Traceback (most recent call last):
  File "/home/levi/f1-dash/.venv/lib/python3.13/site-packages/fastf1/logger.py", line 151, in __wrapped
    return func(*args, **kwargs)
  File "/home/levi/f1-dash/.venv/lib/python3.13/site-packages/fastf1/events.py", line 584, in _get_schedule_ff1
    response = Cache.requests_get(
        _SCHEDULE_BASE_URL + f"schedule_{year}.json",
        headers=_HEADERS
    )
  File "/home/levi/f1-dash/.venv/lib/python3.13/site-packages/fastf1/req.py", line 303, in requests_get
    return cls._cached_request('GET', url, **kwargs)
           ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
  File "/home/levi/f1-dash/.venv/lib/python3.13/site-packages/fastf1/req.py", line 347, in _cached_request
    response = func(url, **kwargs)
  File "/home/levi/f1-dash/.venv/lib/python3.13/site-packages/requests_cache/session.py", line 127, in get
    return self.request('GET', url, params=params, **kwargs)
 