In [None]:
import fastf1
import pandas as pd
import numpy as np
import requests
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error
from sklearn.impute import SimpleImputer
from sklearn.inspection import permutation_importance
import matplotlib.pyplot as plt
import os
import warnings

# Suppress FastF1 warnings for plotting as they can be noisy
warnings.simplefilter(action='ignore', category=FutureWarning)

# --- Configuration Constants ---
CACHE_DIR = r"D:\fastf1_cache" # Consider making this dynamic or user-configurable in a real app
FASTF1_YEAR = 2024
FASTF1_ROUND = 8  # Monaco Grand Prix is typically round 8
FASTF1_SESSION_TYPE = "R"  # 'R' for Race, 'Q' for Qualifying

OPENWEATHER_API_KEY = "b3d0fae0fe1583a97653b49954a298cf" # Replace with your actual key in a production environment
MONACO_LATITUDE = 43.7384
MONACO_LONGITUDE = 7.4246
MONACO_FORECAST_TIME = "2025-05-25 13:00:00" # Target forecast time (e.g., race start time)

# --- Setup FastF1 Cache ---
os.makedirs(CACHE_DIR, exist_ok=True)
fastf1.Cache.enable_cache(CACHE_DIR)

# --- Data Loading and Preprocessing Functions ---

def get_f1_session_lap_data(year: int, round_num: int, session_type: str) -> pd.DataFrame:
    """
    Loads F1 session data, extracts lap and sector times, and converts them to seconds.
    """
    print(f"Loading {year} Round {round_num} {session_type} session data...")
    try:
        session = fastf1.get_session(year, round_num, session_type)
        session.load(telemetry=False, weather=False, messages=False) # Load only what's necessary
    except Exception as e:
        print(f"Error loading FastF1 session data: {e}")
        return pd.DataFrame() # Return empty DataFrame on error

    # Select relevant lap data and drop NaNs
    laps = session.laps[["Driver", "LapTime", "Sector1Time", "Sector2Time", "Sector3Time"]].copy()
    laps.dropna(subset=["LapTime", "Sector1Time", "Sector2Time", "Sector3Time"], inplace=True)

    # Convert lap and sector times to total seconds
    for col in ["LapTime", "Sector1Time", "Sector2Time", "Sector3Time"]:
        laps[f"{col} (s)"] = laps[col].dt.total_seconds()
    
    return laps

def aggregate_sector_times(laps_df: pd.DataFrame) -> pd.DataFrame:
    """
    Aggregates mean sector times per driver from lap data.
    """
    sector_times = laps_df.groupby("Driver").agg({
        "Sector1Time (s)": "mean",
        "Sector2Time (s)": "mean",
        "Sector3Time (s)": "mean"
    }).reset_index()

    sector_times["TotalSectorTime (s)"] = (
        sector_times["Sector1Time (s)"] +
        sector_times["Sector2Time (s)"] +
        sector_times["Sector3Time (s)"]
    )
    return sector_times

def get_weather_forecast_data(lat: float, lon: float, api_key: str, target_time: str) -> tuple[float, float]:
    """
    Fetches weather forecast data for a specific location and time.
    Returns rain probability and temperature.
    """
    print(f"Fetching weather forecast for {target_time}...")
    weather_url = f"http://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units=metric"
    try:
        response = requests.get(weather_url, timeout=10)
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        weather_data = response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching weather data: {e}")
        return 0.0, 20.0 # Default values on error

    forecast_data = next((f for f in weather_data.get("list", []) if f.get("dt_txt") == target_time), None)

    if forecast_data:
        rain_probability = forecast_data.get("pop", 0.0)
        temperature = forecast_data.get("main", {}).get("temp", 20.0)
        print(f"Weather forecast: Rain probability={rain_probability*100:.1f}%, Temperature={temperature:.1f}°C")
        return rain_probability, temperature
    else:
        print(f"No forecast found for {target_time}. Using default values.")
        return 0.0, 20.0 # Default values if time not found

# --- Hardcoded Data Functions (to be replaced by API calls post-event) ---

