# Logistic Regression Implementation

Aim : Predict if the person makes over 50k or not.

Defining Libraries

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan,col,count,when,trim

creating spark session

In [3]:
spark = SparkSession.builder.appName("LogisticRegression").config("spark.driver.bindAddress", "10.0.2.15").getOrCreate()

23/07/25 14:42:13 WARN Utils: Your hostname, UbuntuOS resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/07/25 14:42:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/25 14:42:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/25 14:42:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Read Input Data CSV File

In [4]:
person_df = spark.read.csv("classification_2.csv",header=True,inferSchema=True)
person_df.show() 

                                                                                

+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|fnlwgt|    education|education-num|      marital-status|        occupation|  relationship|               race|    sex|capital-gain|capital-loss|Hours-per-week|        native|salary|
+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516|    Bachelors|           13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311|    Bachelors|           13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|           0|           0|            1

Describing dataset to get better insights

In [5]:
person_df.describe().show()

                                                                                

+-------+-----------------+------------+------------------+-------------+------------------+--------------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+-----------+------+
|summary|              age|   workclass|            fnlwgt|    education|     education-num|marital-status|       occupation|relationship|               race|    sex|      capital-gain|      capital-loss|    Hours-per-week|     native|salary|
+-------+-----------------+------------+------------------+-------------+------------------+--------------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+-----------+------+
|  count|            32561|       32561|             32561|        32561|             32561|         32561|            32561|       32561|              32561|  32561|             32561|             32561|             32561|      32561| 32561|
|   mean|38.58164675532078| 

Replace column names

In [6]:
# Rename specific columns using withColumnRenamed
person_df = person_df.withColumnRenamed("education-num", "education_num")
person_df = person_df.withColumnRenamed("marital-status", "marital_status")
person_df = person_df.withColumnRenamed("capital-gain", "capital_gain")
person_df = person_df.withColumnRenamed("capital-loss", "capital_loss")
person_df = person_df.withColumnRenamed("hours-per-week", "hours_per_week")

Display Structure of Dataset

In [7]:
person_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native: string (nullable = true)
 |-- salary: string (nullable = true)



Rows and columns count of Original Dataset

In [8]:
print(f"Rows:{person_df.count()},Columns:{len(person_df.columns)}")

Rows:32561,Columns:15


Trim the dataset to reduce dimentionality and work with important data 

In [9]:
person_df = person_df.select("age","education","occupation","sex","capital_gain","capital_loss","hours_per_week","salary")
person_df.head(10)

