# MLOPS Project Group 09
**Team: Nagendra Jupudy, Vamsi Krishna Pirati, Rajesh Avunoori, Sanju Vikasini Velmurugan, Piyush Borse**
## **Predicting Metro_Interstate_Traffic_Volume**

**Dataset Overview:**
- **Domain:** Transportation / Traffic Analysis  
- **Task:** Regression   
- **Dataset Type:** Multivariate  
- **Number of Instances:** 48,204
- **Number of Features:** 8  
- **Feature Types:** Mixed (categorical, continuous, integer) 

**Objective:**  
The primary objective of this project is to build an effective regression model to forecast traffic volume on the Metro Interstate  

## Stage 1: Import Required Libraries

In [4]:
import os
import sys
import pyarrow as pa
import pandera as pa_schema
import numpy as np
import pandas as pd
import requests
from io import BytesIO
from ydata_profiling import ProfileReport
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import (mean_absolute_error, mean_squared_error,
                             mean_absolute_percentage_error, r2_score)
from sklearn.model_selection import KFold, cross_val_score
import xgboost as xgb
from xgboost import XGBRegressor
import mlflow
import mlflow.sklearn
import pyarrow.parquet as pq
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
import warnings

warnings.filterwarnings("ignore")

## Stage 2: Environment and Working Directory Info

In [6]:
print("Python executable:", sys.executable)

Python executable: C:\Users\india\anaconda3\envs\mlops\python.exe


In [7]:
import sys
print(sys.executable)


C:\Users\india\anaconda3\envs\mlops\python.exe


In [8]:
!conda info --envs
!where jupyter


# conda environments:
#
base                   C:\Users\india\anaconda3
mlops                * C:\Users\india\anaconda3\envs\mlops
redteam                C:\Users\india\anaconda3\envs\redteam
resai                  C:\Users\india\anaconda3\envs\resai

C:\Users\india\anaconda3\Scripts\jupyter.exe


In [9]:
# Optionally, change directory if required.
# For example, change to the project directory (adjust the path as needed):
project_dir = r'C:\Users\india\Desktop\Jio_Institute\MLOps\Project\Nagendra\MLOPS'
os.chdir(project_dir)

In [10]:
# Display current working directory and list files
print("Current working directory:", os.getcwd())
print("Files in current directory:", os.listdir(os.getcwd()))

Current working directory: C:\Users\india\Desktop\Jio_Institute\MLOps\Project\Nagendra\MLOPS
Files in current directory: ['.git', '.gitignore', '.ipynb_checkpoints', 'Dataset', 'Metro_Interstate_Traffic_Volume_Profile.html', 'mlruns', 'Notebooks', 'README.md', 'requirements.txt']


## Stage 3: Data Loading and Initial Inspection

Load the CSV data and inspect its initial contents.

In [13]:
# Define the path to the original CSV file
csv_file_path = os.path.join("Dataset", "Original", "Metro_Interstate_Traffic_Volume.csv")


In [14]:
dtype_schema = {
    "holiday": "object",
    "temp": "float64",
    "rain_1h": "float64",
    "snow_1h": "float64",
    "clouds_all": "int64",
    "weather_main": "object",
    "weather_description": "object",
    "traffic_volume": "int64"
}

Read the CSV file, now with explicit dtype and parsing the 'date_time' column.

