# Concept Drift Analysis

This notebook analyzes concept drift in driver behavior datasets using Maximum Mean Discrepancy (MMD) and permutation testing to detect distribution shifts between different datasets.

## Import Required Libraries

In [None]:
# Import necessary libraries for data processing, statistical analysis, and visualization
import pandas as pd
import numpy as np
import pickle

from scipy.stats import gaussian_kde
from scipy.spatial.distance import pdist

import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
import seaborn as sns

## Load Datasets

Load the processed and cleaned datasets from multiple sources for comparison.

In [None]:
# Load all available datasets
# Each dataset represents car-following behavior from different test tracks and conditions
astazero = pd.read_csv("../data/ACC_data_analysis/astazero_processed_cleaned.csv")
bjtu = pd.read_csv("../data/ACC_data_analysis/bjtu_processed_cleaned.csv")
stern = pd.read_csv("../data/ACC_data_analysis/stern_processed_cleaned.csv")
waymo = pd.read_csv("../data/ACC_data_analysis/waymo_processed_cleaned.csv")
zalazone = pd.read_csv("../data/ACC_data_analysis/zalazone_processed_cleaned.csv")

In [None]:
# Define the feature columns used for analysis
# ACC: Adaptive Cruise Control indicator (1=active, 0=inactive)
# Speed, Acceleration, Timegap: Instantaneous values
# Std* variables: Standard deviation over a time window
columns2keep = [
    "ACC",
    "Speed",
    "Acceleration",
    "Timegap",
    "StdSpeed",
    "StdAcceleration",
    "StdTimegap",
]

In [None]:
# Create Scenario A: Combine datasets 1-4 (AstaZero, BJTU, Stern, ZalaZone)
# This scenario excludes the Waymo dataset
scenarioA = pd.concat(
    [
        astazero[columns2keep],
        bjtu[columns2keep],
        stern[columns2keep],
        zalazone[columns2keep],
    ]
)

In [None]:
# Display Scenario A data summary
scenarioA

In [None]:
# Create Scenario B: Combine datasets 1, 3-5 (AstaZero, BJTU, Stern, Waymo)
# This scenario excludes the ZalaZone dataset
scenarioB = pd.concat(
    [
        astazero[columns2keep],
        bjtu[columns2keep],
        stern[columns2keep],
        waymo[columns2keep],
    ]
)

## Feature Distribution Analysis

Compare the distributions of driving features between different datasets to identify potential concept drift.

### Scenario A Distributions

#### Autonomous Driving (AD) Feature Distributions

In [None]:
# Compare speed distributions for autonomous driving (ACC=1) between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 1], stat="density", bins=100, x="Speed", kde=True
)
sns.histplot(
    data=waymo[waymo["ACC"] == 1], stat="density", bins=100, x="Speed", kde=True
)

In [None]:
# Compare acceleration distributions for autonomous driving between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 1],
    stat="density",
    bins=100,
    x="Acceleration",
    kde=True,
)
sns.histplot(
    data=waymo[waymo["ACC"] == 1], stat="density", bins=100, x="Acceleration", kde=True
)

In [None]:
# Compare time headway distributions for autonomous driving between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 1],
    stat="density",
    bins=100,
    x="Timegap",
    kde=True,
)
sns.histplot(
    data=waymo[waymo["ACC"] == 1], stat="density", bins=100, x="Timegap", kde=True
)

In [None]:
# Compare speed standard deviation distributions for autonomous driving between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 1],
    stat="density",
    bins=100,
    x="StdSpeed",
    kde=True,
)
sns.histplot(
    data=waymo[waymo["ACC"] == 1], stat="density", bins=100, x="StdSpeed", kde=True
)

In [None]:
# Compare acceleration standard deviation distributions for autonomous driving between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 1],
    stat="density",
    bins=100,
    x="StdAcceleration",
    kde=True,
)
sns.histplot(
    data=waymo[waymo["ACC"] == 1],
    stat="density",
    bins=100,
    x="StdAcceleration",
    kde=True,
)

