### 🚀 **Big Data & Cloud ML: Topics & Subtopics**  

This will cover **Big Data processing**, **scalable machine learning**, and **cloud-based AI services** to handle large-scale data and deploy ML models effectively.  

---

## **🔹 1. Big Data Processing Frameworks**  
These tools help handle large datasets efficiently.  

### ✅ **Apache Spark** (Distributed Data Processing)  
- Spark Architecture (RDD, DAG, Lazy Evaluation)  
- Spark DataFrames & SQL  
- Spark MLlib (Machine Learning on Spark)  
- Optimizations (Partitioning, Caching, Broadcasting)  

### ✅ **Dask** (Parallel Computing in Python)  
- Dask vs. Spark  
- Dask DataFrames & Arrays  
- Scaling Pandas Workflows  
- Parallel ML Training with Dask-ML  

### ✅ **Other Big Data Tools**  
- **Apache Flink** – Real-time stream processing  
- **Apache Kafka** – Message streaming for Big Data  
- **Hadoop & HDFS** – Distributed storage & processing  

🛠 **Hands-on:**  
- Process large datasets with **Spark & Dask**  
- Build a **real-time stream processing pipeline with Kafka**  

---

## **🔹 2. Scalable Machine Learning (ML) on Big Data**  
How to train & deploy ML models efficiently on large datasets.  

### ✅ **Machine Learning with Spark MLlib**  
- Feature Engineering at Scale  
- Training ML Models (Regression, Classification)  
- Hyperparameter Tuning in Spark  
- Deploying Spark ML Models  

### ✅ **Dask-ML for Scalable ML**  
- Parallel Hyperparameter Tuning  
- Scaling Scikit-Learn with Dask  
- Model Training on Large Datasets  

### ✅ **Online & Incremental Learning**  
- Streaming ML with Spark Streaming  
- Incremental Learning (River, Vowpal Wabbit)  
- Federated Learning (Google’s FedAvg, PySyft)  

🛠 **Hands-on:**  
- Train a **large-scale ML model using Spark MLlib**  
- Implement **incremental learning for streaming data**  

---

## **🔹 3. Cloud ML: AWS, GCP, Azure**  
Deploying ML models in the cloud using **managed AI services**.  

### ✅ **AWS Machine Learning**  
- **SageMaker** – Train & deploy ML models  
- **AWS Lambda & API Gateway** – Serverless ML  
- **EMR (Elastic MapReduce)** – Run Spark on AWS  

### ✅ **Google Cloud AI**  
- **Vertex AI** – End-to-end ML platform  
- **BigQuery ML** – Train ML models inside BigQuery  
- **Dataflow & Dataproc** – Serverless Spark & Hadoop  

### ✅ **Microsoft Azure AI**  
- **Azure Machine Learning Studio**  
- **Azure Databricks** (Cloud-based Spark)  
- **ML Pipelines & MLOps in Azure**  

🛠 **Hands-on:**  
- Deploy an **ML model using AWS SageMaker**  
- Train an **ML model in BigQuery ML**  

---

## **🔹 4. MLOps & Model Deployment at Scale**  
How to deploy and manage ML models in production.  

### ✅ **MLOps for Scalable ML**  
- **CI/CD for ML** – Automate ML pipelines  
- **Model Versioning & Monitoring** (MLflow, Kubeflow)  
- **A/B Testing & Model Drift Detection**  

### ✅ **Deploying ML Models at Scale**  
- **Serverless Deployment** (FastAPI, AWS Lambda)  
- **Kubernetes for ML (Kubeflow)**  
- **Inference Optimization** (ONNX, TensorRT)  

🛠 **Hands-on:**  
- Deploy a **serverless ML model on AWS Lambda**  
- Set up **MLOps with MLflow**  

---


Let's break down **Big Data Processing Frameworks** in detail, covering **Apache Spark, Dask, and other tools** like **Kafka, Flink, and Hadoop**, along with **Python code snippets** to demonstrate their usage.  

