In [2]:
import pandas as pd
import numpy as np

# 1. Загружаем сырые данные
all_data = []
years = [2021, 2022, 2023, 2024, 2025]

for year in years:
    file = f'f1_{year}_raw.csv'
    df = pd.read_csv(file)
    all_data.append(df)

final_df = pd.concat(all_data, ignore_index=True)

final_df.head()

print(f"Исходное количество строк: {len(final_df)}")

# 2. Приводим типы данных (на всякий случай)
# В данных могут быть 'NC' (Not Classified) или строки, превращаем в числа
cols_to_numeric = ['QualiPos', 'GridPosition', 'ClassifiedPosition']

for col in cols_to_numeric:
    # errors='coerce' превратит всё, что не число (например 'NC'), в NaN
    final_df[col] = pd.to_numeric(final_df[col], errors='coerce')

# 3. Фильтрация: Оставляем только тех, кто финишировал
# Обычно статус финиша это 'Finished' или '+1 Lap', '+2 Laps' и т.д.
# Самый простой способ: проверить, что ClassifiedPosition - это число (не NaN)
# И статус не содержит слов 'Collision', 'Accident', 'Spun', 'Engine'
# Но проще всего довериться ClassifiedPosition.

df_clean = final_df.dropna(subset=['ClassifiedPosition', 'QualiPos', 'GridPosition'])

# Дополнительная проверка на статус (опционально, но надежно)
# Оставляем только тех, у кого статус похож на финиш
valid_status = df_clean['Status'].astype(str).str.contains(r'Finished|\+\d', regex=True)
df_clean = df_clean[valid_status]
df_clean.to_csv('f1_all_clean_data.csv', index=False)

print(f"Строк после очистки (только финишировавшие): {len(df_clean)}")

Исходное количество строк: 1738
Строк после очистки (только финишировавшие): 1253


Temp data

In [None]:
import pandas as pd
import numpy as np

# mock_data = pd.DataFrame({
#     'driver': ['VER', 'HAM', 'GAS', 'OCO'],
#     'team': ['Red Bull', 'Mercedes', 'Alpine', 'Alpine'],
#     'tire_degradation': np.random.rand(4),
#     'braking_intensity': np.random.rand(4),
#     'throttle_aggression': np.random.rand(4),
#     'consistency_score': np.random.rand(4),
#     'driver_form': np.random.rand(4),
#     'driver_is_french': [0,0,1,1],
#     'qual_pos': [1,2,5,7],
#     'fp2_pace': np.random.rand(4),
#     'target_win': [0,1,0,0]
# })

# mock_data.head()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 380 entries, 0 to 379
Data columns (total 10 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   Year                380 non-null    int64  
 1   Round               380 non-null    int64  
 2   Circuit             380 non-null    object 
 3   TeamName            380 non-null    object 
 4   Abbreviation        380 non-null    object 
 5   QualiPos            379 non-null    float64
 6   GridPosition        379 non-null    float64
 7   ClassifiedPosition  380 non-null    object 
 8   Status              380 non-null    object 
 9   Time                185 non-null    object 
dtypes: float64(2), int64(2), object(6)
memory usage: 29.8+ KB


Functions   feature engineering

In [11]:
import numpy as np
from sklearn.linear_model import LinearRegression


def compute_consistency(lap_times):
    lap_times = [lt for lt in lap_times if lt > 0]
    if len(lap_times) < 3:
        return None
    return np.std(lap_times)


def compute_driver_form(results_last_n):
    return None if len(results_last_n) == 0 else np.mean(results_last_n)


def compute_tire_degradation(lap_numbers, lap_times):
    if len(lap_numbers) < 3 or len(lap_times) < 3:
        return None

    lap_numbers = np.array(lap_numbers).reshape(-1, 1)
    lap_times = np.array(lap_times)

    model = LinearRegression()
    model.fit(lap_numbers, lap_times)

    return float(model.coef_[0])  


def compute_braking_intensity(brake_data):
    if len(brake_data) == 0:
        return None
    return float(np.mean(brake_data))


def compute_throttle_aggression(throttle_data):
    if len(throttle_data) == 0:
        return None
    return float(np.mean(throttle_data))


def compute_race_pace(lap_times):
    lap_times = [lt for lt in lap_times if lt > 0]
    if len(lap_times) == 0:
        return None
    return float(np.median(lap_times))


Data merging

In [12]:
def compute_my_features(df):
    df["consistency"] = df["lap_times"].apply(compute_consistency)
    df["driver_form"] = df["results_last_n"].apply(compute_driver_form)
    df["tire_degradation"] = df.apply(
        lambda row: compute_tire_degradation(row["lap_numbers"], row["lap_times"]), axis=1)
    df["braking_intensity"] = df["brake_data"].apply(compute_braking_intensity)
    df["throttle_aggression"] = df["throttle_data"].apply(compute_throttle_aggression)
    df["race_pace"] = df["lap_times"].apply(compute_race_pace)

    return df

def save_my_features(df, path="my_features.csv"):
    df = compute_my_features(df)
    df.to_csv(path, index=False)


ML-basis

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

df_fastf1 = pd.read_parquet("cleaned_fastf1.csv")
df_openf1 = pd.read_parquet("cleaned_openf1.csv")
df_my = pd.read_parquet("my_features.csv")

df_all = (
    df_fastf1
    .merge(df_openf1, on="driver", how="inner")
    .merge(df_my, on="driver", how="inner")
)

# 3. Целевая переменная: выиграл гонку или нет
df_all["target_win"] = (df_all["position"] == 1).astype(int)

# 4. Формируем X и y
X = df_all.drop(columns=["driver", "team", "position", "target_win"])
y = df_all["target_win"]

# 5. Делим на train/test
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42
)

# 6. Создаём модель
model = RandomForestClassifier(
    n_estimators=300,
    max_depth=12,
    random_state=42
)

# 7. Обучаем модель
model.fit(X_train, y_train)

# 8. Предсказываем
y_pred = model.predict(X_test)

# 9. Метрики
print("Accuracy:", accuracy_score(y_test, y_pred))
print("F1:", f1_score(y_test, y_pred))
print("ROC-AUC:", roc_auc_score(y_test, model.predict_proba(X_test)[:, 1]))

# 10. Показ предсказаний
print("\nPredictions:", y_pred)


KeyError: 'position'