In [3]:
import sys
import pandas as pd
import os
from tqdm import tqdm

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
from dotenv import dotenv_values
from sqlalchemy import create_engine
from joblib import Parallel, delayed

In [4]:
token = dotenv_values("../../.env.local")['INFLUXDB_TOKEN']
org = "my-org"
bucket = "meets"
url = "http://localhost:8086"

In [5]:
# Conexão
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)

In [6]:
def int_or_none(value):
    try:
        if value is None:
            return None
        return int(value)
    except (ValueError, TypeError):
        return None
    
def str_or_none(value):
    try:
        if value is None:
            return None
        return str(value)
    except (ValueError, TypeError):
        return None

def float_or_none(value):
    try:
        if value is None:
            return None
        return float(value)
    except (ValueError, TypeError):
        return None

In [5]:
# Meets
df_meets = pd.read_parquet("../../data/meetings/meetings.parquet")
df_meets["meeting_key"] = df_meets["meeting_key"].astype(str)
df_meets["date_start"] = pd.to_datetime(df_meets["date_start"])

for _, row in tqdm(df_meets.iterrows(), total=len(df_meets)):
    p = (
        Point("meetings")
        .tag("meeting_key", str_or_none(row["meeting_key"]))
        .tag("country_code", str_or_none(row["country_code"]))
        .tag("circuit_short_name", str_or_none(row["circuit_short_name"]))
        .tag("meeting_name", str_or_none(row["meeting_name"]))
        .tag("meeting_official_name", str_or_none(row["meeting_official_name"]))
        .tag("location", str_or_none(row["location"]))
        .tag("country_key", int_or_none(row["country_key"]))
        .tag("country_name", str_or_none(row["country_name"]))
        .field("circuit_key", int_or_none(row["circuit_key"]))
        .field("gmt_offset", str_or_none(row["gmt_offset"]))
        .field("year", int_or_none(row["year"]))
        .time(row["date_start"], WritePrecision.NS)
    )
    write_api.write(bucket=bucket, org=org, record=p)


100%|██████████| 40/40 [00:00<00:00, 139.28it/s]


In [6]:
# Weather Conditions
df_weather = pd.read_parquet("../../data/weather_conditions/weather_conditions.parquet")
df_session = pd.read_parquet("../../data/sessions/sessions.parquet")
df_weather = df_weather.drop(columns=["meeting_key"])
df_weather = df_weather.drop_duplicates(subset=["session_key", "date"])
df_weather = df_weather[df_weather["session_key"].isin(df_session["session_key"].to_list())]
df_weather["rainfall"] = df_weather["rainfall"].astype(bool)

for _, row in tqdm(df_weather.iterrows(), total=len(df_weather)):
    point = (
        Point("weather_conditions")
        .tag("session_key", str_or_none(row["session_key"]))
        .field("track_temperature", (row["track_temperature"]))
        .field("wind_speed", row["wind_speed"])
        .field("rainfall", int(row["rainfall"]))  
        .field("humidity", row["humidity"])
        .field("pressure", row["pressure"])
        .field("air_temperature", row["air_temperature"])
        .field("wind_direction", row["wind_direction"]) 
        .time(row["date"], WritePrecision.NS)
    )
    write_api.write(bucket=bucket, org=org, record=point)

100%|██████████| 8419/8419 [01:11<00:00, 118.43it/s]


In [None]:
df_weather.head()

In [7]:
# Sessions
df_sessions = pd.read_parquet("../../data/sessions/sessions.parquet")

df_sessions["date_start"] = pd.to_datetime(df_sessions["date_start"])
df_sessions["date_end"] = pd.to_datetime(df_sessions["date_end"])

for _, row in tqdm(df_sessions.iterrows(), total=len(df_sessions)):
    point = (
        Point("sessions")
        .tag("session_key", str_or_none(row["session_key"]))
        .tag("meeting_key", str_or_none(row["meeting_key"]))
        .tag("location", str_or_none(row["location"]))
        .tag("circuit_short_name", str_or_none(row["circuit_short_name"]))
        .tag("session_type", str_or_none(row["session_type"]))  
        .tag("session_name", str_or_none(row["session_name"]))
        .field("year", row["year"])
        .time(row["date_start"], WritePrecision.NS)
    )
    write_api.write(bucket=bucket, org=org, record=point)

  0%|          | 0/98 [00:00<?, ?it/s]

