<a href="https://colab.research.google.com/github/aryannagar29/telecom-churn-elt/blob/main/etl_pipeline_churn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Telecom Churn ELT Pipeline
Simulated ELT pipeline with ingestion, transformation, anonymization, and analytics.
Runs fully in Google Colab using SQLite instead of Postgres.

In [1]:
# Step 1: mount Drive (optional but recommended) and load your CSV
import numpy as np
from pathlib import Path
import pandas as pd
import sys
import os
import hashlib
from sqlalchemy import create_engine
from google.colab import drive

In [2]:
# Step 1: Install gdown (only needed once in Colab or Jupyter)
!pip install gdown

# Step 2: Import libraries
import gdown

# Step 3: Paste your Google Drive share link here
file_url = 'https://drive.google.com/file/d/1GbKfKX8mptE7wjmPGBiAu0u45W-ORsHc/view?usp=drive_link'

# Step 4: Extract the file id and create direct download link
file_id = file_url.split('/d/')[1].split('/')[0]
download_url = f'https://drive.google.com/uc?id={file_id}'

# Step 5: Download the file
gdown.download(download_url, 'my_file.csv', quiet=False)

# Step 6: Read DataFrame
df = pd.read_csv('my_file.csv')

# Step 7: Verify
df.head()



Downloading...
From: https://drive.google.com/uc?id=1GbKfKX8mptE7wjmPGBiAu0u45W-ORsHc
To: /content/my_file.csv
100%|██████████| 58.7k/58.7k [00:00<00:00, 55.2MB/s]


Unnamed: 0,CustomerID,Age,Gender,Tenure,MonthlyCharges,ContractType,InternetService,TotalCharges,TechSupport,Churn
0,1,49,Male,4,88.35,Month-to-Month,Fiber Optic,353.4,Yes,Yes
1,2,43,Male,0,36.67,Month-to-Month,Fiber Optic,0.0,Yes,Yes
2,3,51,Female,2,63.79,Month-to-Month,Fiber Optic,127.58,No,Yes
3,4,60,Female,8,102.34,One-Year,DSL,818.72,Yes,Yes
4,5,42,Male,32,69.01,Month-to-Month,,2208.32,No,Yes


In [3]:
df.head()

Unnamed: 0,CustomerID,Age,Gender,Tenure,MonthlyCharges,ContractType,InternetService,TotalCharges,TechSupport,Churn
0,1,49,Male,4,88.35,Month-to-Month,Fiber Optic,353.4,Yes,Yes
1,2,43,Male,0,36.67,Month-to-Month,Fiber Optic,0.0,Yes,Yes
2,3,51,Female,2,63.79,Month-to-Month,Fiber Optic,127.58,No,Yes
3,4,60,Female,8,102.34,One-Year,DSL,818.72,Yes,Yes
4,5,42,Male,32,69.01,Month-to-Month,,2208.32,No,Yes


In [4]:
df.shape

(1000, 10)

In [5]:
df.columns.tolist()

['CustomerID',
 'Age',
 'Gender',
 'Tenure',
 'MonthlyCharges',
 'ContractType',
 'InternetService',
 'TotalCharges',
 'TechSupport',
 'Churn']

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 10 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   CustomerID       1000 non-null   int64  
 1   Age              1000 non-null   int64  
 2   Gender           1000 non-null   object 
 3   Tenure           1000 non-null   int64  
 4   MonthlyCharges   1000 non-null   float64
 5   ContractType     1000 non-null   object 
 6   InternetService  703 non-null    object 
 7   TotalCharges     1000 non-null   float64
 8   TechSupport      1000 non-null   object 
 9   Churn            1000 non-null   object 
dtypes: float64(2), int64(3), object(5)
memory usage: 78.3+ KB


In [7]:
# Missing value summary
missing_df = pd.concat(
    [df.isnull().sum(), df.isnull().mean()*100],
    axis=1
)
missing_df.columns = ['missing_count','missing_pct']
missing_df = missing_df.sort_values(by='missing_count', ascending=False)

missing_df.head()

