## similar to whats in folder 05-AWS, just recreating

In [1]:
import os
import json
import logging
import sys
from io import StringIO 
from pathlib import Path

In [2]:
#create a folder to store all .py files
## Set up folders and paths

source_folder = Path("src")
source_folder.mkdir(exist_ok=True)
(source_folder / "__init__.py").touch()
(source_folder / "components").mkdir(parents=True, exist_ok=True)
(source_folder / "components" / "__init__.py").touch()

sys.path.append(str(source_folder.resolve()))

In [3]:
%%writefile src/logger.py
# Create logger.py
import logging
import os
from datetime import datetime

LOG_FILE = f"{datetime.now().strftime('%m_%d_%Y_%H_%M_%S')}.log"
logs_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(logs_dir, exist_ok=True)

LOG_FILE_PATH = os.path.join(logs_dir, LOG_FILE)

logging.basicConfig(
    filename=LOG_FILE_PATH,
    format="[ %(asctime)s ] %(lineno)d %(name)s - %(levelname)s - %(message)s",
    level=logging.INFO,
)

Overwriting src/logger.py


In [4]:
schema_path = "data_schema/schema_yaml"
schema_dir = os.path.dirname(schema_path)

os.makedirs(schema_dir, exist_ok=True)

In [5]:
%%writefile data_schema/schema.yaml

##write schema for the data type to compare later


columns:
- lpep_pickup_datetime: datetime64[us]
- lpep_dropoff_datetime: datetime64[us]
- PULocationID: object 
- DOLocationID: object 
- trip_distance: float64
- passenger_count: float64
- fare_amount: float64
- total_amount: float64

numerical_columns:
- trip_distance: float64 
- passenger_count: float64
- fare_amount: float64
- total_amount: float64


categorical_columns:
- DOLocationID: object 
- PULocationID: object


Overwriting data_schema/schema.yaml


## DATA INGESTION AND VALIDATION

In [6]:
(source_folder / "constants").mkdir(parents=True, exist_ok=True)
(source_folder / "constants" / "__init__.py").touch()

In [7]:
%%writefile src/constants/training_pipeline_names.py

import os
import sys
import numpy as np
import pandas as pd

"""
Data ingestion related constants, such as the data link, and the downloaded folder path
"""

DATA_INGESTION_TRAIN_LINK: str = "./data/green_tripdata_2025-01.parquet"
DATA_INGESTION_TEST_LINK: str = "./data/green_tripdata_2025-02.parquet" 
DATA_INGESTION_INGESTED_DIR: str = "ingested_data"
SCHEMA_FILE_PATH = os.path.join("data_schema", "schema.yaml")

"""
Data transformation features, the columns required
"""
numeric_features = ['trip_distance', 'passenger_count','fare_amount','total_amount']
target_encoder_features = ['PULocationID', 'DOLocationID']
TARGET_COLUMN = ['duration']

"""
Model Trainer related constant start with MODEL TRAINER VAR NAME
"""
MODEL_TRAINER_TRAINED_MODEL_NAME: str = "duration_time_model.pkl"
MODEL_TRAINER_EXPECTED_SCORE: float = 0.7
MODEL_TRAINER_OVER_FITTING_UNDER_FITTING_THRESHOLD: float = 0.05


Overwriting src/constants/training_pipeline_names.py


In [8]:
%%writefile src/constants/config_entity.py


from datetime import datetime
import os
from src.constants import training_pipeline_names
from datetime import datetime
import os
from src.constants import training_pipeline_names

class TrainingPipelineConfig:
    def __init__(self, timestamp=datetime.now()):
        timestamp = timestamp.strftime("%m_%d_%Y_%H_%M_%S")
        self.data_artifact_folder = training_pipeline_names.DATA_INGESTION_INGESTED_DIR
        self.data_folder = os.path.join(self.data_artifact_folder, f"untransformed_data-{timestamp}")  
        self.timestamp: str = timestamp

class DataIngestionConfig:
    def __init__(self, training_pipeline_config: TrainingPipelineConfig):
        self.train_data_source: str = training_pipeline_names.DATA_INGESTION_TRAIN_LINK
        self.test_data_source: str = training_pipeline_names.DATA_INGESTION_TEST_LINK
        self.training_file_path: str = os.path.join(training_pipeline_config.data_folder, "train_data.csv") 
        self.test_file_path: str = os.path.join(training_pipeline_config.data_folder, "test_data.csv")  

class DataTransformationConfig:
    def __init__(self, training_pipeline_config: TrainingPipelineConfig):
        timestamp = datetime.now().strftime("%m_%d_%Y_%H_%M_%S")
        self.timestamp: str = timestamp
        self.transformed_folder = os.path.join(training_pipeline_config.data_artifact_folder, f'transformed_data-{timestamp}')
        self.model_path = os.path.join(training_pipeline_config.data_artifact_folder, f'model-{timestamp}')
        self.transformed_train_file_path: str = os.path.join(self.transformed_folder, "train.npy")
        self.transformed_test_file_path: str = os.path.join(self.transformed_folder, "test.npy")
        self.transformed_object_file_path: str = os.path.join(self.model_path, "preprocessing.pkl")

class ModelTrainerConfig:
    def __init__(self, training_pipeline_config: TrainingPipelineConfig):
        timestamp = datetime.now().strftime("%m_%d_%Y_%H_%M_%S")
        self.timestamp: str = timestamp
        self.model_trainer_dir: str = os.path.join(training_pipeline_config.data_artifact_folder, f'model-{timestamp}')
        self.trained_model_file_path: str = os.path.join(
            self.model_trainer_dir, training_pipeline_names.MODEL_TRAINER_TRAINED_MODEL_NAME
        )

Overwriting src/constants/config_entity.py


In [9]:
%%writefile src/constants/artifact_entity.py

from dataclasses import dataclass

