<a href="https://colab.research.google.com/github/ABHIRAM199/Gen-AI/blob/main/Case_Study_MLOps.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Case Study : Housing Price Prediction**

**Agenda**
- Data Preprocessing
- Model Training
- Version Control (Code, Data, and Model)
- Model Evaluation
- Model Deployment
- Monitoring and Logging



MLOps pipeline for Housing Price Prediction involves several critical steps, including data preprocessing, model training, versioning, and deployment. Below is a detailed guide that walks through the entire pipeline step-by-step with code snippets for each part.

**Case Study Overview:**
- **Problem:** Predict housing prices based on various features (like the number of rooms, location, etc.).
- **Solution:** Build an MLOps pipeline that automates data preprocessing, model training, versioning (with Git, DVC, and MLflow), and deployment (using Docker, Flask, and Airflow for orchestration).

###**Data Preprocessing**


This step focuses on preparing raw data for model training by handling missing values, encoding categorical features, and feature scaling.

**Python Code:**

In [None]:
# src/preprocess.py

import pandas as pd
import logging
import os
import sys

def setup_logging(log_dir='logs', log_file='preprocess.log'):
    """
    Sets up logging configuration.
    Ensures that the log directory exists.
    """
    os.makedirs(log_dir, exist_ok=True)  # Create log directory if it doesn't exist
    log_path = os.path.join(log_dir, log_file)

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(levelname)s:%(message)s',
        handlers=[
            logging.FileHandler(log_path),
            logging.StreamHandler(sys.stdout)  # Also log to stdout for real-time monitoring
        ]
    )

def ensure_directory_exists(file_path):
    """
    Ensures that the directory for the given file path exists.
    If it doesn't exist, the directory is created.

    Parameters:
    - file_path (str): The file path for which to ensure the directory exists.
    """
    directory = os.path.dirname(file_path)
    if directory:
        os.makedirs(directory, exist_ok=True)
        logging.debug(f"Ensured directory exists: {directory}")

def preprocess_data(input_path, output_path):
    """
    Preprocesses the housing data by handling missing values and encoding categorical variables.

    Parameters:
    - input_path (str): Path to the raw input CSV file.
    - output_path (str): Path to save the preprocessed CSV file.
    """
    try:
        logging.info("Loading data from %s", input_path)
        data = pd.read_csv('/content/Housing.csv')

        logging.info("Handling missing values")
        data.ffill(inplace=True)  # Forward fill to handle missing values

        logging.info("Encoding categorical variables")
        data = pd.get_dummies(data, drop_first=True)  # One-hot encode categorical variables

        logging.info("Ensuring output directory exists")
        ensure_directory_exists(output_path)

        logging.info("Saving preprocessed data to %s", output_path)
        data.to_csv(output_path, index=False)  # Save preprocessed data
        logging.info("Preprocessing completed successfully.")

    except FileNotFoundError as fnf_error:
        logging.error("File not found: %s", fnf_error)
        sys.exit(1)  # Exit the script with an error code
    except pd.errors.EmptyDataError:
        logging.error("No data: The file is empty.")
        sys.exit(1)
    except Exception as e:
        logging.error("Error in preprocessing data: %s", e)
        sys.exit(1)

if __name__ == "__main__":
    # Setup logging
    setup_logging()

    # Define input and output paths (using the correct uploaded dataset path)
    input_data_path = '/content/Housing.csv'  # Make sure this is the correct file path
    output_data_path = '/mnt/data/preprocessed_housing.csv'  # Output will be saved here

    logging.info("Starting the preprocessing pipeline.")
    preprocess_data(input_path=input_data_path, output_path=output_data_path)
    logging.info("Preprocessing pipeline finished.")


**Explanation:**
- **Data Loading:** Reads the raw housing dataset (data/raw_housing.csv).
- **Missing Value Handling:** Fills missing values using forward-fill (ffill).
- **Categorical Encoding:** Encodes categorical variables using one-hot encoding.
- **Data Export:** Saves the processed dataset (data/preprocessed_housing.csv).

###**Model Training**


In this phase, the preprocessed data is split into training and testing sets, and a RandomForest model is trained to predict housing prices.

- **Python Code:**

In [None]:
!pip install mlflow

Collecting mlflow
  Downloading mlflow-2.16.2-py3-none-any.whl.metadata (29 kB)
Collecting mlflow-skinny==2.16.2 (from mlflow)
  Downloading mlflow_skinny-2.16.2-py3-none-any.whl.metadata (30 kB)
