Traffic Management System

Data Understanding 

In [None]:
# -------------------- Initial Imports --------------------
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import os
from datetime import timedelta, datetime

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
import gdown

In [None]:
# Load the dataset
read_data = 'C:/Users/Ning Sheng Yong/Desktop/QING APU/futuristic_city_traffic.csv'
data = pd.read_csv(read_data)

# Basic Information
print("Basic Information:")
print("Dataset Shape:", data.shape)
print("\nData Types:\n", data.dtypes)
print("\nSummary Statistics:\n", data.describe())

In [None]:
# Missing Values
print("\nMissing Values:\n", data.isnull().sum())

In [None]:
# Unique Values in Categorical Columns
categorical_columns = data.select_dtypes(include=['object']).columns
print("\nUnique Values in Categorical Columns:")
for col in categorical_columns:
    print(f"\nColumn: {col}")
    print(data[col].unique())


Data Visualization (Initial)

In [None]:
# Traffic density distribution
sns.histplot(data['Traffic Density'], kde=True)
plt.title("Traffic Density Distribution")
plt.show()

In [None]:
# Vehicle speed vs traffic density
sns.scatterplot(x='Speed', y='Traffic Density', hue='Vehicle Type', data=data)
plt.title("Speed vs Traffic Density by Vehicle Type")
plt.show()

In [None]:
# Boxplot of energy consumption
sns.boxplot(x='Vehicle Type', y='Energy Consumption', data=data)
plt.title("Energy Consumption by Vehicle Type")
plt.show()


In [None]:
# Correlation Heatmap
corr = data.select_dtypes(include=np.number).corr()
plt.figure(figsize=(10, 6))
sns.heatmap(corr, annot=True, cmap='coolwarm')
plt.title("Correlation Matrix")
plt.show()


In [None]:
# Grouped Analysis
numeric_data = data.select_dtypes(include=[np.number]).columns
grouped_data = data.groupby('City')[numeric_data].mean()
print("\nGrouped Analysis by City:\n", grouped_data)

In [None]:
# Pairplot showing numerical data distribution
numericals = ["Speed", "Energy Consumption", "Traffic Density"]
sns.pairplot(data, vars=numericals)
plt.show()

In [None]:
# Outlier Detection
for col in numerical_columns:
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=data[col])
    plt.title(f'Boxplot of {col} for Outlier Detection')
    plt.show()

In [None]:
numerical_columns = data.select_dtypes(include=['float64', 'int64']).columns

# Set up grid: 2 plots per row
n_cols = 2
n_rows = (len(numerical_columns) + n_cols - 1) // n_cols  # ceiling division
fig, axes = plt.subplots(n_rows, n_cols, figsize=(12, n_rows * 4))

# Flatten axes array for easy indexing
axes = axes.flatten()

for idx, col in enumerate(numerical_columns):
    sns.boxplot(data=data[col], ax=axes[idx])
    axes[idx].set_title(f'Boxplot of {col}')
    
# Hide any unused subplots
for j in range(idx + 1, len(axes)):
    fig.delaxes(axes[j])

plt.tight_layout()
plt.show()


Data Pre-processing (Initial - from IR)

Handle Outliers