In [16]:
df = pd.read_csv(
    os.path.join("Dataset", "Original", "Metro_Interstate_Traffic_Volume.csv"),
    dtype=dtype_schema,
    parse_dates=["date_time"]
)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 48204 entries, 0 to 48203
Data columns (total 9 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   holiday              61 non-null     object        
 1   temp                 48204 non-null  float64       
 2   rain_1h              48204 non-null  float64       
 3   snow_1h              48204 non-null  float64       
 4   clouds_all           48204 non-null  int64         
 5   weather_main         48204 non-null  object        
 6   weather_description  48204 non-null  object        
 7   date_time            48204 non-null  datetime64[ns]
 8   traffic_volume       48204 non-null  int64         
dtypes: datetime64[ns](1), float64(3), int64(2), object(3)
memory usage: 3.3+ MB


## Stage 4: Export DataFrame to Parquet Format

In [18]:
# Define the output directory for Parquet files and create it if it doesn't exist
output_dir = os.path.join("Dataset", "Parquet")
os.makedirs(output_dir, exist_ok=True)

In [19]:
# Convert DataFrame to PyArrow Table
table = pa.Table.from_pandas(df)

# Define the path for the Parquet file and write the table
parquet_file_path = os.path.join(output_dir, "Metro_Interstate_Traffic_Volume.parquet")
pq.write_table(table, parquet_file_path)

## Stage 5: Data Profiling using ydata_profiling

In [21]:
profile = ProfileReport(
    df,
    title="Metro Interstate Traffic Volume Data Profiling Report",
    explorative=True
)

In [22]:
# Save the profiling report
profile_report_path = "Metro_Interstate_Traffic_Volume_Profile.html"
profile.to_file(profile_report_path)

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

## Stage 6: Read Parquet and Split Data into Train, Test, and Prod Sets

In [24]:
df_parquet = pd.read_parquet(parquet_file_path)

# Sort the DataFrame by 'date_time' for time series splitting
df_parquet = df_parquet.sort_values(by="date_time")

# Compute indices for splitting the dataset: 60% train, 20% test, and 20% production
n_total = len(df_parquet)
train_end = int(0.6 * n_total)
test_end = int(0.8 * n_total)


In [25]:
# Split the DataFrame into three parts
train_df = df_parquet.iloc[:train_end]
test_df = df_parquet.iloc[train_end:test_end]
prod_df = df_parquet.iloc[test_end:]

# Print the number of instances in each split
print("Metro_Interstate_Traffic_Volume_train:", len(train_df))
print("Metro_Interstate_Traffic_Volume_test:", len(test_df))
print("Metro_Interstate_Traffic_Volume_prod:", len(prod_df))

Metro_Interstate_Traffic_Volume_train: 28922
Metro_Interstate_Traffic_Volume_test: 9641
Metro_Interstate_Traffic_Volume_prod: 9641


In [26]:
# Save each split as a separate Parquet file
train_df.to_parquet(os.path.join(output_dir, "Metro_Interstate_Traffic_Volume_train.parquet"), index=False)
test_df.to_parquet(os.path.join(output_dir, "Metro_Interstate_Traffic_Volume_test.parquet"), index=False)
prod_df.to_parquet(os.path.join(output_dir, "Metro_Interstate_Traffic_Volume_prod.parquet"), index=False)


## Stage 7: Load Data from Remote URLs

In this section, we build an end-to-end machine learning pipeline that:
- Loads the training and test datasets from GitHub.
- Separates features from the target variable.
- Defines separate preprocessing pipelines for numeric and categorical features.
- Combines these preprocessors using a ColumnTransformer.
- Integrates the preprocessor with a classifier (Logistic Regression in this example) into a single scikit-learn Pipeline.
- Trains the model and evaluates its performance on the test set.

In [29]:
# Define raw URLs for train, test, and production datasets.
TRAIN_URL = "https://raw.githubusercontent.com/Jupudy-Nagendra/MLOPS/main/Dataset/Parquet/Metro_Interstate_Traffic_Volume_train.parquet"
TEST_URL  = "https://raw.githubusercontent.com/Jupudy-Nagendra/MLOPS/main/Dataset/Parquet/Metro_Interstate_Traffic_Volume_test.parquet"
PROD_URL  = "https://raw.githubusercontent.com/Jupudy-Nagendra/MLOPS/main/Dataset/Parquet/Metro_Interstate_Traffic_Volume_prod.parquet"


In [30]:
def load_parquet_from_url(url):
    """Helper function to load a parquet file from a URL."""
    response = requests.get(url)
    response.raise_for_status()
    return pd.read_parquet(BytesIO(response.content))

In [31]:
# Load datasets.
train_df = load_parquet_from_url(TRAIN_URL)
test_df = load_parquet_from_url(TEST_URL)
prod_df = load_parquet_from_url(PROD_URL)

## Stage 8: Define Custom Transformers

In [33]:
class DateTimeExtractor(BaseEstimator, TransformerMixin):
    """
    Transformer to convert the 'date_time' column into datetime format
    and extract new features: year, month, day, and hour.
    Optionally drops the original 'date_time' column.
    """
    def __init__(self, column="date_time", drop_original=True):
        self.column = column
        self.drop_original = drop_original

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X[self.column] = pd.to_datetime(X[self.column])
        X["year"] = X[self.column].dt.year
        X["month"] = X[self.column].dt.month
        X["day"] = X[self.column].dt.day
        X["hour"] = X[self.column].dt.hour
        if self.drop_original:
            X = X.drop(columns=[self.column])
        return X

In [34]:
class HolidayBinaryTransformer(BaseEstimator, TransformerMixin):
    """
    Transformer to convert the 'holiday' column into a binary feature:
      - 0 if missing or equal to "None"
      - 1 otherwise.
    """
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        # X is expected to be a DataFrame with column 'holiday'
        binary = ((~pd.isnull(X)) & (X != "None")).astype(int)
        if isinstance(binary, pd.Series):
            binary = binary.to_frame()
        return binary.values

    def get_feature_names_out(self, input_features=None):
        return np.array(["holiday_binary"])

## Stage 9: Build Preprocessing Pipelines

In [36]:
# After extracting datetime features, expected columns:
# Numeric: 'temp', 'rain_1h', 'snow_1h', 'clouds_all', 'year', 'month', 'day', 'hour'
# Categorical: 'weather_main', 'weather_description'
numeric_cols = ["temp", "rain_1h", "snow_1h", "clouds_all", "year", "month", "day", "hour"]
categorical_cols = ["weather_main", "weather_description"]

In [37]:
# Pipeline for numeric features: imputation and scaling.
numeric_pipeline = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

In [38]:
# Pipeline for categorical features: imputation and one-hot encoding.
categorical_pipeline = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
    ("onehot", OneHotEncoder(handle_unknown="ignore"))
])

