In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, PolynomialFeatures
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt
import joblib
from datetime import datetime, timedelta
from sklearn.pipeline import Pipeline
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.svm import SVR

class EVEnergyPredictor:
    def __init__(self, data_path):
        self.data_path = data_path
        self.model = None
        self.scaler = MinMaxScaler()
        self.feature_columns = None
        self.categorical_mappings = {
            'Terrain_Type': ['hilly', 'non-hilly'],
            'Traffic': ['no traffic', 'present traffic']
        }

    def cap_outliers(self, df, column, whisker_factor=1.5):
        """Cap outliers using IQR method."""
        q1 = df[column].quantile(0.25)
        q3 = df[column].quantile(0.75)
        iqr = q3 - q1
        lower_bound = q1 - whisker_factor * iqr
        upper_bound = q3 + whisker_factor * iqr
        df[column] = df[column].clip(lower_bound, upper_bound)
        return df

    def load_and_preprocess(self):
        """Load and preprocess the historical data with outlier capping"""
        df = pd.read_csv(self.data_path)
        df.columns = df.columns.str.strip()

        df = df[['EVS Bus', 'Temperature', 'Distance', 'Passenger Count',
                'speed(km/hr)', 'Energy Consumption',
                'Terrain Description', 'Traffic Description']]
        df.columns = ['Bus_ID', 'Temperature', 'Distance_km', 'Passengers',
                     'Average_Speed', 'Energy_kWh', 'Terrain_Type', 'Traffic']

        df = df[~df['Traffic'].str.contains('light traffic', case=False, na=False)]

        df = df.dropna()
        df = df[df['Energy_kWh'] > 0]

        df['Terrain_Type'] = df['Terrain_Type'].str.strip().str.lower()
        df['Traffic'] = df['Traffic'].str.strip().str.lower()

        df = df[df['Traffic'].isin(['no traffic', 'present traffic'])]

        # Binary encoding categorical features
        df['Terrain_Type'] = df['Terrain_Type'].map({'hilly': 1, 'non-hilly': 0})
        df['Traffic'] = df['Traffic'].map({'present traffic': 1, 'no traffic': 0})

        #feature interactions and nonlinear transformations
        df['Distance_Squared'] = df['Distance_km'] ** 2
        df['Speed_Squared'] = df['Average_Speed'] ** 2
        df['Distance_Speed'] = df['Distance_km'] * df['Average_Speed']
        df['Distance_Terrain'] = df['Distance_km'] * df['Terrain_Type']
        df['Speed_Terrain'] = df['Average_Speed'] * df['Terrain_Type']
        df['Passengers_Distance'] = df['Passengers'] * df['Distance_km']

        # Outlier capping
        base_numeric_cols = ["Distance_km", "Passengers", "Temperature", "Average_Speed", "Energy_kWh"]
        derived_numeric_cols = ["Distance_Squared", "Speed_Squared", "Distance_Speed",
                                "Distance_Terrain", "Speed_Terrain", "Passengers_Distance"]

        for col in base_numeric_cols:
            df = self.cap_outliers(df, col)

        # Store the processed data
        self.historical_data = df
        return df

    def train_model(self, test_size=0.2, random_state=42, model_type='gradient_boosting'):
        """Train non-linear regression model with train-test split and evaluation"""
        if not hasattr(self, 'historical_data'):
            self.load_and_preprocess()

        df = self.historical_data.copy()

        # Prepare features and target
        self.base_numeric_features = ["Distance_km", "Passengers", "Temperature", "Average_Speed"]
        self.derived_numeric_features = ["Distance_Squared", "Speed_Squared", "Distance_Speed",
                                         "Distance_Terrain", "Speed_Terrain", "Passengers_Distance"]
        self.numeric_features = self.base_numeric_features + self.derived_numeric_features
        self.categorical_cols = ["Terrain_Type", "Traffic"]
        self.feature_columns = self.numeric_features + self.categorical_cols

        X = df[self.feature_columns]
        y = df["Energy_kWh"]

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state)

        # Normalize only numeric features
        if len(self.numeric_features) > 0:
            X_train[self.numeric_features] = self.scaler.fit_transform(X_train[self.numeric_features])
            X_test[self.numeric_features] = self.scaler.transform(X_test[self.numeric_features])

        # Initialize model based on type
        if model_type == 'gradient_boosting':
            self.model = GradientBoostingRegressor(
                n_estimators=100,
                learning_rate=0.1,
                max_depth=4,
                min_samples_split=5,
                random_state=random_state
            )
        elif model_type == 'svr':
            self.model = SVR(kernel='rbf', C=100, gamma=0.1, epsilon=0.1)
        elif model_type == 'kernel_ridge':
            self.model = KernelRidge(alpha=1.0, kernel='rbf', gamma=0.1)
        elif model_type == 'random_forest':
            self.model = RandomForestRegressor(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                random_state=random_state
            )
        else:
            raise ValueError(f"Unsupported model type: {model_type}")

        # Training model and making predictions
        self.model.fit(X_train, y_train)

        train_pred = self.model.predict(X_train)
        test_pred = self.model.predict(X_test)

        # Store predictions
        self.train_results = pd.DataFrame({
            'Actual': y_train,
            'Predicted': train_pred
        })
        self.test_results = pd.DataFrame({
            'Actual': y_test,
            'Predicted': test_pred
        })

        # Evaluate model
        self.evaluate_model(y_train, train_pred, y_test, test_pred)

        # Store feature medians for prediction
        self.feature_medians = X.median()

        # Feature importance for tree-based models
        if model_type in ['gradient_boosting', 'random_forest']:
            self.plot_feature_importance()
