In [None]:
# Reset directory and all files in it
dbutils.fs.rm("FileStore/tables", True)

In [None]:
# /---------------------------------------------------------------
# Course            : Big Data Analytics
# Course Code       : CDB3034
# Assignment        : 2
# Group             : 1
# Student Name 1    : Chan Seow Fen / 0207368
# Student Name 2    : Cheah Pin Chee / 0197637
# Student Name 3    : Ong Yi Wen / 0207333
# Student Name 4    : Saw Keat Loon / 0207778
# /---------------------------------------------------------------
# Data source: https://openlearning.uowmkdu.edu.my/courses/pg-cbd-3034n-big-data-analysis-jjoshua/data_a2/?cl=1
# Original Data Files: T_Data_C1.csv, T_Data_C2.csv, T_Data_C3.csv
# /---------------------------------------------------------------
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType, IntegerType

# Task 1: Data Loading and Data Cleaning
# Make sure all the 3 files are loaded into the Spark environment!
# 1.1 Renaming the files and remove original files
dbutils.fs.mv("FileStore/tables/T_Data_C1.csv", "/FileStore/tables/BDA_T_data_C1.csv")
dbutils.fs.mv("FileStore/tables/T_Data_C2.csv", "/FileStore/tables/BDA_T_data_C2.csv")
dbutils.fs.mv("FileStore/tables/T_Data_C3.csv", "/FileStore/tables/BDA_T_data_C3.csv")
dbutils.fs.rm("FileStore/tables/T_Data_Cl.csv")
dbutils.fs.rm("FileStore/tables/T_Data_C2.csv")
dbutils.fs.rm("FileStore/tables/T_Data_C3.csv")

# 1.2 List all files in the directory
display(dbutils.fs.ls("/FileStore/tables/"))

In [None]:
# 2.1 Load the data from the files
df_c1 = spark.read.csv("dbfs:/FileStore/tables/BDA_T_data_C1.csv", header=True, inferSchema=True)
df_c2 = spark.read.csv("dbfs:/FileStore/tables/BDA_T_data_C2.csv", header=True, inferSchema=True)
df_c3 = spark.read.csv("dbfs:/FileStore/tables/BDA_T_data_C3.csv", header=True, inferSchema=True)

# 2.2 Display the data from the files
print("Data from BDA_T_data_C1.csv")
print("Data size of BDA_T_data_C1.csv: ", df_c1.count(), " rows" , " and ", len(df_c1.columns), " columns")
display(df_c1)
print("Data from BDA_T_data_C2.csv")
print("Data size of BDA_T_data_C2.csv: ", df_c2.count(), " rows" , " and ", len(df_c2.columns), " columns")
display(df_c2)
print("Data from BDA_T_data_C3.csv")
print("Data size of BDA_T_data_C3.csv: ", df_c3.count(), " rows" , " and ", len(df_c3.columns), " columns")
display(df_c3)

In [None]:
# 3.1 Cleaning df_c1
print("Before cleaning for df_c1")
df_c1.display()  # Before cleaning

# 3.1.1 Rename the columns
df_c1_cleaned = df_c1  # Copy the original dataframe to a new dataframe
df_c1_cleaned = (
    df_c1_cleaned.withColumnRenamed("c01", "customerID")
    .withColumnRenamed("c02", "gender")
    .withColumnRenamed("c03", "SeniorCitizen")
    .withColumnRenamed("c04", "Partner")
    .withColumnRenamed("c05", "Dependents")
    .withColumnRenamed("c06", "tenure")
    .withColumnRenamed("c07", "PhoneServices")
    .withColumnRenamed("c08", "MultipleLines")
    .withColumnRenamed("c09", "InternetServices")
    .withColumnRenamed("c10", "OnlineSecurity")
    .withColumnRenamed("c11", "OnlineBackup")
    .withColumnRenamed("c12", "DeviceProtection")
    .withColumnRenamed("c13", "TechSupport")
    .withColumnRenamed("c14", "StreamingTV")
    .withColumnRenamed("c15", "StreamingMovies")
    .withColumnRenamed("c16", "Contract")
    .withColumnRenamed("c17", "PaperlessBilling")
    .withColumnRenamed("c18", "PaymentMethod")
    .withColumnRenamed("c19", "MonthlyCharges")
    .withColumnRenamed("c20", "TotalCharges")
    .withColumnRenamed("c21", "Churn")
)

# 3.1.2 Change the data type
df_c1_cleaned = (
    df_c1_cleaned.withColumn("SeniorCitizen", col("SeniorCitizen").cast(IntegerType()))
    .withColumn("tenure", col("tenure").cast(IntegerType()))
    .withColumn("MonthlyCharges", col("MonthlyCharges").cast(DoubleType()))
    .withColumn("TotalCharges", col("TotalCharges").cast(DoubleType()))
)

# 3.1.3 Remove rows with missing values
df_c1_cleaned = df_c1_cleaned.dropna()

print("After cleaning for df_c1")
df_c1_cleaned.display()  # After cleaning