@dataclass
class DataIngestionArtifact:
    train_file_path: str
    test_file_path: str

@dataclass
class DataTransformationArtifact:
    transformed_object_file_path: str
    transformed_train_file_path: str
    transformed_test_file_path: str

@dataclass
class RegressionMetricArtifact:
    mean_absolute_error: float
    root_mean_squared_error: float
    r2_score: float
    
@dataclass
class ModelTrainerArtifact:
    trained_model_file_path: str
    train_metric_artifact: RegressionMetricArtifact
    test_metric_artifact: RegressionMetricArtifact
    model_performance: str
    over_fitting_under_fitting: float


Overwriting src/constants/artifact_entity.py


In [10]:
%%writefile src/components/data_ingestion.py

import os
import sys
import json
import pandas as pd
import numpy as np
import yaml  
from dataclasses import dataclass 
from src.logger import logging
from typing import List
from src.constants.config_entity import DataIngestionConfig, TrainingPipelineConfig
from src.constants.artifact_entity import DataIngestionArtifact
from src.constants import training_pipeline_names  

"""
This class handles:
- assume to read train/test from an external source 
- remove unwanted columns
- Validating column consistency
- saves the data to my machine
"""

class DataIngestion:
    def __init__(self, data_ingestion_config: DataIngestionConfig):
        try:
            self.data_ingestion_config = data_ingestion_config
            self._schema_config = self.read_yaml_file(training_pipeline_names.SCHEMA_FILE_PATH)
        except Exception as e:
            raise Exception(f"Error initializing DataIngestion: {e}") 
            
    @staticmethod
    def read_yaml_file(file_path: str) -> dict:
        """
        Static method to read YAML configuration files
        """
        try:
            # Check if file exists
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"YAML file not found: {file_path}")
                
            with open(file_path, "r", encoding="utf-8") as yaml_file:  
                content = yaml.safe_load(yaml_file)
                if content is None:
                    raise ValueError(f"YAML file is empty or invalid: {file_path}")
                return content
        except yaml.YAMLError as e:
            raise Exception(f"Error parsing YAML file {file_path}: {e}")
        except Exception as e:
            raise Exception(f"Error reading YAML file {file_path}: {e}")
    
    def read_external_dataframe(self):
        """
        Read data from any data source and apply transformations
        """
        try:
            train_data_link = self.data_ingestion_config.train_data_source
            test_data_link = self.data_ingestion_config.test_data_source
            logging.info('Reading train and test data sources')
            
            # Check if files exist before reading
            if not os.path.exists(train_data_link):
                raise FileNotFoundError(f"Train data file not found: {train_data_link}")
            if not os.path.exists(test_data_link):
                raise FileNotFoundError(f"Test data file not found: {test_data_link}")
            
            # Read parquet files
            train_data = pd.read_parquet(train_data_link)
            test_data = pd.read_parquet(test_data_link)
            
            logging.info(f"Loaded {len(train_data)} training and {len(test_data)} test records")
            print(f"Loaded {len(train_data)} training and {len(test_data)} test records from database")
      
            # Define columns to drop (taxi-specific columns)
            columns_to_drop = [
                'VendorID', 'store_and_fwd_flag', 'RatecodeID', 
                'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
                'ehail_fee', 'improvement_surcharge', 'payment_type', 
                'trip_type', 'congestion_surcharge', 'cbd_congestion_fee'
            ]
            
            # Check if columns exist before dropping
            existing_cols_train = [col for col in columns_to_drop if col in train_data.columns]
            if existing_cols_train:
                train_data = train_data.drop(columns=existing_cols_train, axis=1)
                logging.info(f"Dropped {len(existing_cols_train)} columns from training data")
            
            existing_cols_test = [col for col in columns_to_drop if col in test_data.columns]
            if existing_cols_test:
                test_data = test_data.drop(columns=existing_cols_test, axis=1)
                logging.info(f"Dropped {len(existing_cols_test)} columns from test data")
                
            # Convert all "na" strings to numpy NaN
            train_data.replace({"na": np.nan}, inplace=True)
            test_data.replace({"na": np.nan}, inplace=True)
            
            logging.info("Data transformation completed successfully")
            return train_data, test_data
            
        except Exception as e:
            logging.error(f"Error in reading and transforming dataframe: {e}")
            raise e

    def validate_column(self, train_data: pd.DataFrame, test_data: pd.DataFrame) -> bool:
        """
        Validate that the required columns are present in both datasets
        """
        try:
            required_columns = []  # Fixed variable name
            
            # Extract required columns from schema config
            if "columns" in self._schema_config:
                for column_dict in self._schema_config["columns"]:
                    if isinstance(column_dict, dict):
                        required_columns.extend(column_dict.keys())
                    elif isinstance(column_dict, str):
                        required_columns.append(column_dict)
            else:
                logging.warning("No 'columns' key found in schema config")
                return True  # Skip validation if no schema defined
            
            logging.info(f"Required columns: {required_columns}")
            logging.info(f"Train columns: {list(train_data.columns)}")
            logging.info(f"Test columns: {list(test_data.columns)}")
            
            # Validate train data columns
            train_validation = set(required_columns) == set(train_data.columns)
            if train_validation:
                logging.info("Train Column validation passed.")
            else:
                logging.error("Train Column validation failed.")
                missing_train = set(required_columns) - set(train_data.columns)
                extra_train = set(train_data.columns) - set(required_columns)
                if missing_train:
                    logging.error(f"Missing columns in train data: {missing_train}")
                if extra_train:
                    logging.error(f"Extra columns in train data: {extra_train}")
            
            # Validate test data columns
            test_validation = set(required_columns) == set(test_data.columns)
            if test_validation:
                logging.info("Test Column validation passed.")
            else:
                logging.error("Test Column validation failed.")
                missing_test = set(required_columns) - set(test_data.columns)
                extra_test = set(test_data.columns) - set(required_columns)
                if missing_test:
                    logging.error(f"Missing columns in test data: {missing_test}")
                if extra_test:
                    logging.error(f"Extra columns in test data: {extra_test}")
            
            # Return True only if both validations pass
            return train_validation and test_validation
            
        except Exception as e:
            logging.error(f"Error validating columns: {e}")
            raise e
            
    def save_data_to_machine(self, train_data: pd.DataFrame, test_data: pd.DataFrame):
        """
        Save the processed data to local machine
        """
        try:
            # Create directory if it doesn't exist
            data_path = os.path.dirname(self.data_ingestion_config.training_file_path)
            os.makedirs(data_path, exist_ok=True)
            
            logging.info(f"Created directory: {data_path}")
            
            # Save dataframes as CSV files
            train_data.to_csv(self.data_ingestion_config.training_file_path, index=False, header=True)
            test_data.to_csv(self.data_ingestion_config.test_file_path, index=False, header=True)
            
            logging.info("Dataframes successfully saved to local machine")
            
            return self.data_ingestion_config.training_file_path, self.data_ingestion_config.test_file_path
            
        except Exception as e:
            logging.error(f"Error in saving data to machine: {e}")
            raise e
    
    def start_data_ingestion(self):
        """
        Main method to start the data ingestion process
        """
        try:
            logging.info("Starting data ingestion process")
            
            # Read and transform data
            train_data, test_data = self.read_external_dataframe()

            # Validate columns
            validation_result = self.validate_column(train_data, test_data)
            if not validation_result:
                raise Exception("Column validation failed. Check logs for details.")
            
            # Save data to machine
            train_file_path, test_file_path = self.save_data_to_machine(train_data, test_data)
            
            # Create artifact
            data_ingestion_artifact = DataIngestionArtifact(
                train_file_path=self.data_ingestion_config.training_file_path,
                test_file_path=self.data_ingestion_config.test_file_path
            )
            
            logging.info("Data ingestion completed successfully")
            return data_ingestion_artifact
            
        except Exception as e:
            logging.error(f"Error in data ingestion process: {e}")
            raise e