In [None]:
# Handle Outliers
# Using IQR to detect outliers
numerical_features = data.select_dtypes(include=[np.number]).columns
for column in numerical_features:
    Q1 = data[column].quantile(0.25)
    Q3 = data[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # Count outliers
    outliers = data[(data[column] < lower_bound) | (data[column] > upper_bound)]
    print(f"Outliers detected in {column}: {len(outliers)}")
    
    # Remove outliers
    data = data[(data[column] >= lower_bound) & (data[column] <= upper_bound)]


Feature Engineering

In [None]:
# Synchronize 'Day Of Week' with an actual calendar
from datetime import timedelta, datetime

# Map Day Of Week strings to numerical values (Monday=0, ..., Sunday=6)
day_of_week_mapping = {
    "Monday": 0,
    "Tuesday": 1,
    "Wednesday": 2,
    "Thursday": 3,
    "Friday": 4,
    "Saturday": 5,
    "Sunday": 6,
}

# Convert 'Day Of Week' to numerical values using the mapping
data["Day Of Week"] = data["Day Of Week"].map(day_of_week_mapping)

# Choose an arbitrary starting Monday for syncing the calendar
start_date = datetime(2024, 1, 1)  # Example: Start from the first Monday of 2024

# Generate synthetic calendar dates based on 'Day Of Week'
data["Date"] = data["Day Of Week"].apply(lambda dow: start_date + timedelta(days=dow))
#data['Date'] = pd.to_datetime(data['Date'], errors='coerce')


# Validate and display the updated dataset
print(data[["Day Of Week", "Date"]].head())


In [None]:
# -------------------- Public Holiday Feature --------------------
manual_holidays = [
    datetime(2024, 1, 1), datetime(2024, 2, 10), datetime(2024, 5, 1),
    datetime(2024, 8, 31), datetime(2024, 12, 25),
    datetime(2025, 1, 1), datetime(2025, 1, 29)
]
holiday_dates = [d.date() for d in manual_holidays]
data['Is_Public_Holiday'] = data['Date'].apply(
    lambda x: 1 if pd.notnull(x) and x.date() in holiday_dates else 0
)

In [None]:
#Feature Engineering
# Convert Date/Time to usable features
data['Date'] = pd.to_datetime(data['Date'])
data['Is_Weekend'] = data['Day Of Week'].apply(lambda x: 1 if x >= 5 else 0)

def categorize_hour(hour):
    if 5 <= hour < 12: return "Morning"
    elif 12 <= hour < 17: return "Afternoon"
    elif 17 <= hour < 21: return "Evening"
    else: return "Night"

data["Time of Day"] = data["Hour Of Day"].apply(categorize_hour)

data['Weather_Category'] = data['Weather'].apply(
    lambda x: 'Clear' if 'clear' in x.lower()
    else 'Rainy' if 'rain' in x.lower()
    else 'Snowy' if 'snow' in x.lower()
    else 'Other'
)

In [None]:
# Define a function to categorize traffic density
def categorize_density(density):
    if density >= 0.75: return 'very high'
    elif density >= 0.35: return 'high'
    elif density >= 0.20: return 'medium'
    elif density >= 0.05: return 'low'
    else: return 'very low'


# Apply the categorization function
data['Traffic Density Category'] = data['Traffic Density'].apply(categorize_density)

# Display the updated DataFrame
print(data)

In [None]:
# -------------------- Speed-Traffic Impact --------------------
def determine_speed_impact(row):
    speed = row['Speed']
    density = row['Traffic Density']
    if speed >= 70 and density <= 0.2:
        return 'Free-flowing'
    elif speed <= 30 and density >= 0.7:
        return 'Highly Congested'
    elif 30 < speed < 70 and 0.2 < density < 0.7:
        return 'Moderate'
    else:
        return 'Irregular'

data['Speed_Traffic_Impact'] = data.apply(determine_speed_impact, axis=1)
impact_encoder = LabelEncoder()
data['Speed_Traffic_Impact_Label'] = impact_encoder.fit_transform(data['Speed_Traffic_Impact'])

print("\nSpeed-Traffic Impact Distribution:")
print(data['Speed_Traffic_Impact'].value_counts())

Train-test Split

In [None]:
X = data.iloc[:, 0: 15]
X

y = data['Traffic Density Category']
y

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2)

In [None]:
# Check the distribution of values in the 'density' column (which contains 'low', 'medium', 'high')
target_class_distribution = data['Traffic Density Category'].value_counts()

# Print the result
print(target_class_distribution)

In [None]:
#Verify the Preprocessed Data
print("\nDataset after Preprocessing:")

# Save the cleaned and preprocessed dataset
cleaned_file_path = 'C:/Users/Ning Sheng Yong/Desktop/QING APU/cleaned_urban_traffic_density.csv'
data.to_csv(cleaned_file_path, index=False)
print(f"Cleaned dataset saved to {cleaned_file_path}.")

Data Preprocessing (Further preprocessing)

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import joblib

# ------------------ STEP 0: Load Dataset ------------------
read_data = 'C:/Users/Ning Sheng Yong/Desktop/QING APU/cleaned_urban_traffic_density.csv'
data = pd.read_csv(read_data)

# Initial exploration of data
print("First few rows of the dataset:")
print(data.head())
print("\nColumn names and data types in the dataset:")
print(data.dtypes)

In [None]:

