In [None]:
def get_multiclass_features_and_labels(conflict_analyses):
    X_multi = []
    y_multi = []
    risk_map = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2}
    for entry in conflict_analyses:
        features, _ = prepare_logistic_regression_data(entry)
        risk_level = entry['metrics']['risk_level']  # This is "LOW", "MEDIUM", or "HIGH"
        X_multi.append(features)
        y_multi.append(risk_map[risk_level])
    return np.array(X_multi), np.array(y_multi)

In [None]:
def train_logistic_regression_multiclass(intersection_cases, root_path):
    """
    Trains a multi-class logistic regression model (LOW, MEDIUM, HIGH) with SMOTE for class balancing.
    """
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import StandardScaler
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
    from imblearn.over_sampling import SMOTE
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt

    X = []
    y = []
    risk_map = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2}

    for _, case in intersection_cases.iterrows():
        scenario_id = case['log_id']
        features = analyze_intersection_scenario(scenario_id, root_path)
        if features:
            # Get risk level from scenario analysis
            conflict_analysis = analyze_scenario_conflicts(features)
            risk_level = conflict_analysis['metrics']['risk_level']  # "LOW", "MEDIUM", "HIGH"
            scenario_features, _ = prepare_logistic_regression_data(features)
            y.append(risk_map[risk_level])
            X.append(scenario_features)

    X = np.array(X)
    y = np.array(y)

    print("\nClass distribution in dataset:")
    print(f"LOW (0): {np.sum(y == 0)}")
    print(f"MEDIUM (1): {np.sum(y == 1)}")
    print(f"HIGH (2): {np.sum(y == 2)}")

    if np.sum(y == 0) < 10 or np.sum(y == 1) < 10 or np.sum(y == 2) < 10:
        print("\nWarning: Very imbalanced dataset. Proceeding anyway.")

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    print(f"Number of training data samples: {len(y_train)}")
    print(f"Number of test data samples: {len(y_test)}")

    sm = SMOTE(random_state=42)
    X_train_res, y_train_res = sm.fit_resample(X_train, y_train)
    print(f"After SMOTE:")
    print(f"LOW (0): {np.sum(y_train_res == 0)}")
    print(f"MEDIUM (1): {np.sum(y_train_res == 1)}")
    print(f"HIGH (2): {np.sum(y_train_res == 2)}")

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_res)
    X_test_scaled = scaler.transform(X_test)

    model = LogisticRegression(random_state=42, max_iter=1000, class_weight='balanced', multi_class='multinomial')
    model.fit(X_train_scaled, y_train_res)

    cv_scores = cross_val_score(model, X_train_scaled, y_train_res, cv=5)
    print("\nCross-validation scores:", cv_scores)
    print(f"Mean CV score: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")

    y_pred = model.predict(X_test_scaled)
    print("\nTest set results:")
    print(classification_report(y_test, y_pred, target_names=['LOW', 'MEDIUM', 'HIGH']))

    labels = ['LOW', 'MEDIUM', 'HIGH']
    conf_matrix = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])
    conf_df = pd.DataFrame(conf_matrix, index=labels, columns=labels)
    print("\nConfusion Matrix with Labels:")
    print(conf_df)

    disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=labels)
    disp.plot(cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.show()

    return model, scaler

In [None]:
from IPython.lib.display import FileLink
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
import pandas as pd

print("\nSplitting data into training and testing sets...")
available_scenarios = []
for _, case in intersection_cases.iterrows():
    scenario_id = case['log_id']
    filename = get_scenario_filename(scenario_id, root_av)
    if filename is not None:
        available_scenarios.append(case)

print(f"\nTotal available scenarios: {len(available_scenarios)}")

# Split data into training and testing sets (80% training, 20% testing)
split_idx = int(0.8 * len(available_scenarios))
train_data = pd.DataFrame(available_scenarios[:split_idx])  # 80% for training
test_data = pd.DataFrame(available_scenarios[split_idx:])   # 20% for testing

print(f"Using {len(train_data)} scenarios for training")
print(f"Using {len(test_data)} scenarios for testing")

# Train the logistic regression model
print("\nTraining logistic regression model...")
model, scaler = train_logistic_regression_multiclass(train_data, root_av)
print("\nModel training complete.")

# Test predictions on 20% of the data
print("\nPredicting test scenarios:")

scenario_ids = []
risk_levels = []
confidences = []

for _, case in test_data.iterrows():
    scenario_id = case['log_id']
    test_scenario = analyze_intersection_scenario(scenario_id, root_av)

    if test_scenario:
        try:
            initial_features, _ = prepare_logistic_regression_data(test_scenario)
            initial_features_scaled = scaler.transform(initial_features.reshape(1, -1))
            prediction = model.predict(initial_features_scaled)[0]
            probabilities = model.predict_proba(initial_features_scaled)[0]
            max_probability = np.max(probabilities)
            risk_level = ['LOW', 'MEDIUM', 'HIGH'][prediction]

            scenario_ids.append(scenario_id)
            risk_levels.append(risk_level)
            confidences.append(max_probability)
        except Exception as e:
            prediction, probability = predict_collision(None, None, test_scenario)
            if prediction is not None:
                risk_level = ['LOW', 'MEDIUM', 'HIGH'][prediction]
                scenario_ids.append(scenario_id)
                risk_levels.append(risk_level)
                confidences.append(probability)
    # If test_scenario is None, skip

# Create DataFrame
results_df = pd.DataFrame({
    'Scenario ID': scenario_ids,
    'Risk Level': risk_levels,
    'Confidence': confidences
})

print("Counts of each risk level in test predictions:")
print(results_df['Risk Level'].value_counts())

counts = results_df['Risk Level'].value_counts()
for level in ['LOW', 'MEDIUM', 'HIGH']:
    print(f"{level}: {counts.get(level, 0)}")

results_df.to_csv('all_test_predictions.csv', index=False)
print("\nPrediction Results Table:")
display(FileLink('all_test_predictions.csv'))
results_df