In [None]:
import re
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import statsmodels.formula.api as smf
from scipy.optimize import curve_fit
from sklearn.metrics import r2_score, mean_squared_error

In [None]:
DATA_FOLDER = "data"

In [None]:
def transform_origin_dataset():
    file_path = r"C:\Users\Q672355\OneDrive\MA\Daten\2024-12-22_detailed_query_alternatives_segments.csv"
    df = pd.read_csv(file_path, sep=",", encoding="utf-8")

    # Remove corrupted trips
    df = df[df["Vehicle ID"].notnull()]
    df.dropna(subset=["Vehicle ID", "Trip Segments Location Address"], inplace=True)

    # Transform data
    trips_data = []

    for _, trips in df.groupby("Trip ID"):
        t = trips.iloc[0]
        if pd.isna(t["Trip Segments Timestamp"]):
            continue
        # Extract the general information
        date = t["Trip Segments Timestamp"].split(" ")[0]
        data = {
            "vehicle_id": int(t["Vehicle ID"]),
            "efficient_mode": float(t["Efficient Mode"]),
            "vehicle_type": t["Vehicle Type"] if not pd.isna(t["Vehicle Type"]) else "BE",
            "electric_consumption": float(t["Electric Consumption"]),
            "fuel_consumption": float(t["Fuel Consumption"]),
            "speed": float(t["Speed Average"]),
            "distance": float(t["Distance"]),
            "date": date,
            "weekday": pd.Timestamp(date).strftime("%A"),
            "app_open_count": (
                0 if pd.isna(t["User App Open Count"])
                else int(t["User App Open Count"])
            ),
        }

        # Merge into a single trip
        for trip_alternative_type, trip in trips.groupby("Trip Alternatives Type"):
            # Make sure that there are start and end in one trip
            if len(trip) != 2:
                continue

            # Determine the start and end trips
            start = trip[trip["Trip Segments Type"] == "start"]
            if len(start.index) != 1:
                continue
            else:
                start = start.iloc[0]
            end = trip[trip["Trip Segments Type"] == "end"]
            if len(end.index) != 1:
                continue
            else:
                end = end.iloc[0]
            
            # Determine whether trip is in Netherland
            start_location = start["Trip Segments Location Address"]
            end_location = end["Trip Segments Location Address"]
            if not start_location.endswith("Netherlands") and not end_location.endswith("Netherlands"):
                continue

            data["start_time"] = start["Trip Segments Timestamp"].split(" ")[1][:5]
            data["start_time"] = date + " " + data["start_time"]
            data["end_time"] = end["Trip Segments Timestamp"].split(" ")[1][:5]
            data["end_time"] = date + " " + data["end_time"]
            data["start_location"] = start["Trip Segments Location Address"]
            data["end_location"] = end["Trip Segments Location Address"]
            data["start_lng"] = start["Trip Segments Location Lng"]
            data["start_lat"] = start["Trip Segments Location Lat"]
            data["end_lng"] = end["Trip Segments Location Lng"]
            data["end_lat"] = end["Trip Segments Location Lat"]
            data[f"distance_{trip_alternative_type}"] = float(start["Trip Alternatives Distance"])
            data[f"distance_diff_{trip_alternative_type}"] = float(start["Trip Alternatives Distance Diff"])
            data[f"time_diff_{trip_alternative_type}"] = int(start["Trip Alternatives Time Diff"])
            data[f"is_{trip_alternative_type}_good_alternative"] = (
                0 if pd.isna(start["Trip Alternatives Is Good"])
                else int(start["Trip Alternatives Is Good"])
            )
            data["user_group_name"] = start["User Group Name"]

        if "start_location" not in data:
            continue
        # Add the data to the corresponding group
        if t["User Group Name"] == "MyTravels_Group_A" or t["User Group Name"] == "MyTravels_Group_A_extended":
            data["group"] = 1
        elif t["User Group Name"] == "MyTravels_Group_B" or t["User Group Name"] == "MyTravels_Group_B_extended":
            data["group"] = 0
        trips_data.append(data)

    # only consider trips since 24.07
    trips_data = pd.DataFrame(trips_data)
    trips_data["date"] = pd.to_datetime(trips_data["date"], format="%Y-%m-%d")
    trips_data.dropna(subset=["start_time", "end_time"], inplace=True)
    trips_data["start_time"] = pd.to_datetime(trips_data["start_time"])
    trips_data["end_time"] = pd.to_datetime(trips_data["end_time"])
    trips_data = trips_data[
        (pd.Timestamp("2024-07-24") <= trips_data["date"]) & 
        (pd.Timestamp("2024-12-20") >= trips_data["date"])
    ]
    return trips_data

In [None]:
df_0 = transform_origin_dataset()

In [None]:
df_0.to_csv(f"{DATA_FOLDER}/0.csv", index=False, header=True, sep=",", encoding="utf-8")

In [None]:
# Calculate stop times between trips
def calculate_stop_times(df):
    """
    Calculate the stop times between the end of one trip and the start of the next trip 
    for the same vehicle on the same day.
    """
    df = df.sort_values(by=["vehicle_id", "date", "start_time"])
    df["prev_end_time"] = df.groupby(["vehicle_id", "date"])["end_time"].shift(1)
    # Calculate stop times in minutes
    df["stop_time"] = (df["start_time"] - df["prev_end_time"]).dt.total_seconds() / 60  # Convert to minutes
    df["stop_time"] = df.apply(lambda r: r["stop_time"] if r["stop_time"] > 0 else 0, axis=1)
    # Set stop time to NaN for the first trip of the day
    df["stop_time"] = df.apply(lambda r: 0 if pd.isna(r["stop_time"]) else r["stop_time"], axis=1)
    df = df.drop(columns="prev_end_time")
    return df

# Remove outliers based on quantiles
def remove_outliers(df):
    """
    Remove outliers based on the 1st and 99th percentiles.
    """
    stop_time_col, distance_col = "stop_time", "distance"
    stop_times = np.percentile(df[stop_time_col], [0, 99])
    distances = np.percentile(df[distance_col], [0, 99])
    filtered_df = df[
        (df[stop_time_col] >= stop_times[0]) & 
        (df[stop_time_col] <= stop_times[1]) &
        (df[distance_col] >= distances[0]) &
        (df[distance_col] <= distances[1])
    ]
    return filtered_df

In [None]:
df_1 = df_0.copy()
df_1 = calculate_stop_times(df_0)
df_1 = remove_outliers(df_1)

In [None]:
df_1.to_csv(f"{DATA_FOLDER}/1.csv", index=False, header=True, sep=",", encoding="utf-8")

In [None]:
def clustering_analysis(df, col, n_clusters=3):
    """
    Perform clustering analysis on stop time and sum of distances.
    """
    # Use stop_time and sum_distance for clustering
    data = df[[col]].dropna()
    # Apply K-means clustering
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    data["cluster"] = kmeans.fit_predict(data)
    # Display cluster centers
    for i, center in enumerate(kmeans.cluster_centers_):
        cluster_data = data[data["cluster"] == i]
        # Calculate standard deviation of stop_time and sum_distance for each cluster
        avg = center[0]
        std = cluster_data[col].std()
        print(f"[{col}]: Mean {avg}, Standard Deviation {std}, upper bound = {avg + std}")

In [None]:
stop_time = clustering_analysis(df_1, "stop_time")

In [None]:
def merge_trips(df):
    df["trip_id"] = -1  # -1 indicates not assigned to any group

    # Label which trips should be merged
    for _, group in df.groupby(["vehicle_id", "date"]):
        current_trip_id = 0  # Group identifier within this vehicle_id and date group
        previous_row = None
        for idx, row in group.iterrows():
            if previous_row is None:  # First row starts a new group
                df.at[idx, "trip_id"] = current_trip_id
            else:
                # Check if current row should be in the same group
                if row["stop_time"] <= stop_time:
                    df.at[idx, "trip_id"] = current_trip_id
                else:
                    # Start a new group
                    current_trip_id += 1
                    df.at[idx, "trip_id"] = current_trip_id
            previous_row = row  # Update the previous row reference

    # Merge trips
    merged_trips = df.groupby(["vehicle_id", "date", "trip_id"]).agg({
        "efficient_mode": "mean",
        "vehicle_type": "first",
        "electric_consumption": "sum",
        "fuel_consumption": "sum",
        "speed": "mean",
        "distance": "sum",
        "weekday": "first",
        "app_open_count": "first",
        "start_time": "first",
        "end_time": "last",
        "start_location": "first",
        "end_location": "last",
        "start_lat": "first",
        "start_lng": "first",
        "end_lng": "last",
        "end_lat": "last",
        "distance_bicycle": "sum",
        "distance_diff_bicycle": "sum",
        "time_diff_bicycle": "sum",
        "is_bicycle_good_alternative": "mean",
        "distance_pedestrian": "sum",
        "distance_diff_pedestrian": "sum",
        "time_diff_pedestrian": "sum",
        "is_pedestrian_good_alternative": "mean",
        "distance_publicTransport": "sum",
        "distance_diff_publicTransport": "sum",
        "time_diff_publicTransport": "sum",
        "is_publicTransport_good_alternative": "mean",
        "group": "first",
        "stop_time": "sum",
    }).reset_index()
    merged_trips.drop(columns=["trip_id"], inplace=True)
    merged_trips["alternative_score"] = (
        merged_trips["is_bicycle_good_alternative"] + #bicycle_score
        merged_trips["is_pedestrian_good_alternative"] +
        merged_trips["is_publicTransport_good_alternative"]
    )
    # remove unmergable parking trips
    merged_trips = merged_trips[merged_trips["distance"] >= 0.8]
    return merge_trips