# ------------------ STEP 1: Drop Unnecessary Columns ------------------
data = data.drop(columns=[
    'Traffic Density',                # Not needed (regression target)
    'Weather',                        # Redundant (Weather_Category kept)
    'Date',                           # Already encoded via Day/Hour/Weekend
    'Speed_Traffic_Impact'            # Object column; encoded label version exists
])

# ------------------ STEP 2: Select Features ------------------
# Define final feature columns
selected_features = [
    'City', 'Vehicle Type', 'Weather_Category', 'Economic Condition',
    'Day Of Week', 'Hour Of Day', 'Speed', 'Is Peak Hour',
    'Random Event Occurred', 'Energy Consumption',
    'Is_Public_Holiday', 'Is_Weekend', 'Time of Day',
    'Speed_Traffic_Impact_Label'
]

X = data[selected_features]
y = data['Traffic Density Category']

# Encode the target labels to integers (e.g. 'low' -> 1)
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

# ------------------ STEP 3: Preprocessing Pipeline ------------------

# Identify categorical columns (if any are still object types)
categorical_cols = X.select_dtypes(include=['object']).columns.tolist()
numeric_cols = X.select_dtypes(include=['int64', 'float64', 'int32', 'float32']).columns.tolist()

# Column transformer
preprocessor = ColumnTransformer(transformers=[
    ('num', StandardScaler(), numeric_cols),
    ('cat', OneHotEncoder(drop='first', handle_unknown='ignore'), categorical_cols)
])

In [None]:
# ------------------ STEP 4: Train-Test-Val Split ------------------
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, stratify=y_encoded, random_state=42)
X_train_final, X_val, y_train_final, y_val = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train, random_state=42)

# ------------------ STEP 5: Fit and Transform ------------------
X_train_final_processed = preprocessor.fit_transform(X_train_final)
X_val_processed = preprocessor.transform(X_val)
X_test_processed = preprocessor.transform(X_test)

# ✅ Save the fitted preprocessor
joblib.dump(preprocessor, "C:/Users/Ning Sheng Yong/Desktop/QING APU/traffic_preprocessor.pkl")

# Convert to DataFrame for saving (sparse handling)
def to_dataframe(matrix):
    return pd.DataFrame(matrix.toarray() if hasattr(matrix, 'toarray') else matrix)

X_train_df = to_dataframe(X_train_final_processed)
X_val_df = to_dataframe(X_val_processed)
X_test_df = to_dataframe(X_test_processed)


In [None]:
# ------------------ STEP 6: Save to CSV ------------------
output_path = "C:/Users/Ning Sheng Yong/Desktop/QING APU/"

X_train_df.to_csv(output_path + "X_train.csv", index=False)
pd.DataFrame(y_train_final).to_csv(output_path + "y_train.csv", index=False)

X_val_df.to_csv(output_path + "X_val.csv", index=False)
pd.DataFrame(y_val).to_csv(output_path + "y_val.csv", index=False)

X_test_df.to_csv(output_path + "X_test.csv", index=False)
pd.DataFrame(y_test).to_csv(output_path + "y_test.csv", index=False)

# ------------------ STATUS ------------------
print("\n✅ Datasets saved as CSV:")
print("- X_train.csv, y_train.csv")
print("- X_val.csv, y_val.csv")
print("- X_test.csv, y_test.csv")

In [None]:
print(X_train.info())
print(pd.Series(y_train).info())
print(X_val.info())
print(pd.Series(y_val).info())
print(X_test.info())
print(pd.Series(y_test).info())


Data Modelling

In [None]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter

from sklearn.model_selection import train_test_split, learning_curve
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import GradientBoostingClassifier
from xgboost import XGBClassifier
from imblearn.over_sampling import SMOTE
from catboost import CatBoostClassifier
from lightgbm import LGBMClassifier
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, StratifiedKFold
import joblib



In [None]:
# -------------------- Load Data --------------------
X_train = pd.read_csv("C:/Users/Ning Sheng Yong/Desktop/QING APU/X_train.csv")
y_train = pd.read_csv("C:/Users/Ning Sheng Yong/Desktop/QING APU/y_train.csv").values.ravel()
X_val = pd.read_csv("C:/Users/Ning Sheng Yong/Desktop/QING APU/X_val.csv")
y_val = pd.read_csv("C:/Users/Ning Sheng Yong/Desktop/QING APU/y_val.csv").values.ravel()
X_test = pd.read_csv("C:/Users/Ning Sheng Yong/Desktop/QING APU/X_test.csv")
y_test = pd.read_csv("C:/Users/Ning Sheng Yong/Desktop/QING APU/y_test.csv").values.ravel()