Overwriting src/components/data_ingestion.py


### DATA TRANSFORMATION

In [11]:
%%writefile src/components/data_transformation.py

# src/components/data_transformation.py
import os
import sys
import pandas as pd
import numpy as np
import joblib
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
from category_encoders import TargetEncoder
from src.constants.training_pipeline_names import ( 
    numeric_features,
    target_encoder_features,
    TARGET_COLUMN
)
from src.constants.config_entity import (
    DataIngestionConfig,
    TrainingPipelineConfig,
    DataTransformationConfig
)
from src.constants.artifact_entity import DataIngestionArtifact, DataTransformationArtifact
from src.logger import logging

class DataTransformation:
    def __init__(self, data_ingestion_artifact: DataIngestionArtifact,
                 data_transformation_config: DataTransformationConfig):
        try:
            self.data_ingestion_artifact = data_ingestion_artifact
            self.data_transformation_config = data_transformation_config
            
        except Exception as e:
            raise e
    
    @staticmethod
    def read_data(filepath) -> pd.DataFrame:
        try:
            return pd.read_csv(filepath)
        except Exception as e:
            raise e
    
    @staticmethod
    def save_numpy_data(file_path: str, array: np.array):
        """
        Save numpy array data to file
        file_path: str location of file to save
        array: np.array data to save
        """
        try:
            dir_path = os.path.dirname(file_path)
            os.makedirs(dir_path, exist_ok=True)
            with open(file_path, "wb") as file_obj:
                np.save(file_obj, array)
        except Exception as e:
            raise e
    
    @staticmethod
    def save_object(file_path: str, obj: object) -> None:
        """
        this static method saves the preprocessor pkl
        """
        try:
            logging.info("Entered the save_object method class")
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            with open(file_path, "wb") as file_obj:
                joblib.dump(obj, file_obj)
            logging.info("Exited the save_object method class")
        except Exception as e:
            raise e
    
    def create_data_transformer(self):
        """
        This function creates and returns the preprocessing object with imputation
        """
        logging.info("creating encoders and imputers")
        try:
            # Create preprocessing pipelines for each feature type
            numeric_pipeline = Pipeline([
                ('imputer', SimpleImputer(strategy='mean')),
                ('scaler', StandardScaler())
            ])
            
            categorical_pipeline = Pipeline([
                ('imputer', SimpleImputer(strategy='most_frequent')),
                ('target_encoder', TargetEncoder())
            ])
            
            logging.info("Initiating encoders and scaling")
            # Create preprocessor using ColumnTransformer
            preprocessor = ColumnTransformer(
                transformers=[
                    ('numerical_transformer', numeric_pipeline, numeric_features),
                    ('categorical_transformer', categorical_pipeline, target_encoder_features)
                ],
                remainder='drop'
            )
            return preprocessor
        except Exception as e:
            logging.error(f'Could not transform the inputs: {e}')
            raise Exception(f"Error in creating preprocessor: {e}")
    
    def create_processed_data(self, untransformed_data: pd.DataFrame):
        """
        Load and preprocess data from DataFrame
        
        creates the target column
        
        Returns:
        processed dataframe with target column and selected features
        """
        # Calculate trip duration in minutes
        untransformed_data['lpep_pickup_datetime'] = pd.to_datetime(untransformed_data['lpep_pickup_datetime'])
        untransformed_data['lpep_dropoff_datetime'] = pd.to_datetime(untransformed_data['lpep_dropoff_datetime'])
        
        untransformed_data['duration'] = (
            untransformed_data['lpep_dropoff_datetime'] - untransformed_data['lpep_pickup_datetime']
        ).dt.total_seconds() / 60
        
        # Select final columns
        data = untransformed_data[['PULocationID', 'DOLocationID', 'passenger_count','trip_distance','fare_amount','total_amount','duration']]
        	
        
        # Remove outliers - filter duration and distance, duration more than one hour and 20 miles
        data = data[(data['duration'] >= 0) & (data['duration'] <= 60)]
        data = data[(data['trip_distance'] >= 0) & (data['trip_distance'] <= 15)]
        data = data[(data['passenger_count'] >= 0) & (data['passenger_count'] <= 5)]
        
        # Convert location IDs to categorical data
        data[['PULocationID', 'DOLocationID']] = (
            data[['PULocationID', 'DOLocationID']].astype('str')
        )
        return data
        
    def start_data_transformation(self):
        """
        This method initiates the data transformation with imputation
        """
        logging.info("Starting data transformation")
        try:
            # Read training and testing data via the static method function
            train_df = DataTransformation.read_data(self.data_ingestion_artifact.train_file_path)
            test_df = DataTransformation.read_data(self.data_ingestion_artifact.test_file_path)
            logging.info("Read both test and train data successfully")
            
            # Check for missing values before transformation
            logging.info(f"Train data missing values:\n{train_df.isnull().sum()}")
            logging.info(f"Test data missing values:\n{test_df.isnull().sum()}")
            
            train_df = self.create_processed_data(train_df)
            test_df = self.create_processed_data(test_df)
            logging.info("created the duration target column")
            
            # Separate features and target variable
            input_feature_train_df = train_df.drop(columns=TARGET_COLUMN, axis=1)
            target_feature_train_df = train_df[TARGET_COLUMN[0]]  # TARGET_COLUMN is a list
            logging.info("Split the train data into features and target")
            
            input_feature_test_df = test_df.drop(columns=TARGET_COLUMN, axis=1)
            target_feature_test_df = test_df[TARGET_COLUMN[0]]  # TARGET_COLUMN is a list
            logging.info("Split the test data into features and target")
            
            # Get the preprocessor object
            preprocessor = self.create_data_transformer()
            
            # Apply transformations
            logging.info("Applying transformations on training data")
            transformed_input_train_feature = preprocessor.fit_transform(input_feature_train_df, target_feature_train_df)
            logging.info("Applying transformations on test data")
            transformed_input_test_feature = preprocessor.transform(input_feature_test_df)
            
            # Combine transformed features with target variable
            logging.info("concatenate the transformed array")
            train_arr = np.c_[
                transformed_input_train_feature, np.array(target_feature_train_df)
            ]
            test_arr = np.c_[
                transformed_input_test_feature, np.array(target_feature_test_df)
            ]
            
            logging.info("Transformation completed successfully")
            logging.info(f"Transformed train array shape: {train_arr.shape}")
            logging.info(f"Transformed test array shape: {test_arr.shape}")
            
            # Save the train, test and transformation file
            logging.info("saving the train, test and transformer")
            DataTransformation.save_numpy_data(self.data_transformation_config.transformed_train_file_path, array=train_arr)
            DataTransformation.save_numpy_data(self.data_transformation_config.transformed_test_file_path, array=test_arr)
            DataTransformation.save_object(self.data_transformation_config.transformed_object_file_path, preprocessor)
            
            # Prepare data transformation artifact
            data_transformation_artifact = DataTransformationArtifact(
                transformed_object_file_path=self.data_transformation_config.transformed_object_file_path,
                transformed_train_file_path=self.data_transformation_config.transformed_train_file_path,
                transformed_test_file_path=self.data_transformation_config.transformed_test_file_path,
            )
            
            logging.info(f"Data transformation artifact: {data_transformation_artifact}")
            return data_transformation_artifact
            
        except Exception as e:
            logging.error(f"Error in data transformation: {e}")
            raise Exception(f"Data transformation failed: {e}")

