<a href="https://colab.research.google.com/github/BHARATH077/ETL_Customer_Behavior/blob/main/ETL_Customer_Behavior.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL Pipeline for Customer Behavior Analytics

# Data Ingestion

In [60]:
# Install Dependencies
# Install Spark + DuckDB
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark duckdb pandas seaborn matplotlib




In [61]:
# Verify Spark and DuckBD

import pyspark
from pyspark.sql import SparkSession
import duckdb

# Start Spark session
spark = SparkSession.builder.appName("ETL_Project").getOrCreate()
print("Spark Version:", spark.version)

# Connect to DuckDB (in-memory for now)
con = duckdb.connect(":memory:")
print("DuckDB connected:", con)


Spark Version: 3.5.1
DuckDB connected: <duckdb.duckdb.DuckDBPyConnection object at 0x7bc8ac52d230>


In [62]:
# Prepare Sample Data
# Since we don’t have real APIs/DBs yet, I’ll mock some CSV & JSON files directly in Colab (later we can replace them).

import pandas as pd
import json

# Clickstream CSV
clickstream_data = {
    "session_id": [1,2,3,4,5],
    "customer_id": [101,102,101,103,104],
    "page_viewed": ["Home","Product","Cart","Home","Checkout"],
    "timestamp": ["2024-01-01 10:00","2024-01-01 10:05","2024-01-01 10:10","2024-01-02 12:00","2024-01-02 12:15"]
}
pd.DataFrame(clickstream_data).to_csv("clickstream.csv", index=False)

# Transactions CSV
transactions_data = {
    "transaction_id": [1001,1002,1003],
    "customer_id": [101,102,104],
    "amount": [250.0, 100.0, 75.0],
    "timestamp": ["2024-01-01 11:00","2024-01-01 11:05","2024-01-02 12:30"]
}
pd.DataFrame(transactions_data).to_csv("transactions.csv", index=False)

# CRM JSON
crm_data = [
    {"customer_id":101, "name":"Alice","segment":"Premium"},
    {"customer_id":102, "name":"Bob","segment":"Standard"},
    {"customer_id":103, "name":"Charlie","segment":"Standard"},
    {"customer_id":104, "name":"Diana","segment":"Premium"}
]
with open("crm.json","w") as f:
    json.dump(crm_data,f)

print("Raw data recreated ✅")


Raw data recreated ✅


In [63]:
# Read Raw Data Files
import pandas as pd
import json

# Read Clickstream CSV
clickstream_df = pd.read_csv("clickstream.csv")
print("Clickstream:")
print(clickstream_df.head())

# Read Transactions CSV
transactions_df = pd.read_csv("transactions.csv")
print("\nTransactions:")
print(transactions_df.head())

# Read CRM JSON
with open("crm.json") as f:
    crm_df = pd.json_normalize(json.load(f))
print("\nCRM:")
print(crm_df.head())


Clickstream:
   session_id  customer_id page_viewed         timestamp
0           1          101        Home  2024-01-01 10:00
1           2          102     Product  2024-01-01 10:05
2           3          101        Cart  2024-01-01 10:10
3           4          103        Home  2024-01-02 12:00
4           5          104    Checkout  2024-01-02 12:15

Transactions:
   transaction_id  customer_id  amount         timestamp
0            1001          101   250.0  2024-01-01 11:00
1            1002          102   100.0  2024-01-01 11:05
2            1003          104    75.0  2024-01-02 12:30

CRM:
   customer_id     name   segment
0          101    Alice   Premium
1          102      Bob  Standard
2          103  Charlie  Standard
3          104    Diana   Premium


In [64]:
# Load into DuckDB
import duckdb

# Create DuckDB connection
con = duckdb.connect(database=':memory:')

# Load Pandas DataFrames into DuckDB
con.register("clickstream_df", clickstream_df)
con.register("transactions_df", transactions_df)
con.register("crm_df", crm_df)

# Create DuckDB tables
con.execute("CREATE TABLE clickstream AS SELECT * FROM clickstream_df")
con.execute("CREATE TABLE transactions AS SELECT * FROM transactions_df")
con.execute("CREATE TABLE crm AS SELECT * FROM crm_df")

# Verify
print(con.execute("SHOW TABLES").fetchdf())


              name
