Step 1: Setting Up the Environment

Story:

You start by setting up your development environment. You decide to use FastAPI for building the REST API because of its speed and ease of use. You also plan to use Docker to containerize the application for easy deployment and scalability. Finally, you set up Kafka to handle real-time streaming of transaction data for live predictions.

Mini Tasks:


Install Python and create a virtual environment.
Install FastAPI, Uvicorn, and other required libraries (e.g., pydantic, scikit-learn, kafka-python).
Set up a local Kafka server using Docker (use the confluentinc/cp-kafka image).
Verify that Kafka is running by creating a test topic and producing/consuming messages.


In [None]:
 # Install Python (3.10+) and create a virtual environment:

  ```bash
  python -m venv venv
  source venv/bin/activate  # or venv\Scripts\activate on Windows
  ```
# Install required Python libraries:

  ```bash
  pip install fastapi uvicorn[standard] pydantic scikit-learn kafka-python prometheus-fastapi-instrumentator joblib requests pandas
  ```
# Set up Kafka using Docker:

  ```yaml
  # Included in docker-compose.yml (see below)
  - Uses `confluentinc/cp-zookeeper` and `confluentinc/cp-kafka`
  ```
# Verify Kafka setup:

  ```bash
  docker-compose exec kafka bash
  kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic --partitions 1 --replication-factor 1
  kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic
  kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning

Step 2: Building the Machine Learning Model

Story:

You already have a trained fraud detection model, but for this project, you decide to train a simple logistic regression model on a sample dataset (e.g., the Credit Card Fraud Detection dataset). You save the trained model as a .pkl file for later use.

Mini Tasks:


Load the dataset and preprocess it (e.g., handle missing values, scale features).
Train a logistic regression model using scikit-learn.
Save the trained model as a .pkl file using joblib or pickle.

In [None]:

import kagglehub

# # Download latest version
path = kagglehub.dataset_download("mlg-ulb/creditcardfraud")

print("Path to dataset files:", path)

import pandas as pd
df = pd.read_csv('/kaggle/input/creditcardfraud/creditcard.csv')
df.head()

"""## Exploring Data"""

df.shape

df.info()

df.describe()

"""## Checking for Missing Values"""

df.isnull().sum()

"""## Handling duplicates"""

duplicate_values = df.duplicated().sum()
print(f'Number of duplicate rows: {duplicate_values}')

df = df.drop_duplicates()
print(f'Number of rows after dropping duplicates: {df.shape[0]}')

"""## Checking Dataset is Balanced or Imbalanced"""

classes=df['Class'].value_counts()
print(f'normal_trans ={classes[0]}')
print(f'fraud_trans ={classes[1]}')
print(f'percentage_normal_trans ={(classes[0] / df["Class"].count())*100:.2f}%')
print(f'percentage_fraud_trans ={(classes[1] / df["Class"].count())*100:.2f}%')

import matplotlib.pyplot as plt
import seaborn as sns
title=['normal_trans','fraud_trans']
value=[classes[0],classes[1]]
plt.figure(figsize=(5, 5))
bars = plt.bar(title, value, color='lightblue')
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width() / 2, yval + 20,
             str(int(yval)),
             ha='center', va='bottom', fontweight='bold')
plt.title('Class Distribution')
plt.xlabel('Transaction Type')
plt.ylabel('Count')
plt.show()

"""## Exploratory Data Analysis"""



"""### Correlation Matrix

Let's look at the correlation matrix to see how the features are related, especially with the 'Class' variable.
"""

import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 10))
sns.heatmap(df.corr(), cmap='coolwarm', annot=False)
plt.title('Correlation Matrix')
plt.show()

print(df.columns)

x = df.drop('Class', axis=1)#features
y=df['Class']#target
var=x.columns
fig, axes = plt.subplots(10, 3, figsize=(30, 45))
axes = axes.flatten()

for i, ax in enumerate(axes):
    sns.histplot(x[var[i]], ax=ax)
    ax.set_title(var[i])

plt.tight_layout()
plt.show()

print(x.columns)
print(x.shape)

"""### Skewness
When plotting the features, we observe that not all of them follow a normal distribution. To assess the skewness, we will use the skew() function. If any feature has a skew value, we will apply a PowerTransformer, which helps reduce skewness and transforms the data to follow a more normal distribution.
"""

skewness=x.skew()
skew=[]
for i in var:
  skew.append(x[i].skew())
skew_df = pd.DataFrame({'Features': var, 'Skewness': skew})
print(skew_df)

from sklearn.preprocessing import PowerTransformer
skewed_columns = skew_df.loc[abs(skew_df['Skewness']) > 1, 'Features']
print("Highly skewed columns:", list(skewed_columns))
pt = PowerTransformer(method='yeo-johnson', copy=False)
x[skewed_columns] = pt.fit_transform(x[skewed_columns])

"""### Scatter Plot of Time vs. Amount by Class

Let's create a scatter plot to visualize the relationship between 'Time' and 'Amount' for both normal and fraudulent transactions.
"""

plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='Time', y='Amount', hue='Class', alpha=0.6)
plt.title('Scatter Plot of Time vs. Amount by Class')
plt.xlabel('Time')
plt.ylabel('Amount')
plt.show()

"""## Outliers
In this dataset, there may be instances where the card is used for high-value transactions, which is why we cannot simply remove outliers.

## Normalization
"""

from sklearn.preprocessing import RobustScaler
scaler=RobustScaler()
x[['Amount']]=scaler.fit_transform(x[['Amount']])
x.head()

"""## SMOTE
My dataset was highly imbalanced, so I applied SMOTE to balance the data.
"""

from imblearn.over_sampling import SMOTE
from collections import Counter

print("Before SMOTE:", Counter(y))

smote = SMOTE(sampling_strategy=0.1, random_state=42)
X_resampled, y_resampled = smote.fit_resample(x, y)

print("After SMOTE:", Counter(y_resampled))

"""## StratifiedKFold
For imbalanced datasets, I use StratifiedKFold to ensure equal representation of all classes in each fold.
"""

from sklearn.model_selection import StratifiedKFold

skf = StratifiedKFold(n_splits=5, random_state=None, shuffle=False)

"""### Plotting Classification Metrics and Confusion Matrix

Now, let's visualize the performance of the Random Forest model by plotting the accuracy, ROC AUC, and the confusion matrix.
"""

import numpy as np
import warnings
warnings.filterwarnings('ignore')

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, StratifiedKFold, train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, roc_auc_score,
    confusion_matrix, ConfusionMatrixDisplay
)
from sklearn.inspection import PartialDependenceDisplay
import matplotlib.pyplot as plt
import joblib

# Stratified K-Fold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# ====== PIPELINE + GRID SEARCH (Feature selection + scaling + logistic regression) ====== #
pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("feature_selection", SelectKBest(score_func=f_classif, k='all')),
    ("logreg", LogisticRegression(random_state=42, solver='liblinear'))
])

param_grid = {
    'logreg__C': [0.01, 0.1, 1, 10, 100],
    'logreg__penalty': ['l1', 'l2']
}

grid_search = GridSearchCV(pipeline, param_grid, cv=skf, scoring='roc_auc', n_jobs=-1)
grid_search.fit(X_resampled, y_resampled)

best_model = grid_search.best_estimator_
print("✅ Best Hyperparameters:", grid_search.best_params_)

# ====== EVALUATE ON CROSS-VALIDATION ====== #
print(f"\n🔍 Cross-validating Best Logistic Regression Model")
accuracies, precisions, roc_aucs = [], [], []

for train_idx, val_idx in skf.split(X_resampled, y_resampled):
    X_train, X_val = X_resampled.iloc[train_idx], X_resampled.iloc[val_idx]
    y_train, y_val = y_resampled.iloc[train_idx], y_resampled.iloc[val_idx]

    best_model.fit(X_train, y_train)
    y_pred = best_model.predict(X_val)
    y_proba = best_model.predict_proba(X_val)[:, 1]

    accuracies.append(accuracy_score(y_val, y_pred))
    precisions.append(precision_score(y_val, y_pred))
    roc_aucs.append(roc_auc_score(y_val, y_proba))

# Print average metrics
print("===============================================")
print(f"📊 Mean Accuracy:  {np.mean(accuracies):.4f}")
print(f"📊 Mean Precision: {np.mean(precisions):.4f}")
print(f"📊 Mean ROC AUC:   {np.mean(roc_aucs):.4f}")
print("===============================================")

# ====== Save Model ====== #
filename = 'logistic_regression_model.joblib'
joblib.dump(best_model, filename)
print(f"📦 Model saved as: {filename}")

# ========== PLOT: Accuracy & ROC AUC ========== #
# Define the accuracy and ROC AUC values from the previous cell's output
accuracy_plot = np.mean(accuracies)
roc_auc_plot = np.mean(roc_aucs)

plt.figure(figsize=(8, 5))
metrics_names = ['Accuracy', 'ROC AUC']
metrics_values = [accuracy_plot, roc_auc_plot]
colors = ['skyblue', 'lightgreen']

bars = plt.bar(metrics_names, metrics_values, color=colors)
plt.ylabel('Score')
plt.title('Logistic Regression Model Performance (Test Set)')
plt.ylim(0, 1)

for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval + 0.01, f"{yval:.4f}", ha='center', va='bottom')

plt.tight_layout()
plt.show()

# ====== Confusion Matrix on Test Split ====== #
X_train_plot, X_test_plot, y_train_plot, y_test_plot = train_test_split(
    X_resampled, y_resampled, test_size=0.2, stratify=y_resampled, random_state=42
)

best_model.fit(X_train_plot, y_train_plot)
y_pred_test = best_model.predict(X_test_plot)
cm = confusion_matrix(y_test_plot, y_pred_test)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=best_model.classes_)

plt.figure(figsize=(6, 6))
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix: Logistic Regression (Test Set)')
plt.show()

# ====== Partial Dependence Plot ====== #
print("📈 Partial Dependence Plot: Logistic Regression")
features_to_plot = [0, 1, 2]  # Adjust these based on your dataset
PartialDependenceDisplay.from_estimator(best_model, X_resampled, features=features_to_plot)
plt.suptitle("Logistic Regression - Partial Dependence", y=1.02)
plt.tight_layout()
plt.show()





**Step 3: Creating the REST API with FastAPI**

**Story: **

You build a REST API using FastAPI that exposes an endpoint for making predictions. The API takes transaction data as input, loads the trained model, and returns the prediction (fraudulent or not).

**Mini Tasks:**


Create a FastAPI application with a /predict endpoint.
Load the trained model from the .pkl file when the API starts.
Define a Pydantic model for the input data (e.g., transaction amount, timestamp, features).
Implement the prediction logic in the /predict endpoint.
Test the API locally using Uvicorn and sample transaction data.

**Fastapi(main.py)**

In [None]:
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import pandas as pd
import joblib
import logging
import os

from prometheus_fastapi_instrumentator import Instrumentator

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI(
    title="Fraud Detection API",
    description="API that predicts whether a transaction is fraudulent or legitimate.",
    version="1.0.0"
)

# Instrumentator for Prometheus metrics
Instrumentator().instrument(app).expose(app)

# Define the expected feature names
FEATURE_NAMES = [
    "Time", "V1", "V2", "V3", "V4", "V5", "V6", "V7", "V8", "V9",
    "V10", "V11", "V12", "V13", "V14", "V15", "V16", "V17", "V18", "V19",
    "V20", "V21", "V22", "V23", "V24", "V25", "V26", "V27", "V28", "Amount"
]

# Define the Pydantic model
class TransactionInput(BaseModel):
    Time: float
    V1: float
    V2: float
    V3: float
    V4: float
    V5: float
    V6: float
    V7: float
    V8: float
    V9: float
    V10: float
    V11: float
    V12: float
    V13: float
    V14: float
    V15: float
    V16: float
    V17: float
    V18: float
    V19: float
    V20: float
    V21: float
    V22: float
    V23: float
    V24: float
    V25: float
    V26: float
    V27: float
    V28: float
    Amount: float

# Load model
MODEL_PATH = os.getenv("MODEL_PATH", "app/model/logistic_regression_model.joblib")

if not os.path.exists(MODEL_PATH):
    logger.error(f"Model file not found at {MODEL_PATH}")
    raise FileNotFoundError(f"Model file not found at {MODEL_PATH}")

try:
    model = joblib.load(MODEL_PATH)
    logger.info("Model loaded successfully.")
except Exception as e:
    logger.error(f"Error loading model: {e}")
    raise RuntimeError(f"Could not load model: {e}")

# Root endpoint
@app.get("/")
def read_root():
    return {"message": "Welcome! This API enables real-time fraud detection for financial transactions."}

# Health check endpoint
@app.get("/health")
def health_check():
    return {"status": "ok"}

# Prediction endpoint
@app.post("/predict")
def predict(transaction: TransactionInput, request: Request):
    logger.info(f"Received prediction request from {request.client.host}")

    try:
        # Convert the input data to a DataFrame
        input_data = pd.DataFrame(
            [[getattr(transaction, col) for col in FEATURE_NAMES]],
            columns=FEATURE_NAMES
        )
        # Make prediction
        prediction = model.predict(input_data)[0]
        probability = model.predict_proba(input_data)[0][1]
        result = "Fraudulent" if prediction == 1 else "Legitimate"

        logger.info(f"Prediction: {result}, Probability: {probability:.4f}")

        return {
            "prediction": result,
            "fraud_probability": round(probability, 4)
        }

    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise HTTPException(status_code=500, detail="Prediction failed. Check input data.")


**kafka_worker.py**

In [None]:
from kafka import KafkaConsumer, KafkaProducer
import json
import logging
import pandas as pd
import joblib
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("KafkaWorker")

KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka:9092")
TRANSACTIONS_TOPIC = "transactions"
PREDICTIONS_TOPIC = "predictions"

MODEL_PATH = os.getenv("MODEL_PATH", "app/model/logistic_regression_model.joblib")
FEATURE_NAMES = ["Time"] + [f"V{i}" for i in range(1, 29)] + ["Amount"]

try:
    model = joblib.load(MODEL_PATH)
    logger.info("✅ Model loaded.")
except Exception as e:
    logger.error(f"❌ Failed to load model: {e}")
    raise

def predict(features_dict):
    df = pd.DataFrame([[features_dict[col] for col in FEATURE_NAMES]], columns=FEATURE_NAMES)
    prediction = model.predict(df)[0]
    probability = model.predict_proba(df)[0][1]
    result = {
        "prediction": "Fraudulent" if prediction == 1 else "Legitimate",
        "fraud_probability": round(probability, 4)
    }
    return result

consumer = KafkaConsumer(
    TRANSACTIONS_TOPIC,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    auto_offset_reset="earliest",
    group_id="fraud-detector"
)

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda m: json.dumps(m).encode("utf-8")
)

logger.info("🔁 Kafka worker is now listening...")

try:
    for msg in consumer:
        data = msg.value
        features = data.get("features", data)
        logger.info(f"📥 Received: {features}")

        try:
            result = predict(features)
            output = {
                "input": features,
                "prediction": result["prediction"],
                "fraud_probability": result["fraud_probability"]
            }
            producer.send(PREDICTIONS_TOPIC, output)
            logger.info(f"📤 Sent prediction: {output}")
        except Exception as e:
            logger.error(f"❌ Prediction failed: {e}")
except KeyboardInterrupt:
    logger.warning("🛑 Worker stopped manually.")
finally:
    consumer.close()
    producer.flush()


**Step 4: Containerizing the Application with Docker**

**Story:**

To make the API easy to deploy and scale, you containerize it using Docker. You create a Dockerfile that sets up the environment, installs dependencies, and runs the FastAPI application.

**Mini Tasks:**


Create a Dockerfile that:
Uses a base Python image (e.g., python:3.9-slim).
Installs dependencies from a requirements.txt file.
Copies the FastAPI application and model file into the container.
Exposes the API on port 8000.
Build the Docker image and run it locally.
Test the API inside the Docker container using sample transaction data.

**Dockerfile**

In [None]:
# Use official Python image
FROM python:3.10-slim

# Set work directory
WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy app
COPY . .

# Expose port
EXPOSE 8000

# Default command (for API container)
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]


**docker-compose.yml**

In [None]:
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data
    healthcheck:
      test: ["CMD-SHELL", "echo ruok | nc localhost 2181"]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,DOCKER://kafka:29092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,DOCKER://0.0.0.0:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      # ✅ Reduce verbosity to WARN level
      KAFKA_LOG4J_ROOT_LOGLEVEL: "WARN"
      KAFKA_LOG4J_LOGGERS: "kafka.controller=WARN,kafka.producer.async.DefaultEventHandler=WARN,state.change.logger=WARN,kafka.request.logger=WARN"
    depends_on:
      zookeeper:
        condition: service_healthy
    volumes:
      - kafka-data:/var/lib/kafka/data
    healthcheck:
      test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"]
      interval: 10s
      timeout: 10s
      retries: 10

  topic-init:
    build:
      context: .
      dockerfile: Dockerfile
    command: ["python", "scripts/create_topics.py"]
    depends_on:
      kafka:
        condition: service_healthy
    environment:
      KAFKA_BROKER: kafka:29092
    volumes:
      - .:/app
    restart: "no"

  api:
    build: .
    container_name: fraud_api
    ports:
      - "8000:8000"
    depends_on:
      topic-init:
        condition: service_completed_successfully
    environment:
      KAFKA_BROKER: kafka:29092
      MODEL_PATH: app/model/logistic_regression_model.joblib
    command: ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
    volumes:
      - .:/app
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/docs"]
      interval: 10s
      timeout: 5s
      retries: 5
    labels:
      - "prometheus.scrape=true"
      - "prometheus.port=8000"

  worker:
    build: .
    container_name: fraud_worker
    depends_on:
      topic-init:
        condition: service_completed_successfully
    environment:
      KAFKA_BROKER: kafka:29092
      MODEL_PATH: app/model/logistic_regression_model.joblib
    command: ["python", "app/services/kafka_worker.py"]
    volumes:
      - .:/app
    restart: unless-stopped

  producer:
    build: .
    container_name: kafka_producer
    depends_on:
      kafka:
        condition: service_healthy
    environment:
      KAFKA_BROKER: kafka:29092
    command: ["python", "scripts/kafka_producer.py"]
    volumes:
      - .:/app
    restart: "no"

  consumer:
    build: .
    container_name: kafka_consumer
    depends_on:
      kafka:
        condition: service_healthy
    environment:
      KAFKA_BROKER: kafka:29092
    command: ["python", "scripts/kafka_consumer.py"]
    volumes:
      - .:/app
    restart: "no"

  api-tester:
    build: .
    depends_on:
      api:
        condition: service_healthy
    command: ["python", "scripts/api_stream_test.py"]
    volumes:
      - .:/app
    restart: "no"

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    depends_on:
      - api
    restart: unless-stopped

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    volumes:
      - grafana-data:/var/lib/grafana
    depends_on:
      - prometheus
    restart: unless-stopped

volumes:
  kafka-data:
  zookeeper-data:
  grafana-data:


**Step 5: Setting Up Real-Time Streaming with Kafka**

**Story:**

To enable real-time predictions, you set up Kafka to stream transaction data to the API. The API consumes messages from a Kafka topic, makes predictions, and writes the results to another Kafka topic.

**Mini Tasks:**


Create two Kafka topics: transactions (for incoming transaction data) and predictions (for prediction results).
Modify the FastAPI application to include a Kafka consumer and producer:
The consumer reads transaction data from the transactions topic.
The producer writes predictions to the predictions topic.
Test the real-time streaming setup by producing sample transaction data to the transactions topic and consuming predictions from the predictions topic.

**create_topics.py**

In [None]:
# scripts/create_topics.py

from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import KafkaError
import logging
import os
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("KafkaTopicSetup")

KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka:29092")
TOPICS = ["transactions", "predictions"]

MAX_RETRIES = 5
RETRY_DELAY = 5  # seconds

def create_topics():
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            admin = KafkaAdminClient(
                bootstrap_servers=KAFKA_BROKER,
                client_id="fraud_detection_admin"
            )

            existing_topics = admin.list_topics()
            topics_to_create = [
                NewTopic(name=topic, num_partitions=1, replication_factor=1)
                for topic in TOPICS if topic not in existing_topics
            ]

            if topics_to_create:
                admin.create_topics(new_topics=topics_to_create)
                logger.info(f"[+] Created topics: {[t.name for t in topics_to_create]}")
            else:
                logger.info("[✓] All required topics already exist.")
            break  # Success, exit loop

        except KafkaError as e:
            logger.warning(f"[!] Kafka not ready yet (attempt {attempt}): {e}")
            time.sleep(RETRY_DELAY)
        except Exception as e:
            logger.error(f"[!] Unexpected error: {e}")
            break

if __name__ == "__main__":
    create_topics()


**kafka_producer.py**

In [None]:
from confluent_kafka import Producer
import json

producer_conf = {
    'bootstrap.servers': 'localhost:9092'
}
producer = Producer(producer_conf)

sample_transaction = {
    "Time": 0.0,
    "V1": -1.36, "V2": -0.07, "V3": 2.53, "V4": 1.37, "V5": -0.33,
    "V6": 0.46, "V7": 0.23, "V8": 0.09, "V9": 0.36,
    "V10": 0.09, "V11": -0.55, "V12": -0.61, "V13": -0.99, "V14": -0.31,
    "V15": 1.46, "V16": -0.47, "V17": 0.20, "V18": 0.02, "V19": 0.40,
    "V20": 0.25, "V21": -0.01, "V22": 0.27, "V23": -0.11, "V24": 0.06,
    "V25": 0.12, "V26": -0.18, "V27": 0.13, "V28": -0.02,
    "Amount": 149.62
}

def delivery_report(err, msg):
    if err:
        print(f"[!] Delivery failed: {err}")
    else:
        print(f"[✓] Message delivered to {msg.topic()} [{msg.partition()}]")

producer.produce(
    topic="transactions",
    key="txn-1",
    value=json.dumps(sample_transaction),
    callback=delivery_report
)

producer.flush()


**kafka_consumer.py**

In [None]:
from confluent_kafka import Consumer
import json

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'prediction-group',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
consumer.subscribe(["predictions"])

print("[*] Waiting for prediction messages...\n")

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"[!] Error: {msg.error()}")
            continue

        prediction = json.loads(msg.value().decode("utf-8"))
        print("[✓] Prediction received:")
        print(json.dumps(prediction, indent=4))

except KeyboardInterrupt:
    print("\n[!] Stopped by user.")
finally:
    consumer.close()



Step 6: Deploying the Application

Story:

You deploy the Dockerized FastAPI application and Kafka setup to a cloud platform (e.g., AWS, Azure, or GCP) or run it locally for testing. You ensure that the API and Kafka are working together seamlessly to provide real-time predictions.


Mini Tasks:

Push the Docker image to a container registry (e.g., Docker Hub, Azure Container Registry).
Deploy the Docker container to a cloud service (e.g., AWS ECS, Azure Container Instances) or run it locally using Docker Compose.
Verify that the API and Kafka are working together by streaming live transaction data and checking the predictions.

**Build & Run All Services**

### ✅ Option A: Local Deployment with Docker Compose

In [None]:
# 🔧 1. Build & Run All Services

docker-compose up --build


# 🧪 2. Verify the Deployment
# API Health Check:

Visit: http://localhost:8000/docs

# Grafana Dashboard:

Visit: http://localhost:3000


# Prometheus:

Visit: http://localhost:9090


# Use queries like:

http_requests_total


# Streaming Test:

docker-compose run api-tester

### ✅ Option B: Cloud Deployment (AWS)

In [None]:
## ✅ Prerequisites

* AWS account
* AWS CLI installed and configured: `aws configure`
* Docker installed
* Docker Hub account
* Docker images for all services (API, worker, producer, consumer, tester) pushed to Docker Hub

## ① Push Docker Images to Docker Hub

docker login

# Push API
docker tag fraud_api yourdockerhub/fraud_api:latest
docker push yourdockerhub/fraud_api:latest

# Push Worker
docker tag fraud_worker yourdockerhub/fraud_worker:latest
docker push yourdockerhub/fraud_worker:latest

# Push Kafka Producer
docker tag kafka_producer yourdockerhub/kafka_producer:latest
docker push yourdockerhub/kafka_producer:latest

# Push Kafka Consumer
docker tag kafka_consumer yourdockerhub/kafka_consumer:latest
docker push yourdockerhub/kafka_consumer:latest

# Push API Tester
docker tag api_tester yourdockerhub/api_tester:latest
docker push yourdockerhub/api_tester:latest

## ② Create an ECS Cluster

aws ecs create-cluster --cluster-name fraud-detection-cluster

## ③ Create IAM Role for ECS Task Execution

1. Create the trust policy JSON file:

```json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "ecs-tasks.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Save it as `ecs-trust-policy.json`