In [None]:
df_2 = df_1.copy()
df_2 = merge_trips(df_2)
df_2["phase"] = df_2.apply(
    lambda r: (
        (
            1 if r["date"] >= pd.Timestamp("2024-09-24") else 0
        ) if r["group"] == 1 else (
            1 if r["date"] >= pd.Timestamp("2024-10-24") else 0
        )
    ),
    axis=1
)

In [None]:
df_2.to_csv(f"{DATA_FOLDER}/2.csv", index=False, header=True, sep=",", encoding="utf-8")

In [None]:
def get_app_open_count():
    def calculate_count(filename):
        df = pd.read_csv(f"C:/Users/Q672355/OneDrive/MA/Daten/{filename}.csv", sep=",", encoding="utf-8")
        df = df.dropna(subset=["Vehicle ID", "Trip Segments Timestamp"])
        df["Trip Segments Timestamp"] = df["Trip Segments Timestamp"].apply(lambda v: v.split(" ")[0])
        df = df.groupby("Vehicle ID").agg(
            min_timestamp=("Trip Segments Timestamp", "min"),
            max_timestamp=("Trip Segments Timestamp", "max"),
            open_count=("User App Open Count", "first"),
        ).reset_index()
        df["open_count"] = df["open_count"].apply(lambda v: 0 if pd.isna(v) else int(v))
        df["Vehicle ID"] = df["Vehicle ID"].apply(lambda v: int(v))
        return df.set_index("Vehicle ID").to_dict(orient="index")

    # set activeness via app open. active -> open at least once a week on avg
    open_count_2110 = calculate_count("2024-10-21_detailed_query_alternatives_segments")
    open_count_2212 = calculate_count("2024-12-22_detailed_query_alternatives_segments")

    active_vehicle_ids = []
    open_count_vids = {}
    for vehicle_id, data_2212 in open_count_2212.items():
        if vehicle_id in open_count_2110:
            data_2110 = open_count_2110[vehicle_id]
            open_count = data_2212["open_count"] - data_2110["open_count"]
            ts1 = pd.Timestamp(data_2110["max_timestamp"])
            ts2 = pd.Timestamp(data_2212["max_timestamp"])
        else:
            open_count = data_2212["open_count"]
            ts1 = pd.Timestamp(data_2212["min_timestamp"])
            ts2 = pd.Timestamp(data_2212["max_timestamp"])
        weeks = ((ts2 - ts1).days)//7
        if open_count >= weeks:
            active_vehicle_ids.append(int(vehicle_id))
        open_count_vids[int(vehicle_id)] = open_count
    return open_count_vids, active_vehicle_ids

In [None]:
open_count_vids, active_vehicle_ids = get_app_open_count()
df_3 = df_2.copy()
df_3["active"] = df_3.apply(lambda r: 1 if r["vehicle_id"] in active_vehicle_ids else 0, axis=1)
df_3["app_open_count"] = df_3.apply(lambda r: open_count_vids[r["vehicle_id"]], axis=1)

In [None]:
df_3.to_csv(f"{DATA_FOLDER}/3.csv", index=False, header=True, sep=",", encoding="utf-8")

In [None]:
# Regex to match 4-digit postcode
def get_postcode(df):
    pattern = r"\b\d{4}\b"
    fake_postcode = -1

    def get_postcode(location):
        try:
            return int(re.findall(pattern, location)[-1])
        except:
            nonlocal fake_postcode
            fake_postcode -= 1
            return fake_postcode

    df["postcode"] = df["start_location"].apply(get_postcode)
    return df


def get_weather_category(df):
    # Define thresholds
    SUNSHINE_THRESHOLD = 11 # hours of sunshine to consider sunny
    PRECIP_THRESHOLD = 1  # precipitation threshold in mm/day

    # Function to assign season based on month
    def assign_season(date):
        month = date.month
        if month in [11, 12]:  # Winter includes November and December
            return "Winter"
        elif month in [9, 10]:  # Autumn includes September and October
            return "Autumn"
        elif month in [7, 8]:  # Summer includes July and August
            return "Summer"

    df["season"] = df["date"].apply(assign_season)

    # Ensure "season" column is properly categorized
    df["season"] = pd.Categorical(df["season"], categories=["Summer", "Autumn", "Winter"], ordered=True)

    # Function to categorize weather with the additional condition for winter
    def categorize_weather(row):
        temp = row["apparent_temperature_mean"]
        precip = row["precipitation_sum"]
        sunshine = row["sunshine_duration"]
        if precip > PRECIP_THRESHOLD: # > 1 mm/day
            return "rainy"
        elif 13 <= temp <= 29: 
            return "sunny"
        #elif 0 <= temp < 13 and sunshine >= SUNSHINE_THRESHOLD: # varing between 1-12h/day
            #return "sunny"
        return "unfavorable"

    # Apply the updated categorization function
    df["weather_category"] = df.apply(categorize_weather, axis=1)
    return df

In [None]:
df_4 = df_3.copy()
df_4 = get_postcode(df_3)
weather_df = pd.read_csv(f"{DATA_FOLDER}/region_weather.csv")
df_4 = pd.merge(df_4, weather_df, on=["postcode", "date"], how="left")
df_4 = get_weather_category(df_4)

In [None]:
df_4.to_csv(f"{DATA_FOLDER}/4.csv", index=False, header=True, sep=",", encoding="utf-8")

In [None]:
def get_unique_driver_types(df):
    df = df[df["date"] < pd.Timestamp("2024-09-24")]
    """Add columns for trip type based on distance."""
    df["short_trip"] = df["distance"].apply(lambda x: 1 if x <= 5 else 0)
    df["medium_trip"] = df["distance"].apply(lambda x: 1 if 5 < x <= 30 else 0)
    df["long_trip"] = df["distance"].apply(lambda x: 1 if x > 30 else 0)
    user_trip_data = df.groupby("vehicle_id")[["short_trip", "medium_trip", "long_trip"]].sum()
    user_trip_data_sum = user_trip_data.sum(axis=1)
    user_trip_data_normalized = user_trip_data.div(user_trip_data_sum, axis=0).fillna(0)

    # Scale the data for clustering
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(user_trip_data_normalized)

    # Perform KMeans clustering
    kmeans = KMeans(n_clusters=3, random_state=42)
    user_trip_data_normalized["cluster"] = kmeans.fit_predict(scaled_data)

    # Map clusters to readable driver types
    cluster_names = {0: "medium_drivers", 1: "medium_drivers", 2: "short_drivers"}
    user_trip_data_normalized["driver_type"] = user_trip_data_normalized["cluster"].map(cluster_names)

    # Reset index for analysis
    user_trip_data_normalized = user_trip_data_normalized.reset_index()

    # Calculate average trip proportions per cluster
    cluster_averages = user_trip_data_normalized.groupby("driver_type")[["short_medium_trip", "medium_trip", "long_trip"]].mean()

    # Normalize cluster averages
    cluster_averages_normalized = cluster_averages.div(cluster_averages.sum(axis=1), axis=0)

    # Count the number of users in each cluster
    cluster_counts = user_trip_data_normalized["driver_type"].value_counts()

    # Plot cluster averages
    cluster_averages_normalized = cluster_averages_normalized.loc[["short_drivers", "medium_drivers", "long_drivers"]]
    cluster_counts = cluster_counts.loc[["short_drivers", "medium_drivers", "long_drivers"]]

    # Save a CSV with unique vehicle_id and driver_type
    unique_driver_types = user_trip_data_normalized[["vehicle_id", "driver_type"]].reset_index(drop=True)
    return unique_driver_types