def get_2025_qualifying_data() -> pd.DataFrame:
    """
    Loads hardcoded 2025 Monaco qualifying data.
    In a real scenario post-event, this would come from fastf1 API.
    """
    print("Loading hardcoded 2025 qualifying data...")
    # NOTE: In a real scenario post-2025 Monaco GP, this would be fetched via fastf1:
    # session_2025_q = fastf1.get_session(2025, FASTF1_ROUND, "Q")
    # session_2025_q.load()
    # q_laps = session_2025_q.laps.pick_fastest()
    # quali_data = q_laps[['Driver', 'LapTime']].copy()
    # quali_data['QualifyingTime (s)'] = quali_data['LapTime'].dt.total_seconds()
    # return quali_data

    # Changed driver names to 3-letter abbreviations for consistency
    return pd.DataFrame({
        "Driver": ["VER", "NOR", "PIA", "RUS", "SAI", "ALB", "LEC", "OCO",
                   "HAM", "STR", "GAS", "ALO", "HUL", "LAW", "HAD", "TSU"],
        "QualifyingTime (s)": [
            70.669,  # VER (1:10.669)
            69.954,  # NOR (1:09.954)
            70.129,  # PIA (1:10.129)
            np.nan,    # RUS (DNF) - using NaN for missing values
            71.362,  # SAI (1:11.362)
            71.213,  # ALB (1:11.213)
            70.063,  # LEC (1:10.063)
            70.942,  # OCO (1:10.942)
            70.382,  # HAM (1:10.382)
            72.563,  # STR (1:12.563)
            71.994,  # GAS (1:11.994)
            70.924,  # ALO (1:10.924)
            71.596,   # HUL (1:11.596)
            71.129, # LAW (1:11.129)
            70.923, # HAD (1:10.923)
            71.415 # TSU (1:11.415)
        ]
    })

def get_clean_air_race_pace() -> dict:
    """
    Loads hardcoded clean air race pace data.
    In a real scenario post-event, this would be derived from 2025 race lap data.
    """
    print("Loading hardcoded clean air race pace data...")
    # NOTE: Values are fictitious for 2025 data
    # Changed driver names to 3-letter abbreviations for consistency
    return {
        "VER": 93.191067, "HAM": 94.020622, "LEC": 93.418667, "NOR": 93.428600, "ALO": 94.784333,
        "PIA": 93.232111, "RUS": 93.833378, "SAI": 94.497444, "STR": 95.318250, "HUL": 95.345455,
        "OCO": 95.682128, "ALB": 95.500000, "GAS": 95.100000,
        "LAW": 96.000000, "HAD": 95.800000, "TSU": 94.500000 # Added new drivers
    }

def get_constructor_data() -> tuple[dict, dict]:
    """
    Loads hardcoded team points and driver-to-team mapping.
    In a real scenario, team points could come from an F1 standings API.
    Driver-to-team could come from fastf1 session data or an external source.
    """
    print("Loading hardcoded constructor and driver-to-team data...")
    team_points = {
        "McLaren": 279, "Mercedes": 147, "Red Bull": 131, "Williams": 51, "Ferrari": 114,
        "Haas": 20, "Aston Martin": 14, "Kick Sauber": 6, "Racing Bulls": 10, "Alpine": 7
    }
    # Normalize team points to a performance score between 0 and 1
    max_points = max(team_points.values())
    team_performance_score = {team: points / max_points for team, points in team_points.items()}

    # Changed driver names to 3-letter abbreviations for consistency
    driver_to_team = {
        "VER": "Red Bull", "NOR": "McLaren", "PIA": "McLaren", "LEC": "Ferrari", "RUS": "Mercedes",
        "HAM": "Mercedes", "GAS": "Alpine", "ALO": "Aston Martin", "TSU": "Red Bull",
        "SAI": "Williams", "HUL": "Kick Sauber", 
        "OCO": "Alpine", "STR": "Aston Martin", "ALB": "Williams",
        "LAW": "Racing Bulls", "HAD": "Racing Bulls" # Added new drivers for consistent team mapping
    }
    return team_performance_score, driver_to_team

