In [1]:
# Logistic Regression Example by Spark/PySpark

# source: 
#   building model: https://docs.databricks.com/_static/notebooks/binary-classification.html
#   data: https://archive.ics.uci.edu/ml/datasets/Adult

# import SparkSession
# SparkSession : The entry point to programming Spark with the Dataset and DataFrame API.
# A SparkSession can be used create DataFrame, register DataFrame as tables, execute 
# SQL over tables, cache tables, and read parquet files. To create a SparkSession, use 
# the following builder pattern:
"""
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
"""

from pyspark.sql import SparkSession

In [2]:
spark

In [3]:
col_names = ['age', 'workclass', 'fnlwgt', 'education', 
             'education_num', 'marital_status', 'occupation', 
             'relationship', 'race', 'sex', 'capital_gain', 
             'capital_loss', 'hours_per_week',
             'native_country', 'income']
col_names

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'income']

In [4]:
input_path = './data/adult/adult.data'

# .read => denotes DataFrameReader
# read data and create a data set
dataset = spark.read\
   .format("csv")\
   .option("header","true")\
   .option("inferSchema", "true")\
   .load(input_path)\

dataset.show(truncate=False)

+---+-----------------+--------+-------------+-------------+----------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|workclass        |fnlwgt  |education    |education_num|marital_status        |occupation        |relationship  |race               |sex    |capital_gain|capital_loss|hours_per_week|native_country|income|
+---+-----------------+--------+-------------+-------------+----------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|39 | State-gov       |77516.0 | Bachelors   |13.0         | Never-married        | Adm-clerical     | Not-in-family| White             | Male  |2174.0      |0.0         |40.0          | United-States| <=50K|
|50 | Self-emp-not-inc|83311.0 | Bachelors   |13.0         | Married-civ-spouse   | Exec-managerial  | Husband      | White             | Male  |0.0         |0.0   

In [5]:
dataset.select('age', 'workclass', 'fnlwgt', 'education', 'income').show(truncate=False)

+---+-----------------+--------+-------------+------+
|age|workclass        |fnlwgt  |education    |income|
+---+-----------------+--------+-------------+------+
|39 | State-gov       |77516.0 | Bachelors   | <=50K|
|50 | Self-emp-not-inc|83311.0 | Bachelors   | <=50K|
|38 | Private         |215646.0| HS-grad     | <=50K|
|53 | Private         |234721.0| 11th        | <=50K|
|28 | Private         |338409.0| Bachelors   | <=50K|
|37 | Private         |284582.0| Masters     | <=50K|
|49 | Private         |160187.0| 9th         | <=50K|
|52 | Self-emp-not-inc|209642.0| HS-grad     | >50K |
|31 | Private         |45781.0 | Masters     | >50K |
|42 | Private         |159449.0| Bachelors   | >50K |
|37 | Private         |280464.0| Some-college| >50K |
|30 | State-gov       |141297.0| Bachelors   | >50K |
|23 | Private         |122272.0| Bachelors   | <=50K|
|32 | Private         |205019.0| Assoc-acdm  | <=50K|
|40 | Private         |121772.0| Assoc-voc   | >50K |
|34 | Private         |24548

In [6]:
cols = dataset.columns
cols

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'income']

In [7]:
dataset.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



In [8]:
"""
Preprocess Data
Since we are going to try algorithms like Logistic Regression, 
we will have to convert the categorical variables in the dataset 
into numeric variables. There are 2 ways we can do this.

Category Indexing

    This is basically assigning a numeric value to each category 
    from {0, 1, 2, ...numCategories-1}. This introduces an implicit 
    ordering among your categories, and is more suitable for ordinal 
    variables (eg: Poor: 0, Average: 1, Good: 2)

One-Hot Encoding

    This converts categories into binary vectors with at most one 
    nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

In this dataset, we have ordinal variables like education (Preschool - Doctorate), 
and also nominal variables like relationship (Wife, Husband, Own-child, etc). For 
simplicity's sake, we will use One-Hot Encoding to convert all categorical variables 
into binary vectors. It is possible here to improve prediction accuracy by converting 
each categorical column with an appropriate method.

Here, we will use a combination of StringIndexer and OneHotEncoderEstimator 
to convert the categorical variables. The OneHotEncoderEstimator will return 
a SparseVector.

Since we will have more than 1 stage of feature transformations, we use a 
Pipeline to tie the stages together. This simplifies our code.

"""

