In [None]:
import sys
from pandas import value_counts
import pickle
import numpy as np
from imblearn.over_sampling import SMOTE
from numpy import array, ndarray
from pandas import read_csv, DataFrame, Series, concat
from matplotlib.pyplot import figure, savefig, show, subplots
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from scipy.stats import norm, expon, lognorm
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import MinMaxScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler

sys.path.insert(1, '../../../../utils')
from dslabs_functions import get_variable_types, \
  CLASS_EVAL_METRICS, DELTA_IMPROVE, plot_bar_chart, plot_multiline_chart, plot_evaluation_results, \
  plot_horizontal_bar_chart, HEIGHT, plot_line_chart, dummify, run_NB, run_KNN, plot_multibar_chart, \
  encode_cyclic_variables, plot_confusion_matrix, NR_STDEV, determine_outlier_thresholds_for_var
from studies import naive_Bayes_study, knn_study, evaluate_approach, evaluate_and_plot 


### **Outliers** ###

In [None]:
file_tag = "traffic"
lab_folder_out = "lab3_preparation/outliers"
target_name = "crash_type"
filename = "../../data/prepared/traffic_enc1.csv"
# filename = "../../data/prepared/traffic_enc2.csv"
data: DataFrame = read_csv(filename, na_values="")
variable_types: dict[str, list] = get_variable_types(data)
# data.shape

### Approach 1 ###

In [None]:
approach_out1 = "drop_outliers"

In [None]:
n_std: int = NR_STDEV
numeric_vars: list[str] = get_variable_types(data)["numeric"]
if numeric_vars is not None:
    df_out1: DataFrame = data.copy(deep=True)
    summary5: DataFrame = data[numeric_vars].describe()
    for var in numeric_vars:
        top_threshold, bottom_threshold = determine_outlier_thresholds_for_var(
            summary5[var]
        )
        outliers: Series = df_out1[(df_out1[var] > top_threshold) | (df_out1[var] < bottom_threshold)]
        df_out1.drop(outliers.index, axis=0, inplace=True)
    df_out1.to_csv(f"../../data/prepared/{file_tag}_{approach_out1}.csv", index=True)
    print(f"Data after dropping outliers: {df_out1.shape}")
else:
    print("There are no numeric variables")

In [None]:
evaluate_and_plot(df_out1, lab_folder_out, file_tag, approach_out1, target_name)

### Approach 2 ###

In [None]:
approach_out2 = "replacing_outliers"

In [None]:
if [] != numeric_vars:
    df_out2: DataFrame = data.copy(deep=True)
    for var in numeric_vars:
        top, bottom = determine_outlier_thresholds_for_var(summary5[var])
        median: float = df_out2[var].median()
        df_out2[var] = df_out2[var].apply(lambda x: median if x > top or x < bottom else x)
    df_out2.to_csv(f"../../data/prepared/{file_tag}_{approach_out2}.csv", index=True)
    print("Data after replacing outliers:", df_out2.shape)
    print(df_out2.describe())
else:
    print("There are no numeric variables")

In [None]:
evaluate_and_plot(df_out2, lab_folder_out, file_tag, approach_out2, target_name)

### Approach 3 ###

In [None]:
approach_out3 = "truncate_outliers"

In [None]:
if [] != numeric_vars:
    df_out3: DataFrame = data.copy(deep=True)
    for var in numeric_vars:
        top, bottom = determine_outlier_thresholds_for_var(summary5[var])
        df_out3[var] = df_out3[var].apply(
            lambda x: top if x > top else bottom if x < bottom else x
        )
    df_out3.to_csv(f"../../data/prepared/{file_tag}_{approach_out3}.csv", index=True)
    print("Data after truncating outliers:", df_out3.shape)
    print(df_out3.describe())
else:
    print("There are no numeric variables")

In [None]:
evaluate_and_plot(df_out3, lab_folder_out, file_tag, approach_out3, target_name)

### **Scaling** ###

In [None]:
lab_folder_sca = "lab3_preparation/scaling"

# filename = "../../data/prepared/traffic_outliers_drop_outliers.csv"
# filename = "../../data/prepared/traffic_outliers_replacing_outliers.csv"
# filename = "../../data/prepared/traffic_outliers_truncate_outliers.csv"

input_df = df_out3.copy()

