In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('MlLib').getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/11 16:42:00 WARN Utils: Your hostname, Soroushs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.101 instead (on interface en0)
25/11/11 16:42:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/11 16:42:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Import dataset

In [3]:
df = spark.read.csv(path='datasets/customer_data.csv', header=True, inferSchema=True)
df.show(n=5)

+-----------+---+------+------+-------------+------+
|customer_id|age|income|gender|loyalty_score|region|
+-----------+---+------+------+-------------+------+
|        754| 56| 23343|Female|            9|  West|
|        214| 69| 33500|Female|            1| South|
|        125| 46| 73222|  Male|            9|  East|
|        859| 32| 49375|Female|            8|  East|
|        381| 60| 29662|Female|            4| North|
+-----------+---+------+------+-------------+------+
only showing top 5 rows


### Use VectorAssembler to combine selected columns 

In [4]:
df.columns

['customer_id', 'age', 'income', 'gender', 'loyalty_score', 'region']

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Encode gender (string -> index -> one-hot)
indexer = StringIndexer(inputCol="gender", outputCol="gender_index", handleInvalid="keep")
encoder = OneHotEncoder(inputCols=["gender_index"], outputCols=["gender_ohe"])

# Assemble features (use the OHE vector instead of raw string)
assembler = VectorAssembler(
    inputCols=["age", "income", "gender_ohe"],
    outputCol="features"    
)

pipeline = Pipeline(stages=[indexer, encoder, assembler])
model = pipeline.fit(df)
output = model.transform(df)
output.show(n=5)
output.first()['features']


+-----------+---+------+------+-------------+------+------------+-------------+--------------------+
|customer_id|age|income|gender|loyalty_score|region|gender_index|   gender_ohe|            features|
+-----------+---+------+------+-------------+------+------------+-------------+--------------------+
|        754| 56| 23343|Female|            9|  West|         0.0|(3,[0],[1.0])|[56.0,23343.0,1.0...|
|        214| 69| 33500|Female|            1| South|         0.0|(3,[0],[1.0])|[69.0,33500.0,1.0...|
|        125| 46| 73222|  Male|            9|  East|         1.0|(3,[1],[1.0])|[46.0,73222.0,0.0...|
|        859| 32| 49375|Female|            8|  East|         0.0|(3,[0],[1.0])|[32.0,49375.0,1.0...|
|        381| 60| 29662|Female|            4| North|         0.0|(3,[0],[1.0])|[60.0,29662.0,1.0...|
+-----------+---+------+------+-------------+------+------------+-------------+--------------------+
only showing top 5 rows


DenseVector([56.0, 23343.0, 1.0, 0.0, 0.0])

In [12]:
from pyspark.sql.functions import when, col

output = output.withColumn("label", when(col("loyalty_score") >= 7, 1).otherwise(0))
output.show(n=5)

+-----------+---+------+------+-------------+------+------------+-------------+--------------------+-----+
|customer_id|age|income|gender|loyalty_score|region|gender_index|   gender_ohe|            features|label|
+-----------+---+------+------+-------------+------+------------+-------------+--------------------+-----+
|        754| 56| 23343|Female|            9|  West|         0.0|(3,[0],[1.0])|[56.0,23343.0,1.0...|    1|
|        214| 69| 33500|Female|            1| South|         0.0|(3,[0],[1.0])|[69.0,33500.0,1.0...|    0|
|        125| 46| 73222|  Male|            9|  East|         1.0|(3,[1],[1.0])|[46.0,73222.0,0.0...|    1|
|        859| 32| 49375|Female|            8|  East|         0.0|(3,[0],[1.0])|[32.0,49375.0,1.0...|    1|
|        381| 60| 29662|Female|            4| North|         0.0|(3,[0],[1.0])|[60.0,29662.0,1.0...|    0|
+-----------+---+------+------+-------------+------+------------+-------------+--------------------+-----+
only showing top 5 rows


### Select only the feature vector and label for modeling.

In [14]:
output = output.select('features', 'label')
output.show(n=5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[56.0,23343.0,1.0...|    1|
|[69.0,33500.0,1.0...|    0|
|[46.0,73222.0,0.0...|    1|
|[32.0,49375.0,1.0...|    1|
|[60.0,29662.0,1.0...|    0|
+--------------------+-----+
only showing top 5 rows


### partition data into training and test sets

In [15]:
train_set, test_set = output.randomSplit([0.8, 0.2])


### Define and fit a logistic regression model using the training data.

In [22]:
from pyspark.ml.classification import LogisticRegression
regressor = LogisticRegression(featuresCol='features', labelCol='label')
model = regressor.fit(train_set)

### Apply the trained model to the test data to predict outcomes.

In [27]:
predictions = model.transform(test_set)
predictions.select("label", "prediction", "probability", 'rawPrediction').show(20, truncate=False)


+-----+----------+----------------------------------------+------------------------------------------+
|label|prediction|probability                             |rawPrediction                             |
+-----+----------+----------------------------------------+------------------------------------------+
|0    |0.0       |[0.5536795632131665,0.44632043678683353]|[0.21554895147643272,-0.21554895147643272]|
|0    |0.0       |[0.5981997337266606,0.4018002662733394] |[0.39796958598966925,-0.39796958598966925]|
|0    |0.0       |[0.585305799704103,0.414694200295897]   |[0.34459306333660533,-0.34459306333660533]|
|0    |0.0       |[0.581447979529271,0.41855202047072904] |[0.32872032444527854,-0.32872032444527854]|
|0    |0.0       |[0.6078345557402057,0.39216544425979427]|[0.43821893023265623,-0.43821893023265623]|
|1    |0.0       |[0.5798950821130343,0.4201049178869657] |[0.3223427096945656,-0.3223427096945656]  |
|1    |0.0       |[0.5775922097453148,0.4224077902546852] |[0.31289692514

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#  Initialize the BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="label",           # your actual label column
    rawPredictionCol="rawPrediction", 
    metricName="areaUnderROC"   # you can also use "areaUnderPR"
)

#  Evaluate the model
roc_auc = evaluator.evaluate(predictions)
print(f"Test AUC (Area Under ROC): {roc_auc:.3f}")


Test AUC (Area Under ROC): 0.500


In [None]:
model.save("models/logistic_regression")