In [27]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

#URLs from GitHub
train_url = "https://raw.githubusercontent.com/KoBlades/Customer-Churn-Project/refs/heads/main/Sources/train.csv"
test_url = "https://raw.githubusercontent.com/KoBlades/Customer-Churn-Project/refs/heads/main/Sources/test.csv"

# Load the CSV files into Pandas DataFrames
train_df = pd.read_csv(train_url)
test_df = pd.read_csv(test_url)
train_df.to_csv('/content/train.csv', index=False)
test_df.to_csv('/content/test.csv', index=False)


# Display the first few rows of each DataFrame
print("Train DataFrame:")
print(train_df.head())
print("\nTest DataFrame:")
print(test_df.head())


Train DataFrame:
   AccountAge  MonthlyCharges  TotalCharges SubscriptionType  \
0          20       11.055215    221.104302          Premium   
1          57        5.175208    294.986882            Basic   
2          73       12.106657    883.785952            Basic   
3          32        7.263743    232.439774            Basic   
4          57       16.953078    966.325422          Premium   

      PaymentMethod PaperlessBilling ContentType MultiDeviceAccess  \
0      Mailed check               No        Both                No   
1       Credit card              Yes      Movies                No   
2      Mailed check              Yes      Movies                No   
3  Electronic check               No    TV Shows                No   
4  Electronic check              Yes    TV Shows                No   

  DeviceRegistered  ViewingHoursPerWeek  ...  ContentDownloadsPerMonth  \
0           Mobile            36.758104  ...                        10   
1           Tablet           

In [28]:
# Set up PySpark in Google Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

import findspark
findspark.init()

# Initialize Spark session
spark = SparkSession.builder.master("local[*]").appName("Customerchurnproject").getOrCreate()

# Load the local CSV files into PySpark DataFrames
spark_train_df = spark.read.csv('/content/train.csv', header=True, inferSchema=True)
spark_test_df = spark.read.csv('/content/test.csv', header=True, inferSchema=True)

# Show the first few rows
print("Train DataFrame (PySpark):")
spark_train_df.show(5)

print("Test DataFrame (PySpark):")
spark_test_df.show(5)

Train DataFrame (PySpark):
+----------+------------------+-----------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+-----------------+----------------------+------+-------------+---------------+----------------+----------+-----+
|AccountAge|    MonthlyCharges|     TotalCharges|SubscriptionType|   PaymentMethod|PaperlessBilling|ContentType|MultiDeviceAccess|DeviceRegistered|ViewingHoursPerWeek|AverageViewingDuration|ContentDownloadsPerMonth|GenrePreference|       UserRating|SupportTicketsPerMonth|Gender|WatchlistSize|ParentalControl|SubtitlesEnabled|CustomerID|Churn|
+----------+------------------+-----------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+-----------------+----------------------+------+------

In [29]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

#schema to understand data types
spark_train_df.printSchema()
spark_test_df.printSchema()


root
 |-- AccountAge: integer (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- SubscriptionType: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- ContentType: string (nullable = true)
 |-- MultiDeviceAccess: string (nullable = true)
 |-- DeviceRegistered: string (nullable = true)
 |-- ViewingHoursPerWeek: double (nullable = true)
 |-- AverageViewingDuration: double (nullable = true)
 |-- ContentDownloadsPerMonth: integer (nullable = true)
 |-- GenrePreference: string (nullable = true)
 |-- UserRating: double (nullable = true)
 |-- SupportTicketsPerMonth: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- WatchlistSize: integer (nullable = true)
 |-- ParentalControl: string (nullable = true)
 |-- SubtitlesEnabled: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Churn: integer (nullable = true)

root
 |-- Accou

In [30]:
from pyspark.sql import functions as F

# Function to count missing values in each column
def count_missing_values(df):
    return df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])

# Count missing values in train DataFrame
print("Missing values in Train DataFrame:")
missing_train = count_missing_values(spark_train_df)
missing_train.show()

# Count missing values in test DataFrame
print("Missing values in Test DataFrame:")
missing_test = count_missing_values(spark_test_df)
missing_test.show()


Missing values in Train DataFrame:
+----------+--------------+------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+----------------+----------+-----+
|AccountAge|MonthlyCharges|TotalCharges|SubscriptionType|PaymentMethod|PaperlessBilling|ContentType|MultiDeviceAccess|DeviceRegistered|ViewingHoursPerWeek|AverageViewingDuration|ContentDownloadsPerMonth|GenrePreference|UserRating|SupportTicketsPerMonth|Gender|WatchlistSize|ParentalControl|SubtitlesEnabled|CustomerID|Churn|
+----------+--------------+------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+----------------+--------

In [31]:
# Count the number of duplicate rows
train_duplicates = spark_train_df.count() - spark_train_df.dropDuplicates().count()
test_duplicates = spark_test_df.count() - spark_test_df.dropDuplicates().count()