In [None]:
filename_raw = "../../data/raw/traffic_accidents.csv"
df_raw: DataFrame = read_csv(filename_raw, na_values="")
variable_types: dict[str, list] = get_variable_types(df_raw)

In [None]:
numeric_vars = variable_types['numeric']
df_raw[numeric_vars].head()

In [None]:
symbolic_vars = variable_types['symbolic']
df_raw[symbolic_vars].head()

### Approach 1 - Standard Scaler

In [None]:
approach_sca1 = "Standard-Scaler"

data_sca1 = input_df.copy()
target: Series = data_sca1.pop(target_name)

numeric_df = data_sca1[numeric_vars].copy()
transf: StandardScaler = StandardScaler(with_mean=True, with_std=True, copy=True).fit(
    numeric_df
)
numeric_df_scaled = DataFrame(transf.transform(numeric_df), index=data_sca1.index)
data_sca1[numeric_vars] = numeric_df_scaled
data_sca1[target_name] = target

In [None]:
evaluate_and_plot(data_sca1, lab_folder_sca, file_tag, approach_sca1, target_name)

### Approach 2 - MinMax Scaler

In [None]:
approach_sca2 = "MinMax-Scaler"

data_sca2 = input_df.copy()
target: Series = data_sca2.pop(target_name)

numeric_df = data_sca2[numeric_vars].copy()
transf: MinMaxScaler = MinMaxScaler(feature_range=(0, 1), copy=True).fit(numeric_df)
numeric_df_scaled = DataFrame(transf.transform(numeric_df), index=data_sca2.index)
data_sca2[numeric_vars] = numeric_df_scaled
data_sca2[target_name] = target

In [None]:
evaluate_and_plot(data_sca2, lab_folder_sca, file_tag, approach_sca2, target_name)

### **Balancing** ###

In [None]:
lab_folder_bal = "lab3_preparation/balancing"

# filename = "../../data/prepared/traffic_outliers_drop_outliers.csv"
# filename = "../../data/prepared/traffic_outliers_replacing_outliers.csv"
# filename = "../../data/prepared/traffic_outliers_truncate_outliers.csv"

#input_df = data_sca1.copy()
input_df = data_sca2.copy()

### Approach 1 - Undersampling

In [None]:
approach_bal1 = "undersampling"

# Check class distribution before balancing
print("Class distribution before undersampling:")
print(input_df[target_name].value_counts())
print(f"Ratio: {input_df[target_name].value_counts()[0] / input_df[target_name].value_counts()[1]:.2f}:1")

# Separate majority and minority classes
df_majority = input_df[input_df[target_name] == 0]
df_minority = input_df[input_df[target_name] == 1]

# Undersample majority class to match minority class size
df_majority_undersampled = df_majority.sample(n=len(df_minority), random_state=42)

# Combine minority class with undersampled majority class
data_bal1 = concat([df_majority_undersampled, df_minority])

# Shuffle the dataset
data_bal1 = data_bal1.sample(frac=1, random_state=42).reset_index(drop=True)

print(f"\nClass distribution after undersampling:")
print(data_bal1[target_name].value_counts())
print(f"Dataset size: {len(input_df)} → {len(data_bal1)} ({100*len(data_bal1)/len(input_df):.1f}%)")

In [None]:
# Evaluate
evaluate_and_plot(data_bal1, lab_folder_bal, file_tag, approach_bal1, target_name)

### Approach 2 - SMOTE (Synthetic Minority Over-sampling Technique)

In [None]:
approach_bal2 = "SMOTE"

# Check class distribution before balancing
print("Class distribution before SMOTE:")
print(input_df[target_name].value_counts())

# Separate features and target
X = input_df.drop(columns=[target_name])
y = input_df[target_name]

# Apply SMOTE
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Combine back into DataFrame
data_bal2 = DataFrame(X_resampled, columns=X.columns)
data_bal2[target_name] = y_resampled

print(f"\nClass distribution after SMOTE:")
print(data_bal2[target_name].value_counts())
print(f"Dataset size: {len(input_df)} → {len(data_bal2)} ({100*len(data_bal2)/len(input_df):.1f}%)")

In [None]:
# Evaluate
evaluate_and_plot(data_bal2, lab_folder_bal, file_tag, approach_bal2, target_name)