In [32]:
import pandas as pd
import numpy as np
import os

Reference: https://dhiraj-p-rai.medium.com/logistic-regression-in-spark-ml-8a95b5f5434c

In [33]:
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import when

In [34]:
spark = pyspark.sql.SparkSession.builder.master('local[2]').appName('H1B-3').getOrCreate()
training_data = spark.read.csv('../DATA/training_downsampling.csv', header = True, inferSchema = True)



In [35]:
test_data = spark.read.csv(
    '../DATA/test_downsampling.csv', header=True, inferSchema=True)




In [36]:
training_data = training_data.drop(col('_c0'))
test_data = test_data.drop(col('_c0'))


In [37]:
cols = training_data.columns
cols.remove('EMPLOYER_NAME')

In [38]:
training_data = training_data.drop(col('EMPLOYER_NAME'))
training_data = training_data.withColumn("CASE_DURATION",col("CASE_DURATION").cast("int"))
training_data = training_data.withColumn("FULL_TIME_POSITION",col("FULL_TIME_POSITION").cast("int"))
test_data = test_data.drop(col('EMPLOYER_NAME'))
test_data = test_data.withColumn(
    "CASE_DURATION", col("CASE_DURATION").cast("int"))
test_data = test_data.withColumn(
    "FULL_TIME_POSITION", col("FULL_TIME_POSITION").cast("int"))


In [39]:
from pyspark.ml.feature import VectorAssembler
try:
  cols.remove('CASE_STATUS')
except:
  pass
assembler = VectorAssembler(inputCols=cols,outputCol="features", handleInvalid='skip')


In [40]:
len(np.unique(training_data.columns))

421

In [41]:
training_data = assembler.transform(training_data)
test_data = assembler.transform(test_data)


In [42]:
training_data.select("features").show()

+--------------------+
|            features|
+--------------------+
|(420,[49,63,159,2...|
|(420,[9,64,159,19...|
|(420,[38,64,159,2...|
|(420,[7,64,159,19...|
|(420,[16,65,159,1...|
|(420,[39,73,159,2...|
|(420,[54,65,159,2...|
|(420,[4,64,159,16...|
|(420,[33,64,159,1...|
|(420,[37,64,159,2...|
|(420,[6,64,159,16...|
|(420,[4,64,159,16...|
|(420,[3,64,159,18...|
|(420,[42,60,159,2...|
|(420,[22,69,159,2...|
|(420,[16,64,159,1...|
|(420,[4,64,159,16...|
|(420,[49,64,159,2...|
|(420,[35,65,159,1...|
|(420,[4,62,159,16...|
+--------------------+
only showing top 20 rows



In [43]:
training_data = training_data.withColumn("CASE_STATUS",
                                         when(training_data.CASE_STATUS == 'CERTIFIED',1)
                                         .otherwise(0))
test_data = test_data.withColumn("CASE_STATUS",
                                         when(test_data.CASE_STATUS ==
                                              'CERTIFIED', 1)
                                         .otherwise(0))


In [44]:
from pyspark.ml.classification import LogisticRegression

In [45]:
lr = LogisticRegression(labelCol = 'CASE_STATUS',
                        featuresCol = 'features',
                        maxIter=1000, regParam=0, elasticNetParam=0.5).setFamily("binomial")

In [46]:
model = lr.fit(training_data)



In [47]:
predict_train = model.transform(training_data)
predict_test = model.transform(test_data)


In [48]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='CASE_STATUS', probabilityCol = 'probability', metricName='f1')
predict_test.printSchema()


root
 |-- AK: integer (nullable = true)
 |-- AL: integer (nullable = true)
 |-- AR: integer (nullable = true)
 |-- AZ: integer (nullable = true)
 |-- CA: integer (nullable = true)
 |-- CO: integer (nullable = true)
 |-- CT: integer (nullable = true)
 |-- DC: integer (nullable = true)
 |-- DE: integer (nullable = true)
 |-- FL: integer (nullable = true)
 |-- FM: integer (nullable = true)
 |-- GA: integer (nullable = true)
 |-- GU: integer (nullable = true)
 |-- HI: integer (nullable = true)
 |-- IA: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- IL: integer (nullable = true)
 |-- IN: integer (nullable = true)
 |-- KS: integer (nullable = true)
 |-- KY: integer (nullable = true)
 |-- LA: integer (nullable = true)
 |-- MA: integer (nullable = true)
 |-- MD: integer (nullable = true)
 |-- ME: integer (nullable = true)
 |-- MH: integer (nullable = true)
 |-- MI: integer (nullable = true)
 |-- MN: integer (nullable = true)
 |-- MO: integer (nullable = true)
 |-- MP: intege

In [49]:
evaluator.evaluate(predict_test)

In [None]:
evaluator.evaluate(predict_train)



0.6783501545289743

In [None]:
test_prediction = np.array(predict_test.select('prediction').collect())
train_prediction = np.array(predict_train.select('prediction').collect())
test_label = np.array(predict_test.select('CASE_STATUS').collect())
train_label = np.array(predict_train.select('CASE_STATUS').collect())




In [None]:
from sklearn.metrics import f1_score, balanced_accuracy_score, accuracy_score
print(f1_score(test_label, test_prediction, average='macro'),
      f1_score(test_label, test_prediction, average='micro'))
print(balanced_accuracy_score(test_label, test_prediction),
      accuracy_score(test_label, test_prediction))


0.4838475589439145 0.7679000177952993
0.6711614372331238 0.7679000177952993


In [None]:
print(f1_score(train_label, train_prediction, average='macro'),
      f1_score(train_label, train_prediction, average='micro'))
print(balanced_accuracy_score(train_label, train_prediction),
      accuracy_score(train_label, train_prediction))


0.6783507883296468 0.6751457232621807
0.6751459652334784 0.6751457232621807


In [None]:
f1_score(train_label, train_prediction, average=None), f1_score(
    test_label, test_prediction, average=None)


(array([0.73210691, 0.63357783, 0.73925971, 0.60845869]),
 array([0.30379162, 0.864018  , 0.52702079, 0.24055983]))