In [None]:
unique_driver_types = get_unique_driver_types(df_4)
df_5 = df_4.merge(unique_driver_types, on="vehicle_id", how="inner")

In [None]:
df_5.to_csv(f"{DATA_FOLDER}/5.csv", index=False, header=True, sep=",", encoding="utf-8")

In [None]:
def get_car_usage(df):
    df["trip"] = 1
    df["vshort_trip"] = df.apply(lambda r: 1 if r["distance"] <= 1 else 0, axis=1)
    df["short_trip"] = df.apply(lambda r: 1 if 1 < r["distance"] <= 5 else 0, axis=1)
    df["medium_trip"] = df.apply(lambda r: 1 if 5 < r["distance"] <= 30 else 0, axis=1)
    df["long_trip"] = df.apply(lambda r: 1 if 30 < r["distance"] else 0, axis=1)
    trip_types = ["trip", "vshort_trip", "short_trip", "medium_trip", "long_trip"]
    car_usage = None
    for trip_type in trip_types:
        agg = df.groupby(["vehicle_id", "phase"]).agg(
            num_trips=(trip_type, "sum"),
            period=("date", lambda d: ((max(d) - min(d)).days + 1)/7),
        ).reset_index()
        agg[f"weekly_{trip_type}"] = agg["num_trips"] / agg["period"]
        agg = agg[["vehicle_id", "phase", f"weekly_{trip_type}", "periods"]]
        if car_usage is None:
            car_usage = agg
        else:
            car_usage = car_usage.merge(agg[["vehicle_id", "phase", f"weekly_{trip_type}"]], 
                                        on=["vehicle_id", "phase"], 
                                        how="left")
    car_usage = car_usage[car_usage["period"] >= 1]
    df = df.merge(car_usage, on=["vehicle_id", "phase"], how="left")
    return df

In [None]:
parallel_test_df_2 = df_1.copy()
parallel_test_df_2 = merge_trips(parallel_test_df_2)
parallel_test_df_2["phase"] = parallel_test_df_2.apply(
    lambda r: (
        (
            1 if r["date"] >= pd.Timestamp("2024-09-24") else 0
        ) if r["group"] == 1 else (
            1 if r["date"] >= pd.Timestamp("2024-10-24") else 0
        )
    ),
    axis=1
)

parallel_test_df_3 = parallel_test_df_2.copy()
parallel_test_df_3["active"] = parallel_test_df_3.apply(lambda r: 1 if r["vehicle_id"] in active_vehicle_ids else 0, axis=1)
parallel_test_df_3["app_open_count"] = parallel_test_df_3.apply(lambda r: open_count_vids[r["vehicle_id"]], axis=1)

In [None]:
weathers = ["sunny", "rainy", "unfavorable"]
week_types = [
    ("weekday", ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]), 
    ("weekend", ["Saturday", "Sunday"])
]
df_6_map = {}
for weather in weathers:
    df_6 = df_5.copy()
    df_6 = df_6[(df_6["weather_category"] == weather)]
    df_6_map[weather] = {}
    for week_type, weekdays in week_types:
        df_6 = df_6[df_6["weekday"].isin(weekdays)]
        df_6_map[weather][week_type] = get_car_usage(df_6)

In [None]:
def get_survey_info():    
    df = pd.read_excel(
        r"C:\Users\Q672355\OneDrive\MA\Daten\Umfragedaten_Pre-Survey_Distances.xlsx"
    )
    df = df[[
        "Tabelle1.Vehicle.ID",
        "Tabelle1.User.Group.Name", 
        "What is your age?",
        "What gender do you identify yourself with?",
        "What is the composition of your household?",
        "What ownership characteristic best describes your situation?",
        "What type of BMW do you drive?",
        "What describes your current situation best?",
        "Approximately, what is your yearly mileage?",
        "How often per week do you use the following modes of transportation? - Car",
        "How often per week do you use the following modes of transportation? - Public Transport",
        "How often per week do you use the following modes of transportation? - Bike / E-bike",
        "How often per week do you use the following modes of transportation? -  Scooter / motorbike",
        "Approximately, how many times a week do you use the car for the following trip lengths?  - Shorter than 10 km",
        "Approximately, how many times a week do you use the car for the following trip lengths?  - Between 10-50 km",
        "Approximately, how many times a week do you use the car for the following trip lengths?  - Longer than 50 km"
    ]]
    df.rename(columns={
        "Tabelle1.Vehicle.ID": "vehicle_id",
        "Tabelle1.User.Group.Name": "group_name", 
        "What is your age?": "age", 
        "What gender do you identify yourself with?": "gender",
        "What is the composition of your household?": "family_status",
        "What type of BMW do you drive?": "car_type",
        "What ownership characteristic best describes your situation?": "car_ownership",
        "What describes your current situation best?": "situation",
        "Approximately, what is your yearly mileage?": "yearly_mileage",
        "How often per week do you use the following modes of transportation? - Car": "usage_frequency",
        "How often per week do you use the following modes of transportation? - Public Transport": "public_transport_frequency",
        "How often per week do you use the following modes of transportation? - Bike / E-bike": "e_bike_frequency",
        "How often per week do you use the following modes of transportation? -  Scooter / motorbike": "scooter_motorbike_frequency",
        "Approximately, how many times a week do you use the car for the following trip lengths?  - Shorter than 10 km": "less_than_10km_frequency",
        "Approximately, how many times a week do you use the car for the following trip lengths?  - Between 10-50 km": "btw_10_50_km_frequency",
        "Approximately, how many times a week do you use the car for the following trip lengths?  - Longer than 50 km": "longer_than_50_km_frequency",
    }, inplace=True)

    age_mapping = {
        "18-24": "young",
        "25-34": "young",
        "35-44": "middle",
        "45-54": "middle",
        "45 - 54": "middle", 
        "55 - 64": "old",
        "55-64": "old",  
        "65 of ouder": "old",
        "Older than 65": "old",
    }
    """
    age_mapping = {
        "18-24": 20,
        "25-34": 30,
        "35-44": 40,
        "45-54": 50,
        "45 - 54": 50, 
        "55 - 64": 60,
        "55-64": 60,  
        "65 of ouder": 70,
        "Older than 65": 70,
    }
    """
    car_ownership_mapping = {
        "Private lease": "private_lease",
        "Zakelijke lease": "company_lease",
        "Privé eigenaar": "private_own",
        "Zakelijke eigenaar": "company_own",
    }

    car_type_mapping = {
        "Benzine": "Benzine",
        "Diesel": "Diesel",
        "Plug-in Hybride": "PHEV",
        "Volledig elektrisch": "EL",
    }

    gender_mapping = {
        "Man": "man",
        "Vrouw": "woman",
        "Ik geef liever geen antwoord": "woman",
    }
    family_status_mapping = {
        "Anders, namelijk": "without_children",
        "Eenpersoons huishouden": "single",
        "Andere vormen van samenwonen (kamergenoten, huisgenoten, etc.)": "without_children",
        "Samenwonend zonder thuiswondende kinderen": "without_children",
        "Samenwonend met thuiswonende kinderen": "with_children",
        "Alleenstaande ouder met kinderen": "with_children",
        "Samenwondend met andere familieleden": "extended_family",
    }

    situation_mapping = {
        "Niet aanwezig in de arbeidspopulatie": "unemployed",
        "Student": "unemployed",
        "Werkloos": "unemployed",
        "Anders, namelijk": "unemployed",
        "Werkzaam (full-time of part-time)": "employed",
        "Zelfstandig werknemer": "employed"
    }

    yearly_mileage_mapping = {
        "Minder dan 5.000 km": "Less than 5000 km",
        "Tussen 5.000 km en 10.000 km": "5000-10000 km",
        "Tussen 10.000 km en 25.000 km": "10000-25000 km",
        "Tussen 25.000 km en 50.000 km": "25000-50000 km",
        "Meer dan 50.000 km": "More than 50000 km",
    }

    usage_frequency_mapping = {
        "1-3 dagen": "sometimes",
        "3-5 dagen": "often",
        "Meer dan 5 dagen": "usual",
    }

    public_transport_frequency_mapping = {
        "Nooit": "never",
        "1-3 dagen": "sometimes",
        "3-5 dagen": "often",
        "Meer dan 5 dagen": "usual",
    }

    e_bike_frequency_mapping = {
        "Nooit": "never",
        "1-3 dagen": "sometimes",
        "3-5 dagen": "often",
        "Meer dan 5 dagen": "usual",
    }

    scooter_motorbike_frequency_mapping = {
        "Nooit": "never",
        "1-3 dagen": "sometimes",
        "3-5 dagen": "often",
        "Meer dan 5 dagen": "usual",
    }

    length_less_10_frequency_mapping = {
        "Nooit": "never",
        "1-3 dagen": "sometimes",
        "3-5 dagen": "often",
        "Meer dan 5 dagen": "usual",
    }

    length_10_50_frequency_mapping = {
        "Nooit": "never",
        "1-3 dagen": "sometimes",
        "3-5 dagen": "often",
        "Meer dan 5 dagen": "usual",
    }

    length_than_50_frequency_mapping = {
        "Nooit": "never",
        "1-3 dagen": "sometimes",
        "3-5 dagen": "often",
        "Meer dan 5 dagen": "usual",
    }


    def mapping(row):
        row["age"] = age_mapping[row["age"]]
        row["gender"] = gender_mapping[row["gender"]]
        row["family_status"] = family_status_mapping[row["family_status"]]
        row["car_ownership"] = car_ownership_mapping[row["car_ownership"]]
        row["car_type"] = car_type_mapping[row["car_type"]]
        row["situation"] = situation_mapping[row["situation"]]
        row["yearly_mileage"] = yearly_mileage_mapping[row["yearly_mileage"]]
        row["usage_frequency"] = usage_frequency_mapping[row["usage_frequency"]]
        row["public_transport_frequency"] = public_transport_frequency_mapping[row["public_transport_frequency"]]
        row["e_bike_frequency"] = e_bike_frequency_mapping[row["e_bike_frequency"]]
        row["scooter_motorbike_frequency"] = scooter_motorbike_frequency_mapping[row["scooter_motorbike_frequency"]]
        row["less_than_10km_frequency"] = length_less_10_frequency_mapping[row["less_than_10km_frequency"]]
        row["btw_10_50_km_frequency"] = length_10_50_frequency_mapping[row["btw_10_50_km_frequency"]]
        row["longer_than_50_km_frequency"] = length_than_50_frequency_mapping[row["longer_than_50_km_frequency"]]
        return row
    survey_info = df.apply(mapping, axis=1)
    return survey_info