In [39]:
# Preprocessor combines:
# - Numeric processing on numeric_cols.
# - Categorical processing on categorical_cols.
# - Holiday transformation on 'holiday' column.
preprocessor = ColumnTransformer(transformers=[
    ("num", numeric_pipeline, numeric_cols),
    ("cat", categorical_pipeline, categorical_cols),
    ("holiday", HolidayBinaryTransformer(), ["holiday"])
])

In [40]:
# Pipeline to extract datetime features.
datetime_pipeline = Pipeline(steps=[
    ("datetime_extractor", DateTimeExtractor(column="date_time", drop_original=True))
])

In [41]:
# Full pipeline: datetime extraction followed by preprocessing.
full_pipeline = Pipeline(steps=[
    ("datetime", datetime_pipeline),
    ("preprocessor", preprocessor)
])

## Stage 10: Transform Features and Retrieve Feature Names

In [43]:
# Remove target variable ('traffic_volume') for feature preprocessing.
X_train_features = train_df.drop(columns=["traffic_volume"])

# Fit and transform training features.
X_transformed = full_pipeline.fit_transform(X_train_features)

In [44]:
try:
    feature_names = full_pipeline.named_steps["preprocessor"].get_feature_names_out()
    print("Output Feature Names:")
    print(feature_names)
except Exception as e:
    print("Error retrieving feature names:", e)