def get_average_monaco_position_change() -> dict:
    """
    Loads hardcoded average position change data for Monaco.
    In a real scenario, this would be derived from historical F1 data.
    """
    print("Loading hardcoded average Monaco position change data...")
    # Changed driver names to 3-letter abbreviations for consistency
    # NOTE: Add entries for new drivers if you have historical data for them.
    # Otherwise, they will get NaN for this feature.
    return {
        "VER": -1.0, "NOR": 1.0, "PIA": 0.2, "RUS": 0.5, "SAI": -0.3, "ALB": 0.8,
        "LEC": -1.5, "OCO": -0.2, "HAM": 0.3, "STR": 1.1, "GAS": -0.4, "ALO": -0.6,
        "HUL": 0.0, "LAW": 0, "HAD":0, "TSU": -0.2 # Added new drivers
    }

# --- Main Data Preparation Function ---

def prepare_modeling_data(
    qualifying_df: pd.DataFrame,
    sector_times_df: pd.DataFrame,
    rain_prob: float,
    temp: float,
    team_scores: dict,
    driver_team_map: dict,
    avg_pos_change: dict,
    laps_2024_df: pd.DataFrame # To ensure drivers are valid for y target
) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, list]:
    """
    Merges all input data into a single DataFrame for modeling and defines features (X) and target (y).
    """
    print("Preparing data for modeling...")

    # Add Clean Air Race Pace, Team Performance Score, and Average Position Change
    qualifying_df["CleanAirRacePace (s)"] = qualifying_df["Driver"].map(get_clean_air_race_pace())
    qualifying_df["Team"] = qualifying_df["Driver"].map(driver_team_map)
    qualifying_df["TeamPerformanceScore"] = qualifying_df["Team"].map(team_scores)
    qualifying_df["AveragePositionChange"] = qualifying_df["Driver"].map(avg_pos_change)

    # Merge qualifying and sector times data
    merged_data = qualifying_df.merge(
        sector_times_df[["Driver", "TotalSectorTime (s)"]], on="Driver", how="left"
    )

    # Add weather data
    merged_data["RainProbability"] = rain_prob
    merged_data["Temperature"] = temp

    # --- Important Note on Wet Performance Factor ---
    # The original code had:
    # if rain_probability >= 0.75:
    #     qualifying_2025["QualifyingTime"] = qualifying_2025["QualifyingTime (s)"] * qualifying_2025["WetPerformanceFactor"]
    # else:
    #     qualifying_2025["QualifyingTime"] = qualifying_2025["QualifyingTime (s)"]
    # However, "WetPerformanceFactor" was not defined anywhere.
    # For a robust model, you would need to define/calculate this factor for each driver/team
    # based on their historical wet performance. For this version, I'll assume QualifyingTime
    # is the direct input and if wet factor were added, it would be another feature or an
    # adjustment to the input.
    merged_data["QualifyingTime"] = merged_data["QualifyingTime (s)"] # Ensure this is the final column name used

    # Filter for drivers present in the 2024 lap data (our target source)
    # This is where the driver name consistency is crucial.
    valid_drivers = merged_data["Driver"].isin(laps_2024_df["Driver"].unique())
    modeling_data = merged_data[valid_drivers].copy()

    # Define features (X) and target (y)
    features = [
        "QualifyingTime", "RainProbability", "Temperature", "TeamPerformanceScore",
        "CleanAirRacePace (s)", "AveragePositionChange"
    ]
    X = modeling_data[features]

    # Align y target with the drivers in X
    y = laps_2024_df.groupby("Driver")["LapTime (s)"].mean().reindex(modeling_data["Driver"])

    # Drop rows where y is NaN (drivers in X but not in 2024 laps, if any, after reindex)
    nan_y_drivers = y[y.isna()].index.tolist()
    if nan_y_drivers:
        print(f"Warning: Dropping drivers with no 2024 lap data for target (y): {nan_y_drivers}")
        X = X.drop(nan_y_drivers)
        y = y.dropna()
        modeling_data = modeling_data[~modeling_data["Driver"].isin(nan_y_drivers)].copy()

    return X, y, modeling_data, features

# --- Model Training and Prediction ---