Overwriting src/components/data_transformation.py


### Model trainer

In [12]:
%%writefile src/components/ModelTrainer.py
#MLflow setup:
#MLflow setup:
#- tracking server: yes, local server
#- backend store: sqlite database, this houses meta data, metrics, params
#- artifacts store: local filesystem
#To run this example you need to launch the mlflow server locally by running the following command in your terminal:
#lsof -ti :5000 | xargs kill -9 lsof -ti :8000 | xargs kill -9

##run this in terminal but in the folder 

#mlflow server --backend-store-uri sqlite:///mlflow.db --host 127.0.0.1 --port 5000

#export MLFLOW_TRACKING_URI="http://127.0.0.1:5000"



import os
import sys
import numpy as np
import joblib
from src.constants.training_pipeline_names import MODEL_TRAINER_EXPECTED_SCORE, MODEL_TRAINER_OVER_FITTING_UNDER_FITTING_THRESHOLD
from src.constants.config_entity import (
    DataTransformationConfig,
    ModelTrainerConfig
)
from src.constants.artifact_entity import (
    DataTransformationArtifact,
    ModelTrainerArtifact,
    RegressionMetricArtifact
)
from src.logger import logging
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from xgboost import XGBRegressor
from mlflow.models import infer_signature
import mlflow

def load_object(file_path: str):
    """Load object from file"""
    try:
        with open(file_path, "rb") as file_obj:
            return joblib.load(file_obj)
    except Exception as e:
        raise e

def load_numpy_array_data(file_path: str) -> np.array:
    """Load numpy array data from file"""
    try:
        with open(file_path, "rb") as file_obj:
            return np.load(file_obj)
    except Exception as e:
        raise e

def evaluate_model(y_true, y_pred):
    """Evaluate model performance"""
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    return mae, rmse, r2