0      clickstream
1   clickstream_df
2              crm
3           crm_df
4     transactions
5  transactions_df


In [65]:
# Run First Warehouse Queries

# Count clickstream sessions
print(con.execute("SELECT COUNT(*) as sessions FROM clickstream").fetchdf())

# Check total transactions
print(con.execute("SELECT COUNT(*) as transactions FROM transactions").fetchdf())

# Join CRM with Transactions (quick test)
query = """
SELECT c.customer_id, c.name, c.segment, SUM(t.amount) as total_spend
FROM crm c
LEFT JOIN transactions t
ON c.customer_id = t.customer_id
GROUP BY c.customer_id, c.name, c.segment
"""
print(con.execute(query).fetchdf())


   sessions
0         5
   transactions
0             3
   customer_id     name   segment  total_spend
0          101    Alice   Premium        250.0
1          104    Diana   Premium         75.0
2          103  Charlie  Standard          NaN
3          102      Bob  Standard        100.0


# Data Cleaning & Transformation with PySpark

In [66]:
# Install and import Spark

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, upper

# Start Spark session
spark = SparkSession.builder.appName("ETL_Cleaning").getOrCreate()




In [67]:
# Load Data into Spark
# Load CSVs into Spark DataFrames
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, upper

spark = SparkSession.builder.appName("ETL_Cleaning").getOrCreate()

clickstream_spark = spark.read.csv("clickstream.csv", header=True, inferSchema=True)
transactions_spark = spark.read.csv("transactions.csv", header=True, inferSchema=True)
crm_spark = spark.read.json("crm.json")

# Load JSON CRM into Spark
crm_spark = spark.read.json("crm.json")

print("Clickstream:")
clickstream_spark.show()
print("Transactions:")
transactions_spark.show()
print("CRM:")
crm_spark.show()


Clickstream:
+----------+-----------+-----------+-------------------+
|session_id|customer_id|page_viewed|          timestamp|
+----------+-----------+-----------+-------------------+
|         1|        101|       Home|2024-01-01 10:00:00|
|         2|        102|    Product|2024-01-01 10:05:00|
|         3|        101|       Cart|2024-01-01 10:10:00|
|         4|        103|       Home|2024-01-02 12:00:00|
|         5|        104|   Checkout|2024-01-02 12:15:00|
+----------+-----------+-----------+-------------------+

Transactions:
+--------------+-----------+------+-------------------+
|transaction_id|customer_id|amount|          timestamp|
+--------------+-----------+------+-------------------+
|          1001|        101| 250.0|2024-01-01 11:00:00|
|          1002|        102| 100.0|2024-01-01 11:05:00|
|          1003|        104|  75.0|2024-01-02 12:30:00|
+--------------+-----------+------+-------------------+

CRM:
+-----------+-------+--------+
|customer_id|   name| segment|

In [68]:
# Basic Cleaning

# --- Step 2: Cleaning / Transformation ---
clickstream_spark = clickstream_spark.withColumn("timestamp", to_timestamp("timestamp")).na.drop(subset=["customer_id"])
transactions_spark = transactions_spark.withColumn("timestamp", to_timestamp("timestamp")).na.drop(subset=["customer_id"])
crm_spark = crm_spark.withColumn("segment", upper(col("segment"))).na.drop(subset=["customer_id"])

# --- Step 3: Convert Spark DF back to Pandas ---
clickstream_clean = clickstream_spark.toPandas()
transactions_clean = transactions_spark.toPandas()
crm_clean = crm_spark.toPandas()


In [69]:
# Save Cleaned Data
# --- Step 4: Register Pandas DF into DuckDB ---
import duckdb
con = duckdb.connect(":memory:")

con.register("clickstream_clean", clickstream_clean)
con.register("transactions_clean", transactions_clean)
con.register("crm_clean", crm_clean)

print("DuckDB tables registered ✅")
print(con.execute("SHOW TABLES").fetchdf())


DuckDB tables registered ✅
                 name
0   clickstream_clean
1           crm_clean
2  transactions_clean


In [70]:
# Verify Cleaning