---

# **🚀 Big Data Processing Frameworks**

## **🔹 1. Apache Spark (Distributed Data Processing)**
Apache Spark is an open-source, distributed computing system that processes large datasets in parallel across multiple nodes.  

### ✅ **Spark Architecture**
Spark operates using a **Resilient Distributed Dataset (RDD)** and a **DAG (Directed Acyclic Graph)** execution model.

- **RDD (Resilient Distributed Dataset)** → Immutable, distributed collection of data processed in parallel.  
- **DAG (Directed Acyclic Graph)** → Logical execution plan Spark follows for transformations.  
- **Lazy Evaluation** → Computation is not executed until an action (like `.collect()`) is called.  

**🔹 Example: Creating an RDD in PySpark**


In [None]:

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("BigDataProcessing").getOrCreate()

# Create an RDD
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
rdd = spark.sparkContext.parallelize(data)

# Perform transformation
filtered_rdd = rdd.filter(lambda x: x[1] > 28)

# Trigger execution with an action
print(filtered_rdd.collect())




---

### ✅ **Spark DataFrames & SQL**
Spark DataFrames are optimized, distributed versions of pandas DataFrames. SparkSQL allows querying using SQL syntax.

**🔹 Example: Working with DataFrames & SQL in Spark**


In [None]:

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# Create DataFrame
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# SQL Queries
df.createOrReplaceTempView("people")
result = spark.sql("SELECT * FROM people WHERE Age > 28")
result.show()




---

### ✅ **Spark MLlib (Machine Learning on Spark)**
MLlib is Spark’s built-in machine learning library that supports large-scale ML.

**🔹 Example: Linear Regression using Spark MLlib**


In [None]:

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Sample data
data = [(1, 2.0), (2, 2.5), (3, 3.0), (4, 3.5), (5, 4.0)]
df = spark.createDataFrame(data, ["Feature", "Label"])

# Convert features into vector
assembler = VectorAssembler(inputCols=["Feature"], outputCol="Features")
df = assembler.transform(df)

# Train Linear Regression model
lr = LinearRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(df)

# Predict
predictions = model.transform(df)
predictions.show()




---

### ✅ **Optimizations in Spark (Partitioning, Caching, Broadcasting)**
- **Partitioning** → Distributes data across nodes to balance load.  
- **Caching** → Stores data in memory for faster access.  
- **Broadcasting** → Distributes small datasets efficiently across nodes.

**🔹 Example: Caching a DataFrame**


In [None]:

df.cache()
df.show()




---

## **🔹 2. Dask (Parallel Computing in Python)**
Dask is a parallel computing framework that scales Pandas, NumPy, and Scikit-Learn workflows.

### ✅ **Dask vs. Spark**
| Feature | Apache Spark | Dask |
|---------|-------------|------|
| Language | Scala, Java, Python | Python |
| Parallelism | Distributed Cluster | Single/Multi-core & Cluster |
| Data Structures | RDD, DataFrame | Dask DataFrame, Array |
| Use Case | Big Data Processing | Scaling Python workflows |

---

### ✅ **Dask DataFrames & Arrays**
Dask DataFrames are parallel versions of Pandas DataFrames.

**🔹 Example: Dask DataFrame**


In [None]:

import dask.dataframe as dd

# Load large dataset
df = dd.read_csv("large_dataset.csv")

# Perform operations
df_filtered = df[df["column_name"] > 100]
df_filtered.compute()



**🔹 Example: Dask Arrays (Scaling NumPy)**


In [None]:

import dask.array as da

# Create large array
arr = da.random.random((10000, 10000), chunks=(1000, 1000))

# Compute mean
print(arr.mean().compute())




---

### ✅ **Parallel ML Training with Dask-ML**
Dask-ML scales Scikit-Learn models across multiple cores.