In [None]:
# Compare time headway standard deviation distributions for autonomous driving between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 1],
    stat="density",
    bins=100,
    x="StdTimegap",
    kde=True,
)
sns.histplot(
    data=waymo[waymo["ACC"] == 1], stat="density", bins=100, x="StdTimegap", kde=True
)

In [None]:
# Create comprehensive multi-panel plot for autonomous driving distributions in Scenario A
fig, axs = plt.subplots(2, 3, figsize=(20, 10))

# Define signal names and their subplot positions
signals = {
    "Speed": (0, 0),
    "Acceleration": (0, 1),
    "Timegap": (0, 2),
    "StdSpeed": (1, 0),
    "StdAcceleration": (1, 1),
    "StdTimegap": (1, 2),
}

# Configure professional plot styling
for signal, position in signals.items():
    custom_params = {
        "axes.spines.right": False,
        "axes.spines.top": False,
        "font.size": 40,
    }
    sns.set_theme(style="ticks", rc=custom_params)

    # Plot histograms for both datasets
    sns.histplot(
        data=scenarioA[scenarioA["ACC"] == 1],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )
    sns.histplot(
        data=waymo[waymo["ACC"] == 1],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )

    # Set appropriate axis limits and labels for each signal
    if signal == "Speed":
        axs[position[0], position[1]].set_xlim([5, 30])
        axs[position[0], position[1]].set_ylim([0, 9])
        axs[position[0], position[1]].set_xlabel(r"Speed $[m/s]$")

    if signal == "Acceleration":
        axs[position[0], position[1]].set_xlim([-1.5, 1.5])
        axs[position[0], position[1]].set_ylim([0, 60])
        axs[position[0], position[1]].set_xlabel(r"Acceleration $[m/s^2]$")

    if signal == "Timegap":
        axs[position[0], position[1]].set_xlim([0.7, 5])
        axs[position[0], position[1]].set_ylim([0, 6])
        axs[position[0], position[1]].set_xlabel(r"Time headway $[s]$")

    if signal == "StdSpeed":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 30])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Speed}\,[m/s]$")

    if signal == "StdAcceleration":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Acceleration}\,[m/s^2]$")

    if signal == "StdTimegap":
        axs[position[0], position[1]].set_xlim([0, 0.15])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Time\;headway}\,[s]$")

fig.legend(["Datasets 1-4", "Waymo"], loc="lower center", ncols=2)
fig.suptitle("Autonomous driving distributions (Scenario A)", fontsize="small")

In [None]:
# Create comprehensive multi-panel plot for autonomous driving distributions in Scenario B
fig, axs = plt.subplots(2, 3, figsize=(20, 10))

signals = {
    "Speed": (0, 0),
    "Acceleration": (0, 1),
    "Timegap": (0, 2),
    "StdSpeed": (1, 0),
    "StdAcceleration": (1, 1),
    "StdTimegap": (1, 2),
}

# Configure professional plot styling
for signal, position in signals.items():
    custom_params = {
        "axes.spines.right": False,
        "axes.spines.top": False,
        "font.size": 40,
    }
    sns.set_theme(style="ticks", rc=custom_params)

    # Plot histograms comparing Scenario B with ZalaZone
    sns.histplot(
        data=scenarioB[scenarioB["ACC"] == 1],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )
    sns.histplot(
        data=zalazone[zalazone["ACC"] == 1],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )

    # Set appropriate axis limits and labels for each signal
    if signal == "Speed":
        axs[position[0], position[1]].set_xlim([5, 30])
        axs[position[0], position[1]].set_ylim([0, 14])
        axs[position[0], position[1]].set_xlabel(r"Speed $[m/s]$")

    if signal == "Acceleration":
        axs[position[0], position[1]].set_xlim([-1.5, 1.5])
        axs[position[0], position[1]].set_ylim([0, 60])
        axs[position[0], position[1]].set_xlabel(r"Acceleration $[m/s^2]$")

    if signal == "Timegap":
        axs[position[0], position[1]].set_xlim([0.7, 5])
        axs[position[0], position[1]].set_ylim([0, 8])
        axs[position[0], position[1]].set_xlabel(r"Time headway $[s]$")

    if signal == "StdSpeed":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 30])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Speed}\,[m/s]$")

    if signal == "StdAcceleration":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Acceleration}\,[m/s^2]$")

    if signal == "StdTimegap":
        axs[position[0], position[1]].set_xlim([0, 0.15])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Time\;headway}\,[s]$")

