In [1]:
from sys import argv
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

sc = SparkSession.builder.appName("LogisticRegression").getOrCreate()

In [2]:
input_file = "./data/USCensus1990.data.txt"
data_raw = sc.read.csv(input_file, inferSchema = True, header = True)

In [3]:
print("how many rows in original data: ", data_raw.count())

how many rows in original data:  2458285


In [4]:
data_raw.printSchema()

root
 |-- caseid: integer (nullable = true)
 |-- dAge: integer (nullable = true)
 |-- dAncstry1: integer (nullable = true)
 |-- dAncstry2: integer (nullable = true)
 |-- iAvail: integer (nullable = true)
 |-- iCitizen: integer (nullable = true)
 |-- iClass: integer (nullable = true)
 |-- dDepart: integer (nullable = true)
 |-- iDisabl1: integer (nullable = true)
 |-- iDisabl2: integer (nullable = true)
 |-- iEnglish: integer (nullable = true)
 |-- iFeb55: integer (nullable = true)
 |-- iFertil: integer (nullable = true)
 |-- dHispanic: integer (nullable = true)
 |-- dHour89: integer (nullable = true)
 |-- dHours: integer (nullable = true)
 |-- iImmigr: integer (nullable = true)
 |-- dIncome1: integer (nullable = true)
 |-- dIncome2: integer (nullable = true)
 |-- dIncome3: integer (nullable = true)
 |-- dIncome4: integer (nullable = true)
 |-- dIncome5: integer (nullable = true)
 |-- dIncome6: integer (nullable = true)
 |-- dIncome7: integer (nullable = true)
 |-- dIncome8: integer (nu

In [5]:
data_df = data_raw.filter(data_raw.iSex.isNotNull())

In [6]:
from pyspark.ml.feature import VectorAssembler

cols = data_df.columns
excluded = set(['iSex', 'caseid'])
vec_cols = [s for s in cols if s not in excluded]
assembler = VectorAssembler(inputCols = vec_cols, outputCol = "features")

data_df = assembler.transform(data_df)
data_df = data_df.select("features", 'iSex')

In [7]:
data_df.show(1)

+--------------------+----+
|            features|iSex|
+--------------------+----+
|(67,[0,2,5,6,7,8,...|   1|
+--------------------+----+
only showing top 1 row



In [None]:
# Now transform our dataset using VectorAssembler

In [8]:
from pyspark.sql.types import IntegerType

data_df = data_df.withColumn("label", data_df["iSex"].cast(IntegerType()))
data_df = data_df.select("features", "label")

In [9]:
data_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)



In [10]:
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql.types import *
import numpy as np

## create a dataframe for w, otherwise no way to compute gradient
v = [tuple(0.0 for i in range(len(vec_cols)))]
w_cols = list(['v'+str(i) for i in range(len(vec_cols))])
df_w = sc.createDataFrame(v, w_cols)
vecAssembler = VectorAssembler(inputCols=w_cols, outputCol="w_vec")
df_w = vecAssembler.transform(df_w)
df_w = df_w.select("w_vec")
w = Vectors.dense(df_w.take(1)[0]["w_vec"])

def gradient(xi, yi):
  dotprod = xi.dot(w)
  prob = 1.0/(1.0 + np.exp(-1.0*dotprod*yi))
  return Vector.dense((prob - 1.0)*yi*xi)
dot_prod_udf = F.udf(lambda x,y: gradient(x,y), VectorUDT())

data_prod = data_df.withColumn('dot_prod', dot_prod_udf(F.array('features'), F.col('label')))

In [11]:
from pyspark.ml.feature import StandardScaler as scaler

Densify = F.udf(lambda s: DenseVector(s.toArray()), VectorUDT()) 
data_dense = data_df.withColumn('feature_vector', Densify(F.col('features')))

standardscaler = scaler().setInputCol("feature_vector").setOutputCol("scaled_features")
data = standardscaler.fit(data_dense).transform(data_dense)
data.show(5)

+--------------------+-----+--------------------+--------------------+
|            features|label|      feature_vector|     scaled_features|
+--------------------+-----+--------------------+--------------------+
|(67,[0,2,5,6,7,8,...|    1|[5.0,0.0,1.0,0.0,...|[2.44082028870012...|
|(67,[0,1,2,5,6,7,...|    1|[6.0,1.0,1.0,0.0,...|[2.92898434644015...|
|(67,[0,1,2,5,6,7,...|    1|[3.0,1.0,2.0,0.0,...|[1.46449217322007...|
|(67,[0,1,2,5,6,7,...|    1|[4.0,1.0,2.0,0.0,...|[1.95265623096010...|
|(67,[0,1,2,7,8,11...|    1|[7.0,1.0,1.0,0.0,...|[3.41714840418018...|
+--------------------+-----+--------------------+--------------------+
only showing top 5 rows



In [12]:
data = data.select("scaled_features", "label")

train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

In [13]:
from pyspark.ml.classification import LogisticRegression
import time
start = time.time()

lr = LogisticRegression(labelCol="label", featuresCol="scaled_features", maxIter=100)
model = lr.fit(train_data)
predict_data = model.transform(test_data)
print(f"Training on {int(2458285*0.8)} data points with Spark Logistic Regression takes: {float(time.time()-start)} seconds")


Training on 1966628 data points with Spark Logistic Regression takes: 191.67984890937805 seconds


In [14]:
predict_data.select("label","prediction").show(10)

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 10 rows



In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label")
predict_data.select("scaled_features","label","prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_data)))

+--------------------+-----+----------+--------------------+
|     scaled_features|label|prediction|         probability|
+--------------------+-----+----------+--------------------+
|[0.0,0.0,0.589570...|    0|       0.0|[0.99999999999993...|
|[0.0,0.0,0.589570...|    0|       0.0|[0.99999999999993...|
|[0.0,0.0,0.589570...|    0|       0.0|[0.99999999999993...|
|[0.0,0.0,0.589570...|    0|       0.0|[0.99999999999190...|
|[0.0,0.0,0.589570...|    0|       0.0|[0.99999999999190...|
+--------------------+-----+----------+--------------------+
only showing top 5 rows

The area under ROC for train set is 1.0
