# SparkML Logistic Regression Model
**Goal:** Build a logistic regression model using the taxicab data set to predict payment type.


### Create a Spark Session

In [1]:
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# Ensure native BLAS can be found
os.environ['LD_LIBRARY_PATH'] = '/usr/lib:/usr/lib/x86_64-linux-gnu'

# Direct JVM to use system BLAS
conf = SparkConf()
conf.set("spark.driver.extraJavaOptions", 
         "-Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/lib/x86_64-linux-gnu")
conf.set("spark.executor.extraJavaOptions", 
         "-Dcom.github.fommil.netlib.NativeSystemBLAS.natives=/usr/lib/x86_64-linux-gnu")

# Create SparkContext and SparkSession
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

25/03/27 13:48:24 WARN Utils: Your hostname, codespaces-35d966 resolves to a loopback address: 127.0.0.1; using 10.0.1.243 instead (on interface eth0)
25/03/27 13:48:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/27 13:48:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Load the input data file


In [11]:
### Load Data
input_path='../data/green_tripdata_2024-01.parquet'
raw_df = spark \
        .read \
        .parquet(input_path)

raw_df.printSchema()
raw_df.show(5)
raw_df.describe().show()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+------------------+----------+-----

[Stage 134:>                                                        (0 + 1) / 1]

+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+---------+---------------------+------------------+------------------+-------------------+--------------------+
|summary|           VendorID|store_and_fwd_flag|        RatecodeID|     PULocationID|     DOLocationID|   passenger_count|     trip_distance|       fare_amount|             extra|           mta_tax|        tip_amount|       tolls_amount|ehail_fee|improvement_surcharge|      total_amount|      payment_type|          trip_type|congestion_surcharge|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+---------+---------------------+------------------+--------------

                                                                                

### Clean and filter data

In [3]:
# filter for payment types 1 and 2 only
input_df = raw_df.filter((raw_df.payment_type == 1) | (raw_df.payment_type == 2))
input_df.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2024-01-01 00:46:55|  2024-01-01 00:58:25|                 N|         1|         236|         239|              1|         1.98|       12.8|  1.0|    0.

### Define Functions to vectorize input data

In [4]:
from pyspark.ml import Pipeline, Estimator
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.sql import DataFrame as SparkDataFrame

# Prepare data for machine learning
# from dataframe categorical and numeric columns create label and features

def vectorizeCategories(labelCol: str, categoricalColumns:list[str]) -> list[Estimator]:
  stages = [] # stages in Pipeline
  for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer:Estimator = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder:Estimator = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add categorical stagess
    stages += [stringIndexer, encoder]
  #add label category
  label_stringIdx = StringIndexer(inputCol=labelCol, outputCol="label")
  stages += [label_stringIdx] 
  return stages

def createVectorizePipeline(labelCol: str, categoricalCols:list[str], numericCols:list[str]) -> Pipeline:
  categoricalStages = vectorizeCategories(labelCol, categoricalCols)
  assemblerInputs = [c + "classVec" for c in categoricalCols] + numericCols
  assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
  allStages:list[Estimator | VectorAssembler] = categoricalStages + [assembler]
  partialPipeline = Pipeline().setStages(allStages) # type: ignore
  return partialPipeline


### Select category to predict and input features

In [5]:
# label column is the feature to predict
label_col = 'payment_type'
categorical_feature_cols = ['PULocationID', 'DOLocationID', 'RatecodeID', 'VendorID']
numeric_feature_cols = ['trip_distance', 'passenger_count', 'fare_amount'] 
vectorizePipeline = createVectorizePipeline(label_col, categorical_feature_cols, numeric_feature_cols)

# create vector dataframe
vectorizedModel = vectorizePipeline.fit(input_df)
vectorized_df = vectorizedModel.transform(input_df)
selectedcols = ["label", "features"] + [label_col] + categorical_feature_cols + numeric_feature_cols
mldata_df = vectorized_df.select(selectedcols)

vectorized_df.select("label", "features", label_col).show(5, truncate=False)