"\nPreprocess Data\nSince we are going to try algorithms like Logistic Regression, \nwe will have to convert the categorical variables in the dataset \ninto numeric variables. There are 2 ways we can do this.\n\nCategory Indexing\n\n    This is basically assigning a numeric value to each category \n    from {0, 1, 2, ...numCategories-1}. This introduces an implicit \n    ordering among your categories, and is more suitable for ordinal \n    variables (eg: Poor: 0, Average: 1, Good: 2)\n\nOne-Hot Encoding\n\n    This converts categories into binary vectors with at most one \n    nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))\n\nIn this dataset, we have ordinal variables like education (Preschool - Doctorate), \nand also nominal variables like relationship (Wife, Husband, Own-child, etc). For \nsimplicity's sake, we will use One-Hot Encoding to convert all categorical variables \ninto binary vectors. It is possible here to improve prediction accuracy by converting \ne

In [9]:
# The following code basically indexes each categorical column 
# using the StringIndexer, and then converts the indexed categories 
# into one-hot encoded variables. The resulting output has the 
# binary vectors appended to the end of each row.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [10]:
stages

[StringIndexer_e69acebe7c8d,
 OneHotEncoderEstimator_44a1923b2750,
 StringIndexer_186aa27ad3e6,
 OneHotEncoderEstimator_323a17d47218,
 StringIndexer_095873925c46,
 OneHotEncoderEstimator_8891a1c97166,
 StringIndexer_f6e12eb123b1,
 OneHotEncoderEstimator_cf1c1f9f1dc6,
 StringIndexer_902b659754ee,
 OneHotEncoderEstimator_93da782a5647,
 StringIndexer_2c47e0bdb71f,
 OneHotEncoderEstimator_1718879f4211,
 StringIndexer_544c9e5450fc,
 OneHotEncoderEstimator_ae02f827ddd5,
 StringIndexer_8ce2a228156a,
 OneHotEncoderEstimator_c55aeb978323]

In [11]:
# We use the StringIndexer again to encode our labels to label indices.
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

In [12]:
stages

[StringIndexer_e69acebe7c8d,
 OneHotEncoderEstimator_44a1923b2750,
 StringIndexer_186aa27ad3e6,
 OneHotEncoderEstimator_323a17d47218,
 StringIndexer_095873925c46,
 OneHotEncoderEstimator_8891a1c97166,
 StringIndexer_f6e12eb123b1,
 OneHotEncoderEstimator_cf1c1f9f1dc6,
 StringIndexer_902b659754ee,
 OneHotEncoderEstimator_93da782a5647,
 StringIndexer_2c47e0bdb71f,
 OneHotEncoderEstimator_1718879f4211,
 StringIndexer_544c9e5450fc,
 OneHotEncoderEstimator_ae02f827ddd5,
 StringIndexer_8ce2a228156a,
 OneHotEncoderEstimator_c55aeb978323,
 StringIndexer_1ff91a1af058]

In [13]:
# Use a VectorAssembler to combine all the feature columns into a 
# single vector column. This includes both the numeric columns and 
# the one-hot encoded binary vector columns in our dataset.

# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


In [14]:
stages

[StringIndexer_e69acebe7c8d,
 OneHotEncoderEstimator_44a1923b2750,
 StringIndexer_186aa27ad3e6,
 OneHotEncoderEstimator_323a17d47218,
 StringIndexer_095873925c46,
 OneHotEncoderEstimator_8891a1c97166,
 StringIndexer_f6e12eb123b1,
 OneHotEncoderEstimator_cf1c1f9f1dc6,
 StringIndexer_902b659754ee,
 OneHotEncoderEstimator_93da782a5647,
 StringIndexer_2c47e0bdb71f,
 OneHotEncoderEstimator_1718879f4211,
 StringIndexer_544c9e5450fc,
 OneHotEncoderEstimator_ae02f827ddd5,
 StringIndexer_8ce2a228156a,
 OneHotEncoderEstimator_c55aeb978323,
 StringIndexer_1ff91a1af058,
 VectorAssembler_f1cfbc5faa40]

In [15]:
# Run the stages as a Pipeline. 
# This puts the data through all of the feature transformations we described in a single call.

from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

In [16]:
preppedDataDF

DataFrame[age: int, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string, workclassIndex: double, workclassclassVec: vector, educationIndex: double, educationclassVec: vector, marital_statusIndex: double, marital_statusclassVec: vector, occupationIndex: double, occupationclassVec: vector, relationshipIndex: double, relationshipclassVec: vector, raceIndex: double, raceclassVec: vector, sexIndex: double, sexclassVec: vector, native_countryIndex: double, native_countryclassVec: vector, label: double, features: vector]

In [17]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

LogisticRegressionModel: uid = LogisticRegression_649fa0f39dff, numClasses = 2, numFeatures = 100

