In [None]:
#Data Collection

In [None]:
import pandas as pd
import time
import os

from bms import readBatteryVoltage, readBatteryCurrent, updateSOC, estimateSOH
from fuel_system import FuelSystem
from gps_tracker import get_gps_location
from road_info import Roadconditions
from road_features import RoadFeatures
from user_input import load, ac_codn
from weather_info import Weatherconditions
from tire_system import TireSystem 

print("Current working directory:", os.getcwd())

fuel_system = FuelSystem(load_weight=load)
road_conditions = Roadconditions()
data_records = []

def collect_data():
    global data_records

    lat, lon = get_gps_location()
    if lat is None or lon is None:
        print("GPS not available.")
        return

    updateSOC(1)
    voltage = readBatteryVoltage()
    current = readBatteryCurrent()
    soc = voltage
    soh = estimateSOH()

    weather = Weatherconditions(lat, lon)
    weather.fetch_weather()
    weather_penalty = weather.get_penalty()

    road_conditions.analyze()
    road_data = road_conditions.get_conditions()
    friction = road_data["friction_coefficient"]
    surface_condition = road_data["condition"]

    road_features = RoadFeatures()
    features = road_features.get_features()
    slope = features["slope"]
    curvature = features["curvature"]
    slope_penalty = features["slope_penalty"]
    curvature_penalty = features["curvature_penalty"]
    elevation_data = features.get("elevation_profile", [None, None, None])
    elevation_start, elevation_mid, elevation_end = elevation_data

    tire_pressure = 28.5
    tire_health = 0.75
    tire_system = TireSystem(pressure=tire_pressure, health=tire_health)
    tire_penalty = tire_system.calculate_penalty()

    record = {
        "latitude": lat,
        "longitude": lon,
        "voltage": voltage,
        "current": current,
        "SOC": soc,
        "SOH": soh,
        "load_weight": load,
        "AC_status": ac_codn,
        "weather_main": weather.weather_main,
        "weather_description": weather.weather_description,
        "temperature": weather.temperature,
        "wind_speed": weather.wind_speed,
        "weather_penalty": weather_penalty,
        "road_condition": surface_condition,
        "friction_coefficient": friction,
        "slope": slope,
        "curvature": curvature,
        "slope_penalty": slope_penalty,
        "curvature_penalty": curvature_penalty,
        "elevation_start": elevation_start,
        "elevation_mid": elevation_mid,
        "elevation_end": elevation_end,
        "tire_pressure": tire_pressure,
        "tire_health": tire_health,
        "tire_penalty": tire_penalty,
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
    }

    print(f"Data collected: {record}")
    data_records.append(record)

def save_to_csv(filename="ev_drive_data.csv"):
    df = pd.DataFrame(data_records)
    df.to_csv(filename, index=False)
    print(f"Data saved to {filename}")

if __name__ == "__main__":
    for _ in range(5):
        collect_data()
        time.sleep(2)

    save_to_csv()


Current working directory: c:\Users\shash\OneDrive\Desktop\Currently Doing\Intro_ML_EVRP_GitHub
📍 GPS Location: 28.6519, 77.2315
Found 'Tirupati': 13.6316368, 79.4231711
Found 'Piler': 46.979544, 10.3879129
✅ Data collected: {'latitude': 28.6519, 'longitude': 77.2315, 'voltage': 12.0, 'current': 1.5, 'SOC': 12.0, 'SOH': 83.33333333333334, 'load_weight': 100, 'AC_status': 1, 'weather_main': 'rain', 'weather_description': 'moderate rain', 'temperature': 34.56, 'wind_speed': 5.97, 'weather_penalty': 0, 'road_condition': 'rain', 'friction_coefficient': 0.49, 'slope': 30, 'curvature': 1.0, 'slope_penalty': 1.5, 'curvature_penalty': 1.2, 'elevation_start': None, 'elevation_mid': None, 'elevation_end': None, 'tire_pressure': 28.5, 'tire_health': 0.75, 'tire_penalty': 0.37000000000000005, 'timestamp': '2025-04-11 19:41:20'}
📍 GPS Location: 28.6519, 77.2315
Found 'Tirupati': 13.6316368, 79.4231711
Found 'Piler': 46.979544, 10.3879129
✅ Data collected: {'latitude': 28.6519, 'longitude': 77.2315,

In [None]:
# Correcting the data

In [None]:
import pandas as pd
import math

df = pd.read_csv("ev_drive_data.csv")

# Haversine function
def haversine(lat1, lon1, lat2, lon2):
    R = 6371
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)

    a = math.sin(delta_phi / 2)**2 + \
        math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2)**2

    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c 

df["vehicle_range"] = df.apply(
    lambda row: haversine(row["latitude"], row["longitude"], row["latitude"], row["longitude"]) + 5, axis=1
)