In [None]:
survey_info = get_survey_info()

In [None]:
def mixed_effect_model(df, outcome_var, group_col, phase_col):
    """
    Fit a mixed-effects model with specified fixed and random effects.
    
    Parameters:
    - df: DataFrame containing the data.
    - outcome_var: Dependent variable (e.g., "weekly_trip").
    - group_col: Column indicating treatment group (e.g., "active").
    - phase_col: Column indicating the treatment period (e.g., "phase").
    
    Returns:
    - Fitted model object.
    """
    print(f"-------------------------------[{outcome_var}]-------------------------------")
    
    df[group_col] = df[group_col].astype("category")
    df[phase_col] = df[phase_col].astype("category")
    phase_counts = df.groupby("vehicle_id")["phase"].nunique()
    selected_vids = set(phase_counts[phase_counts == 2].index.tolist())
    df = df[df["vehicle_id"].isin(selected_vids)]
    # Model formula: interaction between phase and group
    formula = f"{outcome_var} ~ C({phase_col}, Treatment(0)) * C({group_col}, Treatment(0))"

    # Fit mixed-effects model
    model = smf.mixedlm(
        formula=formula,
        data=df,
        groups=df["vehicle_id"],  # Random intercept
        re_formula="~C(phase, Treatment(0))"  # Random slope
    ).fit()

    # Compute R² values
    fixed_effects_variance = np.var(model.fittedvalues)
    residual_variance = model.scale
    random_effects_variance = sum(np.var(v) for v in model.random_effects.values()) if model.random_effects else 0

    marginal_r2 = fixed_effects_variance / (fixed_effects_variance + random_effects_variance + residual_variance)
    conditional_r2 = (fixed_effects_variance + random_effects_variance) / (fixed_effects_variance + random_effects_variance + residual_variance)

    print(f"Marginal R² (fixed effects only): {marginal_r2:.4f}")
    print(f"Conditional R² (fixed + random effects): {conditional_r2:.4f}")
    print(model.cov_re)  # Shows variance of random intercept & slope

    # Count number of unique vehicles per group
    count_phase_1 = df[df[group_col] == 1]["vehicle_id"].nunique()
    count_phase_0 = df[df[group_col] == 0]["vehicle_id"].nunique()
    total_count = count_phase_1 + count_phase_0
    percentage_difference = abs(count_phase_1 - count_phase_0) / total_count * 100

    print(f"Number of unique vehicle_id where active == 1 (active): {count_phase_1}")
    print(f"Number of unique vehicle_id where active == 0 (inactive): {count_phase_0}")
    print(f"Percentage difference: {percentage_difference:.3f}%")

    # # Extract DiD effect and p-value
    # interaction_term = f"C({phase_col}, Treatment(0))[T.1]:C({group_col}, Treatment(0))[T.1]"
    # did_effect = model.params.get(interaction_term, 0)
    # p_value = model.pvalues.get(interaction_term, 1)

    # se_did = model.bse.get(interaction_term, 0)
    # ci_lower, ci_upper = model.conf_int().loc[interaction_term]
    return model


def mixed_effect_model_moderating(df, outcome_var, group_col, phase_col, group_cat, moderating_effects=None):
    """
    Fit a mixed-effects model with specified moderating and random effects.
    
    Parameters:
    - df: DataFrame containing the data.
    - outcome_var: Dependent variable (e.g., "outcome").
    - group_col: Column indicating treatment group (e.g., "group").
    - phase_col: Column indicating the treatment period (e.g., "phase").
    - random_effects: Dictionary of random effects in vc_formula format.
    
    Returns:
    - Fitted model object.
    """
    print(f"-------------------------------[{outcome_var}]-------------------------------")
    # Ensure categorical variables are treated as such
    phase_counts = df.groupby("vehicle_id")["phase"].nunique()
    selected_vids = set(phase_counts[phase_counts == 2].index.tolist())
    df = df[df["vehicle_id"].isin(selected_vids)]

    formula = f"{outcome_var} ~ C({phase_col}, Treatment(0)) * C({group_col}, Treatment(0))"
    df.loc[:, group_col] = df[group_col].astype("category")
    df.loc[:, phase_col] = df[phase_col].astype("category")
    if moderating_effects is not None:
        for moderating_effect, reference in moderating_effects:
            if moderating_effect != "app_open_count_type":
                df.loc[:, moderating_effect] = df[moderating_effect].astype("category")
                # formula += f" + C({moderating_effect}, Treatment('{reference}')) * C({phase_col}, Treatment(0))"
                if isinstance(reference, str):
                    formula += f" + C({moderating_effect}, Treatment('{reference}')) * C({phase_col}, Treatment(0))"
                elif isinstance(reference, (int, float)):
                    formula += f" + C({moderating_effect}, Treatment({reference}))  * C({phase_col}, Treatment(0))"
            else:
                formula += f" + C({moderating_effect}, Treatment('{reference}'))  * C({phase_col}, Treatment(0))"
    # moderating effects formula
    # Variance components formula
    re_formula = f" ~C({phase_col}, Treatment(0))" # Random effects model (random intercept for group = "vehicle_id")
 #random_cat: specific factor varing across individuals  (work not value does not change per individual...)
    # Fit the mixed-effects model
    model = smf.mixedlm(
        formula=formula,
        data=df,
        groups=df[group_cat], # Random effects account for variability across groups (in your case, different vehicles). These represent the individual deviations from the moderating effects.
        re_formula=re_formula,  # Random effects for variables
    ).fit()
    # Display the models
    print(f"Covariance matrix of random effects: {model.cov_re}")
    return model

