# Creating a ML Model with Apache MLlib

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression

In [0]:
# Pulling a dataset
from pyspark.sql.types import DoubleType, StringType, StructField, StructType
 
schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])
 
adults = spark.read.format("csv").schema(schema).load("/databricks-datasets/adult/adult.data")


In [0]:
display(adults)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
39.0,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


Databricks visualization. Run in Databricks to view.

In [0]:
adults = (
  adults
  .filter( (col('workclass') != ' ?') &
          (col('education') != ' ?') &
          (col('marital_status') != ' ?') &
          (col('occupation') != ' ?') &
          (col('relationship') != ' ?') &
          (col('race') != ' ?') &
          (col('sex') != ' ?') &
          (col('native_country') != ' ?')
          )
)

In [0]:
# Adults cat only
adults_cat = adults.select('workclass', 'education', 'marital_status', 'occupation', 'relationship','race', 'sex','native_country', 'income')

# Creating a vector out of our dataset to be able to use Univariate Feature Selector
formula=RFormula(formula= "income ~ workclass + education + marital_status + occupation + relationship + race + sex + native_country", 
                 featuresCol= "features", labelCol= "label")
vector_df = formula.fit(adults_cat).transform(adults_cat)

selector = UnivariateFeatureSelector(featuresCol='features', outputCol="selectedFeatures", labelCol= 'label')
selector.setFeatureType("categorical").setLabelType("categorical").setSelectionThreshold(4)
model = selector.fit(vector_df)
#model.selectedFeatures
print('Selected Features - Categorical')
print([name for i,name in enumerate(vector_df.columns) if i in model.selectedFeatures])

In [0]:
# View Vectorized table
#vector_df.select("features", "label").show(truncate=False)

In [0]:
# Adults numerical only
adults_num = adults.select('age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week','income')

# Creating a vector out of our dataset to be able to use Univariate Feature Selector
formula=RFormula(formula= "income ~ age + fnlwgt + education_num + capital_gain + capital_loss + hours_per_week", featuresCol= "features", labelCol= "label")
vector_df = formula.fit(adults_num).transform(adults_num)

# # Using Variable selector for Feature Engineering (num)
selector = UnivariateFeatureSelector(featuresCol='features', outputCol="selectedFeatures", labelCol= 'label')
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(6)
model = selector.fit(vector_df)
#model.selectedFeatures
print('Selected Features - Numerical')
print([name for i,name in enumerate(vector_df.columns) if i in model.selectedFeatures])

In [0]:
df_sel = adults.select('workclass', 'occupation', 'race', 'sex', 'age', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week', 'fnlwgt', 'income')

In [0]:
df_sel.groupBy('income').count().show()

In [0]:
# Columns to transform
cat_cols = ['workclass', 'occupation', 'race', 'sex']

# List of stages for Pipeline
stages = []

for column in cat_cols:
    # Instance encoding with StringIndexer
    stringIndexer = StringIndexer(inputCol=column, outputCol=column + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[column + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]


In [0]:
# Convert label income into label indices using the StringIndexer
label_encode = StringIndexer(inputCol="income", outputCol="label")

# Add to the Pipeline stages
stages += [label_encode]

In [0]:
# Transform all features into a vector using VectorAssembler
#num_cols = ['age', 'education_num', 'capital_gain', 'capital_loss']
num_cols = ['age', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week', 'fnlwgt']
assembler_cols = [c + "classVec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]

In [0]:
pipe = Pipeline().setStages(stages)
pipe_model = pipe.fit(df_sel)
prepared_df = pipe_model.transform(df_sel)

In [0]:
display(prepared_df.limit(5))

workclass,occupation,race,sex,age,education_num,capital_gain,capital_loss,income,workclassIndex,workclassclassVec,occupationIndex,occupationclassVec,raceIndex,raceclassVec,sexIndex,sexclassVec,label,features
State-gov,Adm-clerical,White,Male,39.0,13.0,2174.0,0.0,<=50K,3.0,"Map(vectorType -> sparse, length -> 6, indices -> List(3), values -> List(1.0))",3.0,"Map(vectorType -> sparse, length -> 13, indices -> List(3), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 28, indices -> List(3, 9, 19, 23, 24, 25, 26), values -> List(1.0, 1.0, 1.0, 1.0, 39.0, 13.0, 2174.0))"
Self-emp-not-inc,Exec-managerial,White,Male,50.0,13.0,0.0,0.0,<=50K,1.0,"Map(vectorType -> sparse, length -> 6, indices -> List(1), values -> List(1.0))",2.0,"Map(vectorType -> sparse, length -> 13, indices -> List(2), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 28, indices -> List(1, 8, 19, 23, 24, 25), values -> List(1.0, 1.0, 1.0, 1.0, 50.0, 13.0))"
Private,Handlers-cleaners,White,Male,38.0,9.0,0.0,0.0,<=50K,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(0), values -> List(1.0))",8.0,"Map(vectorType -> sparse, length -> 13, indices -> List(8), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 4, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 28, indices -> List(0, 14, 19, 23, 24, 25), values -> List(1.0, 1.0, 1.0, 1.0, 38.0, 9.0))"
Private,Handlers-cleaners,Black,Male,53.0,7.0,0.0,0.0,<=50K,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(0), values -> List(1.0))",8.0,"Map(vectorType -> sparse, length -> 13, indices -> List(8), values -> List(1.0))",1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(1), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 28, indices -> List(0, 14, 20, 23, 24, 25), values -> List(1.0, 1.0, 1.0, 1.0, 53.0, 7.0))"
Private,Prof-specialty,Black,Female,28.0,13.0,0.0,0.0,<=50K,0.0,"Map(vectorType -> sparse, length -> 6, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 13, indices -> List(0), values -> List(1.0))",1.0,"Map(vectorType -> sparse, length -> 4, indices -> List(1), values -> List(1.0))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())",0.0,"Map(vectorType -> sparse, length -> 28, indices -> List(0, 6, 20, 24, 25), values -> List(1.0, 1.0, 1.0, 28.0, 13.0))"


In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
(train, test) = prepared_df.randomSplit([0.7, 0.3], seed=42)
print(train.count())
print(test.count())

In [0]:
display(train.limit(2))

workclass,occupation,race,sex,age,education_num,capital_gain,capital_loss,income,workclassIndex,workclassclassVec,occupationIndex,occupationclassVec,raceIndex,raceclassVec,sexIndex,sexclassVec,label,features
Federal-gov,Adm-clerical,Amer-Indian-Eskimo,Male,46.0,9.0,0.0,0.0,<=50K,5.0,"Map(vectorType -> sparse, length -> 6, indices -> List(5), values -> List(1.0))",3.0,"Map(vectorType -> sparse, length -> 13, indices -> List(3), values -> List(1.0))",3.0,"Map(vectorType -> sparse, length -> 4, indices -> List(3), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))",0.0,"Map(vectorType -> sparse, length -> 28, indices -> List(5, 9, 22, 23, 24, 25), values -> List(1.0, 1.0, 1.0, 1.0, 46.0, 9.0))"
Federal-gov,Adm-clerical,Asian-Pac-Islander,Female,33.0,10.0,0.0,0.0,<=50K,5.0,"Map(vectorType -> sparse, length -> 6, indices -> List(5), values -> List(1.0))",3.0,"Map(vectorType -> sparse, length -> 13, indices -> List(3), values -> List(1.0))",2.0,"Map(vectorType -> sparse, length -> 4, indices -> List(2), values -> List(1.0))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())",0.0,"Map(vectorType -> sparse, length -> 28, indices -> List(5, 9, 21, 24, 25), values -> List(1.0, 1.0, 1.0, 33.0, 10.0))"


In [0]:
# Fit model to prepped data
lrModel = LogisticRegression(labelCol= 'label', featuresCol='features', maxIter= 10).fit(train)
 
# predictions
predictions = lrModel.transform(test)
preds = predictions.select("label", "prediction", "probability")
# ROC for training data
#display(lrModel, prepared_df, "ROC")
display(preds)

label,prediction,probability
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8328634130167307, 0.16713658698326928))"
1.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.11583081826998315, 0.8841691817300168))"
0.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.25763857736692686, 0.7423614226330731))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6008341821883177, 0.39916581781168226))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9646372946761301, 0.03536270532386987))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9646372946761301, 0.03536270532386987))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9704265486897544, 0.02957345131024558))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9159830162414963, 0.08401698375850375))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9554710115489088, 0.04452898845109121))"
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9180768928767216, 0.08192310712327844))"