#calculating metrics for train and test set
    def evaluate_model(self, y_train, train_pred, y_test, test_pred):
        """Calculate and print evaluation metrics"""

        train_mae = mean_absolute_error(y_train, train_pred)
        train_rmse = np.sqrt(mean_squared_error(y_train, train_pred))
        train_r2 = r2_score(y_train, train_pred)
        train_mape = np.mean(np.abs((y_train - train_pred) / y_train)) * 100

        test_mae = mean_absolute_error(y_test, test_pred)
        test_rmse = np.sqrt(mean_squared_error(y_test, test_pred))
        test_r2 = r2_score(y_test, test_pred)
        test_mape = np.mean(np.abs((y_test - test_pred) / y_test)) * 100

        train_accuracy = max(0, 100 - train_mape)
        test_accuracy = max(0, 100 - test_mape)

        print("\nModel Evaluation Metrics:")
        print(f"{'Metric':<15}{'Training':<15} {'Test':<15}")
        print(f"{'MAE':<15}{train_mae:.4f} {test_mae:.4f}")
        print(f"{'RMSE':<15}{train_rmse:.4f} {test_rmse:.4f}")
        print(f"{'R²':<15}{train_r2:.4f} {test_r2:.4f}")
        print(f"{'MAPE (%)':<15}{train_mape:.2f} {test_mape:.2f}")
        print(f"{'Accuracy (%)':<15}{train_accuracy:.2f} {test_accuracy:.2f}")

        self.metrics = {
            'train_mae': train_mae,
            'train_rmse': train_rmse,
            'train_r2': train_r2,
            'train_mape': train_mape,
            'train_accuracy': train_accuracy,
            'test_mae': test_mae,
            'test_rmse': test_rmse,
            'test_r2': test_r2,
            'test_mape': test_mape,
            'test_accuracy': test_accuracy
        }

    def plot_actual_vs_predicted(self, num_trips=500, dataset='test'):
        """Plot actual vs predicted energy consumption"""
        if not hasattr(self, 'train_results'):
            raise ValueError("Model not trained yet. Call train_model() first.")

        if dataset == 'train':
            plot_data = self.train_results.head(num_trips).copy()
            title = 'Training Set'
        else:
            plot_data = self.test_results.head(num_trips).copy()
            title = 'Test Set'

        plot_data['Trip_Index'] = range(1, len(plot_data)+1)

        plt.figure(figsize=(15, 6))
        plt.plot(plot_data['Trip_Index'], plot_data['Actual'],
                label='Actual Energy Consumption', color='blue', alpha=0.7)
        plt.plot(plot_data['Trip_Index'], plot_data['Predicted'],
                label='Predicted Energy Consumption', color='red', alpha=0.7, linestyle='--')

        plt.title(f'Actual vs Predicted Energy Consumption ({title})')
        plt.xlabel('Trip Index')
        plt.ylabel('Energy Consumption (kWh)')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

        plt.figure(figsize=(8, 8))
        plt.scatter(plot_data['Actual'], plot_data['Predicted'], alpha=0.5)

        max_val = max(plot_data['Actual'].max(), plot_data['Predicted'].max())
        min_val = min(plot_data['Actual'].min(), plot_data['Predicted'].min())
        plt.plot([min_val, max_val], [min_val, max_val], 'r--')

        plt.title(f'Actual vs Predicted Scatter Plot ({title})')
        plt.xlabel('Actual Energy Consumption (kWh)')
        plt.ylabel('Predicted Energy Consumption (kWh)')
        plt.grid(True, alpha=0.3)
        plt.axis('equal')
        plt.tight_layout()
        plt.show()

    def plot_feature_importance(self):
        """Plot feature importance for tree-based models"""
        if not hasattr(self, 'model') or not hasattr(self.model, 'feature_importances_'):
            return

        importances = self.model.feature_importances_
        indices = np.argsort(importances)[::-1]

        plt.figure(figsize=(12, 8))
        plt.title('Feature Importance')
        plt.bar(range(len(importances)), importances[indices], align='center')
        plt.xticks(range(len(importances)), [self.feature_columns[i] for i in indices], rotation=90)
        plt.tight_layout()
        plt.show()

    def plot_residuals(self, dataset='test'):
        """Plot residuals analysis"""
        if not hasattr(self, 'train_results'):
            raise ValueError("Model not trained yet. Call train_model() first.")

        if dataset == 'train':
            results = self.train_results
            title = 'Training Set'
        else:
            results = self.test_results
            title = 'Test Set'

        residuals = results['Actual'] - results['Predicted']

        # Residuals vs Predicted
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.scatter(results['Predicted'], residuals, alpha=0.5)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.xlabel('Predicted Values')
        plt.ylabel('Residuals')
        plt.title(f'Residuals vs Predicted ({title})')
        plt.grid(True, alpha=0.3)

        # Residuals distribution
        plt.subplot(1, 2, 2)
        plt.hist(residuals, bins=30, alpha=0.7)
        plt.xlabel('Residual Value')
        plt.ylabel('Frequency')
        plt.title(f'Residuals Distribution ({title})')
        plt.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

    def find_best_model(self, test_size=0.2, random_state=42):
        """Find the best non-linear model through cross-validation"""
        if not hasattr(self, 'historical_data'):
            self.load_and_preprocess()

        df = self.historical_data.copy()

        # Prepare features and target
        self.base_numeric_features = ["Distance_km", "Passengers", "Temperature", "Average_Speed"]
        self.derived_numeric_features = ["Distance_Squared", "Speed_Squared", "Distance_Speed",
                                         "Distance_Terrain", "Speed_Terrain", "Passengers_Distance"]
        self.numeric_features = self.base_numeric_features + self.derived_numeric_features
        self.categorical_cols = ["Terrain_Type", "Traffic"]
        self.feature_columns = self.numeric_features + self.categorical_cols

        X = df[self.feature_columns]
        y = df["Energy_kWh"]

        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state)

        # Normalize only numeric features
        if len(self.numeric_features) > 0:
            X_train[self.numeric_features] = self.scaler.fit_transform(X_train[self.numeric_features])
            X_test[self.numeric_features] = self.scaler.transform(X_test[self.numeric_features])

        # Define models to try
        models = {
            'Gradient Boosting': GradientBoostingRegressor(random_state=random_state),
            'Random Forest': RandomForestRegressor(random_state=random_state),
            'SVR': SVR(),
        }

        # Results storage
        results = {}

        # Train and evaluate each model
        for name, model in models.items():
            print(f"\nTraining {name}...")
            model.fit(X_train, y_train)

            # Predictions
            train_pred = model.predict(X_train)
            test_pred = model.predict(X_test)

            # Metrics
            train_r2 = r2_score(y_train, train_pred)
            test_r2 = r2_score(y_test, test_pred)
            test_rmse = np.sqrt(mean_squared_error(y_test, test_pred))
            test_mape = np.mean(np.abs((y_test - test_pred) / y_test)) * 100
            test_accuracy = max(0, 100 - test_mape)

            print(f"{name} - Train R²: {train_r2:.4f}, Test R²: {test_r2:.4f}, Test RMSE: {test_rmse:.4f}")

            results[name] = {
                'model': model,
                'train_r2': train_r2,
                'test_r2': test_r2,
                'test_rmse': test_rmse,
                'test_accuracy': test_accuracy
            }

        # Find best model based on test R²
        best_model_name = max(results, key=lambda k: results[k]['test_r2'])
        best_model = results[best_model_name]['model']

        print(f"\nBest model: {best_model_name} with Test R²: {results[best_model_name]['test_r2']:.4f}")

        # Set the best model as the current model
        self.model = best_model

        # Make predictions with best model
        train_pred = best_model.predict(X_train)
        test_pred = best_model.predict(X_test)

        # Store predictions
        self.train_results = pd.DataFrame({
            'Actual': y_train,
            'Predicted': train_pred
        })
        self.test_results = pd.DataFrame({
            'Actual': y_test,
            'Predicted': test_pred
        })

        # Evaluate best model
        self.evaluate_model(y_train, train_pred, y_test, test_pred)

        # Store feature medians for prediction
        self.feature_medians = X.median()

        return results

    def predict(self, input_data):
        """Predict energy consumption with non-linear features"""
        if self.model is None:
            raise ValueError("Model not trained yet. Call train_model() first.")

        # Create template with all expected base feature columns
        base_features = {}
        for num_feat in self.base_numeric_features:
            if num_feat in input_data:
                base_features[num_feat] = input_data[num_feat]
            else:
                # Use historical median if not provided
                base_features[num_feat] = self.feature_medians[num_feat]

        # Add categorical features
        if 'Terrain_Type' in input_data:
            base_features['Terrain_Type'] = 1 if str(input_data['Terrain_Type']).lower() in ['hilly', '1'] else 0
        else:
            base_features['Terrain_Type'] = 0

        if 'Traffic' in input_data:
            base_features['Traffic'] = 1 if str(input_data['Traffic']).lower() in ['present traffic', '1'] else 0
        else:
            base_features['Traffic'] = 0

        # Create derived features
        derived_features = {
            'Distance_Squared': base_features['Distance_km'] ** 2,
            'Speed_Squared': base_features['Average_Speed'] ** 2,
            'Distance_Speed': base_features['Distance_km'] * base_features['Average_Speed'],
            'Distance_Terrain': base_features['Distance_km'] * base_features['Terrain_Type'],
            'Speed_Terrain': base_features['Average_Speed'] * base_features['Terrain_Type'],
            'Passengers_Distance': base_features['Passengers'] * base_features['Distance_km']
        }

        # Combine all features
        all_features = {**base_features, **derived_features}

        # Create DataFrame with features in the correct order
        template = pd.DataFrame(columns=self.feature_columns)
        for feature in self.feature_columns:
            template[feature] = [all_features[feature]]

        # Normalize only numeric features
        if len(self.numeric_features) > 0:
            template[self.numeric_features] = self.scaler.transform(template[self.numeric_features])

        # Predict and ensure reasonable values
        prediction = max(self.model.predict(template)[0], 0.1)
        return min(prediction, 100)  # Cap predictions at 100 kWh

    def save_model(self, path):
        """Save the trained model and components"""
        if self.model is None:
            raise ValueError("Model not trained yet. Call train_model() first.")

        joblib.dump({
            'model': self.model,
            'scaler': self.scaler,
            'feature_columns': self.feature_columns,
            'numeric_features': self.numeric_features,
            'categorical_cols': self.categorical_cols,
            'feature_medians': self.feature_medians,
            'historical_data': self.historical_data,
            'train_results': self.train_results,
            'test_results': self.test_results,
            'base_numeric_features': self.base_numeric_features,
            'derived_numeric_features': self.derived_numeric_features,
            'metrics': getattr(self, 'metrics', None)
        }, path)

    @classmethod
    def load_model(cls, path):
        """Load a saved model"""
        components = joblib.load(path)
        predictor = cls(None)
        predictor.model = components['model']
        predictor.scaler = components['scaler']
        predictor.feature_columns = components['feature_columns']
        predictor.numeric_features = components['numeric_features']
        predictor.categorical_cols = components['categorical_cols']
        predictor.feature_medians = components['feature_medians']
        predictor.historical_data = components.get('historical_data', None)
        predictor.train_results = components.get('train_results', None)
        predictor.test_results = components.get('test_results', None)
        predictor.base_numeric_features = components.get('base_numeric_features', None)
        predictor.derived_numeric_features = components.get('derived_numeric_features', None)
        predictor.metrics = components.get('metrics', None)
        return predictor