In [None]:
def plot_did(df, outcome_var, group_col, phase_col, category):
    """
    Plot the DiD results with error bars and annotations.
    """
    # Run model
    model, did_effect, p_value, se_did, _, _ = mixed_effect_model(df, outcome_var, group_col, phase_col)

    # Generate model-based predictions
    pred_data = df.copy()
    pred_data["prediction"] = model.predict(pred_data)

    # Compute model-based means and standard errors
    summary_model = pred_data.groupby([phase_col, group_col])["prediction"].agg(["mean", "sem"]).reset_index()
    summary_model[phase_col] = summary_model[phase_col].map({0: "Pre Intervention", 1: "Post Intervention"})
    summary_model[group_col] = summary_model[group_col].map({0: "Inactive", 1: "Active"})

    # Extract model-based values
    means = summary_model.pivot(index=phase_col, columns=group_col, values="mean")
    errors = summary_model.pivot(index=phase_col, columns=group_col, values="sem")

    # Find vehicles that exist in both phases
    vehicles_in_both_phases = df.groupby("vehicle_id")[phase_col].nunique()
    valid_vehicles = vehicles_in_both_phases[vehicles_in_both_phases == 2].index

    # Count unique vehicle IDs per group (ensuring they exist in both phases)
    filtered_df = df[df["vehicle_id"].isin(valid_vehicles)]
    counts = filtered_df.groupby(group_col)["vehicle_id"].nunique()

    # Extract adjusted values
    active_before, active_after = means.loc["Pre Intervention", "Active"], means.loc["Post Intervention", "Active"]
    inactive_before, inactive_after = means.loc["Pre Intervention", "Inactive"], means.loc["Post Intervention", "Inactive"]

    # Compute adjusted within-group changes
    change_active = active_after - active_before
    change_inactive = inactive_after - inactive_before


    # Significance marker
    significance = ""
    if p_value < 0.01:
        significance = "(strong evidence)"
    elif p_value < 0.05:
        significance = "(moderate evidence)"
    elif p_value < 0.1:
        significance = "(weak evidence)"

    # Colors
    colors = {"Inactive": "steelblue", "Active": "firebrick"}

    # Plot
    _, ax = plt.subplots(figsize=(8, 5))
    ax.set_xlim(left=-0.5, right=1.3)  # Adjust as needed

    ax.axvline(x=0.5, color="green", linestyle="dashed", linewidth=1.5, label="Intervention")

    # Scatter points with error bars
    # Find vehicles that exist in both phases
    vehicles_in_both_phases = df.groupby("vehicle_id")[phase_col].nunique()
    valid_vehicles = vehicles_in_both_phases[vehicles_in_both_phases == 2].index

    # Filter the dataset to include only these vehicles
    filtered_df = df[df["vehicle_id"].isin(valid_vehicles)]

    # Count unique vehicle IDs per group (ensuring they exist in both phases)
    counts = filtered_df.groupby(group_col)["vehicle_id"].nunique()

    for group in ["Active", "Inactive"]:
        # Convert group label to match df[group_col] values (assuming 1 = Active, 0 = Inactive)
        group_value = 1 if group == "Active" else 0
        ax.errorbar(
            ["Pre Intervention", "Post Intervention"],
            means[group],
            yerr=errors[group],
            fmt="o",
            color=colors[group],
            markersize=6,
            capsize=4,
            label=f"{group} (N={int(counts[group_value])})"
        )

    # Connect points with lines
    ax.plot(["Pre Intervention", "Post Intervention"], [inactive_before, inactive_after], color=colors["Inactive"], linestyle="-", linewidth=2)
    ax.plot(["Pre Intervention", "Post Intervention"], [active_before, active_after], color=colors["Active"], linestyle="-", linewidth=2)

    # Annotations for within-group changes (aligned with vertical brackets)
    text_x_offset = 0.08  # Shift text left
    bracket_x_offset = 0.02 # Moves the dashed bracket slightly right

    # Difference brackets (dashed lines)
    ax.plot([1 + bracket_x_offset, 1 + bracket_x_offset], [active_before, active_after], 
            color=colors["Active"], linestyle="--", linewidth=2)
    ax.annotate(f"{change_active:.3f}", xy=(1, active_after), 
                xytext=(1 + text_x_offset, active_before),
                ha="left", fontsize=12, color=colors["Active"], fontweight="bold")

    ax.plot([1 + bracket_x_offset, 1 + bracket_x_offset], [inactive_before, inactive_after], 
            color=colors["Inactive"], linestyle="--", linewidth=2)
    ax.annotate(f"{change_inactive:.3f}", xy=(1, inactive_after), 
                xytext=(1 + text_x_offset, inactive_before),
                ha="left", fontsize=12, color=colors["Inactive"], fontweight="bold")

    # Compute y-limits
    y_min = min(means.min()) - 1.5
    y_max = max(means.max()) + 1.5
    ax.set_ylim(y_min, y_max)

    # Treatment effect annotation (Δ - Moved Left)
    ci_text = f"Δ = {did_effect:.3f} {significance}" #(95% CI: [{ci_lower:.3f}, {ci_upper:.3f}])"
    ax.annotate(ci_text, 
                xy=(1, active_after), 
                xytext=(0.1, min(means.min()) - 0.5),  
                ha="left", fontsize=10, color="black", fontweight="bold",
                bbox=dict(facecolor="white", edgecolor="black", boxstyle="round,pad=0.3"))

    formatted_outcome_var = outcome_var.replace("_", " ").title()  # Converts "weekly_trips" -> "Weekly Trips"

    if category in ["ICE", "BEV", "PHEV"]: 
        formatted_category = category  # Converts "weekly_trips" -> "Weekly Trips"
    else:
        formatted_category = category.replace("_", " ").title()  # Converts "weekly_trips" -> "Weekly Trips"

    ax.set_xticks([0, 0.5, 1])  # Positions for labels
    ax.set_xticklabels(["Pre Intervention", "Intervention", "Post Intervention"])
    plt.xlabel("Phase", fontsize=12)
    plt.ylabel(f"Average Number of {formatted_outcome_var}s", fontsize=12)  # More explicit
    plt.title(f"Intervention Effect on {formatted_outcome_var}s - {formatted_category}", fontsize=12, fontweight="bold")
    plt.grid(True, linestyle="--", alpha=0.6)
    plt.legend()
    plt.show()
    print(f"DiD effect for {outcome_var}: {did_effect:.3f} SE: {se_did} (p={p_value:.3f}) {significance}")


def estimation_on_family_status(df, outcome_var, group_col, phase_col):
    df = pd.merge(df, survey_info, how="inner", on="vehicle_id")
    
    for category in ["without_children", "with_children"]:  # Renamed to "category" for consistency
        d = df[df["family_status"] == category].copy()
        count_matching = d[d[group_col] == 1]["vehicle_id"].count()
        
        print(f"Number of vehicle_id matching the category '{category}'': {count_matching}")
        
        if len(d) > 2:
            print(f"Within estimation on [family status] {category}")
            
            # Unpack the tuple returned by mixed_effect_model
            model = mixed_effect_model(
                d, outcome_var=outcome_var, group_col=group_col, phase_col=phase_col
            )
            
            # Print model summary correctly
            print(model.summary())  

            # Plot results
            plot_did(d, outcome_var, group_col=group_col, phase_col=phase_col, category=category)

In [None]:
outcome_var = "weekly_trip"
phase_col = "phase"
group_cat = "vehicle_id"
for weather, item in df_6_map.items():
    for week_type, df_6 in item.items():
        for group_col in ["inactive", "active"]:
            print(f"--------------------------------{weather}[{week_type}]--------------------------------")
            model = mixed_effect_model(
                df_6,
                outcome_var=outcome_var,
                group_col=group_col,
                phase_col=phase_col,
            )
            print(model.summary())
            model = mixed_effect_model_moderating(
                df_6,
                outcome_var=outcome_var,
                group_col=group_col,
                phase_col=phase_col,
                group_cat=group_cat,
                moderating_effects=[("alt_score_type", "low")]
            )
            print(model.summary())
            estimation_on_family_status(df_6, outcome_var, group_col, phase_col)

In [None]:
# parameters store
parameters = dict()
vehicle_type_map = {
    "Fuel Vehicles": "ICE",
    "Electric Vehicles": "BEV",
    "Hybrid Fuel-Only Vehicles": "PHEV (Fuel > 0, Electric = 0)",
    "Hybrid Electric-Only Vehicles": "PHEV (Fuel = 0, Electric > 0)",
    "Hybrid Fuel-Dominated Vehicles": "PHEV (Fuel > 0, Electric < 0)",
    "Hybrid Combined Vehicles": "PHEV (Fuel > 0, Electric > 0)"
}

# Exponential Regression Function
def exponential_model(x, a, b, c):
    return a * np.exp(-b * x) + c

def fit_exponential_model(data, speed_col, consumption_col):
    x = data[speed_col].values
    y = data[consumption_col].values
    if len(x) < 2:
        return None  # Not enough data to fit the model
    popt, _ = curve_fit(exponential_model, x, y, maxfev=10000)
    return popt


# Helper function to find the elbow point based on the derivative
def find_elbow_point(params, speed_range):
    derivative = -params[0] * params[1] * np.exp(-params[1] * speed_range)
    elbow_index = np.argmin(np.abs(derivative - np.mean(derivative)))  # Approximation for elbow
    elbow_speed = speed_range[elbow_index]
    elbow_rate = exponential_model(np.array([elbow_speed]), *params)[0]
    return elbow_speed, elbow_rate