In [0]:
# Confusion Matrix
display(
  preds.crosstab('label', 'prediction')
)

label_prediction,0.0,1.0
1.0,1151,1075
0.0,6324,438


In [0]:
print(f'Precision: {1075/(1075+438)}')
print(f'Recall: {1075/(1075+1151)}')
print(f'Specificity: {6324/(6324+438)}')

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
 
logit = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(logit.regParam, [0.01, 0.5, 2.0])
             .addGrid(logit.threshold, [0.35, 0.38])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=logit, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
 
# Run cross validations
cv_model = cv.fit(train)

print('cutoff:', cv_model.bestModel.getThreshold())
print('regParam:', cv_model.bestModel.getRegParam())

# Predictions
preds2 = cv_model.bestModel.transform(test)

# Confusion Matrix
display(
  preds2.crosstab('label', 'prediction')
)


label_prediction,0.0,1.0
1.0,762,1449
0.0,5819,958


In [0]:
# Predictions
preds2 = cv_model.bestModel.transform(test)

# Confusion Matrix
display(
  preds2.crosstab('label', 'prediction')
)

label_prediction,0.0,1.0
1.0,762,1449
0.0,5819,958


In [0]:
logit = LogisticRegression(labelCol="label", featuresCol="features", threshold=0.38, maxIter=100)
log_fit = logit.fit(train)

# Make predictions on test data using the transform() method.
preds2 = log_fit.transform(test)

# Confusion Matrix
display(
  preds2.crosstab('label', 'prediction')
)

label_prediction,0.0,1.0
1.0,825,1386
0.0,5967,810


In [0]:
print(f'Precision: {1386/(1386+810)}')
print(f'Recall: {1386/(1386+825)}')
print(f'Specificity: {5967/(5967+810)}')

In [0]:
display(log_fit, test, 'ROC')

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.999999999999956
0.0,0.0434782608695652,0.999999999999956
0.0,0.0869565217391304,0.9987407842466588
0.0,0.1304347826086956,0.9825766224890088
0.0,0.1739130434782608,0.959326987098154
0.0,0.217391304347826,0.9419623279672809
0.0,0.2608695652173913,0.8809799010330135
0.0,0.3043478260869565,0.7568368228697316
0.0109890109890109,0.3043478260869565,0.723099060638785
0.0109890109890109,0.3478260869565217,0.7219955581238138


In [0]:
# Evaluate model
evaluator2 = BinaryClassificationEvaluator()
evaluator2.evaluate(preds2)

In [0]:
print('Model Intercept: ', lrModel.intercept)

weights = lrModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = spark.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
0.0362884342580394
-0.1936545848870595
-0.0442360209897806
-0.2675410076677062
0.558401966412394
0.4519237539665536
0.4197628291570981
0.0509437278364125
0.873648525133189
-0.0494869832009546
