In [1]:
!pip install kaggle
!mkdir -p ~/.kaggle
!cp /home/Personal_Projects/Databricks/work/.kaggle/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json




In [2]:
#Download the dataset with Kaggle CLI
!kaggle datasets download -d blastchar/telco-customer-churn -p /home/Personal_Projects/Databricks/work/datasets
!unzip -o /home/Personal_Projects/Databricks/work/datasets/telco-customer-churn.zip -d /home/Personal_Projects/Databricks/work/datasets


Dataset URL: https://www.kaggle.com/datasets/blastchar/telco-customer-churn
License(s): copyright-authors
telco-customer-churn.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  /home/Personal_Projects/Databricks/work/datasets/telco-customer-churn.zip
  inflating: /home/Personal_Projects/Databricks/work/datasets/WA_Fn-UseC_-Telco-Customer-Churn.csv  


In [1]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .appName("TelcoChurnLakehouse") \
    .master("local[*]") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config(
        "spark.jars",
        "/opt/spark-jars/hadoop-aws-3.3.4.jar,/opt/spark-jars/aws-java-sdk-bundle-1.12.340.jar"
    ) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.sql.warehouse.dir", "s3a://databricks-one/telco/catalog/") \
    .enableHiveSupport() \
    .getOrCreate()


In [4]:
#installing the boto3
!pip install boto3