def train_and_predict_model(X: pd.DataFrame, y: pd.Series) -> tuple[GradientBoostingRegressor, pd.DataFrame, pd.Series, np.ndarray, np.ndarray]:
    """
    Imputes missing values, splits data, trains a Gradient Boosting Regressor,
    and makes predictions.
    Returns: model, X_imputed_df, y_test (Series), y_pred_test (ndarray), predicted_race_times (ndarray)
    """
    print("Training and predicting with Gradient Boosting Regressor...")

    # Impute missing values for features
    imputer = SimpleImputer(strategy="median")
    X_imputed = imputer.fit_transform(X)
    X_imputed_df = pd.DataFrame(X_imputed, columns=X.columns, index=X.index)

    # Train-test split
    # Check if there's enough data for splitting after imputation
    if len(X_imputed_df) < 2: # Need at least 2 samples for train_test_split
        print("Error: Not enough samples for train-test split after imputation.")
        return None, X_imputed_df, pd.Series(), np.array([]), np.array([]) # Return empty arrays/Series for prediction

    X_train, X_test, y_train, y_test = train_test_split(X_imputed_df, y, test_size=0.3, random_state=37)

    # Train gradient boosting model
    model = GradientBoostingRegressor(n_estimators=100, learning_rate=0.7, max_depth=3, random_state=37)
    model.fit(X_train, y_train)

    # Make predictions on the full imputed dataset
    predicted_race_times = model.predict(X_imputed_df)

    # Calculate model error on the test set
    y_pred_test = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred_test)
    print(f"Model Error (MAE): {mae:.2f} seconds")

    return model, X_imputed_df, y_test, y_pred_test, predicted_race_times

# --- Visualization Functions ---

def plot_lap_time_distribution(laps_df: pd.DataFrame):
    """
    Plots the distribution of 2024 race lap times.
    """
    if laps_df.empty: return
    plt.figure(figsize=(10, 6))
    plt.hist(laps_df["LapTime (s)"], bins=30, edgecolor='black', alpha=0.7, color='skyblue')
    plt.xlabel("Lap Time (s)")
    plt.ylabel("Frequency")
    plt.title("Distribution of 2024 Monaco Race Lap Times")
    plt.grid(axis='y', alpha=0.75)
    plt.show()

def plot_quali_vs_race_pace(qualifying_df: pd.DataFrame):
    """
    Plots 2025 Qualifying Time vs. Clean Air Race Pace.
    """
    # Ensure there's data to plot after mapping
    df = qualifying_df.dropna(subset=["QualifyingTime (s)", "CleanAirRacePace (s)"])
    if df.empty:
        print("Not enough data to plot Qualifying Time vs. Clean Air Race Pace.")
        return
    plt.figure(figsize=(12, 8))
    plt.scatter(df["QualifyingTime (s)"], df["CleanAirRacePace (s)"], color='teal', alpha=0.7)
    for i, driver in df.iterrows():
        plt.annotate(driver["Driver"], (driver["QualifyingTime (s)"], driver["CleanAirRacePace (s)"]),
                     xytext=(5, 5), textcoords='offset points', fontsize=9)
    plt.xlabel("2025 Qualifying Time (s)")
    plt.ylabel("Clean Air Race Pace (s) (Derived)")
    plt.title("Qualifying Time vs. Clean Air Race Pace by Driver")
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.tight_layout()
    plt.show()

def plot_team_performance(team_scores: dict):
    """
    Plots a bar chart of normalized team performance scores.
    """
    if not team_scores: return
    sorted_teams = sorted(team_scores.items(), key=lambda item: item[1], reverse=True)
    teams = [item[0] for item in sorted_teams]
    scores = [item[1] for item in sorted_teams]

    plt.figure(figsize=(10, 6))
    plt.barh(teams, scores, color='lightcoral')
    plt.xlabel("Normalized Performance Score")
    plt.title("Normalized Team Performance Score")
    plt.gca().invert_yaxis() # Puts highest score at the top
    plt.grid(axis='x', alpha=0.75)
    plt.tight_layout()
    plt.show()

