In [None]:
# ROAD REPAIR PREDICTION - ENHANCED SYNTHETIC DATA, HYBRID MODELING, IMPROVED ACCURACY
!pip install -r requirements.txt -q
import pandas as pd
import numpy as np
import random
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, classification_report
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.neighbors import KNeighborsClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
import seaborn as sns

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)

# Generate a larger synthetic dataset
n_samples = 5000
materials = ['asphalt', 'concrete', 'gravel']
weather_types = ['hot', 'humid', 'rainy', 'dry', 'cold']
usage_types = ['residential', 'commercial', 'highway']
traffic_levels = ['low', 'medium', 'high']

data = {
    'last_laid_year': np.random.randint(1980, 2020, size=n_samples),
    'last_repair_year': np.random.randint(1985, 2025, size=n_samples),
    'material': np.random.choice(materials, size=n_samples),
    'weather': np.random.choice(weather_types, size=n_samples),
    'usage_type': np.random.choice(usage_types, size=n_samples),
    'traffic_level': np.random.choice(traffic_levels, size=n_samples),
    'accidents_reported': np.random.randint(0, 30, size=n_samples)
}

# Assuming 'data' is your dataset
df = pd.DataFrame(data)

# Calculate road age and years since last repair
df['road_age'] = 2025 - df['last_laid_year']
df['years_since_repair'] = 2025 - df['last_repair_year']

# Ensure that road_age is always greater than years_since_repair
df['road_age'] = df.apply(lambda row: max(row['road_age'], row['years_since_repair'] + 1), axis=1)

# Now, road_age is guaranteed to be greater than years_since_repair

df.to_csv('synthetic_data.csv', index=False)
#df.head()
import pandas as pd
from sklearn.preprocessing import LabelEncoder

# Load the synthetic_data.csv
df_synthetic = pd.read_csv('synthetic_data.csv')

# Create a LabelEncoder object
encoder = LabelEncoder()

# List of categorical columns to encode (same as before)
categorical_cols = ['material', 'weather', 'usage_type', 'traffic_level']

# Encode each categorical column in the synthetic data
for col in categorical_cols:
    df_synthetic[col] = encoder.fit_transform(df_synthetic[col])

# Save the encoded synthetic data
df_synthetic.to_csv('synthetic_data_encoded.csv', index=False)
df_synthetic.head()

In [None]:
import numpy as np
import pandas as pd

# Set random seed for reproducibility
np.random.seed(42)

# ... (Code for generating 'last_laid_year', 'last_repair_year', 'material', etc. - same as before) ...

# Calculate road age and years since last repair (same as before)
df['road_age'] = 2025 - df['last_laid_year']
df['years_since_repair'] = 2025 - df['last_repair_year']
df['road_age'] = df.apply(lambda row: max(row['road_age'], row['years_since_repair'] + 1), axis=1)

# --- Introduce Noise and Complex Patterns ---

# 1. Encode categorical features for numerical calculations
material_map = {'asphalt': 0, 'concrete': 1, 'gravel': 2}
weather_map = {'hot': 0, 'humid': 1, 'rainy': 2, 'dry': 3, 'cold': 4}
usage_map = {'residential': 0, 'commercial': 1, 'highway': 2}
traffic_map = {'low': 0, 'medium': 1, 'high': 2}

df['material_encoded'] = df['material'].map(material_map)
df['weather_encoded'] = df['weather'].map(weather_map)
df['usage_encoded'] = df['usage_type'].map(usage_map)
df['traffic_encoded'] = df['traffic_level'].map(traffic_map)

# 2. Create a complex factor using encoded features and noise
# Use len(df) instead of n_samples to match the DataFrame size
complex_factor = (
    0.3 * df['road_age'] +
    0.2 * df['years_since_repair'] +
    0.1 * df['material_encoded'] +
    0.15 * df['weather_encoded'] +
    0.1 * df['usage_encoded'] +
    0.15 * df['traffic_encoded'] +
    0.2 * (df['accidents_reported'] / 30) +  # Scale accidents
    np.random.normal(0, 0.2, len(df))  # Add Gaussian noise - Changed n_samples to len(df)
)

# 3. Label logic (thresholding based on complex factor)
df['needs_repair'] = (complex_factor > 1.5).astype(int)  # Adjust threshold as needed

# --- Cleanup ---
df = df.drop(columns=['material_encoded', 'weather_encoded', 'usage_encoded', 'traffic_encoded'])  # Remove encoded columns

#df.to_csv('noisy_test_data.csv', index=False)
import numpy as np
import pandas as pd

# Set random seed for reproducibility
np.random.seed(42)

# Number of samples
n_samples = 1000

