In [104]:
import fastf1
from fastf1.core import Laps
import logging
import numpy as np
import uuid
import warnings
import altair as alt
import pandas as pd
import fastf1.plotting as plotting
import json
import scipy
from matplotlib import pyplot as plt

In [4]:
fastf1.Cache.enable_cache("../../cache")
logging.basicConfig(level=logging.WARNING)  # 设置日志级别为WARNING
logger = logging.getLogger('fastf1')
logger.setLevel(logging.WARNING)
plotting.setup_mpl()

### Utils

##### 1. print & export

In [5]:
def print_the_best_driver(session, fastestlap, best_driver):
    print(
        session.weekend.name
        + " fastest lap is "
        + best_driver
        + " with time "
        + str(fastestlap)
    )


def export_to_file(content: str, target: str):
    with open(target, mode='w') as f:
        f.write(content)
        f.close()

##### 2. partition sequence

In [6]:
def partition_lap_telemetry_max(lap):
    df = pd.DataFrame(lap.telemetry)
    df = df[df["Source"] == "car"].reset_index(drop=True)
    info_dict = df.to_dict(orient='records')

    # partition sequence
    threshold = 3
    increasing_intervals = []
    decreasing_intervals = []
    maxima_points = []
    minima_points = []
    maxima_dict = dict()
    minima_dict = dict()

    current_interval = [info_dict[0]]
    flag = 1 if (info_dict[1]['Speed'] - info_dict[0]['Speed']) > 0 else 0

    for i in range(1, len(info_dict)):
        diff = info_dict[i]['Speed'] - info_dict[i - 1]['Speed']
        if abs(diff) > threshold:
            if (flag == 1 and diff >= 0) or (flag == 0 and diff < 0):
                current_interval.append(info_dict[i])
            elif flag == 1:
                increasing_intervals.append(current_interval)
                flag = 0
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
            elif flag == 0:
                decreasing_intervals.append(current_interval)
                flag = 1
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
        else:
            current_interval.append(info_dict[i])

    index = 1
    for interval in increasing_intervals:
        # 反向遍历数组, 如果上一个元素速度更快就更换maxima point
        idx = -1
        temp_maxima = interval[idx]
        while True:
            idx -= 1
            if interval[idx]['Speed'] > temp_maxima['Speed']:
                temp_maxima = interval[idx]
            else:
                break
        maxima_points.append(temp_maxima)
        maxima_dict[index] = temp_maxima
        index += 1
    index = 1
    for interval in decreasing_intervals:
        minima_points.append(interval[-1])
        minima_dict[index] = interval[-1]
        index += 1

    return maxima_dict

def partition_lap_telemetry_max_json(lap):
    df = pd.DataFrame(lap.telemetry)
    df = df[df["Source"] == "car"].reset_index(drop=True)
    info_dict = df.to_dict(orient='records')

    # partition sequence
    threshold = 3
    increasing_intervals = []
    decreasing_intervals = []
    maxima_points = []
    minima_points = []
    maxima_dict = dict()
    minima_dict = dict()

    current_interval = [info_dict[0]]
    flag = 1 if (info_dict[1]['Speed'] - info_dict[0]['Speed']) > 0 else 0

    for i in range(1, len(info_dict)):
        diff = info_dict[i]['Speed'] - info_dict[i - 1]['Speed']
        if abs(diff) > threshold:
            if (flag == 1 and diff >= 0) or (flag == 0 and diff < 0):
                current_interval.append(info_dict[i])
            elif flag == 1:
                increasing_intervals.append(current_interval)
                flag = 0
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
            elif flag == 0:
                decreasing_intervals.append(current_interval)
                flag = 1
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
        else:
            current_interval.append(info_dict[i])

    index = 1
    for interval in increasing_intervals:
        # 反向遍历数组, 如果上一个元素速度更快就更换maxima point
        idx = -1
        temp_maxima = interval[idx]
        while True:
            idx -= 1
            if interval[idx]['Speed'] > temp_maxima['Speed']:
                temp_maxima = interval[idx]
            else:
                break
        maxima_points.append(temp_maxima)
        maxima_dict[index] = temp_maxima
        index += 1
    index = 1
    for interval in decreasing_intervals:
        minima_points.append(interval[-1])
        minima_dict[index] = interval[-1]
        index += 1

    return json.dumps(maxima_dict, default=str)