class ModelTrainer:
    def __init__(self, model_trainer_config: ModelTrainerConfig, 
                 data_transformation_artifact: DataTransformationArtifact):
        
        try:
            mlflow.set_tracking_uri("http://127.0.0.1:5000")
            mlflow.set_tag('development','duration model')
            logging.info("Starting ModelTrainer initialization and setting up mlflow")
            print(f"tracking URI: '{mlflow.get_tracking_uri()}'")
            logging.info(f"tracking URI: '{mlflow.get_tracking_uri()}'")
            
            self.model_trainer_config = model_trainer_config
            self.data_transformation_artifact = data_transformation_artifact
        except Exception as e:
            logging.error(f"Error in ModelTrainer initialization: {e}")
            raise e

    def _ensure_mlflow_run_ended(self):
        """Safely end any active MLflow run"""
        try:
            if mlflow.active_run():
                logging.info("Active MLflow run detected, ending it safely")
                mlflow.end_run()
                logging.info("Previous MLflow run ended successfully")
        except Exception as e:
            logging.warning(f"Error ending MLflow run: {e}")
            # Continue execution as this is not critical

    def _save_model_safely(self, model, model_dir_path: str, model_filename: str = 'my_model.ubj'):
        """
        Safely save XGBoost model with proper error handling and path management
        
        Args:
            model: Trained XGBoost model
            model_dir_path: Base directory path from config
            model_filename: Name of the model file (default: 'my_model.ubj')
            
        Returns:
            str: Full path where model was saved
        """
        try:
            # Option 3: Most robust - ensure directory exists and create full path
            model_dir = os.path.dirname(model_dir_path)
            
            # Ensure the directory exists
            os.makedirs(model_dir, exist_ok=True)
            logging.info(f"Model directory created/verified: {model_dir}")
            
            # Create full model path
            model_path = os.path.join(model_dir, model_filename)
            
            # Validate path before saving
            if not os.path.exists(model_dir):
                raise FileNotFoundError(f"Model directory does not exist: {model_dir}")
                
            # Save the model with error handling
            logging.info(f"Attempting to save model to: {model_path}")
            
            try:
                model.save_model(model_path)
                logging.info(f"Model saved successfully to: {model_path}")
                
                # Verify the file was actually created
                if os.path.exists(model_path):
                    file_size = os.path.getsize(model_path)
                    logging.info(f"Model file verified - Size: {file_size} bytes")
                else:
                    raise FileNotFoundError(f"Model file was not created: {model_path}")
                    
                return model_path
                
            except Exception as e:
                logging.error(f"Failed to save model: {e}")
                raise e
                
        except Exception as e:
            logging.error(f"Error in _save_model_safely: {e}")
            raise e

    def train_model(self, X_train, y_train, X_test, y_test):
        
        try:
            # Safely end any active MLflow run
            self._ensure_mlflow_run_ended()
            
            mlflow.set_experiment("new-york-taxi-drive-duration-train-January")
            with mlflow.start_run() as run:
                results = [] 
                logging.info("Starting model training process")
                logging.info(f"Training data shape: X_train={X_train.shape}, y_train={y_train.shape}")
                logging.info(f"Test data shape: X_test={X_test.shape}, y_test={y_test.shape}")
    
                logging.info("Loading the Xgboost model")
                
                # Train model
                model = XGBRegressor(n_estimators=100, max_depth=3, learning_rate=0.1)
                model.fit(X_train, y_train)
                logging.info("Making predictions with the model")
                
                # Make predictions
                y_train_pred = model.predict(X_train)
                y_test_pred = model.predict(X_test)
                
                logging.info("Calculating regression metrics")
                train_mae, train_rmse, train_r2 = evaluate_model(y_train, y_train_pred)
                test_mae, test_rmse, test_r2 = evaluate_model(y_test, y_test_pred)
                
                ##signature for mlflow
                signature = infer_signature(X_test, y_test_pred)
                
                # Create metric artifacts
                regression_train_metric = RegressionMetricArtifact(
                    mean_absolute_error=train_mae,
                    root_mean_squared_error=train_rmse,
                    r2_score=train_r2
                )
                
                regression_test_metric = RegressionMetricArtifact(
                    mean_absolute_error=test_mae,
                    root_mean_squared_error=test_rmse,
                    r2_score=test_r2
                )
                
                # Append results
                results.append({
                    'Model': "XGBOOST",
                    'Train_MAE': train_mae,
                    'Train_RMSE': train_rmse,
                    'Train_R2': train_r2,
                    'Test_MAE': test_mae,
                    'Test_RMSE': test_rmse,
                    'Test_R2': test_r2,
                    'Overfit_Check': train_r2 - test_r2  
                })
    
                logging.info(f"Training metrics: {train_mae, train_rmse, train_r2}")
                logging.info(f"Test metrics: {test_mae, test_rmse, test_r2}")
                logging.info("logging MLflow metrics and params")
                
                params = {
                    "model_name": "XGBOOST", 
                    "training_data_shape": str(X_train.shape),
                    "test_data_shape": str(X_test.shape), 
                    "n_estimators": 100, 
                    "max_depth": 3, 
                    "learning_rate": 0.1
                }
                mlflow.log_params(params)
            
                mlflow.log_metrics({
                    'Train_MAE': train_mae,
                    'Train_RMSE': train_rmse,
                    'Train_R2': train_r2,
                    'Test_MAE': test_mae,
                    'Test_RMSE': test_rmse,
                    'Test_R2': test_r2,
                    'Overfit_Check': train_r2 - test_r2
                })

                # log and save model
                model_name = "nyc-taxi-duration-model-January"
                registered_model_name="XGBoostdurationModel"
                
                
                if test_r2 > MODEL_TRAINER_EXPECTED_SCORE:
                    try:
                        mlflow.xgboost.log_model(
                            xgb_model=model,
                            name=model_name,
                            registered_model_name=registered_model_name,
                            input_example=X_test[:5],
                            signature=signature,
                            model_format="ubj"
                        )
                        logging.info(f"Model registered as: {registered_model_name}")
                        print(f"Model registered as: {registered_model_name}")
                        print(f"Run ID: {run.info.run_id}")
                    except Exception as e:
                        logging.error(f"Failed to register model in MLflow: {e}")
                        # Continue execution, just log the model without registration
                        mlflow.xgboost.log_model(model, artifact_path="nyc-duration-model")
                        logging.info("Model logged without registration due to error")
                else:
                    # Just log without registering if performance is poor
                    #mlflow.xgboost.log_model(model, artifact_path="nyc-duration-model")
                    mlflow.xgboost.log_model(model, name=model_name)                        
                    logging.info("Model not registered due to poor performance")
                    
                # Load preprocessor to save to mlflow as artifact
                try:
                    preprocessor = load_object(file_path=self.data_transformation_artifact.transformed_object_file_path)
                    logging.info("Preprocessor loaded successfully")
                    mlflow.log_artifact(self.data_transformation_artifact.transformed_object_file_path, artifact_path="preprocessor")
                    logging.info("Preprocessor logged to MLflow successfully")
                except Exception as e:
                    logging.error(f"Failed to load or log preprocessor: {e}")
                    # Continue execution as this is not critical for model saving
    
                # Save the trained model using the robust method
                logging.info("Starting model save process")
                try:
                    saved_model_path = self._save_model_safely(
                        model=model,
                        model_dir_path=self.model_trainer_config.trained_model_file_path,
                        model_filename='my_model.ubj'
                    )
                    logging.info(f"Model saved successfully at: {saved_model_path}")
                    
                    # Update the artifact path to the actual saved path
                    actual_model_path = saved_model_path
                    
                except Exception as e:
                    logging.error(f"Critical error: Failed to save model: {e}")
                    raise e
    
                # Model performance evaluation
                if results[-1]['Test_R2'] > MODEL_TRAINER_EXPECTED_SCORE:
                    remark = "good score"
                else:
                    remark = "bad"
    
                # Create Model Trainer Artifact with the actual saved path
                model_trainer_artifact = ModelTrainerArtifact(
                    trained_model_file_path=actual_model_path,
                    train_metric_artifact=regression_train_metric,
                    test_metric_artifact=regression_test_metric,
                    model_performance=remark,
                    over_fitting_under_fitting=results[-1]['Overfit_Check']
                )
                
                logging.info(f"Model trainer artifact created: {model_trainer_artifact}")
                logging.info("Model training process completed successfully")
    
                print("model file path:")
                print(model_trainer_artifact.trained_model_file_path)
                print('---' * 20)
                print("Training Metrics:")
                print(model_trainer_artifact.train_metric_artifact)
                print('---' * 20)
                print("Test Metrics:")
                print(model_trainer_artifact.test_metric_artifact)
                print('---' * 20)
                print("Over_fitting_under_fitting_Metrics:")
                print(model_trainer_artifact.over_fitting_under_fitting)
                print('---' * 20)
                print("Training performance:")
                print(model_trainer_artifact.model_performance)
                
                return model_trainer_artifact
            
        except Exception as e:
            logging.error(f"Error in train_model method: {e}")
            # Ensure MLflow run is ended even if there's an error
            self._ensure_mlflow_run_ended()
            raise e

    def start_model_trainer(self) -> ModelTrainerArtifact:
        
        try:
            logging.info("Starting model trainer pipeline")
            
            train_file_path = self.data_transformation_artifact.transformed_train_file_path
            test_file_path = self.data_transformation_artifact.transformed_test_file_path 
            
            logging.info(f"Loading training data from: {train_file_path}")
            logging.info(f"Loading test data from: {test_file_path}")
            
            # Loading training array and testing array
            train_arr = load_numpy_array_data(train_file_path)
            test_arr = load_numpy_array_data(test_file_path)
            
            logging.info(f"Training array shape: {train_arr.shape}")
            logging.info(f"Test array shape: {test_arr.shape}")
            
            # Split features and target
            x_train, y_train, x_test, y_test = (
                train_arr[:, :-1],
                train_arr[:, -1],
                test_arr[:, :-1],
                test_arr[:, -1],
            )
            
            logging.info("Data split completed - features and target separated")
            logging.info(f"x_train shape: {x_train.shape}, y_train shape: {y_train.shape}")
            logging.info(f"x_test shape: {x_test.shape}, y_test shape: {y_test.shape}")
            
            # Train model with all required parameters
            logging.info("Calling train_model method")
            model_trainer_artifact = self.train_model(x_train, y_train, x_test, y_test)
            
            logging.info("Model trainer pipeline completed successfully")
            return model_trainer_artifact
            
        except Exception as e:
            logging.error(f"Error in start_model_trainer method: {e}")
            # Ensure any active MLflow runs are ended
            self._ensure_mlflow_run_ended()
            raise e