# Generate features (same as before)
# ... (Code for generating 'last_laid_year', 'last_repair_year', 'material', etc.) ...

# --- Introduce Noise and Complex Patterns ---

# 1. Numerical Features: Add noise scaled by feature's standard deviation
numerical_cols = ['road_age', 'years_since_repair', 'accidents_reported']
for col in numerical_cols:
    noise_scale = 0.1  # Adjust the noise scale (e.g., 0.1 for 10% noise)
    noise = np.random.normal(0, noise_scale * df[col].std(), size=len(df))
    df[col] = df[col] + noise

# 2. Categorical Features: Introduce random flips with a low probability
categorical_cols = ['material', 'weather', 'usage_type', 'traffic_level']
flip_probability = 0.05  # Adjust the flip probability (e.g., 0.05 for 5% flips)
for col in categorical_cols:
    flip_indices = np.random.choice(df.index, size=int(flip_probability * len(df)), replace=False)
    # Iterate through the flip indices to get unique values excluding current value at each index
    for i in flip_indices:
        # Get unique values in the column (excluding the current value at the index 'i')
        unique_values = [val for val in df[col].unique() if val != df.loc[i, col]]
        # Choose a new random value for the current index 'i'
        new_value = np.random.choice(unique_values, size=1)[0]
        df.loc[i, col] = new_value

# ... (rest of the code, including calculating road_age, years_since_repair, and generating needs_repair) ...

# --- Save the modified data ---
#df.to_csv('noisy_test_data_with_noise.csv', index=False)
import pandas as pd
from sklearn.preprocessing import LabelEncoder

# Load the data
#df = pd.read_csv('noisy_test_data_with_noise.csv')

# Create a LabelEncoder object
encoder = LabelEncoder()

# List of categorical columns to encode
categorical_cols = ['material', 'weather', 'usage_type', 'traffic_level']

# Encode each categorical column
for col in categorical_cols:
    df[col] = encoder.fit_transform(df[col])

# Save the encoded data
df.to_csv('noisy_test_data_encoded.csv', index=False)
df.head()

In [None]:
import pandas as pd

# Load datasets
dataset_1 = pd.read_csv('synthetic_data_encoded.csv')
dataset_2 = pd.read_csv('noisy_test_data_encoded.csv')

# Convert 'road_age' and 'years_since_repair' to integers
dataset_1['road_age'] = dataset_1['road_age'].astype(int)
dataset_1['years_since_repair'] = dataset_1['years_since_repair'].astype(int)
dataset_2['road_age'] = dataset_2['road_age'].astype(int)
dataset_2['years_since_repair'] = dataset_2['years_since_repair'].astype(int)

# Remove 'needs_repair' column from both datasets before concatenating
#dataset_1 = dataset_1.drop('needs_repair', axis=1)
dataset_2 = dataset_2.drop('needs_repair', axis=1)

# Concatenate the datasets
combined_dataset = pd.concat([dataset_1, dataset_2], ignore_index=True)

# Shuffle the rows
combined_dataset = combined_dataset.sample(frac=1, random_state=42).reset_index(drop=True)

combined_dataset.head()

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np

# Assuming 'combined_dataset' is your latest merged DataFrame (without 'needs_repair')

# --- Define more complex logic to generate 'needs_repair' ---
def predict_needs_repair(row):
    # Base condition: road age and years since last repair
    base_condition = (row['road_age'] > 25) | (row['years_since_repair'] > 12)

    # Material-specific conditions
    material_condition = False
    if row['material'] == 0:  # Assuming 0 represents 'asphalt' (adjust based on encoding)
        material_condition = row['road_age'] > 20  # Asphalt roads degrade faster
    elif row['material'] == 2:  # Assuming 2 represents 'gravel'
        material_condition = row['years_since_repair'] > 8  # Gravel roads need more frequent repair

    # Weather-specific conditions
    weather_condition = False
    if row['weather'] == 2:  # Assuming 2 represents 'rainy'
        weather_condition = row['years_since_repair'] > 10  # Rainy weather increases degradation
    elif row['weather'] == 0:  # Assuming 0 represents 'hot'
        weather_condition = row['road_age'] > 22  # Hot weather can damage asphalt

    # Usage and traffic conditions
    usage_traffic_condition = False
    if row['usage_type'] == 2 and row['traffic_level'] == 2:  # Assuming 2 represents 'highway' and 'high' traffic
        usage_traffic_condition = row['years_since_repair'] > 7  # Highways with high traffic degrade faster

    # Accidents condition
    accident_condition = row['accidents_reported'] > 5  # More accidents might indicate road damage

    # Combine conditions with logical OR
    needs_repair = base_condition | material_condition | weather_condition | usage_traffic_condition | accident_condition
    return int(needs_repair)  # Convert to 0/1