Collecting boto3
  Downloading boto3-1.40.34-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.41.0,>=1.40.34 (from boto3)
  Downloading botocore-1.40.34-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.15.0,>=0.14.0 (from boto3)
  Downloading s3transfer-0.14.0-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.40.34-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.40.34-py3-none-any.whl (14.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.0/14.0 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.14.0-py3-none-any.whl (85 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.7/85.7 kB[0m [31m4.8 MB/s

In [5]:
#upload csv file into s3 bucket


import boto3

s3 = boto3.client(
    's3',
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

datapath = "/home/Personal_Projects/Databricks/work/datasets/WA_Fn-UseC_-Telco-Customer-Churn.csv"
s3.upload_file(datapath, "databricks-one", "raw/telco.csv")

#create a database for raw data
spark.sql("""
CREATE DATABASE IF NOT EXISTS raw_data
LOCATION 's3a://databricks-one/telco/raw/'
""")

#read csv file from s3

csv_s3_path = "s3a://databricks-one/raw/telco.csv"

raw_df = spark.read.csv(csv_s3_path, header=True, inferSchema=True)
raw_df.show(10)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [6]:
###Create 3 Schemas

# Bronze schema
spark.sql("""
CREATE DATABASE IF NOT EXISTS bronze
LOCATION 's3a://databricks-one/telco/bronze/'
""")

# Silver schema
spark.sql("""
CREATE DATABASE IF NOT EXISTS silver
LOCATION 's3a://databricks-one/telco/silver/'
""")

# Gold schema
spark.sql("""
CREATE DATABASE IF NOT EXISTS gold
LOCATION 's3a://databricks-one/telco/gold/'
""")

#Verify S3 access
spark.sql("SHOW DATABASES").show()

+-------------+
|    namespace|
+-------------+
|       bronze|
|      default|
|         gold|
|     raw_data|
|       silver|
|telco_catalog|
+-------------+



# Bronze layer

In [7]:
#convert the raw data into delta and store in s3
bronze_parquet_path = "s3a://databricks-one/telco/bronze/telco_churn_parquet"

raw_df.write \
    .mode("overwrite") \
    .parquet(bronze_parquet_path)



In [8]:
#Register as Hive table in Bronze database and Register External Table in Bronze Schema
spark.sql("USE bronze")
spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS bronze.telco_churn_parquet
USING PARQUET
LOCATION '{bronze_parquet_path}'
""")

#query the hive sql

spark.sql("SELECT * FROM bronze.telco_churn_parquet LIMIT 10").show()
spark.sql("SHOW DATABASES").show()
spark.sql("SHOW TABLES IN bronze").show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

# SILVER LAYER

In [9]:

from pyspark.sql.functions import col, trim, when

# ==========================
# Step 1: Read from Bronze Hive Table
# ==========================
# (Assuming bronze.telco_churn_parquet already exists as an external table)
bronze_df = spark.table("bronze.telco_churn_parquet")

# ==========================
# Step 2: Silver transformations
# ==========================
silver_df = (
    bronze_df
    # Trim string columns
    .withColumn("customerID", trim(col("customerID")))
    .withColumn("gender", trim(col("gender")))
    .withColumn("Partner", trim(col("Partner")))
    .withColumn("Dependents", trim(col("Dependents")))
    .withColumn("PhoneService", trim(col("PhoneService")))
    .withColumn("MultipleLines", trim(col("MultipleLines")))
    .withColumn("InternetService", trim(col("InternetService")))
    .withColumn("OnlineSecurity", trim(col("OnlineSecurity")))
    .withColumn("OnlineBackup", trim(col("OnlineBackup")))
    .withColumn("DeviceProtection", trim(col("DeviceProtection")))
    .withColumn("TechSupport", trim(col("TechSupport")))
    .withColumn("StreamingTV", trim(col("StreamingTV")))
    .withColumn("StreamingMovies", trim(col("StreamingMovies")))
    .withColumn("Contract", trim(col("Contract")))
    .withColumn("PaperlessBilling", trim(col("PaperlessBilling")))
    .withColumn("PaymentMethod", trim(col("PaymentMethod")))
    
    # Convert data types
    .withColumn("SeniorCitizen", col("SeniorCitizen").cast("int"))
    .withColumn("tenure", col("tenure").cast("int"))
    .withColumn("MonthlyCharges", col("MonthlyCharges").cast("double"))
    .withColumn("TotalCharges", when(trim(col("TotalCharges")) == "", None)
                .otherwise(col("TotalCharges").cast("double")))

    # Handle missing values
    .dropna(subset=["customerID", "tenure", "MonthlyCharges", "TotalCharges"])

    # Derived column
    .withColumn("IsSenior", when(col("SeniorCitizen") == 1, True).otherwise(False))

    # Data quality checks
    .filter(col("tenure") >= 0)

    # Deduplicate
    .dropDuplicates(["customerID"])
)

# ==========================
# Step 3: Save Silver layer in S3 as Parquet + Register in Hive
# ==========================
silver_path = "s3a://databricks-one/telco/silver/telco_churn"

# Save files to S3
silver_df.write.mode("overwrite").parquet(silver_path)

# Register as Hive external table
spark.sql("USE silver")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS silver.telco_churn
USING PARQUET
LOCATION '{silver_path}'
""")

# ==========================
# Step 4: Query & Verify
# ==========================
spark.sql("REFRESH TABLE silver.telco_churn")
spark.sql("SELECT * FROM silver.telco_churn LIMIT 10").show(truncate=False)
silver_df.printSchema()


silver_db=silver_df.limit(20).toPandas()
silver_db.head(10)

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+--------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|IsSenior|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+--------+
|0002-ORFBO|Female|0            |Yes    |Yes       |9     |Yes         |No           |DSL            |No            |Yes         |No 

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn,IsSenior
0,0002-ORFBO,Female,0,Yes,Yes,9,Yes,No,DSL,No,...,Yes,Yes,No,One year,Yes,Mailed check,65.6,593.3,No,False
1,0003-MKNFE,Male,0,No,No,9,Yes,Yes,DSL,No,...,No,No,Yes,Month-to-month,No,Mailed check,59.9,542.4,No,False
2,0004-TLHLJ,Male,0,No,No,4,Yes,No,Fiber optic,No,...,No,No,No,Month-to-month,Yes,Electronic check,73.9,280.85,Yes,False
3,0011-IGKFF,Male,1,Yes,No,13,Yes,No,Fiber optic,No,...,No,Yes,Yes,Month-to-month,Yes,Electronic check,98.0,1237.85,Yes,True
4,0013-EXCHZ,Female,1,Yes,No,3,Yes,No,Fiber optic,No,...,Yes,Yes,No,Month-to-month,Yes,Mailed check,83.9,267.4,Yes,True
5,0013-MHZWF,Female,0,No,Yes,9,Yes,No,DSL,No,...,Yes,Yes,Yes,Month-to-month,Yes,Credit card (automatic),69.4,571.45,No,False
6,0013-SMEOE,Female,1,Yes,No,71,Yes,No,Fiber optic,Yes,...,Yes,Yes,Yes,Two year,Yes,Bank transfer (automatic),109.7,7904.25,No,True
7,0014-BMAQU,Male,0,Yes,No,63,Yes,Yes,Fiber optic,Yes,...,Yes,No,No,Two year,Yes,Credit card (automatic),84.65,5377.8,No,False
8,0015-UOCOJ,Female,1,No,No,7,Yes,No,DSL,Yes,...,No,No,No,Month-to-month,Yes,Electronic check,48.2,340.35,No,True
9,0016-QLJIS,Female,0,Yes,Yes,65,Yes,Yes,DSL,Yes,...,Yes,Yes,Yes,Two year,Yes,Mailed check,90.45,5957.9,No,False


# GOLD LAYER

## HERE WE ARE USING STAR SCHEMA

In [10]:
# ==========================
# Step 1: Read Silver Hive Table
# ==========================
silver_df = spark.table("silver.telco_churn")

silver_df.show(5, truncate=False)


+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+-----+--------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract      |PaperlessBilling|PaymentMethod   |MonthlyCharges|TotalCharges|Churn|IsSenior|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+-----+--------+
|0002-ORFBO|Female|0            |Yes    |Yes       |9     |Yes         |No           |DSL            |No            |Yes         |No              |Yes        |Y

In [11]:
# Step 2: Create Dimension Tables
# ==========================

# Customer Dimension
dim_customer = silver_df.select(
    "customerID",
    "gender",
    "SeniorCitizen",
    "IsSenior",
    "Partner",
    "Dependents",
    "PhoneService",
    "MultipleLines"
).dropDuplicates(["customerID"])

# Contract Dimension
dim_contract = silver_df.select(
    "Contract",
    "PaperlessBilling"
).dropDuplicates()

# Internet & Services Dimension
dim_services = silver_df.select(
    "InternetService",
    "OnlineSecurity",
    "OnlineBackup",
    "DeviceProtection",
    "TechSupport",
    "StreamingTV",
    "StreamingMovies"
).dropDuplicates()

# Payment Dimension
dim_payment = silver_df.select(
    "PaymentMethod"
).dropDuplicates()


In [12]:
# ==========================
# Step 3: Create Fact Table
# ==========================
fact_churn = silver_df.select(
    "customerID",       # FK to Customer
    "Contract",         # FK to Contract
    "InternetService",  # FK to Services
    "PaymentMethod",    # FK to Payment
    "tenure",
    "MonthlyCharges",
    "TotalCharges"
)


In [14]:
from pyspark.sql.functions import col

# ==========================
# Step 4: Save Gold Layer to S3 & Register in Hive
# ==========================
gold_base_path = "s3a://databricks-one/telco/gold/"

# Create Gold database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
spark.sql("USE gold")

# ==========================
# 1. Customer Dimension
# ==========================
dim_customer_clean = (
    dim_customer
    .filter(col("customerID").isNotNull())   # NOT NULL
    .dropDuplicates(["customerID"])          # Enforce PK uniqueness
)

dim_customer_clean.write.mode("overwrite").parquet(gold_base_path + "dim_customer")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_customer (
    customerID STRING COMMENT 'PK',
    gender STRING,
    SeniorCitizen INT,
    IsSenior STRING,
    Partner STRING,
    Dependents STRING,
    PhoneService STRING,
    MultipleLines STRING
)
USING PARQUET
LOCATION '{gold_base_path}dim_customer'
TBLPROPERTIES ('primary_key'='customerID')
""")

# ==========================
# 2. Contract Dimension
# ==========================
dim_contract_clean = dim_contract.dropDuplicates(["Contract"])

dim_contract_clean.write.mode("overwrite").parquet(gold_base_path + "dim_contract")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_contract (
    Contract STRING COMMENT 'PK',
    PaperlessBilling STRING
)
USING PARQUET
LOCATION '{gold_base_path}dim_contract'
TBLPROPERTIES ('primary_key'='Contract')
""")

# ==========================
# 3. Services Dimension
# ==========================
dim_services_clean = dim_services.dropDuplicates(["InternetService"])

dim_services_clean.write.mode("overwrite").parquet(gold_base_path + "dim_services")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_services (
    InternetService STRING COMMENT 'PK',
    OnlineSecurity STRING,
    OnlineBackup STRING,
    DeviceProtection STRING,
    TechSupport STRING,
    StreamingTV STRING,
    StreamingMovies STRING
)
USING PARQUET
LOCATION '{gold_base_path}dim_services'
TBLPROPERTIES ('primary_key'='InternetService')
""")

# ==========================
# 4. Payment Dimension
# ==========================
dim_payment_clean = dim_payment.dropDuplicates(["PaymentMethod"])

dim_payment_clean.write.mode("overwrite").parquet(gold_base_path + "dim_payment")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_payment (
    PaymentMethod STRING COMMENT 'PK'
)
USING PARQUET
LOCATION '{gold_base_path}dim_payment'
TBLPROPERTIES ('primary_key'='PaymentMethod')
""")

# ==========================
# 5. Fact Table (Churn)
# ==========================
# Validate foreign keys before writing
fact_churn_clean = (
    fact_churn
    .join(dim_customer_clean.select("customerID"), on="customerID", how="inner")
    .join(dim_contract_clean.select("Contract"), on="Contract", how="inner")
    .join(dim_services_clean.select("InternetService"), on="InternetService", how="inner")
    .join(dim_payment_clean.select("PaymentMethod"), on="PaymentMethod", how="inner")
)

fact_churn_clean.write.mode("overwrite").parquet(gold_base_path + "fact_churn")

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS gold.fact_churn (
    customerID STRING COMMENT 'FK -> dim_customer.customerID',
    Contract STRING COMMENT 'FK -> dim_contract.Contract',
    InternetService STRING COMMENT 'FK -> dim_services.InternetService',
    PaymentMethod STRING COMMENT 'FK -> dim_payment.PaymentMethod',
    tenure INT,
    MonthlyCharges DOUBLE,
    TotalCharges DOUBLE
)
USING PARQUET
LOCATION '{gold_base_path}fact_churn'
TBLPROPERTIES (
    'fk_customer'='dim_customer.customerID',
    'fk_contract'='dim_contract.Contract',
    'fk_services'='dim_services.InternetService',
    'fk_payment'='dim_payment.PaymentMethod'
)
""")


DataFrame[]

In [24]:
# # Step 4: Save Gold Layer to S3 & Register in Hive
# # ==========================
# gold_base_path = "s3a://databricks-one/telco/gold/"

# # Create database if not exists
# spark.sql("USE gold")

# # Write to S3 and register as external Hive tables
# dim_customer.write.mode("overwrite").parquet(gold_base_path + "dim_customer")
# spark.sql(f"""
# CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_customer
# USING PARQUET
# LOCATION '{gold_base_path}dim_customer'
# """)

# dim_contract.write.mode("overwrite").parquet(gold_base_path + "dim_contract")
# spark.sql(f"""
# CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_contract
# USING PARQUET
# LOCATION '{gold_base_path}dim_contract'
# """)

# dim_services.write.mode("overwrite").parquet(gold_base_path + "dim_services")
# spark.sql(f"""
# CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_services
# USING PARQUET
# LOCATION '{gold_base_path}dim_services'
# """)

# dim_payment.write.mode("overwrite").parquet(gold_base_path + "dim_payment")
# spark.sql(f"""
# CREATE EXTERNAL TABLE IF NOT EXISTS gold.dim_payment
# USING PARQUET
# LOCATION '{gold_base_path}dim_payment'
# """)

# fact_churn.write.mode("overwrite").parquet(gold_base_path + "fact_churn")
# spark.sql(f"""
# CREATE EXTERNAL TABLE IF NOT EXISTS gold.fact_churn
# USING PARQUET
# LOCATION '{gold_base_path}fact_churn'
# """)


DataFrame[]

In [15]:
# Step 5: Query & Verify
# ==========================
spark.sql("SELECT * FROM gold.dim_customer LIMIT 10").show(truncate=False)
spark.sql("SELECT * FROM gold.fact_churn LIMIT 10").show(truncate=False)


+----------+------+-------------+--------+-------+----------+------------+-------------+
|customerID|gender|SeniorCitizen|IsSenior|Partner|Dependents|PhoneService|MultipleLines|
+----------+------+-------------+--------+-------+----------+------------+-------------+
|0002-ORFBO|Female|0            |false   |Yes    |Yes       |Yes         |No           |
|0003-MKNFE|Male  |0            |false   |No     |No        |Yes         |Yes          |
|0004-TLHLJ|Male  |0            |false   |No     |No        |Yes         |No           |
|0011-IGKFF|Male  |1            |true    |Yes    |No        |Yes         |No           |
|0013-EXCHZ|Female|1            |true    |Yes    |No        |Yes         |No           |
|0013-MHZWF|Female|0            |false   |No     |Yes       |Yes         |No           |
|0013-SMEOE|Female|1            |true    |Yes    |No        |Yes         |No           |
|0014-BMAQU|Male  |0            |false   |Yes    |No        |Yes         |Yes          |
|0015-UOCOJ|Female|1 