Overwriting src/components/ModelTrainer.py


In [13]:
%%writefile main.py
# main.py

import os
import sys
import json
from pathlib import Path
import pandas as pd
import numpy as np
from dataclasses import dataclass 
from src.logger import logging
from typing import List
from src.constants.config_entity import (
    DataIngestionConfig, 
    TrainingPipelineConfig, 
    DataTransformationConfig,
    ModelTrainerConfig
)
from src.constants.artifact_entity import DataIngestionArtifact, DataTransformationArtifact
from src.components.data_ingestion import DataIngestion
from src.components.data_transformation import DataTransformation
from src.components.ModelTrainer import ModelTrainer


if __name__ == "__main__":
    try:
        # Initialize pipeline configuration
        training_pipeline_config = TrainingPipelineConfig()
        data_ingestion_config = DataIngestionConfig(training_pipeline_config)
        
        # Create data ingestion instance
        data_ingestion = DataIngestion(data_ingestion_config)
        
        logging.info("Initiating data reading and processing")
        
        # Start data ingestion
        data_ingestion_artifact = data_ingestion.start_data_ingestion()
        
        print("Data ingestion completed successfully!")
        print(f"Train file: {data_ingestion_artifact.train_file_path}")
        print(f"Test file: {data_ingestion_artifact.test_file_path}")
        
        logging.info("Transforming data")
        data_transformation_config = DataTransformationConfig(training_pipeline_config)
        
        # Create data transformation instance
        data_transform = DataTransformation(data_ingestion_artifact, data_transformation_config)
        data_transformation_artifact = data_transform.start_data_transformation()
        
        print("Data transformation completed successfully!")
        print(f"Preprocessor file: {data_transformation_artifact.transformed_object_file_path}")
        print(f"Transformed train file: {data_transformation_artifact.transformed_train_file_path}")
        print(f"Transformed test file: {data_transformation_artifact.transformed_test_file_path}")
        logging.info("Transforming ended")
        
        logging.info("model training")
        model_trainer_config = ModelTrainerConfig(training_pipeline_config)
        # Create model training instance
        model_trainer = ModelTrainer(model_trainer_config, data_transformation_artifact)
        model_trainer_artifact = model_trainer.start_model_trainer()
        
        print("Model successfully trained!")
        logging.info("Model training ended")
        
    except Exception as e:
        logging.error(f"Error in main execution: {e}")
        raise e