query = """
SELECT c.customer_id, c.name, c.segment, COUNT(s.session_id) as sessions, SUM(t.amount) as total_spend
FROM crm_clean c
LEFT JOIN clickstream_clean s ON c.customer_id = s.customer_id
LEFT JOIN transactions_clean t ON c.customer_id = t.customer_id
GROUP BY c.customer_id, c.name, c.segment
"""
print(con.execute(query).fetchdf())


   customer_id     name   segment  sessions  total_spend
0          104    Diana   PREMIUM         1         75.0
1          103  Charlie  STANDARD         1          NaN
2          101    Alice   PREMIUM         2        500.0
3          102      Bob  STANDARD         1        100.0


# Transformations for Analytics

In [71]:
import duckdb

# Connect to DuckDB in-memory
con = duckdb.connect(database=':memory:')

# Register Pandas DataFrames into DuckDB
con.register("clickstream_clean", clickstream_clean)
con.register("transactions_clean", transactions_clean)
con.register("crm_clean", crm_clean)



<duckdb.duckdb.DuckDBPyConnection at 0x7bc8a77eb1f0>

# Creating Feature tables

In [72]:
# Sessions per Customer (Engagement)

sessions_query = """
SELECT customer_id, COUNT(DISTINCT session_id) AS session_count
FROM clickstream_clean
GROUP BY customer_id
"""
sessions_df = con.execute(sessions_query).fetchdf()
print(sessions_df)


   customer_id  session_count
0          102              1
1          101              2
2          103              1
3          104              1


In [73]:
# Purchases and Spend per Customer

spend_query = """
SELECT customer_id, COUNT(DISTINCT transaction_id) AS purchase_count, SUM(amount) AS total_spend
FROM transactions_clean
GROUP BY customer_id
"""
spend_df = con.execute(spend_query).fetchdf()
print(spend_df)


   customer_id  purchase_count  total_spend
0          102               1        100.0
1          101               1        250.0
2          104               1         75.0


In [74]:
# Most Viewed Pages per Customer

page_query = """
SELECT customer_id, page_viewed, COUNT(*) AS views
FROM clickstream_clean
GROUP BY customer_id, page_viewed
QUALIFY ROW_NUMBER() OVER(PARTITION BY customer_id ORDER BY views DESC) = 1
"""
page_df = con.execute(page_query).fetchdf()
print(page_df)


   customer_id page_viewed  views
0          102     Product      1
1          104    Checkout      1
2          101        Home      1
3          103        Home      1


In [76]:
# Define subqueries as strings
sessions_query = """
SELECT customer_id, COUNT(DISTINCT session_id) AS session_count
FROM clickstream_clean
GROUP BY customer_id
"""

spend_query = """
SELECT customer_id, COUNT(DISTINCT transaction_id) AS purchase_count, SUM(amount) AS total_spend
FROM transactions_clean
GROUP BY customer_id
"""

page_query = """
SELECT customer_id, page_viewed, COUNT(*) AS views
FROM clickstream_clean
GROUP BY customer_id, page_viewed
QUALIFY ROW_NUMBER() OVER(PARTITION BY customer_id ORDER BY views DESC) = 1
"""

# Combine using f-string to inject subqueries
customer360_query = f"""
SELECT c.customer_id, c.name, c.segment,
       COALESCE(s.session_count, 0) AS sessions,
       COALESCE(sp.purchase_count, 0) AS purchases,
       COALESCE(sp.total_spend, 0) AS spend,
       COALESCE(p.page_viewed, 'N/A') AS top_page
FROM crm_clean c
LEFT JOIN ({sessions_query}) s ON c.customer_id = s.customer_id
LEFT JOIN ({spend_query}) sp ON c.customer_id = sp.customer_id
LEFT JOIN ({page_query}) p ON c.customer_id = p.customer_id
"""

# Execute query
customer360 = con.execute(customer360_query).fetchdf()
print(customer360)


   customer_id     name   segment  sessions  purchases  spend  top_page
0          101    Alice   PREMIUM         2          1  250.0      Home
1          102      Bob  STANDARD         1          1  100.0   Product
2          104    Diana   PREMIUM         1          1   75.0  Checkout
3          103  Charlie  STANDARD         1          0    0.0      Home


In [77]:
# Save Outputs

customer360.to_csv("customer360_day4.csv", index=False)