def partition_lap_telemetry_min(lap):
    df = pd.DataFrame(lap.telemetry)
    df = df[df["Source"] == "car"].reset_index(drop=True)
    info_dict = df.to_dict(orient='records')

    # partition sequence
    threshold = 3
    increasing_intervals = []
    decreasing_intervals = []
    maxima_points = []
    minima_points = []
    maxima_dict = dict()
    minima_dict = dict()

    current_interval = [info_dict[0]]
    flag = 1 if (info_dict[1]['Speed'] - info_dict[0]['Speed']) > 0 else 0

    for i in range(1, len(info_dict)):
        diff = info_dict[i]['Speed'] - info_dict[i - 1]['Speed']
        if abs(diff) > threshold:
            if (flag == 1 and diff >= 0) or (flag == 0 and diff < 0):
                current_interval.append(info_dict[i])
            elif flag == 1:
                increasing_intervals.append(current_interval)
                flag = 0
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
            elif flag == 0:
                decreasing_intervals.append(current_interval)
                flag = 1
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
        else:
            current_interval.append(info_dict[i])

    index = 1
    for interval in increasing_intervals:
        maxima_points.append(interval[-1])
        maxima_dict[index] = interval[-1]
        index += 1
    index = 1
    for interval in decreasing_intervals:
        minima_points.append(interval[-1])
        minima_dict[index] = interval[-1]
        index += 1

    return minima_dict

def partition_lap_telemetry_min_json(lap):
    df = pd.DataFrame(lap.telemetry)
    df = df[df["Source"] == "car"].reset_index(drop=True)
    info_dict = df.to_dict(orient='records')

    # partition sequence
    threshold = 3
    increasing_intervals = []
    decreasing_intervals = []
    maxima_points = []
    minima_points = []
    maxima_dict = dict()
    minima_dict = dict()

    current_interval = [info_dict[0]]
    flag = 1 if (info_dict[1]['Speed'] - info_dict[0]['Speed']) > 0 else 0

    for i in range(1, len(info_dict)):
        diff = info_dict[i]['Speed'] - info_dict[i - 1]['Speed']
        if abs(diff) > threshold:
            if (flag == 1 and diff >= 0) or (flag == 0 and diff < 0):
                current_interval.append(info_dict[i])
            elif flag == 1:
                increasing_intervals.append(current_interval)
                flag = 0
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
            elif flag == 0:
                decreasing_intervals.append(current_interval)
                flag = 1
                current_interval = [info_dict[i - 1]]
                current_interval.append(info_dict[i])
        else:
            current_interval.append(info_dict[i])

    index = 1
    for interval in increasing_intervals:
        maxima_points.append(interval[-1])
        maxima_dict[index] = interval[-1]
        index += 1
    index = 1
    for interval in decreasing_intervals:
        minima_points.append(interval[-1])
        minima_dict[index] = interval[-1]
        index += 1

    return json.dumps(minima_dict, default=str)

In [7]:
# test partition
def partition_array(sequence, threshold):
    increasing_intervals = []
    decreasing_intervals = []
    current_interval = [sequence[0]]
    flag = 1 if (sequence[1] - sequence[0]) > 0 else 0

    for i in range(1, len(sequence)):
        diff = sequence[i] - sequence[i - 1]

        if abs(diff) > threshold:
            if (flag == 1 and diff >= 0) or (flag == 0 and diff < 0):
                current_interval.append(sequence[i])
            elif flag == 1:
                increasing_intervals.append(current_interval)
                flag = 0
                current_interval = [sequence[i - 1]]
                current_interval.append(sequence[i])
            elif flag == 0:
                decreasing_intervals.append(current_interval)
                flag = 1
                current_interval = [sequence[i - 1]]
                current_interval.append(sequence[i])
        else:
            current_interval.append(sequence[i])

    if current_interval:
        if sequence[-1] - sequence[-2] > 0:
            increasing_intervals.append(current_interval)
        else:
            decreasing_intervals.append(current_interval)

    return increasing_intervals, decreasing_intervals