In [None]:
# Check class distribution in the target variable for each dataset
print("Training Set Class Distribution:")
print(pd.Series(y_train).value_counts(), "\n")

print("Validation Set Class Distribution:")
print(pd.Series(y_val).value_counts(), "\n")

print("Test Set Class Distribution:")
print(pd.Series(y_test).value_counts(), "\n")

Imbalance Data Handling

In [None]:
# Apply SMOTE to training data
smote = SMOTE(random_state=42)
X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)
print("Class distribution after SMOTE:\n", pd.Series(y_train_smote).value_counts())


Model Evaluation Function

In [None]:
# ------------------ Model Evaluation Function ------------------
def evaluate_model(model, model_name, X_train, y_train, X_val, y_val, cv=5):
    y_pred = model.predict(X_val)
    acc = accuracy_score(y_val, y_pred)
    print(f"\n==== {model_name} ====")
    print(f"Accuracy: {acc:.4f}\n")

    # Classification report
    report = classification_report(y_val, y_pred, output_dict=True)
    report_df = pd.DataFrame(report).transpose()
    display(report_df.round(2))

    # Confusion matrix
    cm = confusion_matrix(y_val, y_pred)
    plt.figure(figsize=(6, 4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False)
    plt.title(f'{model_name} - Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

    # Learning curve
    train_sizes, train_scores, val_scores = learning_curve(
        estimator=model,
        X=X_train,
        y=y_train,
        cv=cv,
        scoring='accuracy',
        n_jobs=-1,
        train_sizes=np.linspace(0.1, 1.0, 10),
        shuffle=True,
        random_state=42
    )

    plt.figure(figsize=(8, 6))
    plt.plot(train_sizes, train_scores.mean(axis=1), 'o-', label='Training Accuracy')
    plt.plot(train_sizes, val_scores.mean(axis=1), 'o-', label='Validation Accuracy')

    plt.fill_between(train_sizes,
                     train_scores.mean(axis=1) - train_scores.std(axis=1),
                     train_scores.mean(axis=1) + train_scores.std(axis=1),
                     alpha=0.1)
    plt.fill_between(train_sizes,
                     val_scores.mean(axis=1) - val_scores.std(axis=1),
                     val_scores.mean(axis=1) + val_scores.std(axis=1),
                     alpha=0.1)

    plt.title(f'Learning Curve - {model_name}')
    plt.xlabel('Training Set Size')
    plt.ylabel('Accuracy')
    plt.legend(loc='best')
    plt.grid(True)
    plt.show()

    return acc, report, cm

Model 1: Logistic Regression

In [None]:
lr_model = LogisticRegression(max_iter=5000, solver='lbfgs', class_weight='balanced', random_state=42)
lr_model.fit(X_train_smote, y_train_smote)

# Evaluate
lr_acc, lr_report, lr_cm = evaluate_model(
    lr_model,
    "Logistic Regression",
    X_train_smote, y_train_smote,
    X_val, y_val
)

Model  2: Random Forest

In [None]:
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train_smote, y_train_smote)

rf_acc, rf_report, rf_cm = evaluate_model(
    rf_model,
    "Random Forest",
    X_train_smote, y_train_smote,
    X_val, y_val
)


Model 3: XGBoost

In [None]:
xgb_model = XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', random_state=42)
xgb_model.fit(X_train_smote, y_train_smote)

xgb_acc, xgb_report, xgb_cm = evaluate_model(
    xgb_model,
    "XGBoost",
    X_train_smote, y_train_smote,
    X_val, y_val
)


Model 4: CatBoost

In [None]:
cat_model = CatBoostClassifier(verbose=0, random_state=42) 
cat_model.fit(X_train_smote, y_train_smote)

cat_acc, cat_report, cat_cm = evaluate_model(
    cat_model,
    "CatBoost",
    X_train_smote, y_train_smote,
    X_val, y_val
)


# Export the tuned CatBoost model
joblib.dump(cat_model, 'C:/Users/Ning Sheng Yong/Desktop/QING APU/catmodel_traffic_model.pkl')
print("CatBoost model exported successfully as 'catmodel_traffic_model.pkl'.")


Model 5: LightGBM

In [None]:
lgbm_model = LGBMClassifier(random_state=42)
lgbm_model.fit(X_train_smote, y_train_smote)

lgbm_acc, lgbm_report, lgbm_cm = evaluate_model( 
    lgbm_model,
    "LightGBM",
    X_train_smote, y_train_smote, 
    X_val, y_val
)

# Export the model
joblib.dump(lgbm_model, 'C:/Users/Ning Sheng Yong/Desktop/QING APU/lgbmodel_traffic_model.pkl')
print("Model exported successfully as 'lgbmodel_traffic_model.pkl'.")

Model 6: Multi-layer Preception

In [None]:
mlp_model = MLPClassifier(hidden_layer_sizes=(64, 64), max_iter=300, random_state=42)
mlp_model.fit(X_train_smote, y_train_smote)

mlp_acc, mlp_report, mlp_cm = evaluate_model(
    mlp_model,
    "MLP Classifier",
    X_train_smote, y_train_smote,
    X_val, y_val
)

Model 7: K-Nearest Neighbours

In [None]:
knn_model = KNeighborsClassifier(n_neighbors=5)
knn_model.fit(X_train_smote, y_train_smote)

knn_acc, knn_report, knn_cm = evaluate_model(
    knn_model,
    "K-Nearest Neighbors",
    X_train_smote, y_train_smote,
    X_val, y_val
)


Model 8: Gradient Boosting

In [None]:
gb_model = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42)
gb_model.fit(X_train_smote, y_train_smote)