2. Create the IAM role and attach permissions:

aws iam create-role \
  --role-name ecsTaskExecutionRole \
  --assume-role-policy-document file://ecs-trust-policy.json

aws iam attach-role-policy \
  --role-name ecsTaskExecutionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy

## ④ Create Task Definitions for Each Service

Use a similar format for each container. Here's an example for the API:

**`fraud-api-task.json`**

```json
{
  "family": "fraud-api-task",
  "networkMode": "awsvpc",
  "requiresCompatibilities": ["FARGATE"],
  "cpu": "256",
  "memory": "512",
  "executionRoleArn": "arn:aws:iam::<your_account_id>:role/ecsTaskExecutionRole",
  "containerDefinitions": [
    {
      "name": "fraud_api",
      "image": "yourdockerhub/fraud_api:latest",
      "portMappings": [
        {
          "containerPort": 8000,
          "protocol": "tcp"
        }
      ],
      "essential": true,
      "environment": [
        {
          "name": "KAFKA_BROKER",
          "value": "<your-msk-bootstrap-server>:9092"
        },
        {
          "name": "MODEL_PATH",
          "value": "app/model/logistic_regression_model.joblib"
        }
      ]
    }
  ]
}


Register the task:


aws ecs register-task-definition \
  --cli-input-json file://fraud-api-task.json


Repeat this for:

* `fraud_worker_task.json`
* `kafka_producer_task.json`
* `kafka_consumer_task.json`
* `api_tester_task.json`

## ⑤ Set Up Networking (VPC, Subnets, Security Groups)

Use the AWS Console or CLI to create:

* A new VPC with subnets in at least 2 AZs
* An Internet Gateway attached to the VPC
* Route table with access to the internet
* Security group allowing HTTP (port 80/8000) and internal Kafka ports

## ⑥ Deploy Each Task to Fargate

### One-time Task (e.g., Worker, Producer)

aws ecs run-task \
  --cluster fraud-detection-cluster \
  --launch-type FARGATE \
  --network-configuration 'awsvpcConfiguration={subnets=[subnet-abc123],securityGroups=[sg-abc123],assignPublicIp="ENABLED"}' \
  --task-definition fraud-worker-task

### Long-running Services (e.g., API)

aws ecs create-service \
  --cluster fraud-detection-cluster \
  --service-name fraud-api-service \
  --task-definition fraud-api-task \
  --desired-count 1 \
  --launch-type FARGATE \
  --network-configuration 'awsvpcConfiguration={subnets=[subnet-abc123],securityGroups=[sg-abc123],assignPublicIp="ENABLED"}'

Repeat for other services like the Kafka producer or consumer if long-running.

## ⑦ Optional: Load Balancer for API

Use an **Application Load Balancer (ALB)** to expose the FastAPI service to the internet:

* Target group: Fraud API ECS service
* Listener: Forward HTTP requests (port 80) to target group
* Add DNS record pointing to the ALB

## ⑧ Monitoring and Logs

Use **CloudWatch** to monitor logs:

aws logs describe-log-groups
aws logs get-log-events \
  --log-group-name /ecs/fraud-api-task \
  --log-stream-name <log-stream>

## ✅ Final Notes

* Use **Amazon MSK** for managed Kafka
* Use **AWS Secrets Manager** for credentials




Step 7: Testing and Monitoring

Story:

You test the entire system end-to-end to ensure it works as expected. You also set up basic monitoring to track the performance of the API and Kafka.

Mini Tasks:


Test the system by streaming a large batch of transaction data and verifying the predictions.
Set up logging in the FastAPI application to track incoming requests and predictions.
Use a tool like Prometheus or Grafana to monitor the API's performance and Kafka's message throughput.

**api_stream_test.py**

In [None]:
import requests
import pandas as pd
import time

# Load sample transaction data
data = pd.read_csv("data/sample_transactions.csv")

# API endpoint (running inside Docker, localhost works if API is mapped to host port 8000)
url = "http://localhost:8000/predict"

print(f"🚀 Starting transaction stream to {url} ...\n")

for idx, row in data.iterrows():
    payload = row.to_dict()

    try:
        response = requests.post(url, json=payload)
        response.raise_for_status()
        result = response.json()
        print(f"✅ Transaction {idx + 1}: Prediction = {result}")
    except requests.exceptions.RequestException as e:
        print(f"❌ Transaction {idx + 1} failed: {e}")

    time.sleep(0.2)  # Simulate delay between incoming transactions


**prometheus.yml**

In [None]:
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'fraud_api'
    static_configs:
      - targets: ['api:8000']


In [None]:
# Grafana Dashboard Setup
Open http://localhost:3000

Login with admin/admin

Add Prometheus as a data source (http://localhost:9090)

Create dashboards:

API Response Time

Prediction Count

**Conclusion**

**Story:**

You’ve successfully deployed a machine learning model as a REST API using FastAPI, containerized it with Docker, and set up real-time streaming for live predictions using Kafka. This system can now be integrated into your live transaction processing pipeline to detect fraudulent transactions in real time.


**Mini Tasks:**

Document the setup process and share it with your team.
Terminate any cloud resources (if used) to avoid unnecessary costs.

# Real-Time Fraud Detection System

## 🚀 Overview

A machine learning-powered real-time fraud detection system leveraging FastAPI, Kafka, Docker, and monitoring tools (Prometheus & Grafana). This project simulates live transaction streaming, prediction processing, and real-time monitoring.

## 📁 Project Structure

```
├── app/
│   ├── __pycache__/                  # Python bytecode cache
│   ├── data/
│   │   └── sample_transactions.csv   # Example data for testing
│   ├── model/
│   │   └── logistic_regression_model.joblib
│   ├── services/
│   │   ├── kafka_worker.py
│   │   └── main.py                   # FastAPI app
├── scripts/
│   ├── api_stream_test.py           # Script to test API stream
│   ├── create_topics.py             # Kafka topic creation
│   ├── kafka_consumer.py            # Kafka consumer logic
│   └── kafka_producer.py            # Kafka producer logic
├── venv/                            # Python virtual environment
├── prometheus.yml                   # Monitoring config
├── docker-compose.yml              # Multi-container setup
├── Dockerfile                      # App container definition
├── README.md
└── requirements.txt