data = np.array([1, 6, 13, 23, 21, 41, 51, 6, 53, 54, 67, 52, 20, 3, 2, 8, 27, 32, 51, 41, 33, 20, 1])
maxima = scipy.signal.argrelextrema(data, np.greater)
minima = scipy.signal.argrelextrema(data, np.less)
print(maxima)
print(minima)

threshold = 3  # 阈值
increasing_intervals, decreasing_intervals = partition_array(data, threshold)
print(increasing_intervals)
print(decreasing_intervals)

(array([ 3,  6, 10, 18]),)
(array([ 4,  7, 14]),)
[[1, 6, 13, 23, 21, 41, 51], [6, 53, 54, 67], [2, 8, 27, 32, 51]]
[[51, 6], [67, 52, 20, 3, 2], [51, 41, 33, 20, 1]]


##### 3. object infos

In [25]:
def fastest_laps_of_drivers(laps):
    drivers = pd.unique(laps["Driver"])
    info_arr = []
    info_dict = dict()
    for drv in drivers:
        fastest = laps.pick_driver(drv).pick_fastest()
        info_arr.append(fastest.to_dict())
        info_dict[fastest['DriverNumber']] = fastest.to_dict()
    return info_arr


def fastest_lap(laps):
    drivers = pd.unique(laps["Driver"])
    counter = 1
    for drv in drivers:
        if counter == 1:
            fastestlap = laps.pick_driver(drv).pick_fastest()
            best_driver = laps.pick_driver(drv).pick_fastest()["Driver"]
        else:
            try:
                if laps.pick_driver(drv).pick_fastest()["LapTime"] <= fastestlap["LapTime"]:
                    fastestlap = laps.pick_driver(drv).pick_fastest()
                    best_driver = laps.pick_driver(drv).pick_fastest()["Driver"]
                else:
                    pass
            except:
                pass
        counter = counter + 1
    return fastestlap


def load_lap_telemetry(lap):
    df = pd.DataFrame(lap.telemetry)
    df = df[df["Source"] == "car"].reset_index(drop=True)
    df = df.drop('DistanceToDriverAhead', axis=1)
    # print(df.columns)
    # print(df.size)
    # print(df)
    info_dict = df.to_dict(orient='records')
    return info_dict

def load_lap_telemetry_and_add_driver(lap, driver):
    df = pd.DataFrame(lap.telemetry)
    df = df[df["Source"] == "car"].reset_index(drop=True)
    df = df.drop('DistanceToDriverAhead', axis=1)
    df['Driver'] = driver
    # print(df.columns)
    info_dict = df.to_dict(orient='records')
    return info_dict

##### 4. json infos

In [9]:
def fastest_laps_of_drivers_json(laps):
    drivers = pd.unique(laps["Driver"])
    info_arr = []
    info_dict = dict()
    for drv in drivers:
        fastest = laps.pick_driver(drv).pick_fastest()
        info_arr.append(fastest.to_dict())
        info_dict[fastest['DriverNumber']] = fastest.to_dict()
    return json.dumps(info_dict, default=str)


def fastest_lap_json(laps):
    drivers = pd.unique(laps["Driver"])
    counter = 1
    for drv in drivers:
        if counter == 1:
            fastestlap = laps.pick_driver(drv).pick_fastest()
            best_driver = laps.pick_driver(drv).pick_fastest()["Driver"]
        else:
            try:
                if laps.pick_driver(drv).pick_fastest()["LapTime"] <= fastestlap["LapTime"]:
                    fastestlap = laps.pick_driver(drv).pick_fastest()
                    best_driver = laps.pick_driver(drv).pick_fastest()["Driver"]
                else:
                    pass
            except:
                pass
        counter = counter + 1
    return json.dumps(fastestlap.to_dict(), default=str)