def plot_average_position_change(avg_pos_change: dict):
    """
    Plots a bar chart of average position change at Monaco.
    """
    if not avg_pos_change: return
    sorted_drivers = sorted(avg_pos_change.items(), key=lambda item: item[1], reverse=False)
    drivers = [item[0] for item in sorted_drivers]
    changes = [item[1] for item in sorted_drivers]

    colors = ['skyblue' if c >= 0 else 'lightgreen' for c in changes]

    plt.figure(figsize=(10, 6))
    plt.barh(drivers, changes, color=colors)
    plt.xlabel("Average Position Change (Qualifying Pos - Finish Pos)")
    plt.title("Historical Average Position Change at Monaco")
    plt.gca().invert_yaxis() # Puts positive changes (losses) higher
    plt.axvline(0, color='grey', linestyle='--', linewidth=0.8)
    plt.grid(axis='x', alpha=0.75)
    plt.tight_layout()
    plt.show()

def plot_predicted_vs_actual(y_test: pd.Series, y_pred_test: np.ndarray):
    """
    Plots predicted vs. actual race times on the test set.
    """
    # Check if arrays are empty using .size (for numpy arrays)
    if y_test.size == 0 or y_pred_test.size == 0:
        print("Not enough test data to plot Predicted vs. Actual.")
        return
    
    plt.figure(figsize=(8, 8))
    plt.scatter(y_test, y_pred_test, alpha=0.7, color='purple')
    plt.plot([min(y_test.min(), y_pred_test.min()), max(y_test.max(), y_pred_test.max())],
             [min(y_test.min(), y_pred_test.min()), max(y_test.max(), y_pred_test.max())],
             color='red', linestyle='--', label='Perfect Prediction (y=x)')
    plt.xlabel("Actual Race Time (s) [2024 Data]")
    plt.ylabel("Predicted Race Time (s) [Model on 2024 Test Set]")
    plt.title("Model Performance: Predicted vs. Actual Race Times (Test Set)")
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    plt.tight_layout()
    plt.show()

def plot_permutation_importance(model: GradientBoostingRegressor, X_imputed: pd.DataFrame, y: pd.Series, features: list):
    """
    Calculates and plots permutation importance for model features.
    This is more robust for correlated features than tree-based feature importance.
    """
    if X_imputed.empty or y.empty or len(X_imputed) < 2: # Need at least 2 samples for permutation_importance
        print("Not enough data to calculate Permutation Importance.")
        return

    print("Calculating Permutation Importance...")
    try:
        # y needs to be an array for permutation_importance
        result = permutation_importance(model, X_imputed, y.to_numpy(), n_repeats=10, random_state=37, n_jobs=-1)
        sorted_idx = result.importances_mean.argsort()

        plt.figure(figsize=(10, 7))
        plt.boxplot(result.importances[sorted_idx].T,
                    vert=False, labels=np.array(features)[sorted_idx])
        plt.xlabel("Permutation Importance")
        plt.title("Permutation Importance of Features")
        plt.grid(axis='x', alpha=0.75)
        plt.tight_layout()
        plt.show()
    except ValueError as e:
        print(f"Error calculating Permutation Importance: {e}")
        print("This often happens if there's only one class/value in y or insufficient data.")


def plot_predicted_results(final_results_subset: pd.DataFrame, num_racers: int = 10):
    """
    Plots the top N predicted racers and their times, with the winner at the top.
    `final_results_subset` dataframe is expected to have 'Driver' and 'PredictedRaceTime (s)' columns,
    and be already sorted by 'PredictedRaceTime (s)' ascending.
    """
    if final_results_subset.empty: return
    
    # top_racers is already the subset passed from main
    top_racers = final_results_subset.head(num_racers)
    
    plt.figure(figsize=(10, 7))
    plt.barh(top_racers["Driver"], top_racers["PredictedRaceTime (s)"], color='skyblue')
    plt.xlabel("Predicted Race Time (s)")
    plt.ylabel("Driver")
    plt.title(f"Predicted Top {num_racers} Racers for 2025 Monaco GP")

    # Invert the y-axis to move the fastest driver (which was at the bottom after barh) to the top.
    plt.gca().invert_yaxis()
    plt.grid(axis='x', alpha=0.75)

    # Add predicted times as labels on the bars
    # Use iterrows for simpler column access
    for i, row in top_racers.iterrows():
        plt.text(row["PredictedRaceTime (s)"], i, f'{row["PredictedRaceTime (s)"]:.2f}s', va='center', ha='left', fontsize=9, color='black')

    plt.tight_layout()
    plt.show()

