                                        Project Title: Customer Churn Prediction Pipeline

Importing the libraries that are required

In [1]:
from faker import Faker
import pandas as pd
import random
import io
import datetime

import boto3
from botocore.exceptions import NoCredentialsError

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

import nest_asyncio
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import uvicorn

                                            Step 1 - Data Collection (Simulated Data Source)
We’ll generate synthetic customer and transaction data using the Faker library.

In [3]:
# Initialize Faker
fake = Faker()

# Generate synthetic customer data
def generate_customer_data(num_records=1000):
    data = []
    for _ in range(num_records):
        data.append({
            "customer_id": fake.uuid4(),
            "name": fake.name(),
            "age": random.randint(18, 70),
            "signup_date": fake.date_between(start_date='-2y', end_date='today'),
            "subscription_type": random.choice(["Basic", "Standard", "Premium"]),
            "last_active_date": fake.date_between(start_date='-1y', end_date='today'),
            "is_churned": random.choice([0, 1])
        })
    return pd.DataFrame(data)

# Save to CSV
customer_data = generate_customer_data()
customer_data.to_csv("customers.csv", index=False)

                                                        Step 2 - Data Ingestion 
Ingest the data into a cloud storage bucket (e.g., AWS S3) or local database.

In [5]:
# AWS Configuration
AWS_ACCESS_KEY = "AKIA47GB76RSZX7GIJGS"
AWS_SECRET_KEY = "X8yoEZgVBXyN1tkWJQvtCYGyI26C5yce0l+MzINA"
RAW_BUCKET = "ng-test-csv"
PROCESSED_BUCKET = "ng-test-csv-processed"

CSV_FILE_PATH = "customers.csv"  # Local CSV file path
S3_OBJECT_NAME = "customers.csv"  # Desired S3 path
FILE_KEY = "transformed/customers.csv"  # Path to the cleaned data in S3

# Initialize S3 client
s3_client = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name='eu-north-1'
)

In [7]:
# Function to Upload File
def upload_to_s3(file_path, bucket_name, object_name):
    try:
        s3_client.upload_file(file_path, bucket_name, object_name)
        print(f"File uploaded successfully to s3://{bucket_name}/{object_name}")
    except FileNotFoundError:
        print("File not found. Check the file path.")
    except NoCredentialsError:
        print("AWS credentials not available.")

# Upload the file
upload_to_s3(CSV_FILE_PATH, RAW_BUCKET, S3_OBJECT_NAME)

File uploaded successfully to s3://ng-test-csv/customers.csv


                                                            Step 3 -  ETL Pipeline
Extracts data from the database. Transforms it into clean, analytics-ready data. Loads it into a data warehouse like Snowflake or BigQuery or AWS

In [10]:
# Testing S3 Connection and files
try:
    response = s3_client.list_objects_v2(Bucket=RAW_BUCKET)
    print("Files in the bucket:")
    for obj in response.get("Contents", []):
        print(obj["Key"])
except Exception as e:
    print(f"Error connecting to S3: {e}")

Files in the bucket:
customers.csv


In [12]:
def extract():
    response = s3_client.get_object(Bucket=RAW_BUCKET, Key="customers.csv")
    raw_data = pd.read_csv(io.BytesIO(response["Body"].read()))
    raw_data.to_csv("tmp/extracted_data.csv", index=False)
    print("Extracted data saved locally at /tmp/extracted_data.csv")
    return raw_data

# Extract data from S3
extracted_data = extract()
print(extracted_data.head())

def transform():
    df = pd.read_csv("tmp/extracted_data.csv")
    df["days_since_last_active"] = (datetime.datetime.now() - pd.to_datetime(df["last_active_date"])).dt.days
    df.to_csv("tmp/transformed_data.csv", index=False)
    print("Transformed data saved locally at /tmp/transformed_data.csv")
    return df

# Tranform the data as per requirement
transformed_data = transform()
print(transformed_data.head())