Collecting alembic!=1.10.0,<2 (from mlflow)
  Downloading alembic-1.13.3-py3-none-any.whl.metadata (7.4 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.3-py2.py3-none-any.whl.metadata (7.7 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==2.16.2->mlflow)
  Downloading databricks_sdk-0.33.0-py3-none-any.whl.metadata (37 kB)
Collecting gitpython<4,>=3.1.9 (from mlflow-skinny==2.16.2->mlflow)
  Downloading GitPython-3.1.43-py3-none-any.whl.metadata (13 kB)
Collecting Mako (from alembic!=1.10.0,<2->mlflow)
  Downloading Mako-1.3.5-py3-none-any.whl.metadata (2.9 kB)
C

In [None]:
# src/train.py
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import joblib
import mlflow
import mlflow.sklearn
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler

def train_model(data_path, model_path):
    # Load data
    data = pd.read_csv(data_path)

    # Identify feature columns and target
    X = data.drop('price', axis=1)  # Features
    y = data['price']  # Target variable

    # Identify categorical and numerical columns
    categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()
    numerical_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()

    # Split data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # Define preprocessing for numerical data
    numerical_transformer = Pipeline(steps=[
        ('scaler', StandardScaler())
    ])

    # Define preprocessing for categorical data
    categorical_transformer = Pipeline(steps=[
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Combine preprocessing steps
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, numerical_cols),
            ('cat', categorical_transformer, categorical_cols)
        ]
    )

    # Create a pipeline that first preprocesses the data and then trains the model
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('regressor', RandomForestRegressor(n_estimators=100, random_state=42))
    ])

    # Start MLflow run for logging
    with mlflow.start_run():
        # Train the model pipeline
        pipeline.fit(X_train, y_train)

        # Evaluate the model
        accuracy = pipeline.score(X_test, y_test)
        print(f"Model R^2 Score: {accuracy * 100:.2f}%")

        # Log metrics and model to MLflow
        mlflow.log_metric("r2_score", accuracy)
        mlflow.sklearn.log_model(pipeline, "model")

        # Save model using joblib
        joblib.dump(pipeline, model_path)
        print(f"Model saved to {model_path}")

if __name__ == "__main__":
    train_model(data_path='/content/Housing.csv', model_path='/content/Housing.csv')


Model R^2 Score: 61.01%




Model saved to /content/Housing.csv


**Explanation:**
- **Libraries Import:** Key libraries such as pandas for data manipulation, sklearn for machine learning, mlflow for experiment tracking, and joblib for model saving are imported.

- **Data Loading:** The dataset is read from a CSV file using pd.read_csv().

- **Feature and Target Separation:** The target variable (price) is separated from the feature variables (X).

- **Categorical and Numerical Columns:** Categorical and numerical columns are identified to apply the appropriate preprocessing techniques.

- **Data Splitting:** The dataset is split into training and testing sets using train_test_split().


**Preprocessing Pipelines:**
- **Numerical Data:** Scaled using StandardScaler.
- **Categorical Data:** One-hot encoded using OneHotEncoder.
- **Combined Preprocessing:** Both numerical and categorical transformers are combined using ColumnTransformer.

- **Pipeline:** A Pipeline is built that applies preprocessing followed by training a RandomForestRegressor.

- **Model Training:** The pipeline is trained using the training data.

- **Model Evaluation:** The model is evaluated on the test data, and the R^2 score is printed.

- **MLflow Logging:** Model performance and the model itself are logged with MLflow.

- **Model Saving:** The trained model is saved using joblib.








**Version Control**
- Versioning is crucial in MLOps pipelines for tracking code, data, and model versions. Here’s how to manage version control for each component.

**a. Code Versioning (Git)**
- Initialize a Git repository for versioning code.

```
# Initialize Git
git init
git add .
git commit -m "Initial commit: Data Preprocessing and Model Training"
```



**b. Data Versioning (DVC)**
- DVC (Data Version Control) is used to track the dataset and the preprocessed data.

```
# Initialize DVC
dvc init

# Add raw data to DVC
dvc add data/raw_housing.csv
git add data/raw_housing.csv.dvc .gitignore
git commit -m "Add raw housing data"

# Add preprocessed data to DVC
dvc add data/preprocessed_housing.csv
git add data/preprocessed_housing.csv.dvc
git commit -m "Add preprocessed housing data"
```


**c. Model Versioning (MLflow)**
- The trained model is versioned using MLflow, logged during the training process. You can track model versions in MLflow’s UI.

```
mlflow ui  # Launch the MLflow UI to monitor model versions
```