# --- Main Execution Flow ---
def main():
    print("--- F1 Race Prediction Project Start ---")

    # 1. Get 2024 Monaco Race Lap Data (for training target)
    laps_2024 = get_f1_session_lap_data(FASTF1_YEAR, FASTF1_ROUND, FASTF1_SESSION_TYPE)
    if laps_2024.empty:
        print("Could not load 2024 lap data. Exiting.")
        return
    sector_times_2024 = aggregate_sector_times(laps_2024)

    # Plot distribution of 2024 lap times
    plot_lap_time_distribution(laps_2024)

    # 2. Get 2025 Qualifying Data (currently hardcoded, future API)
    qualifying_2025 = get_2025_qualifying_data()
    if qualifying_2025.empty:
        print("Could not load 2025 qualifying data. Exiting.")
        return

    # 3. Get Weather Forecast for 2025 Monaco GP
    rain_probability, temperature = get_weather_forecast_data(
        MONACO_LATITUDE, MONACO_LONGITUDE, OPENWEATHER_API_KEY, MONACO_FORECAST_TIME
    )

    # 4. Get Constructor & Driver Data (hardcoded, future API/derived)
    team_performance_score, driver_to_team = get_constructor_data()

    # 5. Get Average Monaco Position Change (hardcoded, future derived)
    average_position_change_monaco = get_average_monaco_position_change()

    # Plot input data characteristics
    # Ensure correct driver names are passed to plot_quali_vs_race_pace
    plot_quali_vs_race_pace(qualifying_2025.merge(
        pd.DataFrame(get_clean_air_race_pace().items(), columns=['Driver', 'CleanAirRacePace (s)']), on='Driver', how='left'))
    plot_team_performance(team_performance_score)
    plot_average_position_change(average_position_change_monaco)


    # 6. Prepare Data for Modeling
    X, y, modeling_data, features = prepare_modeling_data(
        qualifying_2025, sector_times_2024, rain_probability, temperature,
        team_performance_score, driver_to_team, average_position_change_monaco,
        laps_2024 # Pass 2024 laps for driver validation
    )

    if X.empty or y.empty:
        print("Insufficient data after preparation for modeling. Exiting.")
        return

    # 7. Train Model and Predict
    model, X_imputed_df, y_test, y_pred_test, predicted_race_times = train_and_predict_model(X, y)

    # Check if model training/prediction was successful (e.g., if model is not None)
    if model is None or X_imputed_df.empty or y_test.size == 0:
        print("Model training or prediction failed due to insufficient data. Exiting.")
        return

    # Add predictions to the original modeling data for final results display
    # Ensure modeling_data has the same index as X_imputed_df for correct alignment
    # This might be redundant if modeling_data was directly used to create X_imputed_df, but safer.
    modeling_data = modeling_data.loc[X_imputed_df.index].copy()
    modeling_data["PredictedRaceTime (s)"] = predicted_race_times


    # 8. Display Results and Visualizations
    final_results = modeling_data.sort_values("PredictedRaceTime (s)").reset_index(drop=True)

    print("\n🏁 Predicted 2025 Monaco GP Winner 🏁\n")
    print(final_results[["Driver", "PredictedRaceTime (s)"]])

    #print("\n🏆 Predicted Top 10 Racers 🏆")
    #plot_predicted_results(final_results[["Driver", "PredictedRaceTime (s)"]], num_racers=10)


    # Plot model performance
    # plot_predicted_vs_actual(y_test, y_pred_test)

    # Plot feature importances (using Permutation Importance)
    plot_permutation_importance(model, X_imputed_df, y, features)

    print("\n--- F1 Race Prediction Project End ---")

if __name__ == "__main__":
    main()