if __name__ == "__main__":

    print("Initializing predictor with non-linear regression capability...")
    predictor = EVEnergyPredictor('/content/drive/MyDrive/extracted_evs_data (1).csv')
    predictor.load_and_preprocess()
    print(f"Loaded {len(predictor.historical_data)} samples after preprocessing")

    # Find the best model
    print("\nFinding the best model...")
    predictor.find_best_model(test_size=0.2, random_state=42)


# Get the first 100 rows for the test set
if hasattr(predictor, 'test_results'):
    test_plot_data = predictor.test_results.head(100).copy()
    print("\nTest Set Actual vs Predicted (First 100 trips):")
    print(test_plot_data.to_string())

    test_plot_data.to_csv('test_actual_vs_predicted_100.csv', index=False)
    print("\nSaved 'test_actual_vs_predicted_100.csv'")

    predictor.plot_actual_vs_predicted(num_trips=100, dataset='test')
    predictor.plot_residuals(dataset='test')

    if hasattr(predictor.model, 'feature_importances_'):
        predictor.plot_feature_importance()

    predictor.save_model('/content/drive/MyDrive/ev_energy_predictor_nonlinear.joblib')
    print("Model trained and saved")

   #new predictions
def predict_multiple_trips(self, num_trips=14, use_historical_data=True):
    """
    Predict energy consumption for a specific number of trips.

    Parameters:
    -----------
    num_trips : int
        Number of trips to predict for
    use_historical_data : bool
        If True, use samples from historical data for predictions
        If False, use the median values from historical data for all trips

    Returns:
    --------
    pandas.DataFrame
        DataFrame with trip information and predictions
    """
    if self.model is None:
        raise ValueError("Model not trained yet. Call train_model() first.")

    if not hasattr(self, 'historical_data'):
        raise ValueError("Historical data not available. Load data first.")

    results = []

    if use_historical_data:
        # Sample from historical data with replacement
        sampled_indices = np.random.choice(
            len(self.historical_data),
            size=num_trips,
            replace=True if num_trips > len(self.historical_data) else False
        )

        for i, idx in enumerate(sampled_indices):
            trip_data = self.historical_data.iloc[idx]

            input_data = {
                'Distance_km': trip_data['Distance_km'],
                'Passengers': trip_data['Passengers'],
                'Temperature': trip_data['Temperature'],
                'Average_Speed': trip_data['Average_Speed'],
                'Terrain_Type': 'hilly' if trip_data['Terrain_Type'] == 1 else 'non-hilly',
                'Traffic': 'present traffic' if trip_data['Traffic'] == 1 else 'no traffic'
            }

            prediction = self.predict(input_data)

            result = {
                'Trip_ID': i + 1,
                **input_data,
                'Predicted_Energy_kWh': prediction
            }
            results.append(result)
    else:

        for i in range(num_trips):
            input_data = {
                'Distance_km': self.feature_medians['Distance_km'],
                'Passengers': self.feature_medians['Passengers'],
                'Temperature': self.feature_medians['Temperature'],
                'Average_Speed': self.feature_medians['Average_Speed'],
                'Terrain_Type': 'hilly' if self.feature_medians['Terrain_Type'] > 0.5 else 'non-hilly',
                'Traffic': 'present traffic' if self.feature_medians['Traffic'] > 0.5 else 'no traffic'
            }


            prediction = self.predict(input_data)

            result = {
                'Trip_ID': i + 1,
                **input_data,
                'Predicted_Energy_kWh': prediction
            }
            results.append(result)

    return pd.DataFrame(results)