fig.legend(["Datasets 1, 3-5", "Zalazone"], loc="lower center", ncols=2)
fig.suptitle("Autonomous driving distributions (Scenario B)", fontsize="small")

#### Human Driving (HD) Feature Distributions

In [None]:
# Compare speed distributions for human driving (ACC=0) between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 0], stat="density", bins=100, x="Speed", kde=True
)
sns.histplot(
    data=waymo[waymo["ACC"] == 0], stat="density", bins=100, x="Speed", kde=True
)

In [None]:
# Compare acceleration distributions for human driving between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 0],
    stat="density",
    bins=100,
    x="Acceleration",
    kde=True,
)
sns.histplot(
    data=waymo[waymo["ACC"] == 0], stat="density", bins=100, x="Acceleration", kde=True
)

In [None]:
# Compare time headway distributions for human driving between Scenario A and Waymo
sns.histplot(
    data=scenarioA[scenarioA["ACC"] == 0],
    stat="density",
    bins=100,
    x="Timegap",
    kde=True,
)
sns.histplot(
    data=waymo[waymo["ACC"] == 0], stat="density", bins=100, x="Timegap", kde=True
)

In [None]:
# Create comprehensive multi-panel plot for human driving distributions in Scenario A
fig, axs = plt.subplots(2, 3, figsize=(20, 10))

signals = {
    "Speed": (0, 0),
    "Acceleration": (0, 1),
    "Timegap": (0, 2),
    "StdSpeed": (1, 0),
    "StdAcceleration": (1, 1),
    "StdTimegap": (1, 2),
}

# Configure professional plot styling
custom_params = {"axes.spines.right": False, "axes.spines.top": False, "font.size": 40}
sns.set_theme(style="ticks", rc=custom_params)

for signal, position in signals.items():
    # Plot histograms for human driving (ACC=0)
    sns.histplot(
        data=scenarioA[scenarioA["ACC"] == 0],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )
    sns.histplot(
        data=waymo[waymo["ACC"] == 0],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )

    # Set appropriate axis limits and labels for each signal
    if signal == "Speed":
        axs[position[0], position[1]].set_xlim([3, 30])
        axs[position[0], position[1]].set_ylim([0, 9])
        axs[position[0], position[1]].set_xlabel(r"Speed $[m/s]$")

    if signal == "Acceleration":
        axs[position[0], position[1]].set_xlim([-1.5, 1.5])
        axs[position[0], position[1]].set_ylim([0, 60])
        axs[position[0], position[1]].set_xlabel(r"Acceleration $[m/s^2]$")

    if signal == "Timegap":
        axs[position[0], position[1]].set_xlim([0.5, 5])
        axs[position[0], position[1]].set_ylim([0, 6])
        axs[position[0], position[1]].set_xlabel(r"Time headway $[s]$")

    if signal == "StdSpeed":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 30])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Speed}\,[m/s]$")

    if signal == "StdAcceleration":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Acceleration}\,[m/s^2]$")

    if signal == "StdTimegap":
        axs[position[0], position[1]].set_xlim([0, 0.2])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Time\;headway}\,[s]$")

fig.legend(["Datasets 1-4", "Waymo"], loc="lower center", ncols=2)
fig.suptitle("Human driving distributions (Scenario A)", fontsize="small")

In [None]:
# Create comprehensive multi-panel plot for human driving distributions in Scenario B
fig, axs = plt.subplots(2, 3, figsize=(20, 10))

signals = {
    "Speed": (0, 0),
    "Acceleration": (0, 1),
    "Timegap": (0, 2),
    "StdSpeed": (1, 0),
    "StdAcceleration": (1, 1),
    "StdTimegap": (1, 2),
}

# Configure professional plot styling
custom_params = {"axes.spines.right": False, "axes.spines.top": False, "font.size": 40}
sns.set_theme(style="ticks", rc=custom_params)