Unnamed: 0,missing_count,missing_pct
InternetService,297,29.7
CustomerID,0,0.0
Age,0,0.0
Gender,0,0.0
MonthlyCharges,0,0.0


In [8]:
# Quick check for potential PII columns
possible_pii = ['email','phone','first_name','last_name','name','fullname']
present_pii = [c for c in possible_pii if c in df.columns]
if present_pii:
    print("\nDetected potential PII columns:", present_pii)
    for c in present_pii:
        print(f"-- sample values for {c}:")
        display(df[c].dropna().astype(str).head(10))
else:
    print("\n No standard PII columns detected ")


 No standard PII columns detected 


In [9]:
# === Remove duplicates
before_rows = df.shape[0]
df = df.drop_duplicates()
print(f"Removed {before_rows - df.shape[0]} duplicate rows.")

Removed 0 duplicate rows.


In [10]:
# === Handle missing values & # Fill numeric columns with median
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
    df[col] = df[col].fillna(df[col].median())

In [11]:
# Fill categorical columns with mode
categorical_cols = df.select_dtypes(exclude=[np.number]).columns
for col in categorical_cols:
    if df[col].isnull().any():
        df[col] = df[col].fillna(df[col].mode()[0])

print("Missing values handled.")

Missing values handled.


In [12]:
# === Anonymize PII ===
def mask_email(email):
    if pd.isnull(email):
        return email
    parts = email.split('@')
    return parts[0][0] + "***@" + parts[1]

def mask_phone(phone):
    if pd.isnull(phone):
        return phone
    digits = ''.join(filter(str.isdigit, str(phone)))
    return "***-***-" + digits[-4:] if len(digits) >= 4 else "***"

def mask_name(name):
    if pd.isnull(name):
        return name
    return name[0] + "***"

for col in df.columns:
    if "email" in col.lower():
        df[col] = df[col].apply(mask_email)
    elif "phone" in col.lower():
        df[col] = df[col].apply(mask_phone)
    elif any(x in col.lower() for x in ["name", "firstname", "lastname"]):
        df[col] = df[col].apply(mask_name)

print("PII anonymization done.")

PII anonymization done.


In [13]:
# === Standardize category text ===
for col in categorical_cols:
    df[col] = df[col].astype(str).str.strip().str.lower()

print("Category text standardized.")

Category text standardized.


In [14]:
# === Save cleaned version to Drive ===
output_path = "/content/cleaned_customer_churn.csv"
df.to_csv(output_path, index=False)
print(f"Cleaned dataset saved to {output_path}")

Cleaned dataset saved to /content/cleaned_customer_churn.csv


#####################################################################################################################

In [15]:
df.head()

Unnamed: 0,CustomerID,Age,Gender,Tenure,MonthlyCharges,ContractType,InternetService,TotalCharges,TechSupport,Churn
0,1,49,male,4,88.35,month-to-month,fiber optic,353.4,yes,yes
1,2,43,male,0,36.67,month-to-month,fiber optic,0.0,yes,yes
2,3,51,female,2,63.79,month-to-month,fiber optic,127.58,no,yes
3,4,60,female,8,102.34,one-year,dsl,818.72,yes,yes
4,5,42,male,32,69.01,month-to-month,fiber optic,2208.32,no,yes


In [16]:
import psycopg2
import pandas as pd


# 1️ Clean column names
df.columns = [col.strip() for col in df.columns]   # remove spaces
df = df.loc[:, ~df.columns.duplicated()]           # remove duplicate cols

# 2️ Connect to NeonDB
conn = psycopg2.connect(
    host="ep-restless-tree-aeol0h46-pooler.c-2.us-east-2.aws.neon.tech",
    dbname="neondb",
    user="neondb_owner",
    password="npg_XnV0aD8uEhoY",
    sslmode="require"
)
cur = conn.cursor()

# 3️ Drop old table if it exists
cur.execute("DROP TABLE IF EXISTS customer_churn;")

# 4️ Create table dynamically
create_table_query = """
CREATE TABLE customer_churn (
    {}
);
""".format(", ".join([f'"{col}" TEXT' for col in df.columns]))
cur.execute(create_table_query)