df.to_csv("ev_drive_data.csv", index=False)
print("Dummy vehicle_range column added based on distance.")


✅ Dummy vehicle_range column added based on distance.


In [None]:
# Pre-Processing the Data

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

df = pd.read_csv("ev_drive_data.csv")
print("Columns loaded:", df.columns.tolist())

df.dropna(how="all", inplace=True)

num_cols = [
    "voltage", "current", "SOC", "SOH", "load_weight", "AC_status",
    "temperature", "wind_speed", "weather_penalty", "friction_coefficient",
    "slope", "curvature", "slope_penalty", "curvature_penalty",
    "elevation_start", "elevation_mid", "elevation_end",
    "tire_pressure", "tire_health", "tire_penalty"
]

cat_cols = ["weather_main", "weather_description", "road_condition"]

num_cols_filtered = [col for col in num_cols if not df[col].isna().all()]
dropped_cols = [col for col in num_cols if col not in num_cols_filtered]
if dropped_cols:
    print(f"Dropping columns with all NaNs: {dropped_cols}")

num_imputer = SimpleImputer(strategy="mean")
df[num_cols_filtered] = num_imputer.fit_transform(df[num_cols_filtered])

cat_imputer = SimpleImputer(strategy="most_frequent")
df[cat_cols] = cat_imputer.fit_transform(df[cat_cols])

df = pd.get_dummies(df, columns=cat_cols)

scaler = StandardScaler()
df[num_cols_filtered] = scaler.fit_transform(df[num_cols_filtered])

df.to_csv("EV_data_preprocessed.csv", index=False)
print("Data preprocessing complete. Saved as EV_data_preprocessed.csv.")


✅ Columns loaded: ['latitude', 'longitude', 'voltage', 'current', 'SOC', 'SOH', 'load_weight', 'AC_status', 'weather_main', 'weather_description', 'temperature', 'wind_speed', 'weather_penalty', 'road_condition', 'friction_coefficient', 'slope', 'curvature', 'slope_penalty', 'curvature_penalty', 'elevation_start', 'elevation_mid', 'elevation_end', 'tire_pressure', 'tire_health', 'tire_penalty', 'timestamp', 'vehicle_range']
⚠️ Dropping columns with all NaNs: ['elevation_start', 'elevation_mid', 'elevation_end']
✅ Data preprocessing complete. Saved as EV_data_preprocessed.csv.


In [None]:
# Training the ML Model

In [None]:
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

df = pd.read_csv("EV_data_preprocessed.csv")

if "vehicle_range" not in df.columns:
    raise ValueError("❌ 'vehicle_range' column is missing in the dataset!")
df.dropna(subset=["vehicle_range"], inplace=True)

X = df.drop(columns=["vehicle_range", "timestamp"]) 
y = df["vehicle_range"]

num_cols = X.select_dtypes(include=["int64", "float64"]).columns.tolist()
cat_cols = X.select_dtypes(include=["object"]).columns.tolist()

preprocessor = ColumnTransformer(
    transformers=[
        ("num", SimpleImputer(strategy="mean"), num_cols),
        ("cat", Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OneHotEncoder(handle_unknown="ignore"))
        ]), cat_cols)
    ]
)

model_pipeline = Pipeline(steps=[
    ("preprocessing", preprocessor),
    ("model", RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1))
])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model_pipeline.fit(X_train, y_train)

y_pred = model_pipeline.predict(X_test)

mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"📊 Model Performance:")
print(f"MAE: {mae:.2f} km")
print(f"MSE: {mse:.2f}")
print(f"R² Score: {r2:.4f}")

joblib.dump(model_pipeline, "ev_range_predictor_pipeline.pkl")
print("Model training complete. Pipeline saved as ev_range_predictor_pipeline.pkl.")


📊 Model Performance:
🔹 MAE: 1.04 km
🔹 MSE: 1.07
🔹 R² Score: nan
✅ Model training complete. Pipeline saved as ev_range_predictor_pipeline.pkl.




In [None]:
# Predicting the Range

In [None]:
import pandas as pd
import joblib

pipeline = joblib.load("ev_range_predictor_pipeline.pkl")

df = pd.read_csv("ev_drive_data.csv")

df.dropna(how="all", inplace=True)

if "vehicle_range" in df.columns:
    df = df.drop(columns=["vehicle_range"])

if "timestamp" in df.columns:
    df = df.drop(columns=["timestamp"])

if df.shape[0] == 0:
    raise ValueError("No valid rows to predict after dropping empty ones!")

predicted_ranges = pipeline.predict(df)

df["predicted_range_km"] = predicted_ranges

df.to_csv("EV_data_with_predictions.csv", index=False)
print("Predictions saved to EV_data_with_predictions.csv")


✅ Predictions saved to EV_data_with_predictions.csv


