### Load Data using Spark

In [0]:
data = spark.read.options(header=True).csv(
    "/Volumes/workspace/default/customer_churn/customer_churn_clean.csv"
)
display(data)

In [0]:
from pyspark.sql.functions import col, count, lit

cols_with_empty = [
    c for c in data.columns
    if data.filter(col(c) == " ").count() > 0
]

from pyspark.sql import Row

display(
    spark.createDataFrame(
        [Row(Columns_with_empty_string=cols_with_empty)]
    )
)

In [0]:
data = data.toDF(*[c.replace(" ", "") for c in data.columns])
data.columns

In [0]:
from pyspark.sql.functions import col, when

data = data.withColumn(
    "TotalCharges",
    when(col("TotalCharges") == " ", "0").otherwise(col("TotalCharges"))
)
malformed_rows = data.filter(col("TotalCharges") == "0")
display(malformed_rows)

In [0]:
display(data[data["TotalCharges"] == "0"])

In [0]:
data = data.withColumn("TotalCharges", col("TotalCharges").cast("float"))
display(data)

In [0]:
data tota

### Coerce/Fix data Types


In [0]:
# Senior Citizen & Churn

from pyspark.sql.types import BooleanType, ShortType, IntegerType
from pyspark.sql.functions import col, when

binary_columns = ["SeniorCitizen", "ChurnValue"]
telco_customer_churn_df = data
for column in binary_columns:
    telco_customer_churn_df = data.withColumn(column, col(column).cast(BooleanType()))

telco_customer_churn_df.select(*binary_columns).printSchema()

In [0]:
# Casting did not work for SC most probably becaus ethere was some null values or values which could npt be encoded correctly, we can forc using a somple filter method 

telco_customer_churn_df = telco_customer_churn_df.withColumn(\
    "SeniorCitizen", when(col("SeniorCitizen") == "Yes", True ).otherwise(False))

telco_customer_churn_df.select("SeniorCitizen").printSchema()

In [0]:
telco_customer_churn_df.head()

In [0]:
telco_customer_churn_df.columns

In [0]:
# Phone Service & Paperless Billing to new boolean using spark.sql and re-order columns

telco_customer_churn_df.createOrReplaceTempView("telco_customer_churn_temp_view")

telco_customer_casted_df = spark.sql("""
SELECT
  CustomerID,
  BOOLEAN(Dependents),
  BOOLEAN(Partner),
  BOOLEAN(InternetService),
  BOOLEAN(PaperlessBilling),
  *
  EXCEPT(CustomerId, Dependents, Partner, InternetService, PaperlessBilling, ChurnValue),
  ChurnValue
FROM
  telco_customer_churn_temp_view
""")

telco_customer_casted_df.select("Dependents", "Partner", "PaperlessBilling", "InternetService").printSchema()
  

In [0]:
# Cast TenureMonths to Long and replace the original column
telco_customer_casted_df = telco_customer_churn_df.selectExpr(
    "* except(TenureMonths)",
    "cast(TenureMonths as long) as TenureMonths"
)
telco_customer_casted_df.select("TenureMonths").printSchema()

### Handling Outliers

We will see how to handle outliers in column by identifying and addressing data points that fall far outside the typical range of values in a dataset. Common methods for handling outliers include removing them, filtering, transforming the data, or replacing outliers with more representative values.

Follow these steps for handling outliers:

	•	Create a new silver table named as telco_customer_full_silver by appending silver to the original table name and then accessing it using Spark SQL.
	•	Filtering out outliers from the TotalCharges column by removing rows where the column value exceeds the specified cutoff value.




In [0]:
telco_customer_casted_df = telco_customer_casted_df.withColumn("TotalCharges", col("TotalCharges").cast("long"))
telco_customer_casted_df.select("TotalCharges", "TenureMonths").display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# even thouh my dataset is pretty cleaned, best practices:

TotalCharges_cutoff = 0

# Use .filter nethod and SQL col() function

telco_no_outliers_df =  telco_customer_casted_df.filter(\
    (col("TotalCharges") > TotalCharges_cutoff)|\
    (col("TotalCharges").isNull())) # Keep Nulls
    


### Removing outliers from PaymentMethod

	•	Identify the two lowest occurrence groups in the PaymentMethod column and calculate the total count and average MonthlyCharges for each group.
	•	Remove customers from the identified low occurrence groups in the PaymentMethod column to filter out outliers.
	•	Create a new dataframe telco_filtered_df containing the filtered data.
	•	Compare the count of records before and after by dividing the count of telco_casted_full_df and telco_no_outliers_df dataframes, removing outliers, and then materializing the resulting dataframe as a new table.


In [0]:
from pyspark.sql.functions import col, count, avg

# Identify 2 lowest group occurrences

group_var = "PaymentMethod"
stats_df = telco_no_outliers_df.groupBy(group_var) \
                        .agg(count("*").alias("Total"), \
                            avg("MonthlyCharges").alias("MonthlyCharges")) \
                        .orderBy(col("Total").asc()) \
                        .limit(2)

# display                        
display(stats_df)

Databricks visualization. Run in Databricks to view.

In [0]:
# Gather 2 lowest groups name assuming count threshold is below 20% of full dataset and monthly charges < $20
N = telco_no_outliers_df.count()  # total count
lower_groups = [elem[group_var] for elem in stats_df.head(2) if elem['Total']/N < 0.2 and elem['MonthlyCharges'] < 20]
print(f"Removing groups: {', '.join(lower_groups)}")

In [0]:
# Filter/Remove listings from these low occurrence groups while keeping null occurrences
telco_no_outliers_df = telco_no_outliers_df.filter(
    ~col(group_var).isin(lower_groups) |
    col(group_var).isNull()
)

In [0]:
# Count/Compare datasets before/after removing outliers
print(f"Count - Before: {telco_customer_casted_df.count()} / After: {telco_no_outliers_df.count()}")

In [0]:
# Materialize/Snap table [OPTIONAL/for instructor only]
telco_no_outliers_df.write.mode("overwrite").saveAsTable("telco_customer_full_silver")

###Handling Missing Values

To handle missing values in a dataset, we need to identify columns with high percentages of missing data and drop those columns. Then, it removes rows with missing values. Numeric columns are imputed with 0, and string columns are imputed with ‘N/A’. Overall, the code demonstrates a comprehensive approach to handling missing values in the dataset.

⸻

Delete Columns

	•	Create a DataFrame called missing_df to count the missing values per column in the telco_no_outliers_df dataset.
	•	The missing_df DataFrame is then transposed for better readability using the TransposeDF function, which allows for easier analysis of missing values.

In [0]:
from pyspark.sql.functions import col, when, count

def calculate_missing(input_df, show=True):
    """
    Helper function to calculate and display missing data
    """
    missing_df = input_df.select([
        count(
            when(
                (col(c).isNull()) | (col(c) == ' ') | (col(c) == '') | (col(c) == 'NULL') | (col(c) == 'None'),
                c
            )
        ).alias(c)
        for c in input_df.columns
    ])

    missing_df_out = missing_df.selectExpr(
        "stack({0}, {1}) as (Column, Number_of_Missing_Values)".format(
            len(missing_df.columns),
            ', '.join([f"'{c}', `{c}`" for c in missing_df.columns])
        )
    ).withColumn("Number_of_Missing_Values", col("Number_of_Missing_Values").cast("long"))

    if show:
        display(missing_df_out.orderBy("Number_of_Missing_Values", ascending=False))

    return missing_df_out

missing_df = calculate_missing(telco_no_outliers_df)