# 5️ Insert data
for _, row in df.iterrows():
    cur.execute(
        f'INSERT INTO customer_churn VALUES ({", ".join(["%s"] * len(row))})',
        tuple(row.astype(str))
    )

conn.commit()
print("Data uploaded successfully to NeonDB!")

cur.close()
conn.close()


Data uploaded successfully to NeonDB!


In [17]:
# Step 5: Verify data in NeonDB
import pandas as pd
import psycopg2

# Connect again (reuse your connection details)
conn = psycopg2.connect(
    host="ep-restless-tree-aeol0h46-pooler.c-2.us-east-2.aws.neon.tech",
    dbname="neondb",
    user="neondb_owner",
    password="npg_XnV0aD8uEhoY",
    sslmode="require"
)

# Query first 5 rows
query = "SELECT * FROM customer_churn LIMIT 5;"
df_check = pd.read_sql(query, conn)

conn.close()

# Display results
df_check

  df_check = pd.read_sql(query, conn)


Unnamed: 0,CustomerID,Age,Gender,Tenure,MonthlyCharges,ContractType,InternetService,TotalCharges,TechSupport,Churn
0,1,49,male,4,88.35,month-to-month,fiber optic,353.4,yes,yes
1,2,43,male,0,36.67,month-to-month,fiber optic,0.0,yes,yes
2,3,51,female,2,63.79,month-to-month,fiber optic,127.58,no,yes
3,4,60,female,8,102.34,one-year,dsl,818.72,yes,yes
4,5,42,male,32,69.01,month-to-month,fiber optic,2208.32,no,yes


In [18]:
#6: Transform data (Clean + Bulk Load)
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.types import BigInteger, Integer, Float, Text

# Create SQLAlchemy engine
engine = create_engine(
    "postgresql+psycopg2://neondb_owner:npg_XnV0aD8uEhoY@ep-restless-tree-aeol0h46-pooler.c-2.us-east-2.aws.neon.tech/neondb?sslmode=require"
)

In [19]:
# 1. Load data from staging table
df = pd.read_sql("SELECT * FROM customer_churn;", engine)

# 2. Ensure numeric conversion
df['Tenure'] = pd.to_numeric(df['Tenure'], errors='coerce').fillna(0).astype(int)
df['TotalCharges'] = pd.to_numeric(df['TotalCharges'], errors='coerce').fillna(0)

# 3. Create AvgChargesPerMonth safely
df['AvgChargesPerMonth'] = df.apply(
    lambda x: x['TotalCharges'] / x['Tenure'] if x['Tenure'] > 0 else 0, axis=1
)

# 4. Map Yes/No to 1/0
df['TechSupport'] = df['TechSupport'].map({'Yes': 1, 'No': 0}).fillna(0).astype(int)
df['Churn'] = df['Churn'].map({'Yes': 1, 'No': 0}).fillna(0).astype(int)

# 5. Drop duplicates
df = df.drop_duplicates()

# 6. Save transformed data (replace if exists)
df.to_sql(
    "customer_churn_transformed",
    engine,
    if_exists="replace",
    index=False,
    dtype={
        "CustomerID": BigInteger(),
        "Age": Integer(),
        "Gender": Text(),
        "Tenure": Integer(),
        "MonthlyCharges": Float(),
        "ContractType": Text(),
        "InternetService": Text(),
        "TotalCharges": Float(),
        "TechSupport": Integer(),
        "Churn": Integer(),
        "AvgChargesPerMonth": Float()
    }
)

print("Transformed data saved to NeonDB as 'customer_churn_transformed'.")


Transformed data saved to NeonDB as 'customer_churn_transformed'.


In [20]:
import pandas as pd

df_check = pd.read_sql("SELECT * FROM customer_churn_transformed LIMIT 5", engine)

df_check.head()

