### **Deploying a Machine Learning Model as a REST API with Real-Time Streaming**

#### **Introduction**
You are a machine learning engineer at a fintech startup. Your team has developed a fraud detection model that predicts whether a transaction is fraudulent or not. Your task is to deploy this model as a REST API using **FastAPI**, containerize it with **Docker**, and set up real-time streaming for live predictions using **Kafka**. This project will allow your team to integrate the model into your live transaction processing system.

---

### **Step 1: Setting Up the Environment**
**Story:**  
You start by setting up your development environment. You decide to use **FastAPI** for building the REST API because of its speed and ease of use. You also plan to use **Docker** to containerize the application for easy deployment and scalability. Finally, you set up **Kafka** to handle real-time streaming of transaction data for live predictions.

**Mini Tasks:**
1. Install Python and create a virtual environment.
2. Install FastAPI, Uvicorn, and other required libraries (e.g., `pydantic`, `scikit-learn`, `kafka-python`).
3. Set up a local Kafka server using Docker (use the `confluentinc/cp-kafka` image).
4. Verify that Kafka is running by creating a test topic and producing/consuming messages.

---

### **Step 2: Building the Machine Learning Model**
**Story:**  
You already have a trained fraud detection model, but for this project, you decide to train a simple logistic regression model on a sample dataset (e.g., the [Credit Card Fraud Detection dataset](https://www.kaggle.com/mlg-ulb/creditcardfraud)). You save the trained model as a `.pkl` file for later use.

**Mini Tasks:**
1. Load the dataset and preprocess it (e.g., handle missing values, scale features).
2. Train a logistic regression model using scikit-learn.
3. Save the trained model as a `.pkl` file using `joblib` or `pickle`.

---

### **Step 3: Creating the REST API with FastAPI**
**Story:**  
You build a REST API using FastAPI that exposes an endpoint for making predictions. The API takes transaction data as input, loads the trained model, and returns the prediction (fraudulent or not).

**Mini Tasks:**
1. Create a FastAPI application with a `/predict` endpoint.
2. Load the trained model from the `.pkl` file when the API starts.
3. Define a Pydantic model for the input data (e.g., transaction amount, timestamp, features).
4. Implement the prediction logic in the `/predict` endpoint.
5. Test the API locally using Uvicorn and sample transaction data.

---

### **Step 4: Containerizing the Application with Docker**
**Story:**  
To make the API easy to deploy and scale, you containerize it using Docker. You create a `Dockerfile` that sets up the environment, installs dependencies, and runs the FastAPI application.

**Mini Tasks:**
1. Create a `Dockerfile` that:
   - Uses a base Python image (e.g., `python:3.9-slim`).
   - Installs dependencies from a `requirements.txt` file.
   - Copies the FastAPI application and model file into the container.
   - Exposes the API on port 8000.
2. Build the Docker image and run it locally.
3. Test the API inside the Docker container using sample transaction data.

---

### **Step 5: Setting Up Real-Time Streaming with Kafka**
**Story:**  
To enable real-time predictions, you set up Kafka to stream transaction data to the API. The API consumes messages from a Kafka topic, makes predictions, and writes the results to another Kafka topic.

**Mini Tasks:**
1. Create two Kafka topics: `transactions` (for incoming transaction data) and `predictions` (for prediction results).
2. Modify the FastAPI application to include a Kafka consumer and producer:
   - The consumer reads transaction data from the `transactions` topic.
   - The producer writes predictions to the `predictions` topic.
3. Test the real-time streaming setup by producing sample transaction data to the `transactions` topic and consuming predictions from the `predictions` topic.

---

### **Step 6: Deploying the Application**
**Story:**  
You deploy the Dockerized FastAPI application and Kafka setup to a cloud platform (e.g., AWS, Azure, or GCP) or run it locally for testing. You ensure that the API and Kafka are working together seamlessly to provide real-time predictions.

**Mini Tasks:**
1. Push the Docker image to a container registry (e.g., Docker Hub, Azure Container Registry).
2. Deploy the Docker container to a cloud service (e.g., AWS ECS, Azure Container Instances) or run it locally using Docker Compose.
3. Verify that the API and Kafka are working together by streaming live transaction data and checking the predictions.

---

### **Step 7: Testing and Monitoring**
**Story:**  
You test the entire system end-to-end to ensure it works as expected. You also set up basic monitoring to track the performance of the API and Kafka.

**Mini Tasks:**
1. Test the system by streaming a large batch of transaction data and verifying the predictions.
2. Set up logging in the FastAPI application to track incoming requests and predictions.
3. Use a tool like **Prometheus** or **Grafana** to monitor the API's performance and Kafka's message throughput.

---

### **Conclusion**
**Story:**  
You’ve successfully deployed a machine learning model as a REST API using FastAPI, containerized it with Docker, and set up real-time streaming for live predictions using Kafka. This system can now be integrated into your live transaction processing pipeline to detect fraudulent transactions in real time.

**Mini Tasks:**
1. Document the setup process and share it with your team.
2. Terminate any cloud resources (if used) to avoid unnecessary costs.

---

### **Bonus Challenge**
- Add authentication to the FastAPI endpoint using OAuth2 or API keys.
- Use **Kafka Streams** or **KSQL** to perform additional real-time processing on the predictions (e.g., aggregating fraud rates by hour).
- Deploy the system on a Kubernetes cluster for better scalability and management.

---

### **Tools and Technologies Used**
1. **FastAPI**: For building the REST API.
2. **Docker**: For containerizing the application.
3. **Kafka**: For real-time streaming of transaction data.
4. **Scikit-learn**: For training the machine learning model.
5. **Uvicorn**: For running the FastAPI application.
6. **Pydantic**: For validating input data.
7. **Kafka-Python**: For interacting with Kafka in Python.

---

This project provides a hands-on experience with deploying machine learning models, building REST APIs, containerization, and real-time streaming. It’s a great way to learn modern MLOps practices and tools.

# 🚀 3MTT AI/ML Final Project — Fraud Detection REST API with Streaming
**👨🏾‍💻 Submitted by:** Abdulrahman Adisa Amuda  
**📁 Module:** 7 - Machine Learning Deployment  
**🎯 Goal:** Deploy a fraud detection ML model as a REST API using FastAPI, Docker, and Kafka-style streaming simulation.

---

### 📘 Project Overview

As a machine learning engineer at a fintech startup, I was tasked to deploy a fraud detection model with:

- ✅ **FastAPI** to expose predictions via a `/predict` endpoint  
- ✅ **Docker** to containerize the app for deployment  
- ✅ **Kafka** (or simulated Kafka) to stream real-time transaction data  
- ✅ **Monitoring** (described in markdown) for real-world use

Due to Colab limitations, real server-based components (FastAPI/Docker/Kafka) are **written as real code** but **simulated locally** or provided as **standalone files** for external use.

---

### ⚙️ Tools & Technologies Used

- **scikit-learn** for model training  
- **FastAPI** (code written but not executed in Colab)  
- **Kafka-style simulation** using Python loops  
- **Dockerfile** and `requirements.txt` included  
- **Joblib** for saving model  
- **Colab notebook** for submission

---

📎 *This notebook includes both runnable code and production-ready files.*

In [1]:
# ✅ Step 1: Install Dependencies
!pip install -q scikit-learn pandas joblib

## ✅ Step 2: Build and Save the Fraud Detection ML Model

In this step, i simulate the credit card fraud dataset using synthetic data with features like:

- `age`
- `salary`
- `transaction_amount`

I train a logistic regression classifier and save:

- `fraud_model.pkl` → Trained model
- `scaler.pkl` → Feature scaler (StandardScaler)

These files will later be loaded inside the REST API.

In [None]:
# ✅ Step 2: Import Libraries & Build Dataset
import pandas as pd
import numpy as np
import joblib
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

# ✅ Simulate a Mini Dataset (you can also use Kaggle creditcard.csv offline)
data = pd.DataFrame({
    'age': [25, 45, 52, 37, 28, 33, 48],
    'salary': [30000, 50000, 70000, 42000, 25000, 39000, 60000],
    'transaction_amount': [1000, 3000, 5000, 2300, 1200, 1800, 4100],
    'is_fraud': [0, 1, 1, 0, 0, 0, 1]
})

X = data[['age', 'salary', 'transaction_amount']]
y = data['is_fraud']

# ✅ Feature Scaling
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# ✅ Train Logistic Regression
model = LogisticRegression()
model.fit(X_scaled, y)

# ✅ Save Model and Scaler
joblib.dump(model, 'fraud_model.pkl')
joblib.dump(scaler, 'scaler.pkl')

print("✅ Model and Scaler saved as fraud_model.pkl and scaler.pkl")

✅ Model and Scaler saved as fraud_model.pkl and scaler.pkl


✅ Step 3: Simulated FastAPI Prediction Logic
In this step, i simulate the logic behind a FastAPI /predict endpoint by writing a Python function that:

Loads the saved model and scaler
Accepts user inputs (age, salary, transaction_amount)
Returns the predicted result: "Fraudulent" or "Legit"
This prepares us for integration with a real FastAPI application later.

In [None]:
# ✅ Step 3: Define the Simulated /predict Endpoint Logic
def predict_transaction(age, salary, transaction_amount):
    # Load the model and scaler
    model = joblib.load('fraud_model.pkl')
    scaler = joblib.load('scaler.pkl')

    # Create input DataFrame
    input_df = pd.DataFrame([{
        'age': age,
        'salary': salary,
        'transaction_amount': transaction_amount
    }])

    # Scale input and predict
    input_scaled = scaler.transform(input_df)
    prediction = model.predict(input_scaled)[0]

    return "Fraudulent" if prediction == 1 else "Legit"

✅ Step 4: Testing API Logic + Simulated Real-Time Streaming
In this step:

I test the predict_transaction() function with a sample input.
Then simulate a real-time stream of transactions (like Kafka) using a Python loop.
This gives a feel for how real-time fraud prediction would look before implementing Kafka.

In [None]:
# ✅ Test the API with one transaction
test_input = {"age": 45, "salary": 50000, "transaction_amount": 3000}
result = predict_transaction(**test_input)
print(f"🧪 Single Transaction Test → {test_input} → 🧠 Prediction: {result}")

# ✅ Simulated Kafka-style Real-Time Streaming
import time

print("\n🚦 Simulating Real-Time Transaction Stream:\n")
sample_stream = [
    {"age": 28, "salary": 25000, "transaction_amount": 1200},
    {"age": 52, "salary": 70000, "transaction_amount": 5000},
    {"age": 33, "salary": 39000, "transaction_amount": 1800},
]

for txn in sample_stream:
    pred = predict_transaction(**txn)
    print(f"➡️ {txn} → 🧠 Prediction: {pred}")
    time.sleep(1)  # Simulate delay between transactions

print("\n✅ Real-Time Streaming Simulation Complete.")

🧪 Single Transaction Test → {'age': 45, 'salary': 50000, 'transaction_amount': 3000} → 🧠 Prediction: Fraudulent

🚦 Simulating Real-Time Transaction Stream:

➡️ {'age': 28, 'salary': 25000, 'transaction_amount': 1200} → 🧠 Prediction: Legit
➡️ {'age': 52, 'salary': 70000, 'transaction_amount': 5000} → 🧠 Prediction: Fraudulent
➡️ {'age': 33, 'salary': 39000, 'transaction_amount': 1800} → 🧠 Prediction: Legit

✅ Real-Time Streaming Simulation Complete.


In [None]:
%%writefile fraud_api.py
from fastapi import FastAPI
from pydantic import BaseModel
import pandas as pd
import joblib

app = FastAPI()

# Load the model and scaler
model = joblib.load("fraud_model.pkl")
scaler = joblib.load("scaler.pkl")

# Define input schema
class Transaction(BaseModel):
    age: int
    salary: float
    transaction_amount: float

# Predict route
@app.post("/predict")
def predict(data: Transaction):
    input_df = pd.DataFrame([data.dict()])
    input_scaled = scaler.transform(input_df)
    prediction = model.predict(input_scaled)[0]
    result = "Fraudulent" if prediction == 1 else "Legit"
    return {"prediction": result}

Overwriting fraud_api.py


In [None]:
!pip install fastapi uvicorn nest-asyncio pyngrok joblib pandas scikit-learn -q

In [None]:
from pyngrok import conf
conf.get_default().auth_token = "309Gv8gorHu8mS2ztEWyKIJf3vU_3Md13NWpaeLA9gSkTbe6t"

In [None]:
import nest_asyncio
from pyngrok import ngrok
import uvicorn

nest_asyncio.apply()

# Start public tunnel
public_url = ngrok.connect(8000)
print(f"🔗 Public FastAPI URL: {public_url}")

# Run FastAPI app
uvicorn.run("fraud_api:app", host="0.0.0.0", port=8000)

🔗 Public FastAPI URL: NgrokTunnel: "https://a5b0bd4839c6.ngrok-free.app" -> "http://localhost:8000"


INFO:     Started server process [16072]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     105.112.178.162:0 - "GET / HTTP/1.1" 404 Not Found
INFO:     105.112.178.162:0 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     105.112.178.162:0 - "GET /docs HTTP/1.1" 200 OK
INFO:     105.112.178.162:0 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     105.112.178.162:0 - "POST /predict HTTP/1.1" 200 OK


In [None]:
%%writefile requirements.txt
fastapi
uvicorn
scikit-learn
pandas
joblib

In [None]:
%%writefile Dockerfile
# Use a lightweight Python image
FROM python:3.9-slim

# Set working directory
WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy the FastAPI app and model files
COPY fraud_api.py .
COPY fraud_model.pkl .
COPY scaler.pkl .

# Expose the port FastAPI will run on
EXPOSE 8000

# Run the app using uvicorn
CMD ["uvicorn", "fraud_api:app", "--host", "0.0.0.0", "--port", "8000"]

## 🐳 Docker Containerization

To make the FastAPI fraud detection app portable and ready for deployment, we containerized it using Docker.

---

### ✅ Dockerfile

```Dockerfile
# Use a lightweight Python image
FROM python:3.9-slim

# Set working directory
WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy the FastAPI app and model files
COPY fraud_api.py .
COPY fraud_model.pkl .
COPY scaler.pkl .

# Expose the port FastAPI will run on
EXPOSE 8000

# Run the app using uvicorn
CMD ["uvicorn", "fraud_api:app", "--host", "0.0.0.0", "--port", "8000"]
```

---

### 🗂️ Project Structure

```
project/
│
├── fraud_api.py               # FastAPI prediction API  
├── fraud_model.pkl            # Trained logistic regression model  
├── scaler.pkl                 # Feature scaler  
├── requirements.txt           # App dependencies  
└── Dockerfile                 # Docker build instructions  
```

---

### 🚀 How to Build and Run the Docker Container

#### 1. Build the Docker image

```bash
docker build -t fraud-api .
```

#### 2. Run the Docker container

```bash
docker run -d -p 8000:8000 fraud-api
```

#### 3. Access the FastAPI documentation

Go to: [http://localhost:8000/docs](http://localhost:8000/docs)

You can now test the `/predict` endpoint using the Swagger UI.

In [2]:
import time
import pandas as pd
import joblib

# Load model and scaler
model = joblib.load("fraud_model.pkl")
scaler = joblib.load("scaler.pkl")

# Sample stream of transactions
transaction_stream = [
    {"age": 25, "salary": 30000, "transaction_amount": 800},
    {"age": 45, "salary": 50000, "transaction_amount": 3000},
    {"age": 38, "salary": 45000, "transaction_amount": 1500},
    {"age": 52, "salary": 70000, "transaction_amount": 5200},
]

print("🚀 Simulating real-time transaction stream...\n")

for txn in transaction_stream:
    input_df = pd.DataFrame([txn])
    input_scaled = scaler.transform(input_df)
    prediction = model.predict(input_scaled)[0]
    result = "Fraudulent" if prediction == 1 else "Legit"
    print(f"➡️ {txn} → 🧠 Prediction: {result}")
    time.sleep(0.5)  # simulate slight delay

print("\n✅ Streaming simulation complete.")

🚀 Simulating real-time transaction stream...

➡️ {'age': 25, 'salary': 30000, 'transaction_amount': 800} → 🧠 Prediction: Legit
➡️ {'age': 45, 'salary': 50000, 'transaction_amount': 3000} → 🧠 Prediction: Fraudulent
➡️ {'age': 38, 'salary': 45000, 'transaction_amount': 1500} → 🧠 Prediction: Legit
➡️ {'age': 52, 'salary': 70000, 'transaction_amount': 5200} → 🧠 Prediction: Fraudulent

✅ Streaming simulation complete.


## 🔄 Real-Time Streaming Simulation

To simulate real-time data streaming like Kafka, we used a Python loop with a time delay to mimic incoming transactions.

Each transaction is passed through the trained model using the same preprocessing pipeline (scaler + logistic regression) and is instantly classified as either **Fraudulent** or **Legit**.

### Sample Code

```python
import time
import pandas as pd
import joblib

model = joblib.load("fraud_model.pkl")
scaler = joblib.load("scaler.pkl")

transaction_stream = [
    {"age": 25, "salary": 30000, "transaction_amount": 800},
    {"age": 45, "salary": 50000, "transaction_amount": 3000},
    {"age": 38, "salary": 45000, "transaction_amount": 1500},
    {"age": 52, "salary": 70000, "transaction_amount": 5200},
]

print("🚀 Simulating real-time transaction stream...\n")

for txn in transaction_stream:
    input_df = pd.DataFrame([txn])
    input_scaled = scaler.transform(input_df)
    prediction = model.predict(input_scaled)[0]
    result = "Fraudulent" if prediction == 1 else "Legit"
    print(f"➡️ {txn} → 🧠 Prediction: {result}")
    time.sleep(0.5)

print("\n✅ Streaming simulation complete.")