DataFrame[age: int, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string, workclassIndex: double, workclassclassVec: vector, educationIndex: double, educationclassVec: vector, marital_statusIndex: double, marital_statusclassVec: vector, occupationIndex: double, occupationclassVec: vector, relationshipIndex: double, relationshipclassVec: vector, raceIndex: double, raceclassVec: vector, sexIndex: double, sexclassVec: vector, native_countryIndex: double, native_countryclassVec: vector, label: double, features: vector]

'ROC'

In [18]:
display(lrModel, preppedDataDF)

LogisticRegressionModel: uid = LogisticRegression_649fa0f39dff, numClasses = 2, numFeatures = 100

DataFrame[age: int, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string, workclassIndex: double, workclassclassVec: vector, educationIndex: double, educationclassVec: vector, marital_statusIndex: double, marital_statusclassVec: vector, occupationIndex: double, occupationclassVec: vector, relationshipIndex: double, relationshipclassVec: vector, raceIndex: double, raceclassVec: vector, sexIndex: double, sexclassVec: vector, native_countryIndex: double, native_countryclassVec: vector, label: double, features: vector]

In [19]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)


DataFrame[label: double, features: vector, age: int, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string]

In [20]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

22838
9723


In [21]:
# Logistic Regression

# You can read more about Logistic Regression from the classification and 
# regression section of MLlib Programming Guide. In the Pipelines API, 
# we are now able to perform Elastic-Net Regularization with Logistic Regression, 
# as well as other linear methods.

In [22]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)


In [23]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [24]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)


DataFrame[label: double, prediction: double, probability: vector, age: int, occupation: string]

In [25]:
selected.show(truncate=False)

+-----+----------+----------------------------------------+---+---------------+
|label|prediction|probability                             |age|occupation     |
+-----+----------+----------------------------------------+---+---------------+
|0.0  |0.0       |[0.6923453279519433,0.3076546720480568] |26 | Prof-specialty|
|0.0  |0.0       |[0.6211553145296423,0.37884468547035766]|30 | Prof-specialty|
|0.0  |0.0       |[0.6584529417752994,0.3415470582247006] |31 | Prof-specialty|
|0.0  |0.0       |[0.6582662002284263,0.34173379977157375]|32 | Prof-specialty|
|0.0  |0.0       |[0.6150342380572296,0.3849657619427704] |39 | Prof-specialty|
|0.0  |0.0       |[0.5398608213408302,0.46013917865916976]|47 | Prof-specialty|
|0.0  |0.0       |[0.6004473238924508,0.39955267610754913]|50 | Prof-specialty|
|0.0  |0.0       |[0.589862498197214,0.41013750180278596] |51 | Prof-specialty|
|0.0  |0.0       |[0.5824187454881695,0.4175812545118305] |60 | Prof-specialty|
|0.0  |0.0       |[0.5920153502136111,0.

In [26]:
from pyspark.sql.functions import col
selected.filter(col("label") == 1.0).show(10, truncate=False)

+-----+----------+----------------------------------------+---+---------------+
|label|prediction|probability                             |age|occupation     |
+-----+----------+----------------------------------------+---+---------------+
|1.0  |1.0       |[0.2535583150731027,0.7464416849268972] |41 | Prof-specialty|
|1.0  |1.0       |[0.20737798172699914,0.7926220182730008]|42 | Prof-specialty|
|1.0  |1.0       |[0.22291188767425668,0.7770881123257433]|52 | Prof-specialty|
|1.0  |1.0       |[0.4053769739216828,0.5946230260783171] |40 | Prof-specialty|
|1.0  |0.0       |[0.7074755052614815,0.2925244947385185] |29 | Prof-specialty|
|1.0  |0.0       |[0.622138267748489,0.37786173225151093] |37 | Prof-specialty|
|1.0  |0.0       |[0.5759428639453484,0.4240571360546516] |39 | Prof-specialty|
|1.0  |0.0       |[0.5373214942301673,0.4626785057698326] |55 | Prof-specialty|
|1.0  |1.0       |[0.38970819989569183,0.6102918001043083]|34 | Prof-specialty|
|1.0  |1.0       |[0.28794378401126286,0

In [27]:
# Evaluate Model

# We can use BinaryClassificationEvaluator to evaluate our model. 
# We can set the required column names in rawPredictionCol and 
# labelCol Param and the metric in metricName Param.

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.9014206228690932

In [28]:
# Note that the default metric for the BinaryClassificationEvaluator is areaUnderROC

evaluator.getMetricName()

'areaUnderROC'