def load():
    transformed_data = pd.read_csv("tmp/transformed_data.csv")
    csv_buffer = io.StringIO()
    transformed_data.to_csv(csv_buffer, index=False)
    s3_client.put_object(Bucket=PROCESSED_BUCKET, Key="transformed/customers.csv", Body=csv_buffer.getvalue())
    print("Transformed data uploaded to S3: s3://retail-sales-processed-data/transformed/customers.csv")

# Load the transformed data to S3 bucket
load()

Extracted data saved locally at /tmp/extracted_data.csv
                            customer_id              name  age signup_date  \
0  a6c62624-8034-41da-9ba3-90cad8178e2c   Jessica Baldwin   31  2023-03-17   
1  3a11ed03-03c9-4241-a7b2-4c455691669f       Paige Adams   62  2024-05-12   
2  7cce5e08-139e-4e90-8b13-a86a35917be4   Sarah Cervantes   31  2023-08-25   
3  bccf4e18-4b74-4b9a-a437-f5b6aaf04541  Hannah Underwood   57  2024-03-10   
4  e69813d2-01d9-47cc-bf41-1d11dadbb1bf    Anthony Arnold   35  2024-04-10   

  subscription_type last_active_date  is_churned  
0          Standard       2024-08-13           0  
1          Standard       2024-03-24           0  
2          Standard       2024-08-14           1  
3             Basic       2024-11-18           1  
4          Standard       2024-08-01           0  
Transformed data saved locally at /tmp/transformed_data.csv
                            customer_id              name  age signup_date  \
0  a6c62624-8034-41da-9ba3-90ca

                                                    Step 4 - Feature Engineering and Modeling
Prepare features and train a machine learning model for churn prediction.

In [15]:
# Load cleaned data from S3
response = s3_client.get_object(Bucket=PROCESSED_BUCKET, Key=FILE_KEY)
df = pd.read_csv(io.BytesIO(response["Body"].read()))

# Feature selection
X = df[["age", "days_since_last_active"]]
y = df["is_churned"]

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a Random Forest model
clf = RandomForestClassifier()
clf.fit(X_train, y_train)

# Evaluate
y_pred = clf.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")

Accuracy: 0.495


In [17]:
#Save the trained model
joblib.dump(clf, "churn_model.pkl")

['churn_model.pkl']

                                                            Step 5 - Model Deployment
Deploy the churn prediction model using FastAPI.

In [20]:


# Apply nest_asyncio to enable FastAPI to run inside Jupyter
nest_asyncio.apply()

# Define input model for the POST request
class ChurnInput(BaseModel):
    age: int
    days_since_last_active: int

# Initialize FastAPI app
app = FastAPI()

# Load the churn prediction model (ensure this file is present)
model = joblib.load("churn_model.pkl")

# Define the home route
@app.get("/")
def home():
    return {"message": "Churn Prediction API is running!"}

# Define the prediction route (POST method)
@app.post("/predict")
def predict(data: ChurnInput):
    df = pd.DataFrame([data.model_dump()])
    prediction = model.predict(df)
    return {"churn_prediction": int(prediction[0])}

# Start the FastAPI server using uvicorn (without 'reload')
uvicorn.run(app, host="127.0.0.1", port=8001)


INFO:     Started server process [50380]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)


INFO:     127.0.0.1:59145 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:59394 - "POST /predict HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [50380]


                                                                Step 6 - Visualization
Build a real-time dashboard using Plotly Dash to display churn insights.

In [23]:
import dash
from dash import dcc, html
import pandas as pd

app = dash.Dash(__name__)

# Load data
# df = pd.read_sql("SELECT * FROM cleaned_customers", con=engine)
response = s3_client.get_object(Bucket=PROCESSED_BUCKET, Key=FILE_KEY)
df = pd.read_csv(io.BytesIO(response["Body"].read()))

app.layout = html.Div([
    html.H1("Customer Churn Insights"),
    dcc.Graph(
        figure={
            "data": [
                {"x": df["subscription_type"], "y": df["is_churned"], "type": "bar", "name": "Churned"}
            ],
            "layout": {"title": "Churn by Subscription Type"}
        }
    )
])

if __name__ == "__main__":
    app.run_server(debug=True)