<a href="https://colab.research.google.com/github/MELAI-1/MY-PROJECT/blob/main/machine-learning/Mlops_pip.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Implementation of MLOps Pipeline**

Implementing an MLOps pipeline means creating a system where machine learning models can be built, tested, deployed and monitored smoothly. Below is a step-by-step guide to build this pipeline using Python, Docker and Kubernetes.

## **Step 1: Data Ingestion and Preprocessing**

Airflow is a tool for orchestrating data pipelines. Here's an example DAG (Directed Acyclic Graph) that automates data ingestion and preprocessing:

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pandas as pd


def preprocess_data():
    df = pd.read_csv('data/raw_data.csv')
    df.fillna(0, inplace=True)
    df['new_feature'] = df['feature_1'] * df['feature_2']
    df.to_csv('data/preprocessed_data.csv', index=False)


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

dag = DAG(
    'data_preprocessing_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
)

preprocess_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)

preprocess_task

## **Step 2: Model Training with MLflow for Experiment Tracking**



MLflow helps track experiments, log parameters and store models. Below is an example of training a model using scikit-learn and logging it with MLflow:

In [None]:
import mlflow
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import pandas as pd

df = pd.read_csv('data/preprocessed_data.csv')
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)
with mlflow.start_run():

    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("accuracy", accuracy)

    mlflow.sklearn.log_model(model, "random_forest_model")

## **Step 3: Model Deployment**

Once the model is trained it can be deployed as a REST API using Flask and containerized with Docker.

Flask API Code (app.py):

In [None]:
from flask import Flask, request, jsonify
import joblib
import pandas as pd

app = Flask(__name__)

model = joblib.load('model/random_forest_model.pkl')


@app.route('/predict', methods=['POST'])
def predict():
    data = request.json
    df = pd.DataFrame([data])
    prediction = model.predict(df)
    return jsonify({'prediction': int(prediction[0])})


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

In [None]:
# Base image
FROM python:3.8-slim

# Set working directory
WORKDIR /app

# Install dependencies
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Copy application code
COPY . .

# Expose port
EXPOSE 5000

# Run the app
CMD ["python", "app.py"]

## **Step 4: Monitoring Model Performance with Prometheus and Grafana**

Prometheus collects metrics and Grafana visualizes them. Below is an example of setting up monitoring for model accuracy.

In [None]:
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'ml_model'
    static_configs:
      - targets: ['localhost:8000']

Export Metrics with Flask:

In [None]:
from flask import Flask
from prometheus_client import start_http_server, Counter

app = Flask(__name__)
REQUESTS = Counter('http_requests_total', 'Total HTTP Requests')


@app.route('/')
def home():
    REQUESTS.inc()
    return "Hello, World!"


if __name__ == '__main__':
    start_http_server(8000)
    app.run(port=5000)

Run Prometheus and Grafana locally using Docker Compose:

In [None]:
version: '3'
services:
  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"

# **Step 5: Feedback and Iteration**

In [None]:
from fastapi import FastAPI, Query
import joblib
import pandas as pd

app = FastAPI()

model_a = joblib.load('model/model_a.pkl')
model_b = joblib.load('model/model_b.pkl')


@app.post("/predict/")
async def predict(features: dict, model_version: str = Query(..., enum=["A", "B"])):
    df = pd.DataFrame([features])
    if model_version == "A":
        prediction = model_a.predict(df)
    elif model_version == "B":
        prediction = model_b.predict(df)
    return {"model_version": model_version, "prediction": int(prediction[0])}