# Support Vector Machine (SVM)

## Imports

In [None]:
import pandas as pd
import posixpath
from data_mining_project import data, PROJECT_PATH, DATA_PATH, OUTPUT_PATH
import numpy as np
import matplotlib as plt
import plotly.express as px
import time
from sklearn.svm import SVC
import seaborn as sns
import matplotlib.pyplot as plt


## Load Data

In [None]:
file_name = "preprocessed_data.csv"  
file_path = posixpath.join(OUTPUT_PATH, file_name)
data_df = data.load_data_csv(file_path)
data_df = data.reformat_str_to_list(data_df, cols=["events_sequence", "seconds_to_incident_sequence", "dj_ac_state_sequence", "dj_dc_state_sequence", "ac_dc_prob_timestamp"], col_type=int)
data_df = data.reformat_str_to_list(data_df, cols=["train_kph_sequence"], col_type=float)
#data_df.drop(columns=["ac_dc_prob_num", "ac_dc_prob"], inplace=True)
data_df

In [None]:
# Normalization to obtain relative frequencies
braking = data_df[data_df['hard_braking']==1]
brake_counts = braking["incident_type"].value_counts(normalize=True)
data_df_counts = data_df["incident_type"].value_counts(normalize=True)

res = {}

for id, val1 in brake_counts.items():
    if id in data_df_counts:
        val2 = data_df_counts[id]
        res[id] = val1 / val2

incident_type = []
relative_probability = []
for id, res in res.items():
    incident_type.append(id)
    relative_probability.append(res)
    print(f"Incident type {id}: Relative probability = {res:.2f}")


In [None]:
import matplotlib.pyplot as plt
types_incidents_str = [str(t) for t in incident_type]

plt.figure(figsize=(10, 6))
colors = ["green" if p > 1 else "red" for p in relative_probability]
plt.bar(types_incidents_str, relative_probability, color=colors, edgecolor="black", alpha=0.7)

plt.axhline(y=1, color='blue', linestyle='--', label="Relative probability = 1")
plt.xlabel("Incident type", fontsize=12)
plt.ylabel("Relative probability", fontsize=12)
plt.title("The impact of hard braking on incident types", fontsize=14)
plt.xticks(fontsize=10)
plt.legend()
plt.tight_layout()

plt.show()


In [None]:
from tensorflow.keras import Input, Model
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tensorflow.keras.layers import Embedding

from sklearn.metrics import classification_report

In [None]:
event_to_index = {event: idx for idx, event in enumerate(set([item for sublist in data_df['events_sequence'] for item in sublist]))}
vocab_size = len(event_to_index)

def map_sequence_to_indices(sequence, event_to_index):
    return [event_to_index.get(event, event_to_index.get('unknown', -1)) for event in sequence] 

data_df['events_sequence_mapped'] = data_df['events_sequence'].apply(lambda seq: map_sequence_to_indices(seq, event_to_index))

embedding_dim = 200 #first test : 100 

input_seq = Input(shape=(None,), dtype='int32')
embedding_layer = Embedding(input_dim=vocab_size, output_dim=embedding_dim)(input_seq)
embedding_model = Model(inputs=input_seq, outputs=embedding_layer)

def get_sequence_embeddings(df, embedding_model):
    X = []
    for _, row in df.iterrows():
        event_indices = row['events_sequence_mapped']
        embedded_seq = embedding_model.predict(np.array([event_indices]))  
        sequence_embedding = np.mean(embedded_seq, axis=1) 
        X.append(sequence_embedding.flatten())
    return np.array(X)

data_df['hard_braking_numeric'] = data_df['hard_braking'].astype(int)

X_embeddings = get_sequence_embeddings(data_df, embedding_model)

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_embeddings)
X_combined = np.hstack([X_scaled, data_df[['ac_dc_prob']].values])

y = data_df['incident_type'] 