###**Model Evaluation**
After training the model, evaluation metrics are logged to MLflow for easy access and comparison across versions.

- **Metric Logged:** Model accuracy.
- **MLflow Tracking:** Stores metrics, parameters, and artifacts for future reference.


###**Model Deployment**


Once the model is trained, it needs to be deployed as a REST API using Flask and Docker for serving predictions.

- **Python Code for Flask API:**

In [None]:
# src/deploy.py
from flask import Flask, request, jsonify
import joblib
import numpy as np

app = Flask(__name__)
model = joblib.load('/content/Housing.csv')

@app.route('/predict', methods=['POST'])
def predict():
    data = request.json['data']
    prediction = model.predict(np.array(data).reshape(1, -1))
    return jsonify({'prediction': prediction.tolist()})

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://172.28.0.12:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


**Explanation:**
- **Flask API:** Exposes an endpoint /predict for model predictions.

####**Monitoring and Logging**


MLflow and Flask logs can be monitored to track model performance, API requests, and errors.

- **MLflow:** Logs model metrics and versioning.
- **Flask:** Logs API requests and model predictions.

**a. Model Performance Monitoring**
- For any deployed machine learning model, it's critical to keep track of how well it's performing. This is usually done by monitoring metrics like accuracy, prediction latency, or mean squared error (MSE). In our case study, we'll use MLflow to monitor model performance and log metrics.

**b. Application Monitoring**
- When deploying models through a Flask API, we need to monitor the API performance (e.g., how many requests are being made, response times, errors). For this, tools like Prometheus or Flask’s logging module can be used to track the status of requests and errors.

**c. Infrastructure Monitoring**
- For an automated pipeline using Apache Airflow, monitoring the workflow status (e.g., whether tasks succeeded, failed, or are running) is vital. Airflow comes with built-in logging and monitoring features through its web UI.



####**1. Model Monitoring with MLflow**


**MLflow Monitoring:**
- MLflow is already integrated during the model training step to log metrics. Once the model is deployed, the API can also log each prediction to track how the model performs on real-world data over time.

**MLflow Logging Example:**

In [None]:
# src/deploy.py (extended with logging)
from flask import Flask, request, jsonify
import joblib
import numpy as np
import mlflow

app = Flask(__name__)
model = joblib.load('/content/Housing.csv')

@app.route('/predict', methods=['POST'])
def predict():
    data = request.json['data']
    prediction = model.predict(np.array(data).reshape(1, -1))

    # Log prediction to MLflow
    with mlflow.start_run():
        mlflow.log_param("input_data", data)
        mlflow.log_metric("prediction", prediction[0])

    return jsonify({'prediction': prediction.tolist()})

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://172.28.0.12:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


**How it works:**
- **Logging Predictions:** Every time the API is hit, the input data and the prediction result are logged in MLflow.
- **Tracking in MLflow UI:** You can open the MLflow UI by running mlflow ui and track predictions over time.

**Metrics Monitored in MLflow:**
- **Model Accuracy:** During training, accuracy (or other metrics like MSE) are logged.
- **Prediction:** For each API call, we log the prediction result.
- **Latency:** You could add code to measure how long the prediction takes and log it as a custom metric.
Additional Metrics to Monitor:
- **Data Drift:** Compare live data distributions to training data to detect shifts.
- **Request Latency:** Log the time taken by the model to respond to each request.


####**2. Logging in Flask for API Monitoring**


- Flask’s built-in logging features allow you to log important information like API requests, errors, and warnings.

**Example for Flask API Logging:**

In [None]:
# src/deploy.py (with Flask logging)
import logging
from flask import Flask, request, jsonify
import joblib
import numpy as np

app = Flask(__name__)

# Configure logging
logging.basicConfig(level=logging.INFO, filename='logs/flask_api.log',
                    format='%(asctime)s %(levelname)s:%(message)s')

model = joblib.load('/content/Housing.csv')

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.json['data']
        logging.info("Received data for prediction: %s", data)

        prediction = model.predict(np.array(data).reshape(1, -1))
        logging.info("Prediction result: %s", prediction)

        return jsonify({'prediction': prediction.tolist()})

    except Exception as e:
        logging.error("Error occurred: %s", e)
        return jsonify({'error': str(e)}), 500

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://172.28.0.12:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


**How it works:**
- **Request Logging:** Every incoming request is logged, including the input data.
- **Prediction Logging:** After each prediction, the result is logged.
- **Error Logging:** Any exceptions are caught and logged with error details.