combined_dataset['needs_repair'] = combined_dataset.apply(predict_needs_repair, axis=1)

# --- Now split the data ---
X = combined_dataset.drop('needs_repair', axis=1)  # Features
y = combined_dataset['needs_repair']  # Target variable

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)

In [None]:
!pip install tensorflow -q
!pip install xgboost -q
!pip install scikeras -q
!pip install tensorflow_hub -q

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, Dropout, Input, Reshape, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
#from scikeras.wrappers import KerasClassifier  # For wrapping Keras models in scikit-learn
import numpy as np
from sklearn.metrics import log_loss  # For calculating log loss
import tensorflow_hub as hub

# ... (Your code for loading and preprocessing datasets - same as before) ...

# Load datasets
dataset_1 = pd.read_csv('synthetic_data_encoded.csv')
dataset_2 = pd.read_csv('noisy_test_data_encoded.csv')

# Convert 'road_age' and 'years_since_repair' to integers
dataset_1['road_age'] = dataset_1['road_age'].astype(int)
dataset_1['years_since_repair'] = dataset_1['years_since_repair'].astype(int)
dataset_2['road_age'] = dataset_2['road_age'].astype(int)
dataset_2['years_since_repair'] = dataset_2['years_since_repair'].astype(int)

# Remove 'needs_repair' column from both datasets before concatenating
#dataset_1 = dataset_1.drop('needs_repair', axis=1)
dataset_2 = dataset_2.drop('needs_repair', axis=1)

# Concatenate the datasets
combined_dataset = pd.concat([dataset_1, dataset_2], ignore_index=True)

# Shuffle the rows
combined_dataset = combined_dataset.sample(frac=1, random_state=42).reset_index(drop=True)

# --- Drop 'last_laid_year' and 'last_repair_year' ---
combined_dataset = combined_dataset.drop(['last_laid_year', 'last_repair_year'], axis=1)

# Separate features and labels
#X = combined_dataset.drop('needs_repair', axis=1)
#y = combined_dataset['needs_repair']

def predict_needs_repair(row):
    # Base condition: road age and years since last repair
    base_condition = (row['road_age'] > 25) | (row['years_since_repair'] > 12)

    # Material-specific conditions
    material_condition = False
    if row['material'] == 0:  # Assuming 0 represents 'asphalt' (adjust based on encoding)
        material_condition = row['road_age'] > 20  # Asphalt roads degrade faster
    elif row['material'] == 2:  # Assuming 2 represents 'gravel'
        material_condition = row['years_since_repair'] > 8  # Gravel roads need more frequent repair

    # Weather-specific conditions
    weather_condition = False
    if row['weather'] == 2:  # Assuming 2 represents 'rainy'
        weather_condition = row['years_since_repair'] > 10  # Rainy weather increases degradation
    elif row['weather'] == 0:  # Assuming 0 represents 'hot'
        weather_condition = row['road_age'] > 22  # Hot weather can damage asphalt

    # Usage and traffic conditions
    usage_traffic_condition = False
    if row['usage_type'] == 2 and row['traffic_level'] == 2:  # Assuming 2 represents 'highway' and 'high' traffic
        usage_traffic_condition = row['years_since_repair'] > 7  # Highways with high traffic degrade faster

    # Accidents condition
    accident_condition = row['accidents_reported'] > 5  # More accidents might indicate road damage

    # Combine conditions with logical OR
    needs_repair = base_condition | material_condition | weather_condition | usage_traffic_condition | accident_condition
    return int(needs_repair)  # Convert to 0/1

# Apply the logic to generate 'needs_repair' column
combined_dataset['needs_repair'] = combined_dataset.apply(predict_needs_repair, axis=1)

# Separate features and labels
X = combined_dataset.drop('needs_repair', axis=1)
y = combined_dataset['needs_repair']

# --- Preprocessing ---
# 1. Identify categorical columns
categorical_cols = X.select_dtypes(include=['object']).columns

# 2. One-hot encode categorical features
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore') # Create OneHotEncoder instance
encoded_data = encoder.fit_transform(X[categorical_cols]) # Fit and transform on categorical data
encoded_df = pd.DataFrame(encoded_data, columns=encoder.get_feature_names_out(categorical_cols)) # Create DataFrame from encoded data
X = X.drop(categorical_cols, axis=1) # Drop original categorical columns
X = pd.concat([X, encoded_df], axis=1) # Concatenate encoded features

# 3. Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 4. Standardize numerical features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# --- Define and Train Models ---