X_train, X_test, y_train, y_test = train_test_split(X_combined, y, test_size=0.3, random_state=42)

# Entraîner un modèle SVM avec noyau RBF
svm = SVC(kernel='rbf', decision_function_shape='ovr') #en ajoutant des param l'accuracy diminue
"""C=10, gamma=0.01, class_weight='balanced',"""
#svm = SVC(kernel='rbf', random_state=42)
svm.fit(X_train, y_train)

y_pred = svm.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")

#first test : 0.3435
#plus de param et 300 : 0.32


In [None]:
print(classification_report(y_test, y_pred))

In [None]:
"""que les séquences    0.3639
    + hard braking      0.3537 + ac_dc             0.3639
    + ac_dc             0.3639"""

In [None]:
pd.Series(y_test).value_counts()

In [None]:
pd.Series(y_pred).value_counts()

In [None]:
data_df['events_sequence_mapped'].head()

In [None]:
# Extracting necessary details from the dataset and notebook for implementation
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Parameters for embedding generation
embedding_sizes = [16, 32, 64, 128]  # Example embedding dimensions
results = []

# Synthetic data setup for demonstration (adjust as per actual sequence data)
# Generating synthetic data assuming sequences are integer-encoded
sequence_length = 50  # Example sequence length
vocab_size = 1000  # Example vocab size
synthetic_sequences = np.random.randint(0, vocab_size, (data_df.shape[0], sequence_length))

# Iteratively test different embedding sizes
for embedding_dim in embedding_sizes:
    print(f"Testing embedding dimension: {embedding_dim}")
    
    # Generate synthetic embeddings for demonstration
    X_embeddings = np.random.randn(data_df.shape[0], embedding_dim)  # Placeholder embedding
    
    # Combine embeddings with other features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_embeddings)
    X_combined = np.hstack([X_scaled, data_df[['ac_dc_prob']].values])
    y = data_df['incident_type']
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X_combined, y, test_size=0.3, random_state=42)
    
    # Train SVM model
    svm = SVC(kernel='rbf', decision_function_shape='ovr', C=10, gamma=0.01, class_weight='balanced')
    svm.fit(X_train, y_train)
    
    # Evaluate model
    y_pred = svm.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy with embedding size {embedding_dim}: {accuracy:.4f}")
    results.append((embedding_dim, accuracy))

# # Prepare results as a DataFrame for better visualization
# import pandas as pd

results_df = pd.DataFrame(results, columns=['Embedding Size', 'Accuracy'])
# import ace_tools as tools; tools.display_dataframe_to_user(name="Embedding Length Tuning Results", dataframe=results_df)
# Display the results as a table without relying on the missing module
print(results_df)



In [None]:
data_df.head()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report

# List of embedding dimensions to test
embedding_lengths = [100, 125, 150, 175, 200, 300]

# Store results
results = []

# Initialize lists to store additional metrics
accuracy_scores = []
f1_scores = []
confusion_matrices = []

for embedding_dim in embedding_lengths:
    print(f"Testing embedding length: {embedding_dim}")
    
    # Define the embedding layer with the current embedding dimension
    input_seq = Input(shape=(None,), dtype='int32')
    embedding_layer = Embedding(input_dim=vocab_size, output_dim=embedding_dim)(input_seq)
    embedding_model = Model(inputs=input_seq, outputs=embedding_layer)
    
    # Get sequence embeddings
    X_embeddings = get_sequence_embeddings(data_df, embedding_model)
    
    # Scale embeddings
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_embeddings)
    
    # Combine with additional features
    X_combined = np.hstack([X_scaled, data_df[['ac_dc_prob']].values])
    
    # Define target variable
    y = data_df['incident_type']
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X_combined, y, test_size=0.3, random_state=42)
    
    # Train SVM model
    svm = SVC(kernel='rbf', decision_function_shape='ovr', C=10, gamma=0.01, class_weight='balanced')
    svm.fit(X_train, y_train)
    
    # Evaluate the model
    y_pred = svm.predict(X_test)
    report = classification_report(y_test, y_pred, output_dict=True)
    
    # Store metrics
    accuracy = report['accuracy']
    f1_score = report['weighted avg']['f1-score']
    cm = confusion_matrix(y_test, y_pred)
    
    accuracy_scores.append(accuracy)
    f1_scores.append(f1_score)
    confusion_matrices.append(cm)
    
    print(f"Accuracy with embedding length {embedding_dim}: {accuracy:.4f}")
    print(f"F1 Score with embedding length {embedding_dim}: {f1_score:.4f}")
    results.append((embedding_dim, accuracy, f1_score))