Unnamed: 0,CustomerID,Age,Gender,Tenure,MonthlyCharges,ContractType,InternetService,TotalCharges,TechSupport,Churn,AvgChargesPerMonth
0,1,49,male,4,88.35,month-to-month,fiber optic,353.4,0,0,88.35
1,2,43,male,0,36.67,month-to-month,fiber optic,0.0,0,0,0.0
2,3,51,female,2,63.79,month-to-month,fiber optic,127.58,0,0,63.79
3,4,60,female,8,102.34,one-year,dsl,818.72,0,0,102.34
4,5,42,male,32,69.01,month-to-month,fiber optic,2208.32,0,0,69.01


In [21]:

# 2: Mount Google Drive

drive.mount('/content/drive')

# 3: Define file path

file_path = '/content/drive/MyDrive/telecom_elt/customer_churn_data.csv'

# Checking if file exists
if not os.path.exists(file_path):
    raise FileNotFoundError(f"File not found: {file_path}\n"
                            "Please check the path or file name in Google Drive.")




Mounted at /content/drive


In [22]:
# 4: taking CSV data
df = pd.read_csv(file_path)
print(f" Loaded dataset with {df.shape[0]} rows and {df.shape[1]} columns.")


# 5: Fill missing values
for col in df.columns:
    if df[col].dtype in ['float64', 'int64']:
        df[col] = df[col].fillna(df[col].mean())  # Fill numeric with mean
    else:
        df[col] = df[col].fillna("Unknown")  # Fill categorical with "Unknown"

# 6: Anonymize PII

def hash_id(x):
    return hashlib.sha256(x.encode()).hexdigest()[:10]  # Short hash

if 'customerID' in df.columns:
    df['customerID'] = df['customerID'].astype(str).apply(hash_id)


# 7: Save to NeonDB

engine = create_engine(
    "postgresql+psycopg2://neondb_owner:npg_XnV0aD8uEhoY@ep-restless-tree-aeol0h46-pooler.c-2.us-east-2.aws.neon.tech/neondb"
)

df.to_sql("customer_churn_reporting", engine, if_exists="replace", index=False)

print(" Data transformation complete and saved to 'customer_churn_reporting' table in NeonDB.")


 Loaded dataset with 1000 rows and 10 columns.
 Data transformation complete and saved to 'customer_churn_reporting' table in NeonDB.


In [23]:
# Run a SELECT query from NeonDB
query = "SELECT * FROM customer_churn_reporting LIMIT 10;"
result_df = pd.read_sql(query, engine)

result_df


Unnamed: 0,CustomerID,Age,Gender,Tenure,MonthlyCharges,ContractType,InternetService,TotalCharges,TechSupport,Churn
0,1,49,Male,4,88.35,Month-to-Month,Fiber Optic,353.4,Yes,Yes
1,2,43,Male,0,36.67,Month-to-Month,Fiber Optic,0.0,Yes,Yes
2,3,51,Female,2,63.79,Month-to-Month,Fiber Optic,127.58,No,Yes
3,4,60,Female,8,102.34,One-Year,DSL,818.72,Yes,Yes
4,5,42,Male,32,69.01,Month-to-Month,Unknown,2208.32,No,Yes
5,6,42,Female,16,119.75,Two-Year,DSL,1916.0,Yes,Yes
6,7,60,Male,14,80.32,One-Year,Unknown,1124.48,No,Yes
7,8,52,Female,6,58.9,One-Year,Unknown,353.4,No,Yes
8,9,40,Female,53,49.81,Two-Year,Fiber Optic,2639.93,Yes,No
9,10,50,Female,10,61.55,Month-to-Month,Fiber Optic,615.5,Yes,Yes


In [24]:
from sqlalchemy import text

def etl_pipeline():
    print(f"Running ETL at {datetime.now()}...")

    # 1️ Extract
    url = "https://raw.githubusercontent.com/ybifoundation/Dataset/main/Telco%20Customer%20Churn.csv"
    df = pd.read_csv(url)

    # 2️ Transform
    df.columns = [c.strip() for c in df.columns]  # Remove whitespace
    df = df.rename(columns={"customerID": "customerID"})  # Keep same name for clarity
    df["TotalCharges"] = pd.to_numeric(df["TotalCharges"], errors="coerce")
    df.dropna(inplace=True)

    # 3️ Load — drop and recreate table
    with engine.connect() as conn:
        conn.execute(text("DROP TABLE IF EXISTS customer_churn_reporting"))

    df.to_sql("customer_churn_reporting", engine, index=False, if_exists="replace")

    print("ETL pipeline completed successfully!")