Overwriting main.py


In [14]:
!python main.py

Loaded 48326 training and 46621 test records from database
Data ingestion completed successfully!
Train file: ingested_data/untransformed_data-09_16_2025_15_42_58/train_data.csv
Test file: ingested_data/untransformed_data-09_16_2025_15_42_58/test_data.csv
Data transformation completed successfully!
Preprocessor file: ingested_data/model-09_16_2025_15_43_00/preprocessing.pkl
Transformed train file: ingested_data/transformed_data-09_16_2025_15_43_00/train.npy
Transformed test file: ingested_data/transformed_data-09_16_2025_15_43_00/test.npy
tracking URI: 'http://127.0.0.1:5000'
🏃 View run thundering-dove-864 at: http://127.0.0.1:5000/#/experiments/0/runs/9ab543cb0dc84976a0fe91a798e51465
🧪 View experiment at: http://127.0.0.1:5000/#/experiments/0
Registered model 'XGBoostdurationModel' already exists. Creating a new version of this model...
2025/09/16 15:43:04 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: XGBoo

### REQUIREMENT.txt file

In [16]:
%%writefile requirements.txt

mlflow==3.3.2
cloudpickle==3.0.0
lz4==4.3.3
numpy==1.26.4
pandas==2.3.1
psutil==7.0.0
scikit-learn==1.7.0
scipy==1.13.0
fastapi==0.115.12
uvicorn[standard]==0.34.2
category-encoders==2.6.3
jinja2==3.1.3
joblib==1.4.2
email-validator==2.1.1
python-multipart==0.0.6
pydantic==2.5.2
xgboost==3.0.1

Overwriting requirements.txt


### CREATE APP FILE

In [17]:
import os
os.makedirs("templates",exist_ok=True )