# Plot Consumption Rate Function
def analyze_fuel_consumption(df, speed_col, rate_col, vehicle_type):
    if df.empty:
        print(f"No data available for {vehicle_type}. Skipping...")
        return

    plt.figure(figsize=(12, 7))

    # Split data by efficient mode status
    df_on = df[df["efficient_mode_status"] == "On"]
    df_off = df[df["efficient_mode_status"] == "Off"]

    # Ensure data for both "On" and "Off" are present
    if df_on.empty and df_off.empty:
        print(f"No data for {vehicle_type} in both 'On' and 'Off' modes. Skipping...")
        return

    # Fit models separately for "On" and "Off" modes
    params_on = fit_exponential_model(df_on, speed_col, rate_col) if not df_on.empty else None
    params_off = fit_exponential_model(df_off, speed_col, rate_col) if not df_off.empty else None

    parameters[vehicle_type] = {"On": list(params_on), "Off": list(params_off)}

    # Plot the regression and data points
    def plot_regression(df_on, df_off, params_on, params_off, vehicle_type, speed_col, rate_col):
        # Plot scatter points for "On" and "Off"
        if not df_on.empty:
            plt.scatter(df_on[speed_col], df_on[rate_col], color="#035970", alpha=0.5, label=("Efficient Mode: On / Off"))
        if not df_off.empty:
            plt.scatter(df_off[speed_col], df_off[rate_col], color="#035970", alpha=0.5)

        # Define speed range for plotting regression lines
        speed_min = min(df_on[speed_col].min(), df_off[speed_col].min())
        speed_max = max(df_on[speed_col].max(), df_off[speed_col].max())
        speeds = np.linspace(speed_min, speed_max, 1000)

        # Plot regression and elbow points for "On"
        if params_on is not None:
            predicted_rate_on = exponential_model(speeds, *params_on)
            plt.plot(speeds, predicted_rate_on, color="green", linewidth=2, linestyle="dashed", label=f"Regression (On)")
            r2_on = r2_score(df_on[rate_col], exponential_model(df_on[speed_col].values, *params_on))
            mse_on = np.sqrt(mean_squared_error(df_on[rate_col], exponential_model(df_on[speed_col], *params_on)))

            plt.text(0.05, 0.95, f"On R² = {r2_on:.3f} and MSE = {mse_on:.3f}", transform=plt.gca().transAxes, fontsize=12, color="green",
            verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))

            func_text_on = f"y = {params_on[0]:.3f} * exp(-{params_on[1]:.3f} * x) + {params_on[2]:.3f}"
            plt.text(0.05, 0.90, f"{func_text_on}", transform=plt.gca().transAxes, fontsize=12, color="green",
                    verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))
            """
            # Find and plot the elbow point for "On"
            elbow_on_speed, elbow_on_rate = find_elbow_point(params_on, speeds)
            plt.scatter(elbow_on_speed, elbow_on_rate, color="green", s=100, label="", zorder=5)
            plt.annotate(f"Elbow Point (On): {elbow_on_speed:.1f} km/h\nRate: {elbow_on_rate:.3f}",
                         (elbow_on_speed, elbow_on_rate), textcoords="offset points", xytext=(70, -30),
                         arrowprops=dict(arrowstyle="->", color="green"),  bbox=dict(boxstyle="round", facecolor="white", alpha=0.7), fontsize=10, color="green")
            """
        # Plot regression and elbow points for "Off"
        if params_off is not None:
            predicted_rate_off = exponential_model(speeds, *params_off)
            plt.plot(speeds, predicted_rate_off, color="red", linewidth=2, linestyle="dotted", label=f"Regression (Off)")
            r2_off = r2_score(df_off[rate_col], exponential_model(df_off[speed_col].values, *params_off))
            mse_off = np.sqrt(mean_squared_error(df_off[rate_col], exponential_model(df_off[speed_col], *params_off)))
            plt.text(0.05, 0.85, f"Off R² = {r2_off:.3f} and MSE = {mse_off:.3f}", transform=plt.gca().transAxes, fontsize=12, color="red",
            verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))
            func_text_off = f"y = {params_off[0]:.3f} * exp(-{params_off[1]:.3f} * x) + {params_off[2]:.3f}"
            plt.text(0.05, 0.80, f"{func_text_off}", transform=plt.gca().transAxes, fontsize=12, color="red",
                    verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))


            # Find and plot the elbow point for "Off"
           # elbow_off_speed, elbow_off_rate = find_elbow_point(params_off, speeds)
            #plt.scatter(elbow_off_speed, elbow_off_rate, color="red", s=100, label="", zorder=5)
            #plt.annotate(f"Elbow Point (Off): {elbow_off_speed:.1f} km/h\nRate: {elbow_off_rate:.3f}",
                        # (elbow_off_speed, elbow_off_rate), textcoords="offset points", xytext=(30, -10),
                        # arrowprops=dict(arrowstyle="->", color="red"), bbox=dict(boxstyle="round", facecolor="white", alpha=0.7), fontsize=10, color="red")

        # Add title and labels
        plt.title(f"Energy Consumption vs. Speed - {vehicle_type_map[vehicle_type]}", fontsize=12)
        plt.xlabel("Speed [km/h]", fontsize=14)
        plt.ylabel("Energy Consumption [kWh/100km]", fontsize=14)
        plt.grid(True)
        plt.legend(loc="center right", fontsize=10, frameon=True)
        plt.tight_layout()
        plt.show()

    plot_regression(df_on, df_off, params_on, params_off, vehicle_type, speed_col, rate_col)


# Fit and Plot for Hybrid Cases (Combined Mode)
def analyze_electric_comsumption(data, temp_threshold, speed_col, consumption_col, vehicle_type):
    if data.empty:
        print(f"No data available for {vehicle_type}. Skipping...")
        return

    # Split data based on temperature threshold
    data_below = data[data["temperature_2m_mean"] < temp_threshold]
    data_above = data[data["temperature_2m_mean"] >= temp_threshold]

    # Further split based on efficient mode
    def split_by_mode(df):
        return df[df["efficient_mode_status"] == "On"], df[df["efficient_mode_status"] == "Off"]

    data_below_on, data_below_off = split_by_mode(data_below)
    data_above_on, data_above_off = split_by_mode(data_above)

    # Fit models
    def fit_model(df):
        return fit_exponential_model(df, speed_col, consumption_col) if not df.empty else None

    params_below_on = fit_model(data_below_on)
    params_below_off = fit_model(data_below_off)
    params_above_on = fit_model(data_above_on)
    params_above_off = fit_model(data_above_off)

    parameters[vehicle_type] = {"On": dict(), "Off": dict()}
    parameters[vehicle_type]["On"] = {
        "Below": list(params_below_on),
        "Above": list(params_above_on),
    }
    parameters[vehicle_type]["Off"] = {
        "Below": list(params_below_off),
        "Above": list(params_above_off),
    }

    # Create subplots
    _, axes = plt.subplots(1, 2, figsize=(14, 7))

    def plot_regression(ax, df_on, df_off, params_on, params_off, title):
        if df_on.empty and df_off.empty:
            ax.set_title(f"{title} (No Data)", fontsize=16)
            ax.axis("off")
            return

        # Use the same scatter color for both On and Off data
        ax.scatter(df_on[speed_col], df_on[consumption_col], color="#035970", alpha=0.5, label=("Efficient Mode: On / Off"))
        ax.scatter(df_off[speed_col], df_off[consumption_col], color="#035970", alpha=0.5)

        # Define speed range safely
        all_speeds = np.concatenate([df_on[speed_col].values, df_off[speed_col].values]) if not (df_on.empty and df_off.empty) else []
        if len(all_speeds) > 0:
            speed_min, speed_max = np.min(all_speeds), np.max(all_speeds)
            speeds = np.linspace(speed_min, speed_max, 1000)

            # Plot regression for "On" data
            if params_on is not None:
                ax.plot(speeds, exponential_model(speeds, *params_on), color="green", linestyle="dashed", linewidth=2, label="Regression (On)")
                r2_on = r2_score(df_on[consumption_col], exponential_model(df_on[speed_col], *params_on))
                mse_on = np.sqrt(mean_squared_error(df_on[consumption_col], exponential_model(df_on[speed_col], *params_on)))

                ax.text(0.05, 0.95, f"On R² = {r2_on:.3f} and MSE = {mse_on:.3f}", transform=ax.transAxes, fontsize=12, color="green",
                        verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))
                # Display function text for On
                func_text_on = f"y = {params_on[0]:.3f} * exp(-{params_on[1]:.3f} * x) + {params_on[2]:.3f}"
                ax.text(0.05, 0.90, func_text_on, transform=ax.transAxes, fontsize=12, color="green", 
                        verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))

            # Plot regression for "Off" data
            if params_off is not None:
                ax.plot(speeds, exponential_model(speeds, *params_off), color="red", linestyle="dotted", linewidth=2, label="Regression (Off)")
                r2_off = r2_score(df_off[consumption_col], exponential_model(df_off[speed_col], *params_off))
    
                mse_off = np.sqrt(mean_squared_error(df_off[consumption_col], exponential_model(df_off[speed_col], *params_off)))

                ax.text(0.05, 0.85, f"Off R² = {r2_off:.3f} and MSE = {mse_off:.3f}", transform=ax.transAxes, fontsize=12, color="red",
                        verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))
                # Display function text for Off
                func_text_off = f"y = {params_off[0]:.3f} * exp(-{params_off[1]:.3f} * x) + {params_off[2]:.3f}"
                ax.text(0.05, 0.80, func_text_off, transform=ax.transAxes, fontsize=12, color="red", 
                        verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.7))
        """
        # Find and mark elbow points for both "On" and "Off" models
        if params_on is not None:
            elbow_on_speed, elbow_on_rate = find_elbow_point(params_on, speeds)
            ax.scatter(elbow_on_speed, elbow_on_rate, color="green", s=100, label="", zorder=5)
            ax.annotate(f"Elbow Point (On): {elbow_on_speed:.1f} km/h\nRate: {elbow_on_rate:.3f}",
                        (elbow_on_speed, elbow_on_rate), textcoords="offset points", xytext=(80, -40),
                        arrowprops=dict(arrowstyle="->", color="green"), bbox=dict(boxstyle="round", facecolor="white", alpha=0.7), fontsize=10, color="green")

        if params_off is not None:
            elbow_off_speed, elbow_off_rate = find_elbow_point(params_off, speeds)
            ax.scatter(elbow_off_speed, elbow_off_rate, color="red", s=100, label="", zorder=5)
            ax.annotate(f"Elbow Point (Off): {elbow_off_speed:.1f} km/h\nRate: {elbow_off_rate:.3f}",
                        (elbow_off_speed, elbow_off_rate), textcoords="offset points", xytext=(30, -10),
                        arrowprops=dict(arrowstyle="->", color="red"), bbox=dict(boxstyle="round", facecolor="white", alpha=0.7), fontsize=10, color="red")
        """
        # Rotate x-axis labels by 45 degrees for both subplots
        
        ax.set_title(title, fontsize=12)
        ax.set_xlabel("Speed [km/h]", fontsize=14)
        ax.set_ylabel("Energy Consumption [kWh/100km] or [L/100 km]", fontsize=14)
        ax.grid(True)
        ax.legend(loc="center right", fontsize=10, frameon=True)

    # Plot for below and above threshold
    plot_regression(axes[0], data_below_on, data_below_off, params_below_on, params_below_off, f"Energy Consumption vs. Speed - {vehicle_type_map[vehicle_type]} (<= {temp_threshold}°C)")
    plot_regression(axes[1], data_above_on, data_above_off, params_above_on, params_above_off, f"Energy Consumption vs. Speed - {vehicle_type_map[vehicle_type]} (> {temp_threshold}°C)")

    plt.tight_layout()
    plt.show()