def load_lap_telemetry_json(lap):
    df = pd.DataFrame(lap.telemetry)
    df = df[df["Source"] == "car"].reset_index(drop=True)
    df = df[
        [
            "Date",
            "RPM",
            "Speed",
            "nGear",
            "Throttle",
            "Brake",
            "DRS",
            "Distance",
            "X",
            "Y",
            "Z",
        ]
    ]
    # print(df.columns)
    # print(df.size)
    # print(df)
    info_str = df.to_json(orient='records')
    return info_str

##### 5. track info json

In [10]:
def get_track_sample(laps):
    lap = fastest_lap(laps)
    return load_lap_telemetry_json(lap)

In [11]:
def change_initial_position(df):
    df["Index"] = df.index
    first_corner = df[df["str_cor"] == 1].groupby(["str_cor"])["Index"].min().sum()
    df["Index"] = np.where(
        df["Index"] < first_corner, df["Index"] + df["Index"].max(), df["Index"]
    )
    df = df.sort_values(by=["Index"]).reset_index(drop=True)
    del df["Index"]
    return df


def pick_fastest_driver(laps):
    drivers = pd.unique(laps["Driver"])
    counter = 1
    for drv in drivers:
        if counter == 1:
            fastestlap = laps.pick_driver(drv).pick_fastest()["LapTime"]
            best_driver = laps.pick_driver(drv).pick_fastest()["Driver"]
        else:
            try:
                if laps.pick_driver(drv).pick_fastest()["LapTime"] <= fastestlap:
                    fastestlap = laps.pick_driver(drv).pick_fastest()["LapTime"]
                    best_driver = laps.pick_driver(drv).pick_fastest()["Driver"]
                else:
                    pass
            except:
                pass
        counter = counter + 1
    compound = laps.pick_driver(best_driver).pick_fastest()["Compound"]
    return fastestlap, best_driver, compound


def flags_strcor_based_on_normal_speed(df):
    df["normalizedspeed"] = df["Speed"].rolling(window=4, min_periods=1).mean()
    df["str_cor"] = np.where(
        (df["normalizedspeed"].shift(1) - df["normalizedspeed"]) > 3, 1, 0
    )
    df["str_cor"] = np.where(
        (df["str_cor"] == 1)
        & (df["str_cor"].shift(1) != 1)
        & (df["str_cor"].shift(-1) != 1),
        0,
        (df["str_cor"]),
    )
    return df


def order_data(df):
    # Create Order of data
    df["No"] = df.index
    return df


def counter_strcor(df):
    df["counter_straights"] = np.nan
    df["counter_corners"] = np.nan
    for index, rows in df.iterrows():
        if df.loc[index, "No"] == 0:
            if df.loc[index, "str_cor"] == 0:
                df.at[index, "counter_straights"] = 1
                df.at[index, "counter_corners"] = 0
            elif df.loc[index, "str_cor"] == 1:
                df.at[index, "counter_straights"] = 0
                df.at[index, "counter_corners"] = 1
        elif df.loc[index, "str_cor"] == previous_str_cor:
            df.at[index, "counter_straights"] = previous_counter_straights
            df.at[index, "counter_corners"] = previous_counter_corners
        elif df.loc[index, "str_cor"] == 1:
            df.at[index, "counter_straights"] = previous_counter_straights
            df.at[index, "counter_corners"] = previous_counter_corners + 1
        elif df.loc[index, "str_cor"] == 0:
            df.at[index, "counter_straights"] = previous_counter_straights + 1
            df.at[index, "counter_corners"] = previous_counter_corners
        previous_str_cor = df.loc[index, "str_cor"]
        previous_counter_straights = df.loc[index, "counter_straights"]
        previous_counter_corners = df.loc[index, "counter_corners"]
    return df