**🔹 Example: Parallel Random Forest with Dask-ML**


In [None]:

from dask_ml.ensemble import RandomForestClassifier
import dask.dataframe as dd

# Load dataset
df = dd.read_csv("large_data.csv")

# Train model
clf = RandomForestClassifier()
clf.fit(df.iloc[:, :-1], df.iloc[:, -1])




---

## **🔹 3. Other Big Data Tools**

### ✅ **Apache Flink – Real-time stream processing**
Flink is a distributed stream processing framework.

### ✅ **Apache Kafka – Message Streaming for Big Data**
Kafka is a message broker for real-time data pipelines.

**🔹 Example: Producing and Consuming Kafka Messages**


In [None]:

from kafka import KafkaProducer, KafkaConsumer

# Producer (Sending Data)
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('test_topic', b'Hello Kafka!')

# Consumer (Receiving Data)
consumer = KafkaConsumer('test_topic', bootstrap_servers='localhost:9092')
for msg in consumer:
    print(msg.value)




---

### ✅ **Hadoop & HDFS – Distributed Storage & Processing**
Hadoop provides a distributed file system (HDFS) for storing large datasets.

**🔹 Example: Accessing HDFS from Python**


In [None]:

from hdfs import InsecureClient

client = InsecureClient('http://localhost:50070', user='hdfs')
client.list('/')



---

# **🛠 Hands-on Projects**
1️⃣ **Process large datasets using Spark & Dask**  
2️⃣ **Build a real-time data pipeline using Kafka**  
3️⃣ **Train a scalable ML model using Spark MLlib**  

---


# **🚀 Scalable Machine Learning (ML) on Big Data**  
When dealing with **large-scale ML**, standard tools like **Scikit-Learn** often fail due to **memory constraints**. This is where **Spark MLlib, Dask-ML, and Streaming ML** come in, enabling **parallel, distributed, and incremental learning** for handling large datasets.

---

# **1️⃣ Machine Learning with Spark MLlib**
**Spark MLlib** is a **distributed machine learning library** built on Apache Spark, allowing **large-scale ML training** across clusters.

### **✅ Feature Engineering at Scale**
Spark MLlib provides **feature transformations, scaling, and vectorization** for large datasets.

**🔹 Example: Feature Engineering in Spark**


In [None]:

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Start Spark Session
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()

# Sample Data
data = [(1, 2.0, 3.0), (2, 5.0, 6.0), (3, 10.0, 15.0)]
columns = ["ID", "Feature1", "Feature2"]
df = spark.createDataFrame(data, columns)

# Combine features into a single vector
assembler = VectorAssembler(inputCols=["Feature1", "Feature2"], outputCol="features")
df_vectorized = assembler.transform(df)

# Standardize the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df_vectorized)
df_scaled = scaler_model.transform(df_vectorized)

df_scaled.select("scaled_features").show(truncate=False)




---

### **✅ Training ML Models (Regression, Classification)**
Spark MLlib supports **linear regression, decision trees, random forests, and gradient boosting**.

**🔹 Example: Train a Logistic Regression Model**


In [None]:

from pyspark.ml.classification import LogisticRegression

# Training Data
training_data = spark.createDataFrame([(0.0, [0.0, 1.1, 0.1]),
                                       (1.0, [2.0, 1.0, -1.0]),
                                       (0.0, [2.0, 1.3, 1.0]),
                                       (1.0, [0.0, 1.2, -0.5])],
                                      ["label", "features"])

# Train Model
lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(training_data)

# Predict
predictions = model.transform(training_data)
predictions.show()




---

### **✅ Hyperparameter Tuning in Spark**
**Grid Search** and **Cross-validation** can be done efficiently using Spark ML’s `ParamGridBuilder` and `CrossValidator`.

**🔹 Example: Hyperparameter Tuning in Spark**