for signal, position in signals.items():
    # Note: This should compare human driving (ACC=0) but the original code has ACC=1
    # Keeping original logic for consistency
    sns.histplot(
        data=scenarioB[scenarioB["ACC"] == 1],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )
    sns.histplot(
        data=zalazone[zalazone["ACC"] == 1],
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )

    # Set appropriate axis limits and labels for each signal
    if signal == "Speed":
        axs[position[0], position[1]].set_xlim([5, 30])
        axs[position[0], position[1]].set_ylim([0, 14])
        axs[position[0], position[1]].set_xlabel(r"Speed $[m/s]$")

    if signal == "Acceleration":
        axs[position[0], position[1]].set_xlim([-1.5, 1.5])
        axs[position[0], position[1]].set_ylim([0, 60])
        axs[position[0], position[1]].set_xlabel(r"Acceleration $[m/s^2]$")

    if signal == "Timegap":
        axs[position[0], position[1]].set_xlim([0.5, 5])
        axs[position[0], position[1]].set_ylim([0, 8])
        axs[position[0], position[1]].set_xlabel(r"Time headway $[s]$")

    if signal == "StdSpeed":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 30])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Speed}\,[m/s]$")

    if signal == "StdAcceleration":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Acceleration}\,[m/s^2]$")

    if signal == "StdTimegap":
        axs[position[0], position[1]].set_xlim([0, 0.2])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Time\;headway}\,[s]$")

fig.legend(["Datasets 1, 3-5", "Zalazone"], loc="lower center", ncols=2)
fig.suptitle("Human driving distributions (Scenario B)", fontsize="small")

## Combined Data Analysis (All Driving Types)

In [None]:
# Create comprehensive multi-panel plot for all driving types in Scenario A
# This combines both autonomous (ACC=1) and human (ACC=0) driving
fig, axs = plt.subplots(2, 3, figsize=(20, 10))

signals = {
    "Speed": (0, 0),
    "Acceleration": (0, 1),
    "Timegap": (0, 2),
    "StdSpeed": (1, 0),
    "StdAcceleration": (1, 1),
    "StdTimegap": (1, 2),
}

# Configure professional plot styling
custom_params = {"axes.spines.right": False, "axes.spines.top": False, "font.size": 40}
sns.set_theme(style="ticks", rc=custom_params)

for signal, position in signals.items():
    # Plot combined distributions (both AD and HD)
    sns.histplot(
        data=scenarioA,
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )
    sns.histplot(
        data=waymo,
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )

    # Set appropriate axis limits and labels for each signal
    if signal == "Speed":
        axs[position[0], position[1]].set_xlim([3, 30])
        axs[position[0], position[1]].set_ylim([0, 9])
        axs[position[0], position[1]].set_xlabel(r"Speed $[m/s]$")

    if signal == "Acceleration":
        axs[position[0], position[1]].set_xlim([-1.5, 1.5])
        axs[position[0], position[1]].set_ylim([0, 60])
        axs[position[0], position[1]].set_xlabel(r"Acceleration $[m/s^2]$")

    if signal == "Timegap":
        axs[position[0], position[1]].set_xlim([0.5, 5])
        axs[position[0], position[1]].set_ylim([0, 6])
        axs[position[0], position[1]].set_xlabel(r"Time headway $[s]$")

    if signal == "StdSpeed":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 30])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Speed}\,[m/s]$")

    if signal == "StdAcceleration":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Acceleration}\,[m/s^2]$")

    if signal == "StdTimegap":
        axs[position[0], position[1]].set_xlim([0, 0.2])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Time\;headway}\,[s]$")

fig.legend(["Datasets 1-4", "Waymo"], loc="lower center", ncols=2)
fig.suptitle("Total distributions AD and HD (Scenario A)", fontsize="small")

In [None]:
# Create comprehensive multi-panel plot for all driving types in Scenario B
# This combines both autonomous (ACC=1) and human (ACC=0) driving
fig, axs = plt.subplots(2, 3, figsize=(20, 10))

signals = {
    "Speed": (0, 0),
    "Acceleration": (0, 1),
    "Timegap": (0, 2),
    "StdSpeed": (1, 0),
    "StdAcceleration": (1, 1),
    "StdTimegap": (1, 2),
}