+-----+------------------------------------------------------------------+------------+
|label|features                                                          |payment_type|
+-----+------------------------------------------------------------------+------------+
|0.0  |(457,[23,217,448,453,454,455,456],[1.0,1.0,1.0,1.0,1.98,1.0,12.8])|1           |
|0.0  |(457,[8,265,448,453,454,455,456],[1.0,1.0,1.0,1.0,6.54,5.0,30.3]) |1           |
|0.0  |(457,[0,223,448,453,454,455,456],[1.0,1.0,1.0,1.0,3.08,1.0,19.8]) |1           |
|1.0  |(457,[0,229,448,454,455,456],[1.0,1.0,1.0,2.4,1.0,14.2])          |2           |
|0.0  |(457,[0,252,448,453,454,455,456],[1.0,1.0,1.0,1.0,5.14,1.0,22.6]) |1           |
+-----+------------------------------------------------------------------+------------+
only showing top 5 rows



### Train model

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
    featuresCol="features", labelCol="label")

# split the data
train_df, test_df = mldata_df.randomSplit([0.8, 0.2])
# select the features and label
train_df = train_df.select("features", "label")
test_df = test_df.select("features", "label")

lrModel = lr.fit(train_df)
prediction_df = lrModel.transform(test_df)
prediction_df.show()

25/03/27 13:48:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
[Stage 116:>                                                        (0 + 1) / 1]

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(457,[0,208,448,4...|  0.0|[1.12506591588908...|[0.75492718231170...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.10878506778362...|[0.75190254114437...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.09512833864164...|[0.74934619057151...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.09629775555117...|[0.74956577397679...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.09822587963526...|[0.74992754130756...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.09478034120206...|[0.74928082176683...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.09443234376249...|[0.74921544161980...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.09718188731653...|[0.74973170384953...|       0.0|
|(457,[0,208,448,4...|  0.0|[1.09063880788949...|[0.74850199421039...|       0.0|
|(457,[0,208,448

                                                                                

### Evaluate model

In [None]:
# Import evaluation metrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# Binary classification evaluator
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(prediction_df)
# Multi-class metrics
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)
accuracy = multi_evaluator.setMetricName("accuracy").evaluate(prediction_df)
precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(prediction_df)
recall = multi_evaluator.setMetricName("weightedRecall").evaluate(prediction_df)
f1 = multi_evaluator.setMetricName("f1").evaluate(prediction_df)

# Print summary statistics from the model
print("Training Summary:")
print(f"Total Iterations: {lrModel.summary.totalIterations}")
print(f"Objective History: {lrModel.summary.objectiveHistory}")
# Show prediction dataframe separately
print("\nPrediction DataFrame (5 rows):")
prediction_df.show(5)
print(f"\nCoefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")
print(f"\nModel evaluation metrics:")
print(f"AUC: {auc}")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")


Training Summary:
Total Iterations: 89
Objective History: [0.6137972013637448, 0.5646019537083564, 0.5390120352295413, 0.5365443899778046, 0.5358016301577658, 0.5352593383299111, 0.535107382840165, 0.5350556347531025, 0.5350086902834854, 0.5349768081779772, 0.5349463873139287, 0.5349442838899335, 0.5349272950473247, 0.5349232196539377, 0.5349150124253206, 0.5349088678039944, 0.5349067482381371, 0.5349044360907995, 0.5349032847457135, 0.5349009105651661, 0.5348977386827329, 0.534895317360264, 0.5348932806118321, 0.5348924528826469, 0.5348916057728473, 0.534889980450233, 0.5348891860713666, 0.5348879096236417, 0.534887160102492, 0.534886226246888, 0.5348849068137914, 0.5348833213806403, 0.5348822153649311, 0.5348801250567945, 0.5348780366517608, 0.5348754551832077, 0.534871163672003, 0.5348703475485266, 0.5348690936145638, 0.5348681133687798, 0.5348673341874128, 0.5348666113540664, 0.53486637327322, 0.5348661690899833, 0.534865965377952, 0.5348656851249122, 0.5348656194021931, 0.53486539