[Row(age=39, education=' Bachelors', occupation=' Adm-clerical', sex=' Male', capital_gain=2174, capital_loss=0, hours_per_week=40, salary=' <=50K'),
 Row(age=50, education=' Bachelors', occupation=' Exec-managerial', sex=' Male', capital_gain=0, capital_loss=0, hours_per_week=13, salary=' <=50K'),
 Row(age=38, education=' HS-grad', occupation=' Handlers-cleaners', sex=' Male', capital_gain=0, capital_loss=0, hours_per_week=40, salary=' <=50K'),
 Row(age=53, education=' 11th', occupation=' Handlers-cleaners', sex=' Male', capital_gain=0, capital_loss=0, hours_per_week=40, salary=' <=50K'),
 Row(age=28, education=' Bachelors', occupation=' Prof-specialty', sex=' Female', capital_gain=0, capital_loss=0, hours_per_week=40, salary=' <=50K'),
 Row(age=37, education=' Masters', occupation=' Exec-managerial', sex=' Female', capital_gain=0, capital_loss=0, hours_per_week=40, salary=' <=50K'),
 Row(age=49, education=' 9th', occupation=' Other-service', sex=' Female', capital_gain=0, capital_los

Rows and column count of Trimed Dataset

In [10]:
print(f"Rows:{person_df.count()},Columns:{len(person_df.columns)}")

Rows:32561,Columns:8


Display Column list

In [11]:
person_df.columns

['age',
 'education',
 'occupation',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'salary']

# # Data PreProcessing

# Handle Missing Data

Remove rows with "?" values 

In [12]:

for column_name in person_df.columns:
        person_df = person_df.filter(~col(column_name).contains("?"))

In [13]:
person_df.show(100)

+---+-------------+------------------+-------+------------+------------+--------------+------+
|age|    education|        occupation|    sex|capital_gain|capital_loss|hours_per_week|salary|
+---+-------------+------------------+-------+------------+------------+--------------+------+
| 39|    Bachelors|      Adm-clerical|   Male|        2174|           0|            40| <=50K|
| 50|    Bachelors|   Exec-managerial|   Male|           0|           0|            13| <=50K|
| 38|      HS-grad| Handlers-cleaners|   Male|           0|           0|            40| <=50K|
| 53|         11th| Handlers-cleaners|   Male|           0|           0|            40| <=50K|
| 28|    Bachelors|    Prof-specialty| Female|           0|           0|            40| <=50K|
| 37|      Masters|   Exec-managerial| Female|           0|           0|            40| <=50K|
| 49|          9th|     Other-service| Female|           0|           0|            16| <=50K|
| 52|      HS-grad|   Exec-managerial|   Male|    

To validate if any column has value "?" after handling missing value

In [14]:
person_df.select([count(when(col(c) == '?', c)).alias(c) for c in person_df.columns]).show()

+---+---------+----------+---+------------+------------+--------------+------+
|age|education|occupation|sex|capital_gain|capital_loss|hours_per_week|salary|
+---+---------+----------+---+------------+------------+--------------+------+
|  0|        0|         0|  0|           0|           0|             0|     0|
+---+---------+----------+---+------------+------------+--------------+------+



In [15]:
person_df.count()

30718

In [16]:
person_df.show()

+---+-------------+------------------+-------+------------+------------+--------------+------+
|age|    education|        occupation|    sex|capital_gain|capital_loss|hours_per_week|salary|
+---+-------------+------------------+-------+------------+------------+--------------+------+
| 39|    Bachelors|      Adm-clerical|   Male|        2174|           0|            40| <=50K|
| 50|    Bachelors|   Exec-managerial|   Male|           0|           0|            13| <=50K|
| 38|      HS-grad| Handlers-cleaners|   Male|           0|           0|            40| <=50K|
| 53|         11th| Handlers-cleaners|   Male|           0|           0|            40| <=50K|
| 28|    Bachelors|    Prof-specialty| Female|           0|           0|            40| <=50K|
| 37|      Masters|   Exec-managerial| Female|           0|           0|            40| <=50K|
| 49|          9th|     Other-service| Female|           0|           0|            16| <=50K|
| 52|      HS-grad|   Exec-managerial|   Male|    

Binary Conversion of target label "salary"

In [17]:
unique_salaries_before = person_df.select("salary").distinct()
unique_salaries_before.show()

+------+
|salary|
+------+
|  >50K|
| <=50K|
+------+



Triming the spaces in the salary column

In [18]:
person_df = person_df.withColumn("salary", trim(col("salary")))


In [19]:
person_df.show()

+---+-------------+------------------+-------+------------+------------+--------------+------+
|age|    education|        occupation|    sex|capital_gain|capital_loss|hours_per_week|salary|
+---+-------------+------------------+-------+------------+------------+--------------+------+
| 39|    Bachelors|      Adm-clerical|   Male|        2174|           0|            40| <=50K|
| 50|    Bachelors|   Exec-managerial|   Male|           0|           0|            13| <=50K|
| 38|      HS-grad| Handlers-cleaners|   Male|           0|           0|            40| <=50K|
| 53|         11th| Handlers-cleaners|   Male|           0|           0|            40| <=50K|
| 28|    Bachelors|    Prof-specialty| Female|           0|           0|            40| <=50K|
| 37|      Masters|   Exec-managerial| Female|           0|           0|            40| <=50K|
| 49|          9th|     Other-service| Female|           0|           0|            16| <=50K|
| 52|      HS-grad|   Exec-managerial|   Male|    

# Conversion of Target label

Converting salary column values to binary

In [20]:
person_df = person_df.withColumn("salary", when(col("salary") == ">50K", 1).otherwise(0))

person_df.show()

+---+-------------+------------------+-------+------------+------------+--------------+------+
|age|    education|        occupation|    sex|capital_gain|capital_loss|hours_per_week|salary|
+---+-------------+------------------+-------+------------+------------+--------------+------+
| 39|    Bachelors|      Adm-clerical|   Male|        2174|           0|            40|     0|
| 50|    Bachelors|   Exec-managerial|   Male|           0|           0|            13|     0|
| 38|      HS-grad| Handlers-cleaners|   Male|           0|           0|            40|     0|
| 53|         11th| Handlers-cleaners|   Male|           0|           0|            40|     0|
| 28|    Bachelors|    Prof-specialty| Female|           0|           0|            40|     0|
| 37|      Masters|   Exec-managerial| Female|           0|           0|            40|     0|
| 49|          9th|     Other-service| Female|           0|           0|            16|     0|
| 52|      HS-grad|   Exec-managerial|   Male|    

# Encoding & Scaling

Type - Label Encoding \
Scaling - Standard Scaler

In [21]:
# Importing necessary encoding and scaling libraries
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler

Performing label encoding on categorial columns

In [22]:
# categorial columns to be converted
categorical_columns = ["education", "occupation", "sex"]

# Creating StringIndexer for each categorical column
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep") for column in categorical_columns]


using pipeline to fit and transform on dataset

In [23]:
pipeline = Pipeline(stages=indexers)
preprocessed_data = pipeline.fit(person_df).transform(person_df)

preprocessed_data.show()

                                                                                

+---+-------------+------------------+-------+------------+------------+--------------+------+---------------+----------------+---------+
|age|    education|        occupation|    sex|capital_gain|capital_loss|hours_per_week|salary|education_index|occupation_index|sex_index|
+---+-------------+------------------+-------+------------+------------+--------------+------+---------------+----------------+---------+
| 39|    Bachelors|      Adm-clerical|   Male|        2174|           0|            40|     0|            2.0|             3.0|      0.0|
| 50|    Bachelors|   Exec-managerial|   Male|           0|           0|            13|     0|            2.0|             2.0|      0.0|
| 38|      HS-grad| Handlers-cleaners|   Male|           0|           0|            40|     0|            0.0|             8.0|      0.0|
| 53|         11th| Handlers-cleaners|   Male|           0|           0|            40|     0|            5.0|             8.0|      0.0|
| 28|    Bachelors|    Prof-specia

Vectoring the Numerical Columns

In [28]:
numerical_columns = ["age","capital_gain", "capital_loss", "hours_per_week"]

In [29]:
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in numerical_columns]

Standard Scaler Implementation

In [30]:
scaler = [StandardScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in numerical_columns]

Build a pipeling for indexing and scaling to perform on Numberical Columns

In [31]:
pipeline = Pipeline(stages=(assemblers+scaler))
preprocessed_data = pipeline.fit(preprocessed_data).transform(preprocessed_data)

preprocessed_data.show()

                                                                                

+---+-------------+------------------+-------+------------+------------+--------------+------+---------------+----------------+---------+-------+----------------+----------------+------------------+--------------------+--------------------+-------------------+---------------------+
|age|    education|        occupation|    sex|capital_gain|capital_loss|hours_per_week|salary|education_index|occupation_index|sex_index|age_vec|capital_gain_vec|capital_loss_vec|hours_per_week_vec|          age_scaled| capital_gain_scaled|capital_loss_scaled|hours_per_week_scaled|
+---+-------------+------------------+-------+------------+------------+--------------+------+---------------+----------------+---------+-------+----------------+----------------+------------------+--------------------+--------------------+-------------------+---------------------+
| 39|    Bachelors|      Adm-clerical|   Male|        2174|           0|            40|     0|            2.0|             3.0|      0.0| [39.0]|      

In [32]:
import pyspark.sql.functions as F

Replacing original columns with vector and scaled values

In [34]:
preprocessed_data = preprocessed_data.withColumn("education",F.col("education_index"))
preprocessed_data = preprocessed_data.withColumn("age",F.col("age_scaled"))
preprocessed_data = preprocessed_data.withColumn("occupation",F.col("occupation_index"))
preprocessed_data = preprocessed_data.withColumn("sex",F.col("sex_index"))
preprocessed_data = preprocessed_data.withColumn("capital_gain",F.col("capital_gain_scaled"))
preprocessed_data = preprocessed_data.withColumn("capital_loss",F.col("capital_loss_scaled"))
preprocessed_data = preprocessed_data.withColumn("hours-per_week",F.col("hours_per_week_scaled"))

preprocessed_data.show()

+--------------------+---------+----------+---+--------------------+------------+--------------+------+---------------+----------------+---------+-------+----------------+----------------+------------------+--------------------+--------------------+-------------------+---------------------+--------------------+
|                 age|education|occupation|sex|        capital_gain|capital_loss|hours_per_week|salary|education_index|occupation_index|sex_index|age_vec|capital_gain_vec|capital_loss_vec|hours_per_week_vec|          age_scaled| capital_gain_scaled|capital_loss_scaled|hours_per_week_scaled|      hours-per_week|
+--------------------+---------+----------+---+--------------------+------------+--------------+------+---------------+----------------+---------+-------+----------------+----------------+------------------+--------------------+--------------------+-------------------+---------------------+--------------------+
|[2.9729626401141958]|      2.0|       3.0|0.0|[0.28994926881

selecting specific columns for model building

In [36]:
columns = ["age","education","occupation","sex","capital_gain","capital_loss","hours_per_week","salary"]
person_df_one = preprocessed_data.select(*[col(c) for c in columns])
person_df_one.show()

+--------------------+---------+----------+---+--------------------+------------+--------------+------+
|                 age|education|occupation|sex|        capital_gain|capital_loss|hours_per_week|salary|
+--------------------+---------+----------+---+--------------------+------------+--------------+------+
|[2.9729626401141958]|      2.0|       3.0|0.0|[0.28994926881088...|       [0.0]|            40|     0|
| [3.811490564248969]|      2.0|       2.0|0.0|               [0.0]|       [0.0]|            13|     0|
|[2.8967328288292165]|      0.0|       8.0|0.0|               [0.0]|       [0.0]|            40|     0|
| [4.040179998103907]|      5.0|       8.0|0.0|               [0.0]|       [0.0]|            40|     0|
|[2.1344347159794226]|      2.0|       0.0|1.0|               [0.0]|       [0.0]|            40|     0|
| [2.820503017544237]|      3.0|       2.0|1.0|               [0.0]|       [0.0]|            40|     0|
|[3.7352607529639896]|     10.0|       5.0|1.0|               [0

In [37]:
columns = ["age","education","occupation","sex","hours_per_week","salary"]
person_df_two = preprocessed_data.select(*[col(c) for c in columns])
person_df_two.show()

+--------------------+---------+----------+---+--------------+------+
|                 age|education|occupation|sex|hours_per_week|salary|
+--------------------+---------+----------+---+--------------+------+
|[2.9729626401141958]|      2.0|       3.0|0.0|            40|     0|
| [3.811490564248969]|      2.0|       2.0|0.0|            13|     0|
|[2.8967328288292165]|      0.0|       8.0|0.0|            40|     0|
| [4.040179998103907]|      5.0|       8.0|0.0|            40|     0|
|[2.1344347159794226]|      2.0|       0.0|1.0|            40|     0|
| [2.820503017544237]|      3.0|       2.0|1.0|            40|     0|
|[3.7352607529639896]|     10.0|       5.0|1.0|            16|     0|
| [3.963950186818928]|      0.0|       2.0|0.0|            45|     1|
| [2.363124149834361]|      3.0|       0.0|1.0|            50|     1|
| [3.201652073969134]|      2.0|       2.0|0.0|            40|     1|
| [2.820503017544237]|      1.0|       2.0|0.0|            80|     1|
|[2.2868943385493816

# Model Building

Spliting dataset into train and test data

In [38]:
# Importing Pipeline and Model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
assembler = VectorAssembler(inputCols=['age', 'education', 'occupation', 'sex', 'capital_gain', 'capital_loss', 'hours_per_week'],
                                    outputCol='features')
assembler_data = assembler.transform(person_df_one)


In [39]:
assembler_data.show()

+--------------------+---------+----------+---+--------------------+------------+--------------+------+--------------------+
|                 age|education|occupation|sex|        capital_gain|capital_loss|hours_per_week|salary|            features|
+--------------------+---------+----------+---+--------------------+------------+--------------+------+--------------------+
|[2.9729626401141958]|      2.0|       3.0|0.0|[0.28994926881088...|       [0.0]|            40|     0|[2.97296264011419...|
| [3.811490564248969]|      2.0|       2.0|0.0|               [0.0]|       [0.0]|            13|     0|[3.81149056424896...|
|[2.8967328288292165]|      0.0|       8.0|0.0|               [0.0]|       [0.0]|            40|     0|(7,[0,2,6],[2.896...|
| [4.040179998103907]|      5.0|       8.0|0.0|               [0.0]|       [0.0]|            40|     0|[4.04017999810390...|
|[2.1344347159794226]|      2.0|       0.0|1.0|               [0.0]|       [0.0]|            40|     0|[2.13443471597942...|


In [40]:
train_data,test_data = assembler_data.randomSplit([0.8,0.2],seed=42)

In [41]:
train_data.show()

[Stage 53:>                                                         (0 + 1) / 1]

+--------------------+---------+----------+---+--------------------+------------+--------------+------+--------------------+
|                 age|education|occupation|sex|        capital_gain|capital_loss|hours_per_week|salary|            features|
+--------------------+---------+----------+---+--------------------+------------+--------------+------+--------------------+
|[1.2959067918446494]|      0.0|       3.0|0.0|               [0.0]|       [0.0]|            20|     0|(7,[0,2,6],[1.295...|
|[1.2959067918446494]|      0.0|       4.0|1.0|               [0.0]|       [0.0]|            20|     0|[1.29590679184464...|
|[1.2959067918446494]|      0.0|       5.0|0.0|               [0.0]|       [0.0]|            35|     0|(7,[0,2,6],[1.295...|
|[1.2959067918446494]|      0.0|       5.0|0.0|               [0.0]|       [0.0]|            40|     0|(7,[0,2,6],[1.295...|
|[1.2959067918446494]|      0.0|       9.0|0.0|               [0.0]|       [0.0]|            40|     0|(7,[0,2,6],[1.295...|


                                                                                

# Training the model

Using Logistic Regression

In [42]:
from pyspark.ml.classification import LogisticRegression

Implementing Logistic regression by passing features and label

In [43]:
classification = LogisticRegression(featuresCol="features",labelCol="salary")

classification_model = classification.fit(train_data)

result_data = classification_model.transform(test_data)

result_data.show()
 

                                                                                

+--------------------+---------+----------+---+--------------------+------------+--------------+------+--------------------+--------------------+--------------------+----------+
|                 age|education|occupation|sex|        capital_gain|capital_loss|hours_per_week|salary|            features|       rawPrediction|         probability|prediction|
+--------------------+---------+----------+---+--------------------+------------+--------------+------+--------------------+--------------------+--------------------+----------+
|[1.2959067918446494]|      0.0|       4.0|1.0|               [0.0]|       [0.0]|            20|     0|[1.29590679184464...|[4.11600031867852...|[0.98395211739413...|       0.0|
|[1.2959067918446494]|      1.0|       1.0|0.0|               [0.0]|       [0.0]|            16|     0|[1.29590679184464...|[2.72600616617521...|[0.93854387884635...|       0.0|
|[1.2959067918446494]|      5.0|       0.0|0.0|               [0.0]|       [0.0]|            15|     0|(7,[0,1

                                                                                

# Performance Matrices

ROC_AUC

In [44]:
# Importing the evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
  
# Calling the evaluator
res = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='salary')
  
# Evaluating the AUC on results
ROC_AUC = res.evaluate(result_data)

print(ROC_AUC)

                                                                                

0.6457431051426803


F1 Score

In [45]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Convert predicted labels and actual labels to the required format (Double type)
predictions = result_data.withColumn("prediction", result_data["prediction"].cast("double"))
predictions = result_data.withColumn("label", result_data["salary"].cast("double"))

# Evaluate the F1 score

evaluator = MulticlassClassificationEvaluator(metricName="f1")
f1_score = evaluator.evaluate(predictions)

print("F1 Score:", f1_score)



F1 Score: 0.7734794174649425


                                                                                

Accuracy

In [46]:
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy:", accuracy)



Accuracy: 0.8011466011466012


                                                                                

In [None]:
spark.stop()