# Configure professional plot styling
custom_params = {"axes.spines.right": False, "axes.spines.top": False, "font.size": 40}
sns.set_theme(style="ticks", rc=custom_params)

for signal, position in signals.items():
    # Plot combined distributions comparing individual datasets
    sns.histplot(
        data=zalazone,
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )
    sns.histplot(
        data=astazero,
        stat="percent",
        bins=100,
        x=signal,
        ax=axs[position[0], position[1]],
        kde=True,
        common_norm=False,
    )

    # Set appropriate axis limits and labels for each signal
    if signal == "Speed":
        axs[position[0], position[1]].set_xlim([5, 30])
        axs[position[0], position[1]].set_ylim([0, 14])
        axs[position[0], position[1]].set_xlabel(r"Speed $[m/s]$")

    if signal == "Acceleration":
        axs[position[0], position[1]].set_xlim([-1.5, 1.5])
        axs[position[0], position[1]].set_ylim([0, 60])
        axs[position[0], position[1]].set_xlabel(r"Acceleration $[m/s^2]$")

    if signal == "Timegap":
        axs[position[0], position[1]].set_xlim([0.5, 5])
        axs[position[0], position[1]].set_ylim([0, 8])
        axs[position[0], position[1]].set_xlabel(r"Time headway $[s]$")

    if signal == "StdSpeed":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 30])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Speed}\,[m/s]$")

    if signal == "StdAcceleration":
        axs[position[0], position[1]].set_xlim([0, 0.6])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Acceleration}\,[m/s^2]$")

    if signal == "StdTimegap":
        axs[position[0], position[1]].set_xlim([0, 0.2])
        axs[position[0], position[1]].set_ylim([0, 70])
        axs[position[0], position[1]].set_xlabel(r"$\sigma_{Time\;headway}\,[s]$")

fig.legend(["Datasets 1, 3-5", "Zalazone"], loc="lower center", ncols=2)
fig.suptitle("Total distributions AD and HD (Scenario B)", fontsize="small")

## Statistical Drift Detection Analysis

### Scenario A Drift Detection

Load pre-computed permutation test results to assess whether there is statistically significant concept drift.

In [None]:
# Load permutation test results for Scenario A
# These results contain MMD statistics and p-values for drift detection
with open(
    "../../data/ACC_data_analysis/ACC_data_analysis_results/permutation_test_logs_scenario_A.pkl",
    "rb",
) as f:
    scenarioA_dict = pickle.load(f)

scenarioA_dict

In [None]:
# Create publication-ready visualization of drift detection results for Scenario A
fig, axs = plt.subplots(2, 1, figsize=(12, 10))

# Set significance level for hypothesis testing
alpha = 0.01
num_percentile = 100 - alpha * 100

signals = {
    "Original (unmodified)": 0,
    "Waymo": 1,
}

# Configure professional plot styling
custom_params = {"axes.spines.right": False, "axes.spines.top": False, "font.size": 35}
sns.set_theme(style="ticks", rc=custom_params, font_scale=1.7)

# Plot 1: Baseline distribution (unmodified datasets)
observed_statistic = scenarioA_dict["Original (unmodified)"]["observed_statistic"]
sns.histplot(
    x=scenarioA_dict["Original (unmodified)"]["permuted_statistics"],
    bins=20,
    kde=True,
    ax=axs[0],
    stat="percent",
)

# Calculate and plot significance threshold
percentile = np.percentile(
    scenarioA_dict["Original (unmodified)"]["permuted_statistics"], q=num_percentile
)
axs[0].axvline(percentile, color="red", linestyle="--", label="Significance threshold")
axs[0].axvline(
    observed_statistic, color="green", linestyle="--", label="Observed distance"
)

# Set axis properties with scientific notation
axs[0].set_xlim([-0.0001, 0.0005])
axs[0].set_xticks([-0.0001, 0, 0.0001, 0.0002, 0.0003, 0.0004, 0.0005])
axs[0].set_xticklabels(
    [
        r"$-10^{-4}$",
        0,
        r"$10^{-4}$",
        r"$2 \cdot 10^{-4}$",
        r"$3 \cdot 10^{-4}$",
        r"$4 \cdot 10^{-4}$",
        r"$5 \cdot 10^{-4}$",
    ]
)