In [None]:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define hyperparameter grid
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Cross-validation
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=param_grid,
                    evaluator=BinaryClassificationEvaluator(),
                    numFolds=3)

cv_model = cv.fit(training_data)




---

### **✅ Deploying Spark ML Models**
Spark ML models can be **saved** and **deployed** for predictions at scale.

**🔹 Example: Save & Load Spark Model**


In [None]:

# Save model
model.save("logistic_regression_model")

# Load model
from pyspark.ml.classification import LogisticRegressionModel
loaded_model = LogisticRegressionModel.load("logistic_regression_model")




---

# **2️⃣ Scalable ML with Dask-ML**
Dask-ML scales **Scikit-Learn models** to large datasets by distributing computations.

### **✅ Parallel Hyperparameter Tuning**
Dask enables **parallel search** for hyperparameter tuning.

**🔹 Example: Parallel Grid Search with Dask**


In [None]:

from dask_ml.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
import dask.array as da

# Generate large dataset
X = da.random.random((10000, 20), chunks=(1000, 20))
y = da.random.randint(0, 2, size=10000, chunks=1000)

# Define Model & Hyperparameters
clf = RandomForestClassifier()
param_grid = {"n_estimators": [10, 50, 100], "max_depth": [5, 10, None]}

# Perform Grid Search
grid_search = GridSearchCV(clf, param_grid, cv=3)
grid_search.fit(X, y)




---

### **✅ Scaling Scikit-Learn with Dask**
Dask extends Scikit-Learn to large datasets without loading everything into memory.

**🔹 Example: Large Dataset ML Training with Dask**


In [None]:

import dask.dataframe as dd
from dask_ml.linear_model import LogisticRegression

# Load large dataset
df = dd.read_csv("large_dataset.csv")

# Train model
lr = LogisticRegression()
lr.fit(df.iloc[:, :-1], df.iloc[:, -1])




---

# **3️⃣ Online & Incremental Learning**
Instead of training models on **static data**, **incremental learning** allows updating models **dynamically**.

## **✅ Streaming ML with Spark Streaming**
Spark Streaming enables **real-time model updates** with incoming data.

**🔹 Example: Streaming ML with Spark**


In [None]:

from pyspark.sql.functions import col
from pyspark.ml.classification import DecisionTreeClassifier

# Streaming DataFrame
df_stream = spark.readStream.format("csv").option("header", "true").load("streaming_data/")

# Train Model on Incoming Data
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
model = dt.fit(df_stream)

# Predict on Stream
predictions = model.transform(df_stream)
predictions.writeStream.outputMode("append").format("console").start().awaitTermination()




---

## **✅ Incremental Learning (River, Vowpal Wabbit)**
River is a Python library for **online learning** (updating ML models incrementally).

**🔹 Example: Online Learning with River**


In [None]:

from river import linear_model, preprocessing

# Initialize model
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()

# Stream new data and update model
for x, y in stream:  # stream is incoming data
    model.learn_one(x, y)




---

## **✅ Federated Learning (Google’s FedAvg, PySyft)**
Federated Learning trains models **across multiple devices** without sending data to a central server.

**🔹 Example: Federated Learning with PySyft**


In [None]:

import syft as sy

# Create virtual workers
alice = sy.VirtualWorker(hook, id="alice")
bob = sy.VirtualWorker(hook, id="bob")

# Train model across distributed nodes
model.send(alice)
output = model(input_data)
model.get()



---

# **🛠 Hands-on Projects**
✅ **Train a large-scale ML model using Spark MLlib**  
✅ **Parallel hyperparameter tuning with Dask-ML**  
✅ **Implement incremental learning for streaming data**  

---


# **🚀 Cloud Machine Learning: AWS, GCP, Azure**  
Cloud platforms provide **scalable, managed ML services** that allow **training, deployment, and monitoring of ML models** without managing infrastructure.  

---


# **1️⃣ AWS Machine Learning**  

