# Customer Segmentation Data Pipeline

## 1.Data Ingestion

##### Read CSV

#### Validate schema (column names, types)

#### Log basic stats

In [26]:
import numpy as np
import pandas as pd
import os
import sqlite3
# from sqlalchemy import create_engine, text

In [27]:
raw_df = pd.read_csv('D:/GitHub/Customer Segmentation Data Pipeline/data//raw/Customer_churn4.csv')

In [28]:
raw_df.head(5)

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [29]:
raw_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7043 entries, 0 to 7042
Data columns (total 21 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   customerID        7043 non-null   object 
 1   gender            7043 non-null   object 
 2   SeniorCitizen     7043 non-null   int64  
 3   Partner           7043 non-null   object 
 4   Dependents        7043 non-null   object 
 5   tenure            7043 non-null   int64  
 6   PhoneService      7043 non-null   object 
 7   MultipleLines     7043 non-null   object 
 8   InternetService   7043 non-null   object 
 9   OnlineSecurity    7043 non-null   object 
 10  OnlineBackup      7043 non-null   object 
 11  DeviceProtection  7043 non-null   object 
 12  TechSupport       7043 non-null   object 
 13  StreamingTV       7043 non-null   object 
 14  StreamingMovies   7043 non-null   object 
 15  Contract          7043 non-null   object 
 16  PaperlessBilling  7043 non-null   object 


## 2.Data Validation and Profiling

### Check for:

#### Missing values

#### Duplicates

#### Inconsistent types

#### Unique values in categorical fields

In [30]:
raw_df.isnull().sum()

customerID          0
gender              0
SeniorCitizen       0
Partner             0
Dependents          0
tenure              0
PhoneService        0
MultipleLines       0
InternetService     0
OnlineSecurity      0
OnlineBackup        0
DeviceProtection    0
TechSupport         0
StreamingTV         0
StreamingMovies     0
Contract            0
PaperlessBilling    0
PaymentMethod       0
MonthlyCharges      0
TotalCharges        0
Churn               0
dtype: int64

### No missing values

In [31]:
raw_df.duplicated().sum()

np.int64(0)

### No duplicate values

### Value distribution in categorical features

In [32]:
for col in raw_df.select_dtypes(include='object').columns:
    print(f'{col}, unique values: {raw_df[col].nunique()}')

customerID, unique values: 7043
gender, unique values: 2
Partner, unique values: 2
Dependents, unique values: 2
PhoneService, unique values: 2
MultipleLines, unique values: 3
InternetService, unique values: 3
OnlineSecurity, unique values: 3
OnlineBackup, unique values: 3
DeviceProtection, unique values: 3
TechSupport, unique values: 3
StreamingTV, unique values: 3
StreamingMovies, unique values: 3
Contract, unique values: 3
PaperlessBilling, unique values: 2
PaymentMethod, unique values: 4
TotalCharges, unique values: 6531
Churn, unique values: 2


### Total Charges feature supposed to be a float data type instead of object

## 3.Data Cleaning

### Ensure clean, consistent, structured data for downstream users.

#### Handle missing values

#### Convert types

#### Normalize inconsistent string formats

#### Drop or flag invalid rows

In [33]:
clean_df = raw_df.copy()

In [34]:
clean_df['TotalCharges'] = pd.to_numeric(clean_df['TotalCharges'], errors='coerce')

### There are 11 missing values in Total Charges feature

In [35]:
clean_df['TotalCharges'].isnull().sum()

np.int64(11)

### Filling missing values with 0

In [36]:
clean_df['TotalCharges'].fillna(0, inplace=True)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  clean_df['TotalCharges'].fillna(0, inplace=True)


In [37]:
clean_df['TotalCharges'].isnull().sum()

np.int64(0)

In [38]:
clean_df = clean_df.drop_duplicates()
clean_df.to_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/processed/cleaned_df.csv', index=False)

## 4.Feature Standardization

### Prepare clean, encoded, and standardized features for downstream teams.

#### Encode binary values

#### Map categorical values

#### Normalize column names

In [39]:
processed_df = clean_df.copy()

In [48]:
# Remane columns
processed_df.columns = processed_df.columns.str.strip().str.lower().str.replace(' ', '_')

# Convert binary fields
binary_map = {'Yes': 1, 'No': 0}
processed_df['partner'] = processed_df['partner'].map(binary_map)
processed_df['dependents'] = processed_df['dependents'].map(binary_map)
processed_df['churn'] = processed_df['churn'].map(binary_map)

In [49]:
processed_df.to_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/processed/processed_df.csv', index=False)

## 5.Data Modeling

#### Fact Table: fact_customer_activity
#### Contains transactional or measurable data.

#### Dimension Tables:
#### These store attributes that describe customers or services.

In [56]:
# Fact table
fact_df = processed_df[[
    'customerid', 'tenure', 'monthlycharges', 'totalcharges', 'churn'
]]
fact_df.to_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/fact_df.csv', index=False)

# Customer dimension
dim_customer = processed_df[[
    'customerid', 'gender', 'seniorcitizen', 'partner', 'dependents'
]]
dim_customer.to_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/dim_customer_df.csv', index=False)

# Services dimension
dim_services = processed_df[[
    'customerid', 'phoneservice', 'multiplelines', 'internetservice',
    'onlinesecurity', 'onlinebackup', 'deviceprotection', 'techsupport',
    'streamingtv', 'streamingmovies'
]]
dim_services.to_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/dim_service_df.csv', index=False)

# Subscription dimension
dim_subscription = processed_df[[
    'customerid', 'contract', 'paperlessbilling', 'paymentmethod'
]]
dim_subscription.to_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/dim_subscription_df.csv', index=False)

## 6.Load to SQL DataBase

In [None]:
# Load DataFrames
fact = pd.read_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/fact_df.csv')
dim_customer = pd.read_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/dim_customer_df.csv')
dim_services = pd.read_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/dim_service_df.csv')
dim_subscription = pd.read_csv('D:/GitHub/Customer Segmentation Data Pipeline/data/fact_dim_tables/dim_subscription_df.csv')

In [67]:
# Connect SQLite database
con = sqlite3.connect('D:/GitHub/Customer Segmentation Data Pipeline/database/telco_churn.db')
cur = con.cursor()

In [68]:
# Write tables to SQLite
fact.to_sql('fact_table', con, if_exists='replace', index=False)
dim_customer.to_sql('dim_customer_table', con, if_exists='replace', index=False)
dim_services.to_sql('dim_services_table', con, if_exists='replace', index=False)
dim_subscription.to_sql('dim_subscription_table', con, if_exists='replace', index=False)
con.commit()

print("✅ Data loaded into telco_churn.db successfully.")

✅ Data loaded into telco_churn.db successfully.


### Query

In [69]:
cur.execute('''
SELECT * FROM fact_table LIMIT 5
            ''')
rows = cur.fetchall()
con.close()

for row in rows:
    print(row)

('7590-VHVEG', 1, 29.85, 29.85, 0)
('5575-GNVDE', 34, 56.95, 1889.5, 0)
('3668-QPYBK', 2, 53.85, 108.15, 1)
('7795-CFOCW', 45, 42.3, 1840.75, 0)
('9237-HQITU', 2, 70.7, 151.65, 1)
