In [1]:
from pyspark.ml.feature import MinMaxScaler, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import col


In [2]:
# Create the SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 100)

In [3]:
# Define the schema
schema = StructType([
    StructField("state", StringType(), True),
    StructField("account_length", LongType(), True),
    StructField("area_code", LongType(), True),
    StructField("international_plan", StringType(), True),
    StructField("voice_mail_plan", StringType(), True),
    StructField("number_vmail_messages", LongType(), True),
    StructField("total_day_minutes", DoubleType(), True),
    StructField("total_day_calls", LongType(), True),
    StructField("total_day_charge", DoubleType(), True),
    StructField("total_eve_minutes", DoubleType(), True),
    StructField("total_eve_calls", LongType(), True),
    StructField("total_eve_charge", DoubleType(), True),
    StructField("total_night_minutes", DoubleType(), True),
    StructField("total_night_calls", LongType(), True),
    StructField("total_night_charge", DoubleType(), True),
    StructField("total_intl_minutes", DoubleType(), True),
    StructField("total_intl_calls", LongType(), True),
    StructField("total_intl_charge", DoubleType(), True),
    StructField("customer_service_calls", LongType(), True),
    StructField("churn", StringType(), True)
])

In [4]:
# Load the data
data_path = "churn-bigml-80.csv"
df = spark.read.format('csv').option('header', True).schema(schema).load(data_path)

In [5]:
# Handle missing values
null_counts = df.select([col(c).isNull().cast('int').alias(c) for c in df.columns]) \
                .groupBy().sum().collect()[0]

# Display the counts of null values per column
for col_name, null_count in zip(df.columns, null_counts):
    print(f"{col_name}: {null_count}")

state: 0
account_length: 0
area_code: 0
international_plan: 0
voice_mail_plan: 0
number_vmail_messages: 0
total_day_minutes: 0
total_day_calls: 0
total_day_charge: 0
total_eve_minutes: 0
total_eve_calls: 0
total_eve_charge: 0
total_night_minutes: 0
total_night_calls: 0
total_night_charge: 0
total_intl_minutes: 0
total_intl_calls: 0
total_intl_charge: 0
customer_service_calls: 0
churn: 0


In [6]:
# Drop rows with missing values
df = df.dropna()

# Handle duplicate values
duplicate_count = df.groupBy(df.columns).count().where('count > 1')
duplicate_count.show()

# Drop duplicates if there are any
df = df.dropDuplicates()

+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------+-----+-----+
|state|account_length|area_code|international_plan|voice_mail_plan|number_vmail_messages|total_day_minutes|total_day_calls|total_day_charge|total_eve_minutes|total_eve_calls|total_eve_charge|total_night_minutes|total_night_calls|total_night_charge|total_intl_minutes|total_intl_calls|total_intl_charge|customer_service_calls|churn|count|
+-----+--------------+---------+------------------+---------------+---------------------+-----------------+---------------+----------------+-----------------+---------------+----------------+-------------------+-----------------+------------------+------------------+----------------+-----------------+----------------------

In [7]:
# Rename columns to avoid spaces and convert to lower case
df = df.toDF(*(c.lower().replace(' ', '_') for c in df.columns))

In [8]:
# Split the data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Define transformers
# StringIndexer is used to convert categorical string columns into numerical indices.
indexer = StringIndexer(inputCols=['state', 'international_plan', 'voice_mail_plan', 'churn'],
                        outputCols=['state_indexed', 'international_plan_indexed', 'voice_mail_plan_indexed', 'churn_indexed'])

# VectorAssembler is used to combine multiple feature columns into a single vector column.
inputs = ['account_length', 'area_code', 'number_vmail_messages', 'total_day_minutes', 'total_day_calls', 'total_day_charge', 'total_eve_minutes', 'total_eve_calls', 'total_eve_charge', 'total_night_minutes', 'total_night_calls', 'total_night_charge', 'total_intl_minutes', 'total_intl_calls', 'total_intl_charge', 'customer_service_calls']
assembler1 = VectorAssembler(inputCols=inputs, outputCol="features_temp")

# MinMaxScaler is used to scale the features to a range (typically [0, 1]).
scaler = MinMaxScaler(inputCol="features_temp", outputCol="features_scaled")

# Combine indexed categorical features and scaled numerical features into a final feature vector.
assembler2 = VectorAssembler(inputCols=['state_indexed', 'international_plan_indexed', 'voice_mail_plan_indexed', 'features_scaled'], outputCol="features")

In [9]:
# Apply transformers to training data
train_data = indexer.fit(train_data).transform(train_data)
train_data = assembler1.transform(train_data)
train_data = scaler.fit(train_data).transform(train_data)
train_data = assembler2.transform(train_data)

# Apply transformers to test data
test_data = indexer.fit(test_data).transform(test_data)
test_data = assembler1.transform(test_data)
test_data = scaler.fit(test_data).transform(test_data)
test_data = assembler2.transform(test_data)

# Select features and label
train_selected = train_data.select("features", "churn_indexed")
test_selected = test_data.select("features", "churn_indexed")
train_selected.show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|features                                                                                                                                                                                                                                                                                                            |churn_indexed|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|[42.0,0.0,0.0,0.17857142

In [10]:
# Create the RandomForestClassifier
rf = RandomForestClassifier(featuresCol="features", labelCol="churn_indexed", maxBins=64)

# Fit the RandomForestClassifier
rf_model = rf.fit(train_selected)

# Make predictions on the test set
predictions = rf_model.transform(test_selected)
predictions.select('churn_indexed', 'prediction').show()

+-------------+----------+
|churn_indexed|prediction|
+-------------+----------+
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          1.0|       1.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       1.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       0.0|
|          0.0|       1.0|
|          0.0|       0.0|
|          0.0|       0.0|
+-------------+----------+
only showing top 20 rows



In [11]:
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="churn_indexed", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy:", accuracy)

Accuracy: 0.8545081967213115


In [12]:
# Save the RandomForest model
rf_model.save("random_forest_model")