100%|██████████| 98/98 [00:00<00:00, 121.31it/s]


In [8]:
# Drivers
df_drivers = pd.read_parquet("../../data/drivers/drivers.parquet")
df_drivers = df_drivers.drop(columns=["meeting_key"])

for _, row in tqdm(df_drivers.iterrows(), total=len(df_drivers)):
    point = (
        Point("drivers")
        .tag("session_key", str_or_none(row["session_key"]))
        .field("driver_number", str_or_none(row["driver_number"]))
        .tag("broadcast_name", str_or_none(row["broadcast_name"]))
        .tag("full_name", str_or_none(row["full_name"]))
        .tag("name_acronym", str_or_none(row["name_acronym"]))
        .tag("team_name", str_or_none(row["team_name"]))
        .tag("team_colour", str_or_none(row["team_colour"]))
        .tag("first_name", str_or_none(row["first_name"]))
        .tag("last_name", str_or_none(row["last_name"]))
        .tag("headshot_url", str_or_none(row["headshot_url"]))
        .tag("country_code", str_or_none(row["country_code"]))
        .time(pd.Timestamp.utcnow(), WritePrecision.NS)
    )
    write_api.write(bucket=bucket, org=org, record=point)

  0%|          | 0/5199 [00:00<?, ?it/s]

100%|██████████| 5199/5199 [00:44<00:00, 117.22it/s]


In [11]:
# tyre_stints
df_tyre_strints = pd.read_parquet("../../data/stints/stints.parquet")
df_tyre_strints = df_tyre_strints.drop(columns=["meeting_key"])
df_tyre_strints = df_tyre_strints.drop_duplicates(subset=["session_key", "stint_number", "driver_number"])

df_tyre_strints["lap_start"] = pd.to_datetime(df_tyre_strints["lap_start"])

for _, row in tqdm(df_tyre_strints.iterrows(), total=len(df_tyre_strints)):
    try:
        point = (
            Point("tyre_stints")
            .tag("driver_number", str_or_none(row["driver_number"]))
            .tag("session_key", str_or_none(row["session_key"]))
            .tag("compound", str_or_none(row["compound"]))
            .field("stint_number", int(row["stint_number"]))
            .field("tyre_age_at_start", float(row["tyre_age_at_start"]) if not pd.isna(row["tyre_age_at_start"]) else None)
            .time(df_tyre_strints["lap_start"], WritePrecision.NS)
        )
        write_api.write(bucket=bucket, org=org, record=point)
    except Exception as e:
        pass

  0%|          | 0/7686 [00:00<?, ?it/s]

100%|██████████| 7686/7686 [00:00<00:00, 11153.67it/s]


In [14]:
df_laps = pd.read_parquet("../../data/laps/laps.parquet")
df_laps = df_laps.drop(columns=["meeting_key"])
df_laps = df_laps.drop(columns=["segments_sector_1", "segments_sector_2", "segments_sector_3"])

df_laps["date_start"] = pd.to_datetime(df_laps["date_start"], format='ISO8601', utc=True)

for _, row in tqdm(df_laps.iterrows(), total=len(df_laps)):
    point = (
        Point("laps")
        .tag("session_key", str_or_none(row["session_key"]))
        .tag("driver_number", str_or_none(row["driver_number"]))
        .tag("is_pit_out_lap", str_or_none(row["is_pit_out_lap"]))
        .field("i1_speed", float_or_none(row["i1_speed"]))
        .field("i2_speed", float_or_none(row["i2_speed"]))
        .field("st_speed", float_or_none(row["st_speed"]))
        .field("lap_duration", float_or_none(row["lap_duration"]))
        .field("duration_sector_1", float_or_none(row["duration_sector_1"]))
        .field("duration_sector_2", float_or_none(row["duration_sector_2"]))
        .field("duration_sector_3", float_or_none(row["duration_sector_3"]))
        .field("lap_number", int_or_none(row["lap_number"]))
        .time(row["date_start"], WritePrecision.NS)
    )

100%|██████████| 29572/29572 [00:02<00:00, 11925.84it/s]


In [16]:
df_pits = pd.read_parquet("../../data/pits/pits.parquet")
df_pits = df_pits.drop(columns=["meeting_key"])

df_pits["date"] = pd.to_datetime(df_pits["date"], format='ISO8601', utc=True)

