In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    DoubleType,
    StringType,
)
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import when, count, col, isnan, isnull, mean, stddev
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    VectorAssembler,
    StandardScaler,
    OneHotEncoder,
    StringIndexer,
    ChiSqSelector,
)
from pyspark.ml import regression
from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("LogisticRegressionModel").getOrCreate()

In [3]:
# read csv file using spark
df = spark.read.csv("newdata.csv", header=True, inferSchema=True)
df.show(3)

+-------+-------+----+-------+----------+---------+---------------+----------------+-------------+--------------+-----------+-----------------+------------+-----------+--------------+--------------+----------------+---------------+--------------+-----------------+-------------------+------------------+-----------------+----------------+-----------------+---------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------------+--------------+----------------+----------------+--------------------+--------------------+-------------------+------------------+-------------------+-------------------+------------------+
|default|housing|loan|deposit|customerID|job_admin|job_blue-collar|job_entrepreneur|job_housemaid|job_management|job_retired|job_self-employed|job_services|job_student|job_technician|job_unemployed|marital_divorced|marital_married|marital_single|education_primary|education_secondary|education_tertiar

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [5]:
indexer = StringIndexer(inputCol="deposit", outputCol="label")
df_indexed = indexer.fit(df).transform(df)

In [6]:
feature_columns = ["default", "housing", "loan", "customerID", "job_admin", "job_blue-collar",
                   "job_entrepreneur", "job_housemaid", "job_management", "job_retired",
                   "job_self-employed", "job_services", "job_student", "job_technician",
                   "job_unemployed", "marital_divorced", "marital_married", "marital_single",
                   "education_primary", "education_secondary", "education_tertiary",
                   "education_unknown", "contact_cellular", "contact_telephone",
                   "contact_unknown", "month_apr", "month_aug", "month_dec",
                   "month_feb", "month_jan", "month_jul", "month_jun", "month_mar",
                   "month_may", "month_nov", "month_oct", "month_sep",
                   "poutcome_failure", "poutcome_other", "poutcome_success",
                   "poutcome_unknown", "age", "balance", "day", "duration",
                   "campaign", "pdays", "previous"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_features = assembler.transform(df_indexed)

In [7]:
train_data, test_data = df_features.randomSplit([0.8, 0.2], seed=1234)

In [8]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(train_data)

**How to save and load model.**

In [9]:
lr_model.save("lr_model")

Py4JJavaError: An error occurred while calling o194.save.
: java.io.IOException: Path lr_model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [10]:
# Save the DataFrames
train_data.write.parquet("lr_model/train_data.parquet")
test_data.write.parquet("lr_model/test_data.parquet")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/c:/Users/hieud/Documents/BIGDATA/project/lr_model/train_data.parquet already exists. Set mode as "overwrite" to overwrite the existing path.

In [11]:
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))
test_data.show(5)

Training Dataset Count: 8833
Test Dataset Count: 2189
+-------+-------+----+-------+----------+---------+---------------+----------------+-------------+--------------+-----------+-----------------+------------+-----------+--------------+--------------+----------------+---------------+--------------+-----------------+-------------------+------------------+-----------------+----------------+-----------------+---------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------------+--------------+----------------+----------------+--------------------+-------------------+-------------------+--------------------+-------------------+-------------------+------------------+-----+--------------------+
|default|housing|loan|deposit|customerID|job_admin|job_blue-collar|job_entrepreneur|job_housemaid|job_management|job_retired|job_self-employed|job_services|job_student|job_technician|job_unemployed|marital_divorced|mari

In [12]:
from pyspark.ml.classification import LogisticRegressionModel

# Load the saved model
loaded_model = LogisticRegressionModel.load("lr_model")

# Make predictions with the loaded model
predictions = loaded_model.transform(test_data)

In [13]:
predictions = loaded_model.transform(test_data)
predictions.select("features", "label", "prediction").show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(48,[3,6,16,20,22...|  0.0|       1.0|
|(48,[3,14,16,19,2...|  0.0|       0.0|
|(48,[3,13,17,19,2...|  0.0|       0.0|
|(48,[3,14,16,19,2...|  0.0|       0.0|
|(48,[3,10,16,20,2...|  0.0|       0.0|
|(48,[3,13,16,19,2...|  0.0|       0.0|
|(48,[3,14,16,18,2...|  0.0|       0.0|
|(48,[3,10,17,20,2...|  0.0|       0.0|
|(48,[3,7,16,20,23...|  0.0|       0.0|
|(48,[3,10,16,19,2...|  0.0|       0.0|
|(48,[3,11,16,21,2...|  0.0|       0.0|
|(48,[3,4,16,19,24...|  0.0|       0.0|
|(48,[3,8,17,20,22...|  0.0|       0.0|
|(48,[3,8,16,19,22...|  0.0|       0.0|
|(48,[3,13,16,20,2...|  0.0|       0.0|
|(48,[3,5,16,19,22...|  0.0|       0.0|
|(48,[3,11,16,18,2...|  0.0|       0.0|
|(48,[3,4,17,19,22...|  0.0|       0.0|
|(48,[3,8,17,20,22...|  0.0|       0.0|
|(48,[3,9,16,19,23...|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 20 rows



In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# Calculate evaluation metrics
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
mse = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC").evaluate(predictions)
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})

# Calculate error rate
total_instances = predictions.count()
incorrect_instances = predictions.filter(predictions.label != predictions.prediction).count()
error_rate = incorrect_instances / total_instances

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1_score}")
print(f"Root Mean Squared Error: {mse ** 0.5}")
print(f"Error Rate: {error_rate}")

Accuracy: 0.9968021927820923
Precision: 0.9968117629085269
Recall: 0.9968021927820923
F1-Score: 0.9968017574621731
Root Mean Squared Error: 0.9999857623288803
Error Rate: 0.0031978072179077205


In [15]:
import shutil


folder_to_zip = 'lr_model'
output_filename = 'lr_model.zip'

# Create a zip file
shutil.make_archive(output_filename.replace('.zip', ''), 'zip', folder_to_zip)


'c:\\Users\\hieud\\Documents\\BIGDATA\\project\\lr_model.zip'