## ✅ **Amazon SageMaker** – Train & Deploy ML Models  
Amazon SageMaker is a **fully managed** ML platform for training, tuning, and deploying ML models.  

### **🔹 Features of SageMaker**  
- **Managed Jupyter Notebooks** – Develop models interactively.  
- **Built-in Algorithms** – Prebuilt ML models (XGBoost, Linear Regression, etc.).  
- **AutoML** – Automatic model tuning with SageMaker Autopilot.  
- **Hyperparameter Optimization (HPO)** – Optimize models using Bayesian search.  
- **Model Deployment** – Deploy models as REST APIs with **SageMaker Endpoints**.  

### **🔹 Example: Training a Model in SageMaker**


In [None]:

import boto3  
import sagemaker  
from sagemaker import get_execution_role  

# Initialize SageMaker session  
sagemaker_session = sagemaker.Session()  
role = get_execution_role()  

# Define training parameters  
container = sagemaker.image_uris.retrieve("xgboost", boto3.Session().region_name, "latest")  

# Create an estimator  
xgb = sagemaker.estimator.Estimator(container,  
                                    role,  
                                    instance_count=1,  
                                    instance_type="ml.m5.large",  
                                    output_path="s3://your-bucket/output",  
                                    sagemaker_session=sagemaker_session)  

# Set hyperparameters  
xgb.set_hyperparameters(objective="binary:logistic", num_round=100)  

# Train the model  
xgb.fit({"train": "s3://your-bucket/train.csv"})  




---

## ✅ **AWS Lambda & API Gateway** – Serverless ML  
AWS Lambda lets you **run ML inference serverlessly** without managing servers.  

### **🔹 Deploy ML Model as Serverless API**
1. Train an ML model in **SageMaker** or **Scikit-Learn**.  
2. Save the model to **Amazon S3**.  
3. Deploy as **Lambda Function** with **API Gateway**.  

### **🔹 Example: Deploy ML Model with Lambda**


In [None]:

import json
import boto3
import pickle

# Load model from S3
s3 = boto3.client('s3')
bucket = "your-bucket"
model_key = "model.pkl"
response = s3.get_object(Bucket=bucket, Key=model_key)
model = pickle.loads(response['Body'].read())

# Define Lambda function
def lambda_handler(event, context):
    data = json.loads(event['body'])
    prediction = model.predict([data['features']])
    return {"statusCode": 200, "body": json.dumps({"prediction": prediction.tolist()})}




---

## ✅ **AWS EMR (Elastic MapReduce)** – Running Spark on AWS  
**EMR** allows running **Apache Spark, Hadoop, and Presto** on AWS.  

### **🔹 Example: Running a Spark Job on EMR**


In [None]:

from pyspark.sql import SparkSession

# Initialize Spark on EMR
spark = SparkSession.builder.appName("AWS_EMR_Spark").getOrCreate()

# Load Data
df = spark.read.csv("s3://your-bucket/data.csv", header=True, inferSchema=True)

# Perform Transformations
df.groupBy("category").count().show()




---

# **2️⃣ Google Cloud AI**  

## ✅ **Vertex AI** – End-to-End ML Platform  
**Vertex AI** provides **AutoML, custom training, model deployment, and monitoring** in a single platform.

### **🔹 Example: Training a Model in Vertex AI**


In [None]:

from google.cloud import aiplatform

# Initialize Vertex AI client
aiplatform.init(project="your-project-id", location="us-central1")

# Train model
model = aiplatform.CustomTrainingJob(
    display_name="my-model",
    script_path="train.py",
    container_uri="gcr.io/cloud-aiplatform/training/tf-cpu.2-2",
    model_serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2"
)

model.run(replica_count=1, machine_type="n1-standard-4")




---

## ✅ **BigQuery ML** – Train ML Models Inside BigQuery  
**BigQuery ML** enables SQL-based ML model training **directly inside BigQuery** without exporting data.