def corner_counter_speeds(df):
    corner_analysis = (
        df.groupby("counter_corners")
        .min("Speed")
        .reset_index()[["counter_corners", "Speed"]]
    )

    corner_analysis["grp_corner"] = np.where(
        corner_analysis["Speed"] <= 50,
        "50",
        np.where(
            corner_analysis["Speed"] <= 100,
            "100",
            np.where(
                corner_analysis["Speed"] <= 150,
                "150",
                np.where(
                    corner_analysis["Speed"] <= 200,
                    "200",
                    np.where(
                        corner_analysis["Speed"] <= 250,
                        "250",
                        np.where(
                            corner_analysis["Speed"] <= 300,
                            "300",
                            np.where(corner_analysis["Speed"] <= 350, 350, "error"),
                        ),
                    ),
                ),
            ),
        ),
    )

    corner_analysis = pd.pivot_table(
        corner_analysis, columns="grp_corner", values="counter_corners", aggfunc="count"
    ).reset_index()
    corner_analysis = corner_analysis[[x for x in corner_analysis.columns if "0" in x]]

    corner_analysis_columns = ["cor_" + x for x in corner_analysis.columns if "0" in x]
    corner_analysis.columns = corner_analysis_columns
    return corner_analysis


def straight_counter_speeds(df):
    temp_df = (
        df.groupby("counter_straights")
        .max("Speed")
        .reset_index()[["counter_straights", "Speed"]]
    )

    temp_df["grp_straights"] = np.where(
        temp_df["Speed"] <= 50,
        "50",
        np.where(
            temp_df["Speed"] <= 100,
            "100",
            np.where(
                temp_df["Speed"] <= 150,
                "150",
                np.where(
                    temp_df["Speed"] <= 200,
                    "200",
                    np.where(
                        temp_df["Speed"] <= 250,
                        "250",
                        np.where(
                            temp_df["Speed"] <= 300,
                            "300",
                            np.where(temp_df["Speed"] <= 350, 350, "error"),
                        ),
                    ),
                ),
            ),
        ),
    )

    temp_df = pd.pivot_table(
        temp_df, columns="grp_straights", values="counter_straights", aggfunc="count"
    ).reset_index()
    temp_df = temp_df[[x for x in temp_df.columns if "0" in x]]

    temp_df_columns = ["str_" + x for x in temp_df.columns if "0" in x]
    temp_df.columns = temp_df_columns
    return temp_df


def dataframe_creation(laps, best_driver):
    df = load_best_lap(laps, best_driver)
    df = flags_strcor_based_on_normal_speed(df)
    df = change_initial_position(df)
    df = order_data(df)
    df = counter_strcor(df)
    return df


def retreive_kpis(session, fastestlap, df):
    temporary_df = pd.DataFrame()
    df["_Full_Throttle"] = np.where(df["Throttle"] >= 95, 1, 0)
    distance = df.Distance.max()
    avg_speed = distance / 1000 / float(fastestlap / np.timedelta64(1, "h"))
    max_speed = df.Speed.max()
    perc_full_throttle = df["_Full_Throttle"].sum() / df["_Full_Throttle"].count()
    df_corners = corner_counter_speeds(df)
    df_straights = straight_counter_speeds(df)
    temporary_df = pd.concat([temporary_df, df_corners, df_straights], axis=1)
    temporary_df["perc_full_throttle"] = perc_full_throttle
    temporary_df["distance"] = distance
    temporary_df["max_speed"] = max_speed
    temporary_df["avg_speed"] = avg_speed
    temporary_df["grandprix"] = session.weekend.name
    return temporary_df

### Devs

In [None]:
# testcase predefine
qs = fastf1.get_session(2021, 1, "Q")
laps = qs.load_laps(with_telemetry=True)
drivers = qs.drivers
r_qs = fastf1.get_session(2021, 1, "R")
r_qs.load()
r_laps = r_qs.laps
r_drivers = r_qs.drivers
print(r_drivers)

In [None]:
# grand prix standing changes
target = '/Users/blank/repo/Pitlane/frontend/src/data/sample_standing.json'
df_total = pd.DataFrame()
for drv in r_drivers:
    drv_laps = r_laps.pick_driver(drv)
    df = pd.DataFrame(drv_laps)
    print(drv_laps.keys())
    # print(drv_laps['Position'])
    df_total = pd.concat([df, df_total], axis=0)
df_total['LapFinishTime'] = df_total.apply(lambda row: row['LapTime'] + row['LapStartTime'], axis=1)
df_total['Position'] = df_total.groupby('LapNumber')['LapFinishTime'].rank(ascending=False)
export_to_file(df_total.to_json(orient='records'), target)