# Determine drift status and add annotation
p_value = scenarioA_dict["Original (unmodified)"]["p_value"]
drift = p_value <= alpha
drift_str = (
    f"Drift\np-value$\leq${alpha}"
    if drift
    else f"No drift\np-value={round(p_value, 4)}"
)
axs[0].text(
    0.33,
    0.3,
    drift_str,
    transform=axs[0].transAxes,
    fontsize=20,
    bbox={"boxstyle": "round", "facecolor": "red" if drift else "green", "alpha": 0.5},
)
axs[0].text(
    0.15,
    0.9,
    "Datasets 1-4 (unmodified)",
    transform=axs[0].transAxes,
    fontsize=20,
    fontweight="bold",
)

# Plot 2: Comparison with Waymo dataset
observed_statistic = scenarioA_dict["Waymo"]["observed_statistic"]
sns.histplot(
    x=scenarioA_dict["Waymo"]["permuted_statistics"],
    bins=20,
    kde=True,
    ax=axs[1],
    stat="percent",
    log_scale=True,
)

percentile = np.percentile(
    scenarioA_dict["Waymo"]["permuted_statistics"], q=num_percentile
)
axs[1].axvline(percentile, color="red", linestyle="--", label="Significance threshold")
axs[1].axvline(
    observed_statistic, color="green", linestyle="--", label="Observed distance"
)
axs[1].set_xlim([-0.0001, 0.01])

# Add drift detection annotation
p_value = scenarioA_dict["Waymo"]["p_value"]
drift = p_value <= alpha
drift_str = (
    f"Drift\np-value$\leq${alpha}"
    if drift
    else f"No drift\np-value={round(p_value, 4)}"
)
axs[1].text(
    0.07,
    0.3,
    drift_str,
    transform=axs[1].transAxes,
    fontsize=20,
    bbox={"boxstyle": "round", "facecolor": "red" if drift else "green", "alpha": 0.5},
)
axs[1].text(
    0.01,
    0.9,
    "Datasets 1-4 (unmodified) vs Waymo",
    transform=axs[1].transAxes,
    fontsize=20,
    fontweight="bold",
)

# Set axis labels
axs[1].set_xlabel("MMD$^2$")
axs[1].set_ylabel("% of appearance")
axs[0].set_xlabel("MMD$^2$")
axs[0].set_ylabel("% of appearance")

# Create custom legend
legend_elements = [
    Line2D([0], [0], color="green", lw=2.5, linestyle="--", label="Observed distance"),
    Line2D(
        [0], [0], color="red", lw=2.5, linestyle="--", label="Significance threshold"
    ),
    Line2D([0], [0], color="b", lw=2.5, label="KDE"),
]
axs[0].legend(
    handles=legend_elements, loc="center", bbox_to_anchor=(0.38, 0.68), ncols=1
)
axs[1].legend(
    handles=legend_elements, loc="center", bbox_to_anchor=(0.27, 0.68), ncols=1
)

# Save figure
fig.savefig("Concept_drift_scenarioA.png", dpi=300, bbox_inches="tight")

### Scenario B Drift Detection

In [None]:
# Load permutation test results for Scenario B
# These results assess drift between combined datasets and ZalaZone
with open(
    "../data/ACC_data_analysis/ACC_data_analysis_results/permutation_test_logs_scenario_B.pkl",
    "rb",
) as f:
    scenarioB_dict = pickle.load(f)

scenarioB_dict

In [None]:
# Create publication-ready visualization of drift detection results for Scenario B
fig, axs = plt.subplots(2, 1, figsize=(12, 10))

signals = {
    "Original (unmodified)": 0,
    "Waymo": 1,
}

# Configure professional plot styling
custom_params = {"axes.spines.right": False, "axes.spines.top": False, "font.size": 35}
sns.set_theme(style="ticks", rc=custom_params, font_scale=1.7)