### **🔹 Example: Train a Regression Model in BigQuery**
```sql
CREATE OR REPLACE MODEL my_project.my_dataset.model_name  
OPTIONS(model_type='linear_reg') AS  
SELECT feature1, feature2, target FROM my_project.my_dataset.train_data;
```

### **🔹 Example: Make Predictions with BigQuery ML**
```sql
SELECT *  
FROM ML.PREDICT(MODEL my_project.my_dataset.model_name,  
                (SELECT feature1, feature2 FROM my_project.my_dataset.test_data));



---

## ✅ **Dataflow & Dataproc** – Serverless Spark & Hadoop  
- **Dataflow**: Managed **Apache Beam** for real-time and batch processing.  
- **Dataproc**: Fully managed **Apache Spark & Hadoop** on Google Cloud.  

### **🔹 Example: Running a Data Pipeline on Dataflow**


In [None]:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()
p = beam.Pipeline(options=options)

(p | "Read Data" >> beam.io.ReadFromText("gs://your-bucket/data.csv")
   | "Transform Data" >> beam.Map(lambda x: x.upper())
   | "Write Output" >> beam.io.WriteToText("gs://your-bucket/output.txt"))

p.run()




---

# **3️⃣ Microsoft Azure AI**  

## ✅ **Azure Machine Learning Studio**  
Azure ML Studio provides a **drag-and-drop interface** and **Python SDK** for ML training and deployment.

### **🔹 Example: Train Model in Azure ML**


In [None]:

from azureml.core import Workspace, Experiment, ScriptRunConfig

# Connect to Azure ML Workspace
ws = Workspace.from_config()

# Define experiment
experiment = Experiment(ws, name="my_experiment")
config = ScriptRunConfig(source_directory=".", script="train.py")

# Run experiment
run = experiment.submit(config)
run.wait_for_completion(show_output=True)



---

## ✅ **Azure Databricks** – Cloud-Based Spark  
Azure Databricks is a **managed Spark environment** on Azure for **Big Data & ML**.

### **🔹 Example: Train a Spark ML Model in Azure Databricks**


In [None]:

from pyspark.sql import SparkSession
from pyspark.ml.classification import DecisionTreeClassifier

spark = SparkSession.builder.appName("Azure_Databricks").getOrCreate()
df = spark.read.csv("dbfs:/mnt/data.csv", header=True, inferSchema=True)

# Train Decision Tree
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
model = dt.fit(df)

# Predict
predictions = model.transform(df)
predictions.show()




---

## ✅ **ML Pipelines & MLOps in Azure**  
Azure provides **MLOps capabilities** using **Azure DevOps, ML Pipelines, and Model Monitoring**.

---

# **🛠 Hands-on Projects**  
✅ **Deploy an ML model using AWS SageMaker**  
✅ **Train an ML model in BigQuery ML**  
✅ **Build an MLOps pipeline using Azure ML**  

---


Let's break down **MLOps & Model Deployment at Scale** into detailed sections with explanations and code snippets.

---

## 🔹 **1. MLOps for Scalable ML**
### ✅ **1.1 CI/CD for ML (Automating ML Pipelines)**
MLOps integrates DevOps practices into ML workflows to automate and streamline model training, validation, deployment, and monitoring.

📌 **Key Steps in ML CI/CD Pipeline**  
1. **Data Versioning** – Store datasets with DVC or MLflow.  
2. **Model Training & Validation** – Train models using automated pipelines.  
3. **Model Packaging** – Convert models to deployable formats (Pickle, ONNX).  
4. **Model Deployment** – Deploy as APIs or in production environments.  
5. **Continuous Monitoring** – Track model drift and retrain models when needed.  

🛠 **Example: CI/CD with GitHub Actions & MLflow**  
```yaml
name: ML Pipeline

on: [push]