gb_acc, gb_report, gb_cm = evaluate_model(
    gb_model,
    "Gradient Boosting",
    X_train_smote, y_train_smote,
    X_val, y_val
)


Model Tuning 

Tuned Model 1: XGBoost

In [None]:
xgb_params = {
    'learning_rate': [0.01, 0.05, 0.1, 0.2],
    'max_depth': [3, 5, 7, 9],
    'subsample': [0.7, 0.8, 0.9, 1.0],
    'colsample_bytree': [0.7, 0.8, 0.9, 1.0],
    'n_estimators': [100, 300, 500]
}

xgb_random = RandomizedSearchCV(
    XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', random_state=42),
    param_distributions=xgb_params,
    n_iter=50, cv=5, scoring='accuracy', n_jobs=-1, verbose=1
)

xgb_random.fit(X_train_smote, y_train_smote, eval_set=[(X_val, y_val)], verbose=False)

evaluate_model(xgb_random.best_estimator_, "XGBoost (Tuned)", X_train_smote, y_train_smote, X_val, y_val)

Tuned Model 2: CatBoost

In [None]:
from catboost import CatBoostClassifier


cat_model = CatBoostClassifier(
    iterations=1000,           # Increase iterations to allow more rounds, while using early stopping
    learning_rate=0.05,        # Lower learning rate for more gradual updates
    depth=6,
    l2_leaf_reg=3,
    bagging_temperature=1,
    random_state=42,
    verbose=0
)

# Use eval_set and early_stopping_rounds for CatBoost tuning
cat_model.fit(X_train_smote, y_train_smote, eval_set=(X_val, y_val), verbose=False)
evaluate_model(cat_model, "Tuned CatBoost", X_train_smote, y_train_smote, X_val, y_val)

Tuned Model 3: LightGBM

In [None]:
lgb_params = {
    'n_estimators': [100, 200, 300, 500],
    'max_depth': [10, 20, -1],
    'learning_rate': [0.01, 0.05, 0.1],
    'num_leaves': [31, 50, 100],
}

lgb_random = RandomizedSearchCV(
    LGBMClassifier(random_state=42),
    param_distributions=lgb_params,
    n_iter=50,
    cv=5,  # Increased CV folds for robustness
    scoring='accuracy',
    n_jobs=-1,
    verbose=1,
    random_state=42
)

lgb_random.fit(X_train_smote, y_train_smote, eval_set=[(X_val, y_val)])

evaluate_model(lgb_random.best_estimator_, "LightGBM (Tuned)", X_train_smote, y_train_smote, X_val, y_val)