Output Feature Names:
['num__temp' 'num__rain_1h' 'num__snow_1h' 'num__clouds_all' 'num__year'
 'num__month' 'num__day' 'num__hour' 'cat__weather_main_Clear'
 'cat__weather_main_Clouds' 'cat__weather_main_Drizzle'
 'cat__weather_main_Fog' 'cat__weather_main_Haze' 'cat__weather_main_Mist'
 'cat__weather_main_Rain' 'cat__weather_main_Smoke'
 'cat__weather_main_Snow' 'cat__weather_main_Squall'
 'cat__weather_main_Thunderstorm' 'cat__weather_description_SQUALLS'
 'cat__weather_description_Sky is Clear'
 'cat__weather_description_broken clouds'
 'cat__weather_description_drizzle' 'cat__weather_description_few clouds'
 'cat__weather_description_fog' 'cat__weather_description_freezing rain'
 'cat__weather_description_haze'
 'cat__weather_description_heavy intensity drizzle'
 'cat__weather_description_heavy intensity rain'
 'cat__weather_description_heavy snow'
 'cat__weather_description_light intensity drizzle'
 'cat__weather_description_light intensity shower rain'
 'cat__weather_description

In [45]:
target_col = "traffic_volume"

X_train = train_df.drop(columns=[target_col])
y_train = train_df[target_col]
X_test = test_df.drop(columns=[target_col])
y_test = test_df[target_col]

## Stage 11: Compare Multiple Regression Models using MLflow

In this section, we run multiple ML experiments using different algorithms and hyperparameters. We:
1. Set up an MLflow experiment to group all runs under a common name.
2. Use K-Fold Cross-Validation to evaluate each model on the training set.
3. Evaluate each model on the test set.
4. Log parameters, metrics, and the trained model pipeline to MLflow for versioning and future reference.

In [48]:
# Define a dictionary of regression models to compare.
regression_models = {
    "LinearRegression_default": LinearRegression(),
    "Ridge": Ridge(alpha=1.0),
    "Lasso": Lasso(alpha=0.1),
    "XGBoost": XGBRegressor(n_estimators=1000, learning_rate=0.01, random_state=42),
    "ElasticNet": ElasticNet(alpha=0.1, l1_ratio=0.5),
    "DecisionTree": DecisionTreeRegressor(random_state=42),
    "RandomForest": RandomForestRegressor(n_estimators=10, random_state=42),
    "GradientBoosting": GradientBoostingRegressor(n_estimators=100, random_state=42)
}

In [49]:
mlflow.set_experiment("Metro_Interstate_Traffic_Volume_Group06")

Traceback (most recent call last):
  File "C:\Users\india\anaconda3\envs\mlops\lib\site-packages\mlflow\store\tracking\file_store.py", line 329, in search_experiments
    exp = self._get_experiment(exp_id, view_type)
  File "C:\Users\india\anaconda3\envs\mlops\lib\site-packages\mlflow\store\tracking\file_store.py", line 427, in _get_experiment
    meta = FileStore._read_yaml(experiment_dir, FileStore.META_DATA_FILE_NAME)
  File "C:\Users\india\anaconda3\envs\mlops\lib\site-packages\mlflow\store\tracking\file_store.py", line 1373, in _read_yaml
    return _read_helper(root, file_name, attempts_remaining=retries)
  File "C:\Users\india\anaconda3\envs\mlops\lib\site-packages\mlflow\store\tracking\file_store.py", line 1366, in _read_helper
    result = read_yaml(root, file_name)
  File "C:\Users\india\anaconda3\envs\mlops\lib\site-packages\mlflow\utils\file_utils.py", line 310, in read_yaml
    raise MissingConfigException(f"Yaml file '{file_path}' does not exist.")
mlflow.exceptions.Missi

<Experiment: artifact_location='file:///C:/Users/india/Desktop/Jio_Institute/MLOps/Project/Nagendra/MLOPS/mlruns/676113837923995950', creation_time=1742152402360, experiment_id='676113837923995950', last_update_time=1742152402360, lifecycle_stage='active', name='Metro_Interstate_Traffic_Volume_Group06', tags={}>

In [50]:
print("Current Tracking URI:", mlflow.get_tracking_uri())

Current Tracking URI: file:///C:/Users/india/Desktop/Jio_Institute/MLOps/Project/Nagendra/MLOPS/mlruns


In [51]:
# Evaluate each model using 5-fold cross-validation.
for model_name, reg_model in regression_models.items():
    with mlflow.start_run(run_name=model_name):
        mlflow.log_param("model_name", model_name)
        
        # Build a full pipeline for the current model.
        pipeline = Pipeline(steps=[
            ("datetime", datetime_pipeline),
            ("preprocessor", preprocessor),
            ("regressor", reg_model)
        ])
        
        # 5-fold cross-validation.
        kfold = KFold(n_splits=5, shuffle=True, random_state=42)
        cv_scores = cross_val_score(pipeline, X_train, y_train,
                                    cv=kfold, scoring="neg_mean_absolute_error")
        mean_cv = np.mean(np.abs(cv_scores))
        std_cv = np.std(np.abs(cv_scores))
        mlflow.log_metric("cv_mean_MAE", mean_cv)
        mlflow.log_metric("cv_std_MAE", std_cv)
        
        # Train on the full training data.
        pipeline.fit(X_train, y_train)
        
        # Evaluate on the test set.
        y_pred_model = pipeline.predict(X_test)
        mae_val = mean_absolute_error(y_test, y_pred_model)
        mse_val = mean_squared_error(y_test, y_pred_model)
        rmse_val = np.sqrt(mse_val)
        mape_val = mean_absolute_percentage_error(y_test, y_pred_model)
        r2_val = r2_score(y_test, y_pred_model)
        mlflow.log_metric("test_MAE", mae_val)
        mlflow.log_metric("test_MSE", mse_val)
        mlflow.log_metric("test_RMSE", rmse_val)
        mlflow.log_metric("test_MAPE", mape_val)
        mlflow.log_metric("test_R2", r2_val)
        
        # Log the trained model.
        mlflow.sklearn.log_model(pipeline, "model")
        
        print(f"{model_name} -> CV MAE: {mean_cv:.4f} ± {std_cv:.4f}, "
              f"Test MAE: {mae_val:.4f}, RMSE: {rmse_val:.4f}, R2: {r2_val:.4f}")




LinearRegression_default -> CV MAE: 1640.9151 ± 30.5411, Test MAE: 1606.3297, RMSE: 1836.0634, R2: 0.1430




Ridge -> CV MAE: 1640.9287 ± 30.8286, Test MAE: 1606.2699, RMSE: 1835.7752, R2: 0.1432




Lasso -> CV MAE: 1640.6042 ± 29.8682, Test MAE: 1606.2463, RMSE: 1835.2202, R2: 0.1438




XGBoost -> CV MAE: 580.6407 ± 7.9691, Test MAE: 641.6570, RMSE: 919.9290, R2: 0.7849




ElasticNet -> CV MAE: 1653.4365 ± 34.0695, Test MAE: 1616.5132, RMSE: 1836.4155, R2: 0.1426




DecisionTree -> CV MAE: 604.5429 ± 17.2684, Test MAE: 771.6130, RMSE: 1285.0386, R2: 0.5802




RandomForest -> CV MAE: 538.1640 ± 6.2096, Test MAE: 658.9647, RMSE: 993.8292, R2: 0.7489




GradientBoosting -> CV MAE: 629.2385 ± 8.0669, Test MAE: 634.1179, RMSE: 903.6135, R2: 0.7924


In [52]:
pipeline