In [None]:
# 3.2 Cleaning df_c2
print("Before cleaning for df_c2")
df_c2.display()  # Before cleaning
# 3.2.1 Amending Abnormal Data
df_c2_cleaned = df_c2  # Copy the original dataframe to a new dataframe
# Col 4 Replace '0' with 'No' in the 'Partner' column
df_c2_cleaned = df_c2_cleaned.withColumn("Partner", when((df_c2_cleaned["Partner"] == '0'), 'No').otherwise(df_c2_cleaned["Partner"]))
# Col 5 Replace '0' with 'No' in the 'Dependents' column
df_c2_cleaned = df_c2_cleaned.withColumn("Dependents", when((df_c2_cleaned["Dependents"] == '0'), 'No').otherwise(df_c2_cleaned["Dependents"]))
# Col 7 Replace '0' with 'No' in the 'PhoneService' column
df_c2_cleaned = df_c2_cleaned.withColumn("PhoneService", when((df_c2_cleaned["PhoneService"] == '0'), 'No').otherwise(df_c2_cleaned["PhoneService"]))
# Col 8 Replace '0phone service' with 'No phone service' in the 'MultipleLines' column
df_c2_cleaned = df_c2_cleaned.withColumn("MultipleLines", when((df_c2_cleaned["MultipleLines"] == '0phone service'), 'No phone service').otherwise(df_c2_cleaned["MultipleLines"]))
# Col 8 Replace '0' with 'No' in the 'MultipleLines' column
df_c2_cleaned = df_c2_cleaned.withColumn("MultipleLines", when((df_c2_cleaned["MultipleLines"] == '0'), 'No').otherwise(df_c2_cleaned["MultipleLines"]))
# Col 9 Replace '0' with 'No' in the 'InternetService' column
df_c2_cleaned = df_c2_cleaned.withColumn("InternetService", when((df_c2_cleaned["InternetService"] == '0'), 'No').otherwise(df_c2_cleaned["InternetService"]))
# Col 10 Replace '0' with 'No' in the 'OnlineSecurity' column
df_c2_cleaned = df_c2_cleaned.withColumn("OnlineSecurity", when((df_c2_cleaned["OnlineSecurity"] == '0'), 'No').otherwise(df_c2_cleaned["OnlineSecurity"]))
# Col 10 Replace 'NAN' with 'No internet service' in the 'OnlineSecurity' column
df_c2_cleaned = df_c2_cleaned.withColumn("OnlineSecurity", when((df_c2_cleaned["OnlineSecurity"] == 'NAN'), 'No internet service').otherwise(df_c2_cleaned["OnlineSecurity"]))
# Col 11 Replace '0' with 'No' in the 'OnlineBackup' column
df_c2_cleaned = df_c2_cleaned.withColumn("OnlineBackup", when((df_c2_cleaned["OnlineBackup"] == '0'), 'No').otherwise(df_c2_cleaned["OnlineBackup"]))
# Col 11 Replace 'NAN' with 'No internet service' in the 'OnlineBackup' column
df_c2_cleaned = df_c2_cleaned.withColumn("OnlineBackup", when((df_c2_cleaned["OnlineBackup"] == 'NAN'), 'No internet service').otherwise(df_c2_cleaned["OnlineBackup"]))
# Col 12 Replace '0' with 'No' in the 'DeviceProtection' column
df_c2_cleaned = df_c2_cleaned.withColumn("DeviceProtection", when((df_c2_cleaned["DeviceProtection"] == '0'), 'No').otherwise(df_c2_cleaned["DeviceProtection"]))
# Col 12 Replace 'NAN' with 'No internet service' in the 'DeviceProtection' column
df_c2_cleaned = df_c2_cleaned.withColumn("DeviceProtection", when((df_c2_cleaned["DeviceProtection"] == 'NAN'), 'No internet service').otherwise(df_c2_cleaned["DeviceProtection"]))
# Col 13 Replace '0' with 'No' in the 'TechSupport' column
df_c2_cleaned = df_c2_cleaned.withColumn("TechSupport", when((df_c2_cleaned["TechSupport"] == '0'), 'No').otherwise(df_c2_cleaned["TechSupport"]))
# Col 13 Replace 'NAN' with 'No internet service' in the 'TechSupport' column
df_c2_cleaned = df_c2_cleaned.withColumn("TechSupport", when((df_c2_cleaned["TechSupport"] == 'NAN'), 'No internet service').otherwise(df_c2_cleaned["TechSupport"]))
# Col 14 Replace '0' with 'No' in the 'StreamingTV' column
df_c2_cleaned = df_c2_cleaned.withColumn("StreamingTV", when((df_c2_cleaned["StreamingTV"] == '0'), 'No').otherwise(df_c2_cleaned["StreamingTV"]))
# Col 14 Replace 'NAN' with 'No internet service' in the 'StreamingTV' column
df_c2_cleaned = df_c2_cleaned.withColumn("StreamingTV", when((df_c2_cleaned["StreamingTV"] == 'NAN'), 'No internet service').otherwise(df_c2_cleaned["StreamingTV"]))
# Col 15 Replace '0' with 'No' in the 'StreamingMovies' column
df_c2_cleaned = df_c2_cleaned.withColumn("StreamingMovies", when((df_c2_cleaned["StreamingMovies"] == '0'), 'No').otherwise(df_c2_cleaned["StreamingMovies"]))
# Col 15 Replace 'NAN' with 'No internet service' in the 'StreamingMovies' column
df_c2_cleaned = df_c2_cleaned.withColumn("StreamingMovies", when((df_c2_cleaned["StreamingMovies"] == 'NAN'), 'No internet service').otherwise(df_c2_cleaned["StreamingMovies"]))
# Col 17 Replace '0' with 'No' in the 'PaperlessBilling' column
df_c2_cleaned = df_c2_cleaned.withColumn("PaperlessBilling", when((df_c2_cleaned["PaperlessBilling"] == '0'), 'No').otherwise(df_c2_cleaned["PaperlessBilling"]))
# Col 21 Replace '0' with 'No' in the 'churn' column
df_c2_cleaned = df_c2_cleaned.withColumn("churn", when((df_c2_cleaned["churn"] == '0'), 'No').otherwise(df_c2_cleaned["churn"]))