# 1. Standalone Neural Network
input_layer = Input(shape=(X_train.shape[1],))
hidden_layer1 = Dense(32, activation='relu')(input_layer)
dropout1 = Dropout(0.3)(hidden_layer1)
hidden_layer2 = Dense(16, activation='relu')(dropout1)
dropout2 = Dropout(0.3)(hidden_layer2)
output_layer = Dense(1, activation='sigmoid')(dropout2)

model = Model(inputs=input_layer, outputs=output_layer)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
history = model.fit(X_train, y_train, epochs=100, batch_size=64,
                    validation_split=0.2, callbacks=[early_stopping])

# 2. Hybrid KNN+NN
knn = KNeighborsClassifier(n_neighbors=10)
knn.fit(X_train, y_train)
knn_features_train = knn.kneighbors(X_train, return_distance=False)
knn_features_test = knn.kneighbors(X_test, return_distance=False)

input_layer_hybrid = Input(shape=(knn_features_train.shape[1],))
hidden_layer_hybrid = Dense(32, activation='relu')(input_layer_hybrid)
dropout_layer_hybrid = Dropout(0.5)(hidden_layer_hybrid)
output_layer_hybrid = Dense(1, activation='sigmoid')(dropout_layer_hybrid)

hybrid_model = Model(inputs=input_layer_hybrid, outputs=output_layer_hybrid)
hybrid_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
hybrid_model.fit(knn_features_train, y_train, epochs=100, batch_size=32, validation_split=0.2)


# 3. Random Forest
rf_model = RandomForestClassifier(random_state=42)
rf_model.fit(X_train, y_train)

# 4. XGBoost
xgb_model = XGBClassifier(random_state=42)
xgb_model.fit(X_train, y_train)

# 5. Gradient Boosting
gb_model = GradientBoostingClassifier(random_state=42)
gb_model.fit(X_train, y_train)

# 6. AdaBoost
ada_model = AdaBoostClassifier(random_state=42)
ada_model.fit(X_train, y_train)

# 7. Support Vector Machine
svm_model = SVC(probability=True, random_state=42)  # probability=True for predict_proba
svm_model.fit(X_train, y_train)

# 8. Logistic Regression
lr_model = LogisticRegression(random_state=42)
lr_model.fit(X_train, y_train)

# 9. Pre-trained TensorFlow Model (example using EfficientNetB0)
# Load pre-trained model
#base_model = hub.KerasLayer("https://tfhub.dev/tensorflow/efficientnet/b0/classification/1",
                         trainable=False)  # Set trainable=True if you want to fine-tune

# Create a new model on top of the pre-trained model
'''input_tensor = Input(shape=(X_train.shape[1],)) # Assuming your input shape
x = Reshape((1, 1, X_train.shape[1]))(input_tensor) # Reshape to (1, 1, num_features)
efficientnet_output = base_model(x)
efficientnet_output = GlobalAveragePooling2D()(efficientnet_output) # Add GlobalAveragePooling2D
output_tensor = Dense(1, activation='sigmoid')(efficientnet_output)

efficientnet_model = Model(inputs=input_tensor, outputs=output_tensor)

efficientnet_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
efficientnet_model.fit(X_train, y_train, epochs=20, batch_size=32, validation_split=0.2)
'''

# --- Evaluate Models and Calculate Loss ---

models = {
    "Standalone NN": model,
    "Hybrid KNN+NN": hybrid_model,
    "Random Forest": rf_model,
    "XGBoost": xgb_model,
    "Gradient Boosting": gb_model,
    "AdaBoost": ada_model,
    "SVM": svm_model,
    "Logistic Regression": lr_model,
     #"EfficientNetB0": efficientnet_model,  # Uncomment if using
    # "Hybrid KNN+MobileNetV2": hybrid_mobilenet_model  # Uncomment if using
}

results = {}
for model_name, model in models.items():
    if model_name in ["Standalone NN", "Hybrid KNN+NN", "EfficientNetB0"]:  # Keras models
        loss, accuracy = model.evaluate(X_test if model_name == "Standalone NN" else knn_features_test if model_name == "Hybrid KNN+NN" else X_test , y_test, verbose=0)
    else:  # Scikit-learn models
        predictions = model.predict(X_test)
        accuracy = accuracy_score(y_test, predictions)
        loss = log_loss(y_test, model.predict_proba(X_test)[:, 1])  # Calculate log loss for binary classification

    results[model_name] = {"Accuracy": accuracy, "Loss": loss}

# --- Print Results ---
for model_name, metrics in results.items():
    print(f"{model_name} - Test Accuracy: {metrics['Accuracy']:.4f}, Test Loss: {metrics['Loss']:.4f}")