print(f"Number of duplicate rows in Train DataFrame: {train_duplicates}")
print(f"Number of duplicate rows in Test DataFrame: {test_duplicates}")



Number of duplicate rows in Train DataFrame: 0
Number of duplicate rows in Test DataFrame: 0


In [32]:
#standardize column names
def standardize_column_names(df):
    # Convert column names to lowercase and replace spaces with underscores
    for col_name in df.columns:
        new_name = col_name.lower().replace(" ", "_")
        df = df.withColumnRenamed(col_name, new_name)
    return df

spark_train_df = standardize_column_names(spark_train_df)
spark_test_df = standardize_column_names(spark_test_df)

# Print the updated column names to verify
print("Standardized column names in Train DataFrame:", spark_train_df.columns)
print("Standardized column names in Test DataFrame:", spark_test_df.columns)


Standardized column names in Train DataFrame: ['accountage', 'monthlycharges', 'totalcharges', 'subscriptiontype', 'paymentmethod', 'paperlessbilling', 'contenttype', 'multideviceaccess', 'deviceregistered', 'viewinghoursperweek', 'averageviewingduration', 'contentdownloadspermonth', 'genrepreference', 'userrating', 'supportticketspermonth', 'gender', 'watchlistsize', 'parentalcontrol', 'subtitlesenabled', 'customerid', 'churn']
Standardized column names in Test DataFrame: ['accountage', 'monthlycharges', 'totalcharges', 'subscriptiontype', 'paymentmethod', 'paperlessbilling', 'contenttype', 'multideviceaccess', 'deviceregistered', 'viewinghoursperweek', 'averageviewingduration', 'contentdownloadspermonth', 'genrepreference', 'userrating', 'supportticketspermonth', 'gender', 'watchlistsize', 'parentalcontrol', 'subtitlesenabled', 'customerid']


In [33]:
#summary statistics
print("Summary statistics for Train DataFrame:")
spark_train_df.describe().show()

print("Summary statistics for Test DataFrame:")
spark_test_df.describe().show()


Summary statistics for Train DataFrame:
+-------+------------------+------------------+-----------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+------------------+----------------------+------+------------------+---------------+----------------+----------+-------------------+
|summary|        accountage|    monthlycharges|     totalcharges|subscriptiontype|paymentmethod|paperlessbilling|contenttype|multideviceaccess|deviceregistered|viewinghoursperweek|averageviewingduration|contentdownloadspermonth|genrepreference|        userrating|supportticketspermonth|gender|     watchlistsize|parentalcontrol|subtitlesenabled|customerid|              churn|
+-------+------------------+------------------+-----------------+----------------+-------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+---

In [34]:
# Display the first few rows of the cleaned Train DataFrame
print("Cleaned Train DataFrame:")
spark_train_df.show(5)

# Display the first few rows of the cleaned Test DataFrame
print("Cleaned Test DataFrame:")
spark_test_df.show(5)


Cleaned Train DataFrame:
+----------+------------------+-----------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+-----------------+----------------------+------+-------------+---------------+----------------+----------+-----+
|accountage|    monthlycharges|     totalcharges|subscriptiontype|   paymentmethod|paperlessbilling|contenttype|multideviceaccess|deviceregistered|viewinghoursperweek|averageviewingduration|contentdownloadspermonth|genrepreference|       userrating|supportticketspermonth|gender|watchlistsize|parentalcontrol|subtitlesenabled|customerid|churn|
+----------+------------------+-----------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+-----------------+----------------------+------+--------

In [35]:
#numerical columns to 2 decimal places
def round_numeric_columns(df):
    for col_name, dtype in df.dtypes:
        if dtype in ['double', 'float']:
            df = df.withColumn(col_name, F.round(F.col(col_name), 2))  # Round to 2 decimal places
    return df

# round numbers in DataFrames
spark_train_df = round_numeric_columns(spark_train_df)
spark_test_df = round_numeric_columns(spark_test_df)

# cleaned and rounded DataFrames
print("Cleaned and rounded Train DataFrame:")
spark_train_df.show(5)

print("Cleaned and rounded Test DataFrame:")
spark_test_df.show(5)


Cleaned and rounded Train DataFrame:
+----------+--------------+------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+----------------+----------+-----+
|accountage|monthlycharges|totalcharges|subscriptiontype|   paymentmethod|paperlessbilling|contenttype|multideviceaccess|deviceregistered|viewinghoursperweek|averageviewingduration|contentdownloadspermonth|genrepreference|userrating|supportticketspermonth|gender|watchlistsize|parentalcontrol|subtitlesenabled|customerid|churn|
+----------+--------------+------------+----------------+----------------+----------------+-----------+-----------------+----------------+-------------------+----------------------+------------------------+---------------+----------+----------------------+------+-------------+---------------+--------------