def plot_consumption_vehicle_stats(hybrid_electric_only, hybrid_fuel_only, hybrid_combined_case1, hybrid_combined_case2):
    # Count Vehicles in Each Hybrid Mode
    hybrid_modes = ["Electric Only", "Fuel Only", "Hybrid Fuel-Dominated", "Hybrid Combined"]
    vehicle_counts = [len(hybrid_electric_only), len(hybrid_fuel_only), len(hybrid_combined_case1), len(hybrid_combined_case2)]

    # Calculate percentages
    total_vehicles = sum(vehicle_counts)
    percentages = [(count / total_vehicles) * 100 for count in vehicle_counts]

    # Create figure
    color_list = ["#035970", "salmon" , "#FFA07A", "#4F8B9B"]
    bars = plt.bar(hybrid_modes, vehicle_counts, color=color_list, alpha=0.7)

    # Add annotations for both counts and percentages
    for i, bar in enumerate(bars):
        plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 500, 
                f"{vehicle_counts[i]}\n({percentages[i]:.1f}%)", 
                ha="center", fontsize=12, fontweight="bold")

    # Labels and title
    plt.xlabel("Hybrid Mode", fontsize=12)
    plt.xticks(rotation=60)
    plt.ylabel("Number of Trips (Absolute & Percentage)", fontsize=12)
    plt.ylim(0, max(vehicle_counts) * 1.2)  # Extend y-limit for better spacing
    plt.title("Distribution of Modes used in PHEV", fontsize=14)
    plt.grid(axis="y", linestyle="--", alpha=0.5)
    plt.show()

In [None]:
with open(f"{DATA_FOLDER}/parameters_exp.json", "w", encoding="utf-8") as fw:
    json.dump(parameters, fw, indent=4)

In [None]:
# Load Data
df_7 = df_6.copy()
df_7 = df_7[df_7["speed"] > 0]
df_7 = df_7.dropna(subset=["speed", "electric_consumption", "fuel_consumption"])
# Apply Temperature Threshold for Electric-relevant Models Only
temp_threshold = 8

df_7["efficient_mode_status"] = np.where(df_7["efficient_mode"] == 0, "Off", "On")

# ICE
fuel_vehicles = df_7[(df_7["vehicle_type"] == "BE") & (df_7["fuel_consumption"] > 0)]

# BEV
electric_vehicles = df_7[(df_7["vehicle_type"] == "EL") & (df_7["electric_consumption"] > 0)]

# HY Cases
# Case 1: Electric Only
hybrid_electric_only = df_7[(df_7["vehicle_type"] == "HY") & (df_7["electric_consumption"] > 0) & (df_7["fuel_consumption"] == 0)]

# Case 2: Fuel Only
hybrid_fuel_only = df_7[(df_7["vehicle_type"] == "HY") & (df_7["fuel_consumption"] > 0) & (df_7["electric_consumption"] == 0)]

# Case 3: Fuel-dominated
hybrid_combined_case1 = df_7[(df_7["vehicle_type"] == "HY") & (df_7["electric_consumption"] < 0) & (df_7["fuel_consumption"] > 0)].copy()
hybrid_combined_case1["combined_consumption_fuel"] = hybrid_combined_case1["fuel_consumption"]

# Case 4: Combined Mode
hybrid_combined_case2 = df_7[(df_7["vehicle_type"] == "HY") & (df_7["electric_consumption"] > 0) & (df_7["fuel_consumption"] > 0)].copy()
hybrid_combined_case2["combined_consumption"] = hybrid_combined_case2["electric_consumption"] + hybrid_combined_case2["fuel_consumption"]

# Plot Electric Consumption for Electric Vehicles
analyze_electric_comsumption(electric_vehicles, temp_threshold, "speed", "electric_consumption", "Electric Vehicles")

# Plot Electric Consumption for Hybrid Electric Only Vehicles
analyze_electric_comsumption(hybrid_electric_only, temp_threshold, "speed", "electric_consumption", "Hybrid Electric-Only Vehicles")

# Plot Combined Consumption for Hybrid Combined Mode Vehicles
analyze_electric_comsumption(hybrid_combined_case2, temp_threshold, "speed", "combined_consumption", "Hybrid Combined Vehicles")

# Plot Fuel Consumption for BE and Hybrid Modes Vehicles
analyze_fuel_consumption(fuel_vehicles, "speed", "fuel_consumption", "Fuel Vehicles")
analyze_fuel_consumption(hybrid_fuel_only, "speed", "fuel_consumption", "Hybrid Fuel-Only Vehicles")
analyze_fuel_consumption(hybrid_combined_case1,"speed", "combined_consumption_fuel", "Hybrid Fuel-Dominated Vehicles")

# Plot Number of Vehicles w.r.t Vehicle Types
plot_consumption_vehicle_stats(hybrid_electric_only, hybrid_fuel_only, hybrid_combined_case1, hybrid_combined_case2)

In [None]:
# Define your calculation functions
def calculate_savings_fuel_vehicles(a, b, c, x, speed, distance, fuel_price):
    fuel_consumption_per_100km = a * np.exp(-b * speed) + c
    fuel_consumption_per_km = fuel_consumption_per_100km / 100
    total_fuel_savings = x * distance * fuel_consumption_per_km
    fuel_cost_savings = total_fuel_savings * fuel_price
    return round(total_fuel_savings, 3), round(fuel_cost_savings, 3)

def calculate_savings_electric_vehicles(a, b, c, x, speed, distance, electric_price):
    electric_consumption_per_100km = a * np.exp(-b * speed) + c
    electric_consumption_per_km = electric_consumption_per_100km / 100
    total_electric_savings = x * distance * electric_consumption_per_km
    electric_cost_savings = total_electric_savings * electric_price
    return round(total_electric_savings, 3), round(electric_cost_savings, 3)