# Visualization
# 1. Bar Chart for Accuracy and F1 Score
plt.figure(figsize=(10, 5))
plt.bar([str(dim) for dim in embedding_lengths], accuracy_scores, label='Accuracy')
plt.bar([str(dim) for dim in embedding_lengths], f1_scores, alpha=0.7, label='F1 Score')
plt.xlabel('Embedding Length')
plt.ylabel('Score')
plt.title('Accuracy and F1 Score by Embedding Length')
plt.legend()
plt.show()


# Display results as a DataFrame
results_df = pd.DataFrame(results, columns=['Embedding Length', 'Accuracy', 'F1 Score'])
print(results_df)


In [None]:
# Find the best accuracy and F1-score points
best_accuracy_index = accuracy_scores.index(max(accuracy_scores))
best_f1_index = f1_scores.index(max(f1_scores))

best_accuracy_dim = embedding_lengths[best_accuracy_index]
best_accuracy = accuracy_scores[best_accuracy_index]

best_f1_dim = embedding_lengths[best_f1_index]
best_f1_score = f1_scores[best_f1_index]

# Create the plot
plt.figure(figsize=(10, 6))

# Plot accuracy
plt.plot(embedding_lengths, accuracy_scores, marker='o', linestyle='-', color='green', label='Accuracy')

# Plot F1-score
plt.plot(embedding_lengths, f1_scores, marker='o', linestyle='-', color='blue', label='F1-Score')

# Highlight the best accuracy and F1 points
plt.scatter([best_accuracy_dim], [best_accuracy], color='red', label=f"Best Accuracy (t={best_accuracy_dim}, Acc={best_accuracy:.3f})")
plt.text(best_accuracy_dim, best_accuracy, f"({best_accuracy_dim}, {best_accuracy:.3f})", fontsize=10)

plt.scatter([best_f1_dim], [best_f1_score], color='orange', label=f"Best F1-Score (t={best_f1_dim}, F1={best_f1_score:.3f})")
plt.text(best_f1_dim, best_f1_score, f"({best_f1_dim}, {best_f1_score:.3f})", fontsize=10)

# Add labels, title, and legend
plt.xlabel('Embedding Length')
plt.ylabel('Score')
plt.title('Accuracy and F1-Score Trends by Embedding Length')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# Extract unique incident types and sort them
unique_incident_types = data_df['incident_type'].unique()
sorted_incident_types = sorted(unique_incident_types)

# Update axis labels to reflect actual incident type numbers
incident_type_labels = sorted_incident_types

# Plot confusion matrices for each embedding length
for i, embedding_dim in enumerate(embedding_lengths):
    # Normalize the confusion matrix for readability
    normalized_cm = confusion_matrices[i].astype('float') / confusion_matrices[i].sum(axis=1, keepdims=True)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(
        normalized_cm, 
        annot=True, 
        fmt='.2f', 
        cmap='Blues', 
        xticklabels=incident_type_labels,  # Use the actual incident type numbers as labels
        yticklabels=incident_type_labels,
        cbar=True
    )
    plt.title(f'Confusion Matrix for Embedding Length {embedding_dim}')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.xticks(rotation=0, ha='right')
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.show()