# 3.2.2 Change the data type
df_c2_cleaned = (
    df_c2_cleaned.withColumn("SeniorCitizen", col("SeniorCitizen").cast(IntegerType()))
    .withColumn("tenure", col("tenure").cast(IntegerType()))
    .withColumn("MonthlyCharges", col("MonthlyCharges").cast(DoubleType()))
    .withColumn("TotalCharges", col("TotalCharges").cast(DoubleType()))
)

# 3.2.3 Remove rows with missing values
df_c2_cleaned = df_c2_cleaned.dropna()

print("After cleaning for df_c2")
df_c2_cleaned.display()  # After cleaning

In [None]:
# 3.3 Cleaning df_c3
df_c3.display()  # Before cleaning
# 3.3.1 Amending Abnormal Data
df_c3_cleaned = df_c3  # Copy the original dataframe to a new dataframe
# Col 9 Replace 'no' with 'No' in the 'InternetService' column
df_c3_cleaned = df_c3_cleaned.withColumn("InternetService", when((df_c3_cleaned["InternetService"] == 'no'), 'No').otherwise(df_c3_cleaned["InternetService"]))

# 3.3.2 Change the data type
df_c3_cleaned = (
    df_c3_cleaned.withColumn("SeniorCitizen", col("SeniorCitizen").cast(IntegerType()))
    .withColumn("tenure", col("tenure").cast(IntegerType()))
    .withColumn("MonthlyCharges", col("MonthlyCharges").cast(DoubleType()))
    .withColumn("TotalCharges", col("TotalCharges").cast(DoubleType()))
)

# 3.3.3 Remove rows with missing values
df_c3_cleaned = df_c3_cleaned.dropna()

print("After cleaning for df_c3")
df_c3_cleaned.display()  # After cleaning

In [None]:
# 4 Merging the data
# 4.1 Union df_c1_cleaned, df_c2_cleaned, and df_c3_cleaned
df_merged = df_c1_cleaned.union(df_c2_cleaned).union(df_c3_cleaned).distinct()

# Show the size of the merged data
print("Merged data")
print("Data size of merged data: ", df_merged.count(), " rows" , " and ", len(df_merged.columns), " columns")
display(df_merged)

In [None]:
# 5.1 Saving the merged data
# Coallesce the data into 1 partition and save it as a csv file
df_merged.coalesce(1).repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("dbfs:/FileStore/tables/Final")

# 5.2 List all files in the directory
display(dbutils.fs.ls("/FileStore/tables/Final/"))

In [None]:
# 6 Move and Load the final data
# Replace the csv file name and path according to generated csv file in previous step
dbutils.fs.mv("dbfs:/FileStore/tables/Final/part-00000-tid-2861974613874961528-5a08c8c2-d540-41a7-9f10-b40a6c93f71d-470-1-c000.csv", "/FileStore/tables/BDA_T_data_Final.csv")
dbutils.fs.rm("FileStore/tables/Final", True)
display(dbutils.fs.ls("/FileStore/tables/"))

# Downloading the file
# Reference for community edition: https://stackoverflow.com/questions/49019706/databricks-download-a-dbfs-filestore-file-to-my-local-machine
# Replace the part after ?o= with the id from the community session
# https://community.cloud.databricks.com/files/tables/BDA_T_data_Final.csv?o=

In [None]:
# 7 Load the final data
df_final = spark.read.csv("dbfs:/FileStore/tables/BDA_T_data_Final.csv", header=True, inferSchema=True)
print("Data from BDA_T_data_Merged.csv")
print("Data size of BDA_T_data_Merged.csv: ", df_final.count(), " rows", " and ", len(df_final.columns), " columns")
display(df_final)