In [18]:
%%writefile templates/predict_form.html
<!DOCTYPE html>
<html>
<head>
    <title>Trip Duration Predictor</title>
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <style>
        :root { --primary:#007BFF; }
        body {
            font-family: Arial, sans-serif;
            background: #f4f6f9;
            padding: 20px;
        }
        .container {
            background: #fff;
            padding: 30px;
            border-radius: 12px;
            max-width: 560px;
            margin: auto;
            box-shadow: 0 10px 24px rgba(0,0,0,0.06);
        }
        .grid { display: grid; grid-template-columns: 1fr 1fr; gap: 16px; }
        .grid .full { grid-column: 1 / -1; }
        label { display:block; font-weight:600; margin-bottom:6px; }
        input[type="number"] {
            width: 100%;
            padding: 10px 12px;
            border: 1px solid #d6dae1;
            border-radius: 8px;
            background:#fff;
        }
        button {
            background: var(--primary);
            color: #fff;
            border: none;
            padding: 12px 18px;
            border-radius: 8px;
            cursor: pointer;
            font-weight: 600;
        }
        button:hover { background: #0056b3; }
        .result {
            margin-top: 20px;
            background: #e6f7ff;
            padding: 12px;
            border-left: 4px solid var(--primary);
            border-radius: 6px;
        }
        .subtitle { color:#566; margin-top:-6px; margin-bottom:18px; }
    </style>
</head>
<body>
    <div class="container">
        <h2>Trip Duration Predictor</h2>
        <p class="subtitle">Enter ride details and submit to get the predicted duration (minutes).</p>

        <form method="post">
            <div class="grid">
                <div>
                    <label for="passenger_count">Passenger Count</label>
                    <input type="number" id="passenger_count" name="passenger_count" step="1" min="0" required placeholder="e.g., 1">
                </div>

                <div>
                    <label for="trip_distance">Trip Distance (miles)</label>
                    <input type="number" id="trip_distance" name="trip_distance" step="0.01" min="0" required placeholder="e.g., 4.12">
                </div>

                <div>
                    <label for="fare_amount">Fare Amount ($)</label>
                    <input type="number" id="fare_amount" name="fare_amount" step="0.01" required placeholder="e.g., 21.20">
                </div>

                <div>
                    <label for="total_amount">Total Amount ($)</label>
                    <input type="number" id="total_amount" name="total_amount" step="0.01" required placeholder="e.g., 36.77">
                </div>

                <div>
                    <label for="PULocationID">PU Location ID</label>
                    <input type="number" id="PULocationID" name="PULocationID" step="1" min="0" required placeholder="e.g., 171">
                </div>

                <div>
                    <label for="DOLocationID">DO Location ID</label>
                    <input type="number" id="DOLocationID" name="DOLocationID" step="1" min="0" required placeholder="e.g., 73">
                </div>

                <div class="full" style="display:flex; justify-content:flex-end;">
                    <button type="submit">Predict Duration</button>
                </div>
            </div>
        </form>

        {% if result %}
        <div class="result">
            <strong>Prediction:</strong> {{ result }} minutes
        </div>
        {% endif %}
    </div>
</body>
</html>


Writing templates/predict_form.html


In [19]:
%%writefile app.py


from fastapi import FastAPI, HTTPException, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
import joblib
import pandas as pd
import uvicorn
import logging
import warnings
from typing import List
from xgboost import XGBRegressor

# Suppress category_encoders warning
warnings.filterwarnings('ignore', category=FutureWarning, module='category_encoders')

# Logging config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI(
    title="Trip Duration Prediction API",
    description="API and Web UI for predicting NYC taxi trip duration using XGBoost",
    version="1.0.0"
)

# Templates setup
templates = Jinja2Templates(directory="templates")

# Globals
preprocessor = None
model = None

# Input schema
class RideData(BaseModel):
    passenger_count: float
    trip_distance: float
    fare_amount: float
    total_amount: float
    PULocationID: int
    DOLocationID: int
    

    class Config:
        schema_extra = {
            "example": {
                "passenger_count":1.0,
                "trip_distance": 4.12,
                "fare_amount":21.20,
                "total_amount":36.77,
                "PULocationID": 171,
                "DOLocationID": 73,
            }
        }


# Output schema
class PredictionResponse(BaseModel):
    predicted_duration: float
    status: str
    message: str

# Load preprocessor
def load_preprocessor(path: str):
    try:
        preprocessor = joblib.load(path)
        logger.info("Preprocessor loaded.")
        return preprocessor
    except Exception as e:
        logger.error(f"Error loading preprocessor: {e}")
        raise

# Load model
def load_model(path: str):
    try:
        model = XGBRegressor()
        model.load_model(path) 
        logger.info("Model loaded.")
        return model
    except Exception as e:
        logger.error(f"Error loading model: {e}")
        raise

# Prediction logic
def predict_duration(preprocessor, model, ride_df):
    try:
        X_processed = preprocessor.transform(ride_df)
        prediction = model.predict(X_processed)
        return float(prediction[0])
    except Exception as e:
        logger.error(f"Prediction error: {e}")
        raise

# Load on startup
@app.on_event("startup")
async def startup_event():
    global preprocessor, model
    preprocessor = load_preprocessor("preprocessing.pkl")
    model = load_model("my_model.ubj")

# Health check
@app.get("/")
async def root():
    return {"message": "Trip Duration Prediction API is running"}

@app.get("/health")
async def health_check():
    return {
        "status": "healthy",
        "preprocessor_loaded": preprocessor is not None,
        "model_loaded": model is not None
    }

# Single prediction
@app.post("/predict", response_model=PredictionResponse)
async def predict(ride_data: RideData):
    if preprocessor is None or model is None:
        raise HTTPException(status_code=500, detail="Models not loaded.")
    try:
        df = pd.DataFrame([ride_data.dict()])
        duration = predict_duration(preprocessor, model, df)
        return PredictionResponse(
            predicted_duration=duration,
            status="success",
            message="Prediction completed successfully"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}")

# Batch prediction
@app.post("/predict_batch")
async def predict_batch(rides: List[RideData]):
    if preprocessor is None or model is None:
        raise HTTPException(status_code=500, detail="Models not loaded.")
    try:
        results = []
        for ride in rides:
            df = pd.DataFrame([ride.dict()])
            duration = predict_duration(preprocessor, model, df)
            results.append(duration)
        return {
            "predictions": results,
            "status": "success",
            "message": f"Predicted durations for {len(rides)} rides"
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Batch prediction failed: {str(e)}")

# Web form GET
@app.get("/form", response_class=HTMLResponse)
async def form_get(request: Request):
    return templates.TemplateResponse("predict_form.html", {"request": request, "result": None})

# Web form POST
# Web form POST
@app.post("/form", response_class=HTMLResponse)
async def form_post(
    request: Request,
    passenger_count: float = Form(...),
    trip_distance: float = Form(...),
    fare_amount: float = Form(...),
    total_amount: float = Form(...),
    PULocationID: int = Form(...),
    DOLocationID: int = Form(...),
):
    try:
        if preprocessor is None or model is None:
            raise HTTPException(status_code=500, detail="Model not loaded.")
        # Build the full feature set expected by the preprocessor/model
        df = pd.DataFrame([{
            "passenger_count": float(passenger_count),
            "trip_distance": float(trip_distance),
            "fare_amount": float(fare_amount),
            "total_amount": float(total_amount),
            "PULocationID": int(PULocationID),
            "DOLocationID": int(DOLocationID),
        }])
        result = predict_duration(preprocessor, model, df)
        return templates.TemplateResponse(
            "predict_form.html",
            {"request": request, "result": f"{result:.2f}"}
        )
    except Exception as e:
        logger.error(f"Form prediction error: {e}")
        return templates.TemplateResponse(
            "predict_form.html",
            {"request": request, "result": "Error during prediction"}
        )


# Run server
if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=9696, reload=True)


Overwriting app.py


### DOCKER FILE

In [22]:
%%writefile Dockerfile

# Base image
FROM python:3.12-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update -y && \
    apt-get install -y --no-install-recommends curl && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements and install dependencies
COPY requirements.txt /app/
RUN pip install --upgrade pip && \
    pip install --no-cache-dir -r requirements.txt

# Copy application files
COPY . /app

# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Azure Web Apps uses dynamic port assignment
EXPOSE $PORT

# Run the app with dynamic port
ENTRYPOINT ["sh", "-c", "uvicorn app:app --host=0.0.0.0 --port=${PORT:-8000}"]

Overwriting Dockerfile