```

## 📦 Environment Setup

### Required Tools

* Python 3.10+
* Docker & Docker Compose
* Kafka
* Prometheus & Grafana

### Python Dependencies

```bash
pip install fastapi uvicorn[standard] scikit-learn joblib pandas numpy kafka-python confluent-kafka prometheus-fastapi-instrumentator
```

### Docker Compose Setup

Includes services:

* `zookeeper`, `kafka`
* `api`, `worker`, `producer`, `consumer`
* `prometheus`, `grafana`


## 🧠 Model Development

* **Dataset**: [Credit Card Fraud - Kaggle](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud)
* **Preprocessing**:

  * Handle imbalance using SMOTE
  * Scale `Amount` with `RobustScaler`
  * Use `PowerTransformer` to reduce skew
* **Model**: Logistic Regression with `StandardScaler`, `SelectKBest`, `GridSearchCV`
* **Evaluation**: Confusion Matrix, ROC AUC, Precision, etc.

### Save Trained Model

```python
joblib.dump(model, "logistic_regression_model.joblib")
```

## 🌐 REST API (FastAPI)

### Endpoints

* `GET /` → Welcome
* `GET /health` → Health check
* `POST /predict` → Predict fraud

### Features

* Input validated using Pydantic
* Model loaded once at startup
* Returns fraud label + probability
* Prometheus metrics auto-exposed

### Sample Input

```json
{
  "Time": 0.0,
  "V1": -1.36,
  "V2": 0.57,
  ...
  "V28": -0.02,
  "Amount": 149.62
}
```

## 🐳 Docker Setup

### Dockerfile

```dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
```

### docker-compose.yml Highlights

* Health checks
* Volumes for persistence
* Inter-service networking


## 🔁 Kafka Streaming

### Topics

* `transactions` → Input stream
* `predictions` → Output results

### Kafka Roles

* **Producer**: Streams new transactions
* **Worker**: Subscribes to `transactions`, predicts, publishes to `predictions`
* **Consumer**: Reads `predictions` for logging/UI

### Key Scripts

```bash
python kafka/create_topics.py
python kafka/kafka_producer.py
python kafka/kafka_worker.py
python kafka/kafka_consumer.py
```


## ☁️ Deployment Options

### Option A: Local Deployment

```bash
docker-compose up --build
```

Access:

* FastAPI Swagger: [http://localhost:8000/docs](http://localhost:8000/docs)
* Prometheus: [http://localhost:9090](http://localhost:9090)
* Grafana: [http://localhost:3000](http://localhost:3000) (admin/admin)

### Option B: AWS ECS Fargate (Optional)

1. Push Docker images to Docker Hub
2. Create ECS cluster, VPC, roles
3. Define ECS Task Definitions per service
4. Setup Application Load Balancer
5. Deploy & monitor with CloudWatch


## 🧪 Testing and Monitoring

### Test Transactions

* Use `api_stream_test.py` to stream CSV transactions to `/predict`

### Logging

* Python `logging` captures API requests & predictions

### Prometheus

```yaml
scrape_configs:
  - job_name: 'fraud_api'
    static_configs:
      - targets: ['api:8000']
```

### Grafana

* Add Prometheus as Data Source
* Create Dashboard with:

  * Request Count
  * Fraud Rate
  * Response Latency


## ✅ Final Notes

* Scalable, real-time architecture for fraud detection
* Plug-and-play with different models
* Full observability with monitoring tools


## 🙏 Credits

* [Kaggle Credit Card Fraud Dataset](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud)
* FastAPI, Docker, Kafka, Prometheus, Grafana


Happy detecting! 🎯