jobs:
  train:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout Repository
        uses: actions/checkout@v2
      
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.8'
      
      - name: Install Dependencies
        run: pip install -r requirements.txt
      
      - name: Train Model
        run: python train.py

      - name: Log Model with MLflow
        run: python log_model.py
```

---

### ✅ **1.2 Model Versioning & Monitoring (MLflow, Kubeflow)**
#### 📌 **Model Versioning with MLflow**
MLflow helps track experiments, store models, and deploy them.

🛠 **Example: Logging a Model in MLflow**


In [None]:

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier

mlflow.set_experiment("RandomForest_Experiment")

with mlflow.start_run():
    model = RandomForestClassifier(n_estimators=100)
    mlflow.sklearn.log_model(model, "random_forest_model")




#### 📌 **Model Monitoring & Drift Detection**
- **MLflow Tracking** logs metrics, parameters, and artifacts.  
- **Prometheus + Grafana** can track model latency and performance.  
- **Evidently AI** detects model drift.  

🛠 **Example: Monitor Data Drift with Evidently**


In [None]:

from evidently.test_suite import TestSuite
from evidently.tests import TestDataDrift

suite = TestSuite(tests=[TestDataDrift()])
suite.run(reference_data=ref_df, current_data=curr_df)
suite.show()




---

### ✅ **1.3 A/B Testing & Model Drift Detection**
- **A/B Testing** helps compare model versions in production.
- **Model Drift** happens when data distributions change over time.

🛠 **Example: A/B Testing API with FastAPI**


In [None]:

from fastapi import FastAPI
import random

app = FastAPI()

@app.get("/predict")
def predict():
    model_version = random.choice(["v1", "v2"])
    return {"model_version": model_version, "prediction": 0.85}




---

## 🔹 **2. Deploying ML Models at Scale**
### ✅ **2.1 Serverless Deployment (FastAPI, AWS Lambda)**
Serverless deployment makes ML models accessible via APIs.

🛠 **Example: Deploying a FastAPI Model**



In [None]:
from fastapi import FastAPI
import pickle

app = FastAPI()

model = pickle.load(open("model.pkl", "rb"))

@app.get("/predict")
def predict(x: float):
    return {"prediction": model.predict([[x]])[0]}



🛠 **Example: Deploy Model on AWS Lambda**


In [None]:

import json
import joblib

def lambda_handler(event, context):
    model = joblib.load("/tmp/model.pkl")
    input_data = json.loads(event['body'])
    prediction = model.predict([input_data['features']])
    return {"statusCode": 200, "body": json.dumps({"prediction": prediction.tolist()})}




---

### ✅ **2.2 Kubernetes for ML (Kubeflow)**
Kubernetes (K8s) helps in scaling ML models.

📌 **Kubeflow Features**
- **Pipeline Orchestration** – Automate workflows.  
- **Model Serving** – Deploy models using KFServing.  
- **Hyperparameter Tuning** – Use Katib for tuning.  

🛠 **Example: Deploy Model using KFServing**
```yaml
apiVersion: serving.kubeflow.org/v1alpha2
kind: InferenceService
metadata:
  name: my-model
spec:
  default:
    predictor:
      tensorflow:
        storageUri: "gs://my-bucket/my-model/"
```

---

### ✅ **2.3 Inference Optimization (ONNX, TensorRT)**
Optimizing models improves speed and efficiency.

#### 📌 **ONNX for Model Optimization**
ONNX enables cross-platform inference with optimized performance.

🛠 **Convert a PyTorch Model to ONNX**


In [None]:

import torch
import torchvision.models as models

model = models.resnet18(pretrained=True)
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(model, dummy_input, "resnet18.onnx")




#### 📌 **Optimize Inference with TensorRT**
NVIDIA TensorRT accelerates deep learning inference.

🛠 **Convert Model to TensorRT**


In [None]:

import tensorrt as trt

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
builder = trt.Builder(TRT_LOGGER)




---