In [None]:
# track information
target = '/Users/blank/repo/Pitlane/frontend/src/data/sample_track.json'
content = get_track_sample(laps)
print(content)
export_to_file(content, target)

In [177]:
# maxima table data
target = '/Users/blank/repo/Pitlane/frontend/src/data/sample_maxima.json'
drivers = pd.unique(laps["Driver"])
dict_with_laps_partition = dict()
maxima_number = 0
for drv in drivers:
    fastest = laps.pick_driver(drv).pick_fastest()
    temp_dict = partition_lap_telemetry_max(fastest)
    maxima_number = len(temp_dict)
    dict_with_laps_partition[drv] = temp_dict

class MaximaInfo:
    def __init__(self, Driver, Speed, RPM, Distance):
        self.Driver = Driver
        self.Speed = Speed
        self.RPM = RPM
        self.Distance = Distance

    def to_json(self):
        return {
            "Driver": self.Driver,
            "Speed": self.Speed,
            "RPM": self.RPM,
            "Distance": self.Distance,
            "uuid": str(uuid.uuid4())
        }

def custom_serializer(obj):
    if isinstance(obj, MaximaInfo):
        # 返回类对象的可序列化表示
        return obj.to_json()

result_str = '['
for i in range(1, maxima_number + 1): # 第 i 个 maxima point
    key_list = list(dict_with_laps_partition.keys())
    dict_length = len(dict_with_laps_partition)
    result_arr = []
    for j in range(dict_length): # 第 j 个 driver
        temp_key = key_list[j]
        # print(temp_key)
        # print(dict_with_laps_partition[temp_key])
        temp_maxima_obj = MaximaInfo(temp_key, dict_with_laps_partition[temp_key][i]['Speed'],
                                dict_with_laps_partition[temp_key][i]['RPM'], dict_with_laps_partition[temp_key][i]['Distance'])
        result_arr.append(temp_maxima_obj)
        # print(temp_maxima_obj.to_json())
    content = json.dumps(result_arr, default=custom_serializer)
    if i == maxima_number:
        result_str = result_str + "]"
    else:
        result_str = result_str + content + ","

export_to_file(result_str, target)

In [None]:
# minima table data


In [108]:
# qualify result
target = '/Users/blank/repo/Pitlane/frontend/src/data/sample_result.json'
drivers = pd.unique(laps['Driver'])
print(drivers)
result_arr = []
fastest_laps = list()
for drv in drivers:
    fastest_lap = laps.pick_driver(drv).pick_fastest()
    fastest_laps.append(fastest_lap)
fastest_laps = Laps(fastest_laps).sort_values(by='LapTime').reset_index(drop=True)
pole_lap = fastest_laps.pick_fastest()
fastest_laps['LapTimeDelta'] = fastest_laps['LapTime'] - pole_lap['LapTime']
print(type(fastest_laps))
result = fastest_laps.to_json(orient='records')
export_to_file(result, target)


['VER' 'HAM' 'BOT' 'LEC' 'GAS' 'RIC' 'NOR' 'SAI' 'ALO' 'STR' 'PER' 'GIO'
 'TSU' 'RAI' 'RUS' 'OCO' 'LAT' 'VET' 'MSC' 'MAZ']
<class 'fastf1.core.Laps'>


In [None]:
# qualify laps telemetry
target = '/Users/blank/repo/Pitlane/frontend/src/data/sample_qualify.json'
drivers = pd.unique(laps["Driver"])
result_dict = dict()
result_arr = []
for drv in drivers:
    # print(drv)
    driver_fastest = laps.pick_driver(drv).pick_fastest()
    info_dict = load_lap_telemetry_and_add_driver(driver_fastest, drv)
    result_arr.append(info_dict)
export_to_file(json.dumps(result_arr, default=str), target)

In [83]:
# grand prix laps telemetry
target = '/Users/blank/repo/Pitlane/frontend/src/data/sample_distribution.json'
result_arr = []
total_laps_num = 0
for drv in r_drivers:
    driver_laps = r_laps.pick_drivers(drv).pick_quicklaps()
    total_laps_num += len(driver_laps)