In [25]:
!pip install schedule

Collecting schedule
  Downloading schedule-1.2.2-py3-none-any.whl.metadata (3.8 kB)
Downloading schedule-1.2.2-py3-none-any.whl (12 kB)
Installing collected packages: schedule
Successfully installed schedule-1.2.2


In [31]:
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine, text


engine = create_engine("sqlite:///churn.db")

def etl_pipeline():
    try:
        print(f"\n[INFO] Running ETL at {datetime.now()}...")

        TABLE_NAME = "customer_churn_reporting"

        # 1: Read CSV
        print("[INFO] Reading CSV from GitHub...")
        url = "https://raw.githubusercontent.com/IBM/telco-customer-churn-on-icp4d/master/data/Telco-Customer-Churn.csv"
        df = pd.read_csv(url)
        print(f"[INFO] Loaded {len(df)} rows from CSV.")

        # Clean column names
        df.columns = [col.strip().replace(" ", "_").lower() for col in df.columns]
        df["totalcharges"] = pd.to_numeric(df["totalcharges"], errors="coerce")

        with engine.connect() as conn:
            # 2: Create table if not exists
            print(f"[INFO] Ensuring table '{TABLE_NAME}' exists...")
            conn.execute(text(f"""
                CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
                    customerid TEXT PRIMARY KEY,
                    gender TEXT,
                    seniorcitizen INT,
                    partner TEXT,
                    dependents TEXT,
                    tenure INT,
                    phoneservice TEXT,
                    multiplelines TEXT,
                    internetservice TEXT,
                    onlinesecurity TEXT,
                    onlinebackup TEXT,
                    deviceprotection TEXT,
                    techsupport TEXT,
                    streamingtv TEXT,
                    streamingmovies TEXT,
                    contract TEXT,
                    paperlessbilling TEXT,
                    paymentmethod TEXT,
                    monthlycharges FLOAT,
                    totalcharges FLOAT,
                    churn TEXT
                )
            """))

            # 3: Get existing IDs
            print("[INFO] Fetching existing customer IDs...")
            existing_ids = pd.read_sql(text(f"SELECT customerid FROM {TABLE_NAME}"), conn)["customerid"].tolist()

            # 4: Find new rows
            new_rows = df[~df["customerid"].isin(existing_ids)]
            print(f"[INFO] Found {len(new_rows)} new rows to insert.")

            # 5: Insert new rows
            if not new_rows.empty:
                new_rows.to_sql(TABLE_NAME, engine, if_exists="append", index=False)
                print(f"[SUCCESS] Inserted {len(new_rows)} new rows.")
            else:
                print("[INFO] No new rows to insert.")

    except Exception as e:
        print(f"[ERROR] {str(e)}")

# Running function
etl_pipeline()



[INFO] Running ETL at 2025-08-13 13:02:03.401362...
[INFO] Reading CSV from GitHub...
[INFO] Loaded 7043 rows from CSV.
[INFO] Ensuring table 'customer_churn_reporting' exists...
[INFO] Fetching existing customer IDs...
[INFO] Found 7043 new rows to insert.
[SUCCESS] Inserted 7043 new rows.


In [33]:
# Handle schema drift automatically
TABLE_NAME = "customer_churn_reporting"
db_columns = pd.read_sql(f'SELECT * FROM {TABLE_NAME} LIMIT 0', engine).columns

# Filter DataFrame to match DB schema
df = df[[col for col in df.columns if col in db_columns]]

In [34]:
# Add logging
import logging

logging.basicConfig(
    filename="etl.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
def etl_pipeline():
    try:
        logging.info("ETL started")
        # your existing ETL code here...

        logging.info(f"Inserted {len(new_rows)} new rows.")
    except Exception as e:
        logging.error(f"ETL failed: {e}", exc_info=True)