In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import when, col, to_date, concat, lit, month, year, count, concat_ws,  date_trunc, when, trim, date_format, mean as _mean, median,  lag, sum as spark_sum, monotonically_increasing_id
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml import Pipeline


# Initialize a Spark session
spark = SparkSession.builder.appName("BankMarketingRegression").getOrCreate()

# Load the CSV file
df = spark.read.csv('bank_marketing.csv', header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show()


+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown|  single|  unknown|     no|      1|     no|  no|unknown|  5|  may

Data Cleaning

In [2]:
# Remove duplicates
df = df.dropDuplicates()

In [3]:
# Handle missing values
# For simplicity, filling numeric columns with the mean and string columns with 'unknown'
numeric_cols = [col_name for col_name, dtype in df.dtypes if dtype in ['int', 'double']]
string_cols = [col_name for col_name, dtype in df.dtypes if dtype == 'string']

# Fill numeric columns with mean
for col_name in numeric_cols:
    mean_val = df.select(_mean(col(col_name))).collect()[0][0]
    data = df.fillna(mean_val, subset=[col_name])

In [4]:
# Fill string columns with 'unknown'
df = df.fillna('unknown', subset=string_cols)

# Convert data types (example: converting a column 'age' to integer)
df = df.withColumn("age", col("age").cast("integer"))

In [5]:
# Trim whitespace from string columns
for col_name in string_cols:
    df = df.withColumn(col_name, trim(col(col_name)))

In [6]:
# Filter outliers (example: removing rows where 'age' > 100)
df = df.filter(col("age") <= 100)

In [7]:
df.show(5)
# End of Data Cleaning

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 28| unemployed|  single| tertiary|     no|      0|    yes|  no|unknown|  5|  may|     125|       2|   -1|       0| unknown| no|
| 32| technician| married|secondary|     no|      0|    yes|  no|unknown|  6|  may|     176|       2|   -1|       0| unknown| no|
| 56|blue-collar| married|secondary|     no|   1602|    yes|  no|unknown|  6|  may|     427|       1|   -1|       0| unknown| no|
| 30|blue-collar|divorced|secondary|     no|    251|    yes| yes|unknown|  7|  may|     120|       2|   -1|       0| unknown| no|
| 55|    retired| married|secondary|     no|    102|    yes|  no|unknown|  7|  may|      7

In [8]:
# Select relevant columns for predicting balance
data = df.select('age', 'job', 'marital', 'education', 'balance')

# Select relevant columns for predicting balance
# data = data.select('age', 'job', 'marital', 'education', 'balance')

# Handle missing values by treating 'unknown' as a separate category
data = data.withColumn('job', when(data.job == 'unknown', 'other').otherwise(data.job))
data = data.withColumn('education', when(data.education == 'unknown', 'other').otherwise(data.education))

# Index categorical columns
categorical_columns = ['job', 'marital', 'education']
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in categorical_columns]

# Assemble all features into a single vector
assembler = VectorAssembler(
    inputCols=['age'] + [column + "_index" for column in categorical_columns],
    outputCol="features")

# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

test_data_df = test_data.toPandas()
mean_subscription_count = test_data_df['balance'].mean()
median_subscription_count = test_data_df['balance'].median()

# Create a pipeline
pipeline = Pipeline(stages=indexers + [assembler])

# Fit the pipeline to the training data
pipeline_model = pipeline.fit(train_data)
train_data = pipeline_model.transform(train_data)
test_data = pipeline_model.transform(test_data)


# Select the features and target variable for training
train_data = train_data.select('features', 'balance')
test_data = test_data.select('features', 'balance', *categorical_columns, 'age')