avg_laps_num = total_laps_num / 20

driver_laps = r_laps.pick_drivers(r_drivers).pick_quicklaps()
df_total = pd.DataFrame(driver_laps)
export_to_file(df_total.to_json(orient='records'), target)

# for drv in r_drivers:
#     driver_laps = r_laps.pick_drivers(drv).pick_quicklaps()
#     print(f'{drv} + " " + {len(driver_laps)}')
#     if len(driver_laps) == 0 or len(driver_laps) > 2 * avg_laps_num:
#         continue
#     # print(type(driver_laps))
#     # print(driver_laps.to_dict())
#     df = pd.DataFrame(driver_laps)
#     print(df.head())
#     df_total = pd.concat([df, df_total], axis=0)
#     # result_arr.append(driver_laps.to_dict())
# export_to_file(df_total.to_json(orient='records'), target)

### Test

In [None]:
# testcase predefine
qs = fastf1.get_session(2021, 5, "Q")
laps = qs.load_laps(with_telemetry=True)

##### test telemetry partition

In [179]:
fastest = fastest_lap(laps)
target1 = '/Users/blank/repo/Pitlane/frontend/src/data'
target = '/Users/blank/repo/Pitlane/backend-ff/data/telemetry.json'
# export_to_file(load_lap_telemetry(fastest), target)

info_dict = load_lap_telemetry(fastest)
print(info_dict[0]['Speed'])
print(info_dict)
# num = partition_lap_telemetry_max_json(fastest)
# print(num)

297
[{'Date': Timestamp('2021-03-27 15:59:07.635000'), 'SessionTime': Timedelta('0 days 01:14:07.250000'), 'DriverAhead': '', 'DistanceToDriverAhead': 451.0469444444444, 'Time': Timedelta('0 days 00:00:00.169000'), 'RPM': 10549, 'Speed': 297, 'nGear': 8, 'Throttle': 100, 'Brake': False, 'DRS': 12, 'Source': 'car', 'Distance': 14.035000000000002, 'RelativeDistance': 0.002592432964935505, 'Status': 'OnTrack', 'X': -376, 'Y': 1338, 'Z': -159}, {'Date': Timestamp('2021-03-27 15:59:07.875000'), 'SessionTime': Timedelta('0 days 01:14:07.490000'), 'DriverAhead': '10', 'DistanceToDriverAhead': 451.0469444444444, 'Time': Timedelta('0 days 00:00:00.409000'), 'RPM': 10721, 'Speed': 298, 'nGear': 8, 'Throttle': 100, 'Brake': False, 'DRS': 12, 'Source': 'car', 'Distance': 33.901666666666664, 'RelativeDistance': 0.0062620447618754425, 'Status': 'OnTrack', 'X': -367, 'Y': 1536, 'Z': -158}, {'Date': Timestamp('2021-03-27 15:59:08.115000'), 'SessionTime': Timedelta('0 days 01:14:07.730000'), 'DriverAhe

##### test json serialize

In [None]:
### fastestlap json serialization
fastestlap = laps.pick_driver('33').pick_fastest()
print(fastestlap)
lap_dict = fastestlap.to_dict()
print(lap_dict)
lap_json = json.dumps(lap_dict, default=str)
print(lap_json)

### fastestlaps
fastestlaps = [laps.pick_driver('33').pick_fastest(), laps.pick_driver('44').pick_fastest()]
print(fastestlaps)
info_arr = []
info_dict = dict()
for fastestlap in fastestlaps:
    info_arr.append(fastestlap.to_dict())
    info_dict[fastestlap['DriverNumber']] = fastestlap.to_dict()
laps_json1 = json.dumps(info_arr, default=str)
laps_json2 = json.dumps(info_dict, default=str)
print(laps_json1)
print(laps_json2)

##### test session

In [None]:
qs = fastf1.get_session(2021, 1, "Q")
laps = qs.load_laps(with_telemetry=True)
fastest_laps_of_drivers_json(laps)