for _, row in tqdm(df_pits.iterrows(), total=len(df_pits)):
    point = (
        Point("pits")
        .tag("session_key", str_or_none(row["session_key"]))
        .tag("driver_number", str_or_none(row["driver_number"]))
        .tag("lap_number", str_or_none(row["lap_number"]))
        .field("pit_duration", float_or_none(row["pit_duration"]))
        .time(row["date"], WritePrecision.NS)
    )
    write_api.write(bucket=bucket, org=org, record=point)

  0%|          | 0/4455 [00:00<?, ?it/s]

100%|██████████| 4455/4455 [01:14<00:00, 60.12it/s]


In [18]:
df_positions = pd.read_parquet("../../data/positions/positions.parquet")
df_positions = df_positions.drop(columns=["meeting_key"])
df_positions = df_positions.drop_duplicates(subset=["session_key", "driver_number", "date"])

df_positions["date"] = pd.to_datetime(df_positions["date"], format='ISO8601', utc=True)

for _, row in tqdm(df_positions.iterrows(), total=len(df_positions)):
    point = (
        Point("positions")
        .tag("session_key", str_or_none(row["session_key"]))
        .tag("driver_number", str_or_none(row["driver_number"]))
        .field("position", int(row["position"]))
        .time(row["date"], WritePrecision.NS)
    )
    write_api.write(bucket=bucket, org=org, record=point)

100%|██████████| 68407/68407 [09:57<00:00, 114.51it/s]


In [9]:
def process_telemetry(file_path):
    try:
        client = InfluxDBClient(url=url, token=token, org=org)
        write_api = client.write_api(write_options=SYNCHRONOUS)

        df_telemetry = pd.read_parquet(file_path)
        print(f"Processing file: {file_path} {df_telemetry.shape}")

        if "meeting_key" in df_telemetry.columns:
            df_telemetry = df_telemetry.drop(columns=["meeting_key"])
        
        df_telemetry = df_telemetry.drop_duplicates(subset=["session_key", "driver_number", "date"])
        
        df_telemetry["date"] = pd.to_datetime(df_telemetry["date"], format='ISO8601', utc=True)

        for _, row in df_telemetry.iterrows():
            point = (
                Point("telemetry")
                .tag("driver_number", str_or_none(row["driver_number"]))
                .tag("session_key", str_or_none(row["session_key"]))
                .field("rpm", int(row["rpm"]))
                .field("speed", int(row["speed"]))
                .field("n_gear", int(row["n_gear"]))
                .field("throttle", int(row["throttle"]))
                .field("brake", int(row["brake"]))
                .field("drs", int(row["drs"]))
                .time(row["date"], WritePrecision.NS)
            )
            write_api.write(bucket=bucket, org=org, record=point)
    except:
        print(f"Error processing file {file_path}")

In [None]:
path_telemetrys = "../../data/telemetrys"
if not os.path.isdir(path_telemetrys):
    raise FileNotFoundError(f"Directory does not exist: {path_telemetrys}")

files = [f for f in os.listdir(path_telemetrys) if f.endswith(".parquet")]

Parallel(n_jobs=-1)(
    delayed(process_telemetry)(os.path.join(path_telemetrys, file))
    for file in files
)

Processing file: ../../data/telemetrys/session_key=9498&driver_number=20.parquet (16965, 10)
Processing file: ../../data/telemetrys/session_key=10007&driver_number=38.parquet (17997, 10)Processing file: ../../data/telemetrys/session_key=9532&driver_number=63.parquet (18021, 10)

Processing file: ../../data/telemetrys/session_key=9687&driver_number=44.parquet (17276, 10)
Processing file: ../../data/telemetrys/session_key=9602&driver_number=14.parquet (20600, 10)
Processing file: ../../data/telemetrys/session_key=9994&driver_number=87.parquet (17013, 10)
Processing file: ../../data/telemetrys/session_key=9278&driver_number=14.parquet (15376, 10)
Processing file: ../../data/telemetrys/session_key=9989&driver_number=6.parquet (13437, 10)
Processing file: ../../data/telemetrys/session_key=9620&driver_number=44.parquet (18578, 10)
Processing file: ../../data/telemetrys/session_key=9534&driver_number=10.parquet (17763, 10)
Processing file: ../../data/telemetrys/session_key=9554&driver_number=

In [None]:
client.close()