def display_predictions(self, predictions_df):
    """
    Display predictions in a clear, tabular format and create a simple visualization

    Parameters:
    -----------
    predictions_df : pandas.DataFrame
        DataFrame with predictions from predict_multiple_trips()
    """
    # Print tabular results
    print("\nPREDICTED ENERGY CONSUMPTION FOR TRIPS")
    print("=" * 80)

    formatted_df = predictions_df.copy()

    # Format numeric columns
    formatted_df['Distance_km'] = formatted_df['Distance_km'].round(1)
    formatted_df['Temperature'] = formatted_df['Temperature'].round(1)
    formatted_df['Average_Speed'] = formatted_df['Average_Speed'].round(1)
    formatted_df['Predicted_Energy_kWh'] = formatted_df['Predicted_Energy_kWh'].round(2)

    # Print the table
    print(formatted_df.to_string(index=False))

    # Calculate summary statistics
    total_energy = formatted_df['Predicted_Energy_kWh'].sum()
    avg_energy = formatted_df['Predicted_Energy_kWh'].mean()
    min_energy = formatted_df['Predicted_Energy_kWh'].min()
    max_energy = formatted_df['Predicted_Energy_kWh'].max()

    print("\nSUMMARY")
    print("=" * 80)
    print(f"Total predicted energy consumption: {total_energy:.2f} kWh")
    print(f"Average energy per trip: {avg_energy:.2f} kWh")
    print(f"Range: {min_energy:.2f} - {max_energy:.2f} kWh")

    # Create visualization
    plt.figure(figsize=(10, 6))
    plt.bar(formatted_df['Trip_ID'], formatted_df['Predicted_Energy_kWh'],
            color='blue', alpha=0.7)
    plt.axhline(y=avg_energy, color='r', linestyle='--',
                label=f'Average: {avg_energy:.2f} kWh')
    plt.title('Predicted Energy Consumption by Trip')
    plt.xlabel('Trip ID')
    plt.ylabel('Energy Consumption (kWh)')
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()

    return total_energy, avg_energy

# Add methods to EVEnergyPredictor class
EVEnergyPredictor.predict_multiple_trips = predict_multiple_trips
EVEnergyPredictor.display_predictions = display_predictions

if __name__ == "__main__":
    predictor = EVEnergyPredictor.load_model('/content/drive/MyDrive/ev_energy_predictor_nonlinear.joblib')

    trip_predictions = predictor.predict_multiple_trips(num_trips=14)

    total, average = predictor.display_predictions(trip_predictions)