# Train a Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol='balance')
lr_model = lr.fit(train_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol='balance', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

print('Mean Balance:', mean_subscription_count)
print('Median Balance:', median_subscription_count)

print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")

# Show some predictions in a user-friendly format
predictions = predictions.withColumn('predicted_balance', col('prediction'))
predictions.select('age', 'job', 'marital', 'education', 'balance', 'predicted_balance').show(10, truncate=False)



Mean Balance: 1327.779056772463
Median Balance: 436.0
Root Mean Squared Error (RMSE): 3005.5529
+---+-------+-------+---------+-------+------------------+
|age|job    |marital|education|balance|predicted_balance |
+---+-------+-------+---------+-------+------------------+
|18 |student|single |primary  |608    |993.2594755023945 |
|19 |student|single |primary  |103    |1018.358974892383 |
|19 |student|single |primary  |134    |1018.358974892383 |
|20 |admin. |single |secondary|66     |763.8103656717127 |
|20 |student|single |other    |8860   |1123.6569450273723|
|20 |student|single |secondary|-322   |883.0615327923706 |
|20 |student|single |secondary|502    |883.0615327923706 |
|20 |student|single |tertiary |2764   |963.2600035373712 |
|21 |student|single |other    |200    |1148.756444417361 |
|21 |student|single |primary  |1596   |1068.5579736723603|
+---+-------+-------+---------+-------+------------------+
only showing top 10 rows



In [9]:
# Select relevant features and the target variable for duration prediction
duration_columns = ['age', 'job', 'marital', 'education', 'balance', 'housing', 'loan', 'contact', 'campaign', 'pdays', 'previous', 'poutcome', 'duration']
df_duration = df.select(duration_columns)

# Convert categorical features to numerical
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df_duration) for column in categorical_columns]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded") for column in categorical_columns]

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=['age', 'balance', 'campaign', 'pdays', 'previous'] + 
                                      [column+"_encoded" for column in categorical_columns], outputCol="features")

# Initialize RandomForestRegressor
rf_duration = RandomForestRegressor(featuresCol="features", labelCol="duration")

# Create a pipeline
pipeline_duration = Pipeline(stages=indexers + encoders + [assembler, rf_duration])

# Split the data into training and test sets
train_data_duration, test_data_duration = df_duration.randomSplit([0.8, 0.2], seed=1234)


test_data_df = train_data_duration.toPandas()
mean_subscription_count = test_data_df['duration'].mean()
median_subscription_count = test_data_df['duration'].median()


# Train the model
model_duration = pipeline_duration.fit(train_data_duration)

# Make predictions on the test set
predictions_duration = model_duration.transform(test_data_duration)

# Evaluate the model
evaluator_duration = RegressionEvaluator(labelCol="duration", predictionCol="prediction")
rmse_duration = evaluator_duration.evaluate(predictions_duration)
print(f"Test set RMSE for duration prediction: {rmse_duration}")


print('Mean Duration:', mean_subscription_count)
print('Median Duration:', median_subscription_count)

# Show the predictions
predictions_duration.select("age", "job", "marital", "education", "balance", "housing", "loan", "contact", 
                            "campaign", "pdays", "previous", "poutcome", 
                            "duration", "prediction").show()


Test set RMSE for duration prediction: 267.9650461192681
Mean Duration: 257.4216201271772
Median Duration: 180.0
+---+-----------+-------+---------+-------+-------+----+---------+--------+-----+--------+--------+--------+------------------+
|age|        job|marital|education|balance|housing|loan|  contact|campaign|pdays|previous|poutcome|duration|        prediction|
+---+-----------+-------+---------+-------+-------+----+---------+--------+-----+--------+--------+--------+------------------+
| 18|    student| single|  unknown|     35|     no|  no|telephone|       2|   -1|       0| unknown|     104|264.48215146950866|
| 19|    student| single|  primary|    103|     no|  no| cellular|       2|   97|       2| success|      96|259.29310784644906|
| 20|    student| single|secondary|     88|     no|  no|telephone|       1|  181|       4|   other|     621|252.11533540356064|
| 20|    student| single|secondary|    291|     no|  no|telephone|       5|  371|       5| failure|     172| 237.863800