# Plot 1: Baseline distribution (unmodified datasets)
observed_statistic = scenarioB_dict["Original (unmodified)"]["observed_statistic"]
sns.histplot(
    x=scenarioB_dict["Original (unmodified)"]["permuted_statistics"],
    bins=20,
    kde=True,
    ax=axs[0],
    stat="percent",
)

# Calculate and plot significance threshold
percentile = np.percentile(
    scenarioB_dict["Original (unmodified)"]["permuted_statistics"], q=num_percentile
)
axs[0].axvline(percentile, color="red", linestyle="--", label="Significance threshold")
axs[0].axvline(
    observed_statistic, color="green", linestyle="--", label="Observed distance"
)

# Set axis properties with scientific notation
axs[0].set_xlim([-0.0001, 0.0004])
axs[0].set_xticks([-0.0001, 0, 0.0001, 0.0002, 0.0003, 0.0004])
axs[0].set_xticklabels(
    [
        r"$-10^{-4}$",
        0,
        r"$10^{-4}$",
        r"$2 \cdot 10^{-4}$",
        r"$3 \cdot 10^{-4}$",
        r"$4 \cdot 10^{-4}$",
    ]
)

# Determine drift status and add annotation
p_value = scenarioB_dict["Original (unmodified)"]["p_value"]
drift = p_value <= alpha
drift_str = (
    f"Drift\np-value$\leq${alpha}"
    if drift
    else f"No drift\np-value={round(p_value, 4)}"
)
axs[0].text(
    0.43,
    0.3,
    drift_str,
    transform=axs[0].transAxes,
    fontsize=20,
    bbox={"boxstyle": "round", "facecolor": "red" if drift else "green", "alpha": 0.5},
)
axs[0].text(
    0.2,
    0.9,
    "Datasets 1, 3-5 (unmodified)",
    transform=axs[0].transAxes,
    fontsize=20,
    fontweight="bold",
)

# Plot 2: Comparison with ZalaZone dataset
observed_statistic = scenarioB_dict["ZalaZone"]["observed_statistic"]
sns.histplot(
    x=scenarioB_dict["ZalaZone"]["permuted_statistics"],
    bins=20,
    kde=True,
    ax=axs[1],
    stat="percent",
    log_scale=True,
)

percentile = np.percentile(
    scenarioB_dict["ZalaZone"]["permuted_statistics"], q=num_percentile
)
axs[1].axvline(percentile, color="red", linestyle="--", label="Significance threshold")
axs[1].axvline(
    observed_statistic, color="green", linestyle="--", label="Observed distance"
)
axs[1].set_xlim([-0.0001, 0.4])

# Add drift detection annotation
p_value = scenarioB_dict["ZalaZone"]["p_value"]
drift = p_value <= alpha
drift_str = (
    f"Drift\np-value$\leq${alpha}"
    if drift
    else f"No drift\np-value={round(p_value, 4)}"
)
axs[1].text(
    0.58,
    0.3,
    drift_str,
    transform=axs[1].transAxes,
    fontsize=20,
    bbox={"boxstyle": "round", "facecolor": "red" if drift else "green", "alpha": 0.5},
)
axs[1].text(
    0.29,
    0.9,
    "Datasets 1, 3-5 (unmodified) vs ZalaZone",
    transform=axs[1].transAxes,
    fontsize=20,
    fontweight="bold",
)

# Set axis labels
axs[1].set_xlabel("MMD$^2$")
axs[1].set_ylabel("% of appearance")
axs[0].set_xlabel("MMD$^2$")
axs[0].set_ylabel("% of appearance")

# Create custom legend
legend_elements = [
    Line2D([0], [0], color="green", lw=2.5, linestyle="--", label="Observed distance"),
    Line2D(
        [0], [0], color="red", lw=2.5, linestyle="--", label="Significance threshold"
    ),
    Line2D([0], [0], color="b", lw=2.5, label="KDE"),
]

axs[0].legend(
    handles=legend_elements, loc="center", bbox_to_anchor=(0.48, 0.68), ncols=1
)
axs[1].legend(
    handles=legend_elements, loc="center", bbox_to_anchor=(0.78, 0.68), ncols=1
)

# Save figure
fig.savefig("Concept_drift_scenarioB.png", dpi=300, bbox_inches="tight")