# Define calculation functions for different vehicle types
saving_calculation_funcs = {
    "Fuel Vehicles": calculate_savings_fuel_vehicles,
    "Electric Vehicles": calculate_savings_electric_vehicles,
    "Hybrid Fuel-Only Vehicles": calculate_savings_fuel_vehicles,
    "Hybrid Electric-Only Vehicles": calculate_savings_electric_vehicles,
}

def calculate_savings(parameters, reduced_trips, speed_ranges, fuel_price, electric_price):
    # Define distance for each speed range
    speed_range_distances = {sr: 100 for sr in speed_ranges}  # 100 km for each range
    savings_data = {
        "Vehicle Type": [],
        "Efficient Mode": [],
        "Speed Range in km/h": [],
        "Distance (km)": [],
        "Savings Type": [],
        "Savings Value": []
    }
    # Calculate savings for each vehicle type and efficient mode
    for vehicle_type, modes in parameters.items():
        if vehicle_type not in saving_calculation_funcs:
            continue
        
        calculate_savings = saving_calculation_funcs[vehicle_type]
        savings_label = "Fuel Consumption Savings (L/100 km)" if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"] else "Electric Consumption Savings (kWh/100 km)"

        for efficient_mode, param in modes.items():
            if "Below" in param and "Above" in param:
                a1, b1, c1 = param["Below"]
                a2, b2, c2 = param["Above"]
                
                for min_speed, max_speed in speed_ranges:
                    distance = speed_range_distances[(min_speed, max_speed)]
                    speed_for_calculation = max_speed

                    savings_low = calculate_savings(a1, b1, c1, reduced_trips, speed_for_calculation, distance, fuel_price if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"] else electric_price)
                    savings_high = calculate_savings(a2, b2, c2, reduced_trips, speed_for_calculation, distance, fuel_price if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"] else electric_price)

                    savings_data["Vehicle Type"].extend([vehicle_type] * 2)
                    savings_data["Efficient Mode"].extend([f"{efficient_mode} | Low Temperature", f"{efficient_mode} | High Temperature"])
                    #savings_data["Speed Range in km/h"].extend([f"{min_speed}-{max_speed}"] * 2)
                    savings_data["Speed Range in km/h"].extend([f"{max_speed}"] * 2)
                    savings_data["Distance (km)"].extend([distance] * 2)
                    savings_data["Savings Type"].extend([savings_label] * 2)
                    savings_data["Savings Value"].extend([savings_low[0], savings_high[0]])
            else:
                a, b, c = param
                for speed_range in speed_ranges:
                    min_speed, max_speed = speed_range
                    speed_for_calculation = max_speed
                    distance = speed_range_distances[speed_range]
                    speed_for_calculation = max_speed
                    savings = calculate_savings(a, b, c, reduced_trips, speed_for_calculation, distance, fuel_price if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"] else electric_price)
                    savings_data["Vehicle Type"].append(vehicle_type)
                    savings_data["Efficient Mode"].append(efficient_mode)
                    #savings_data["Speed Range in km/h"].append(f"{min_speed}-{max_speed}")
                    savings_data["Speed Range in km/h"].extend([f"{max_speed}"])
                    savings_data["Distance (km)"].append(distance)
                    savings_data["Savings Type"].append(savings_label)
                    savings_data["Savings Value"].append(savings[0])
                    # Use the cost savings values here, not the consumption savings
                    if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"]:
                        savings_data["Fuel Cost Savings Value"] = savings_data["Savings Value"] * fuel_price
                    else:
                        savings_data["Electric Cost Savings Value"] = savings_data["Savings Value"] * electric_price

    # Convert to DataFrame
    df_savings = pd.DataFrame(savings_data)
    return df_savings


# Define color palette for low and high temperature modes (default for fuel vehicles)
def plot_savings(df_savings):
    colors = {
        "Low Temperature": ("#58D68D", "#F5B041"),  # Light and dark blue
        "High Temperature": ("#2E8B57", "#C0392B"),   # Light and dark warm tones
        "Fuel Vehicles": ("#2E8B57", "#C0392B"),  # Default color for fuel vehicles
        "Hybrid Fuel-Only Vehicles": ("#2E8B57", "#C0392B")  # Default color for hybrid fuel-only vehicles,
    }

    vehicle_type_map = {
        "Fuel Vehicles": "ICE",
        "Electric Vehicles": "BEV",
        "Hybrid Fuel-Only Vehicles": "PHEV (Fuel Only)",
        "Hybrid Electric-Only Vehicles": "PHEV (Electric-Only)"
    }

    vehicle_types = df_savings["Vehicle Type"].unique()

    # Plot savings for each vehicle type
    for vehicle_type in vehicle_types:
        df_vehicle_savings = df_savings[df_savings["Vehicle Type"] == vehicle_type]
        
        # Calculate cost savings
        savings_label = "Fuel Consumption Savings (L/100 km)" if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"] else "Electric Consumption Savings (kWh/100 km)"
        cost_label = "Fuel Cost Savings (€/100 km)" if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"] else "Electric Cost Savings (€/100 km)"
        
        # Create subplots for consumption and cost savings
        _, axes = plt.subplots(2, 1, figsize=(10, 7), sharex=True)
        
        # First subplot - Consumption Savings
        ax1 = axes[0]
        if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"]:
            sns.barplot(
                data=df_vehicle_savings,
                x="Speed Range in km/h", y="Savings Value", hue="Efficient Mode",
                palette=[colors[vehicle_type][0], colors[vehicle_type][1]], ax=ax1
            )
        else:
            for mode, (light_color, dark_color) in colors.items():
                sns.barplot(
                    data=df_vehicle_savings[df_vehicle_savings["Efficient Mode"].str.contains(mode)],
                    x="Speed Range in km/h", y="Savings Value", hue="Efficient Mode",
                    palette=[light_color, dark_color], ax=ax1
                )
        for p in ax1.patches:
            value = p.get_height()
            if value > 0.000:  # Only annotate if value is larger than 0.000
                ax1.annotate(f"{value:.0f}", (p.get_x() + p.get_width() / 2., value), 
                            ha="center", va="center", fontsize=8, fontweight = "bold", color="black", 
                            xytext=(0, 5), textcoords="offset points")
        
        ax1.set_title(f"{vehicle_type_map[vehicle_type]} - {savings_label}", fontsize=8)
        ax1.set_ylabel(savings_label)
        ax1.set_xlabel("Speed Range in km/h")
        ax1.legend(title="Efficient Mode", loc="lower left")
        
        # Second subplot - Cost Savings
        ax2 = axes[1]
        if vehicle_type in ["Fuel Vehicles", "Hybrid Fuel-Only Vehicles"]:
            sns.barplot(
                data=df_vehicle_savings,
                x="Speed Range in km/h", y="Fuel Cost Savings Value", hue="Efficient Mode",
                palette=[colors[vehicle_type][0], colors[vehicle_type][1]], ax=ax2
            )
        else:
            for mode, (light_color, dark_color) in colors.items():
                sns.barplot(
                    data=df_vehicle_savings[df_vehicle_savings["Efficient Mode"].str.contains(mode)],
                    x="Speed Range in km/h", y="Electric Cost Savings Value", hue="Efficient Mode",
                    palette=[light_color, dark_color], ax=ax2
                )
        for p in ax2.patches:
            value = p.get_height()
            if value > 0:
                ax2.annotate(f"€{value:.2f}", (p.get_x() + p.get_width() / 2., value), 
                            ha="center", va="center", fontsize=8, fontweight = "bold",
                            xytext=(0, 5), textcoords="offset points")
        ax2.set_title(f"{vehicle_type_map[vehicle_type]} - {savings_label}", fontsize=8)
        ax2.set_ylabel(cost_label)
        ax2.tick_params(axis="x", labelsize=8)
        ax2.set_xlabel("Speed Range in km/h", fontsize=8, labelpad=0)
        ax2.legend(title="Efficient Mode", loc="lower left")

        plt.xticks(rotation=0, ha="center")
        plt.subplots_adjust(bottom=0.06, top=0.97, hspace=0.07)  # Increase value if needed
        plt.show()

In [None]:
# Define constants
reduced_trips = 1  # Reduced trips
speed_ranges = [(0, 30), (30, 55), (55, 80), (80, 120)]  # Updated last range
fuel_price = 1.3  # Price € per liter for fuel
electric_price = 0.20  # Price € per kWh for electricity

df_savings = calculate_savings(parameters, reduced_trips, speed_ranges, fuel_price, electric_price)
plot_savings(df_savings)