## Section 1) Mount Azure storage blob container

In [1]:
# Mount Blob to DBFS
dbutils.fs.mount(
  source = "wasbs://datalake@dsba6190storageepsilon.blob.core.windows.net/",
  mount_point = "/mnt/epsilon/",
  extra_configs = {"fs.azure.account.key.dsba6190storageepsilon.blob.core.windows.net": "HB87PDIxoCmvJYrC5JqMzEic3ySYwEll03NoGfRtJiviX6oQmS1Khnhz4wjIP30r41vIGIn5Cqo9+AStVPZD5Q=="})

In [None]:
#dbutils.fs.ls("/mnt/")
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/mnt/Delta/,wasbs://datalake@dsba6190storagedelta.blob.core.windows.net/,
/databricks-datasets,databricks-datasets,
/mnt/epsilon/,wasbs://datalake@dsba6190storageepsilon.blob.core.windows.net/,
/mnt/delta-sai/,wasbs://datalake@dsba6190storagedelta.blob.core.windows.net/,
/mnt/gamma/,wasbs://datawarehouse@dsba6190storagegamma.blob.core.windows.net/,
/mnt/zeta/,wasbs://datalake@dsba6190storageks.blob.core.windows.net/,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/mnt//,wasbs://data@dsba6190storagebeta.blob.core.windows.net/,
/mnt/beta/,wasbs://data@dsba6190storagebeta.blob.core.windows.net/,


## Section 1b) Read in Your Data

### Description of the Data: 
The dataset is credited to Ronny Kohavi and Barry Becker and was drawn from the 1994 United States Census Bureau data and involves using personal details such as education level to predict whether an individual will earn more or less than $50,000 per year.
### Data Source: 
https://archive.ics.uci.edu/ml/datasets/adult

In [None]:
df = sqlContext.read.format('csv') \
                    .options(header='true', inferSchema='true', delimiter= ',') \
                    .load('/mnt/epsilon/adult.csv')


#df.describe()
df=df.withColumnRenamed("occupation6","occupation")
df=df.withColumnRenamed("occupation7","relationship")
df.show(5)

+---+---------+------+----------+-------------+--------------+------------------+--------------+------+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt| education|education-num|marital-status|        occupation|  relationship|  race|sex|capital-gain|capital-loss|hours-per-week|native-country|Salary|
+---+---------+------+----------+-------------+--------------+------------------+--------------+------+---+------------+------------+--------------+--------------+------+
| 39|     Govt| 77516| Bachelors|           13| Never-married|      Adm-clerical| Not-in-family| White|  0|        2174|           0|            40| United-States|     1|
| 50| employed| 83311| Bachelors|           13|       Married|   Exec-managerial|       Husband| White|  0|           0|           0|            13| United-States|     1|
| 38|  Private|215646|   HS-grad|            9|      Divorced| Handlers-cleaners| Not-in-family| White|  0|           0|           0|            

## Section 2) Shaping the Data for Machine Learning
Transform the data for use in machine learning. This includes One Hot Encoding, Vectorization, etc.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

#Define which columns are numerical versus categorical

label = "Salary"
categoricalColumns = ["workclass","education","marital-status","occupation","relationship","race","sex","native-country"]

numericalColumns = ["age","fnlwgt","education-num","capital-gain","capital-loss","hours-per-week"]

#categoricalColumnsclassVec = ["col1classVec",
#                              "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]

#Set up stages
stages = []

#Index the categorical columns and perform One Hot Encoding
for categoricalColumn in categoricalColumns:
  print(categoricalColumn)
  ## Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
  ## Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
  ## Add stages
  stages += [stringIndexer, encoder]

## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]


##Assemble the data together as a vector
assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")

stages += [assembler]

#Scale features using Normalization
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = "features",
                        outputCol = "scaledFeatures",
                        withStd = True,
                        withMean = True)
stages += [scaler]

prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(df)
dataset = pipelineModel.transform(df)



workclass
education
marital-status
occupation
relationship
race
sex
native-country


## Section 3a) Create a Machine Learning Model
## Machine learning algorithms from Spark MLlib and train models using your data.
Logistic Regression & Random Forest
### Description of the Predictive Use Case: 
Predicting whether an individual will earn more or less than $50,000 per year

In [None]:
#Test train split on our dataset
train, test = dataset.randomSplit([0.70, 0.30], seed = 1111)

In [None]:
#Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
#initialize logistic regression object
lr = LogisticRegression(labelCol="label", featuresCol="features")
#Create a parameter grid for tuning the model
lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
             .build())
#Define how you want the model to be evaluated
lrevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName = "areaUnderROC")

#Define the type of cross-validation you want to perform
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 5)
#Fit the model to the data
lrcvModel = lrcv.fit(train)
print(lrcvModel)
#Score the testing dataset using your fitted model for evaluation purposes
lrpredictions = lrcvModel.transform(test)
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)

CrossValidatorModel_3bbae54541a0
Accuracy: 0.9040797820470264
AUC: 0.8080818789687665
PR: 0.5114596617470211


In [None]:
#Naive Bayes Classifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

#Initialize Naive Bayes object
nb = NaiveBayes(labelCol="label", featuresCol="features")

#Create a parameter grid for tuning the model
nbparamGrid = (ParamGridBuilder()
               .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
               .build())
#Define how you want the model to be evaluated
nbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create 5-fold CrossValidator
nbcv = CrossValidator(estimator = nb,
                      estimatorParamMaps = nbparamGrid,
                      evaluator = nbevaluator,
                      numFolds = 5)
#fit the model to the data
nbcvModel = nbcv.fit(train)
print(nbcvModel)

nbpredictions = nbcvModel.transform(test)

print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)

CrossValidatorModel_e167637407f3
Accuracy: 0.9040797820470264
AUC: 0.8080818789687665
PR: 0.5114596617470211


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#initialize Decision tree object
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10])
             .addGrid(dt.maxBins, [10, 20])
             .build())

dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 5)

dtcvModel = dtcv.fit(train)
print(dtcvModel)

dtpredictions = dtcvModel.transform(test)

print('Accuracy:', dtevaluator.evaluate(dtpredictions))
print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderPR)

CrossValidatorModel_e77eee1dbeb5
Accuracy: 0.7459087142411932
AUC: 0.8075382755086205
PR: 0.5311362363870251


In [None]:
##Random Forest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfparamGrid = (ParamGridBuilder().addGrid(rf.maxDepth, [2, 5, 10]).addGrid(rf.maxBins, [5, 10, 20]).addGrid(rf.numTrees, [5, 20, 50]).build())
rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)
rfcvModel = rfcv.fit(train)
print(rfcvModel)
rfpredictions = rfcvModel.transform(test)
print('RMSE:', rfevaluator.evaluate(rfpredictions))

CrossValidatorModel_fafc39eb6435
RMSE: 0.9090756099767751


In [None]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

gb = GBTClassifier(labelCol="label", featuresCol="features")

gbparamGrid = (ParamGridBuilder()
             .addGrid(gb.maxDepth, [2, 5, 10])
             .addGrid(gb.maxBins, [10, 20, 40])
             .addGrid(gb.maxIter, [5, 10, 20])
             .build())

gbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
                      estimatorParamMaps = gbparamGrid,
                      evaluator = gbevaluator,
                      numFolds = 5)

gbcvModel = gbcv.fit(train)
print(gbcvModel)

gbpredictions = gbcvModel.transform(test)

print('Accuracy:', gbevaluator.evaluate(gbpredictions))
print('AUC:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderPR)

CrossValidatorModel_c3696e2fa37e
Accuracy: 0.9156455363156804
AUC: 0.8302687102073987
PR: 0.5246391724649102


## Section 3b) Evaluate the Model(s)

1. Evaluate the models and pick which of the algorithms worked the best and why.

2. Report the best model and its parameters below.

In [None]:
for model in ["lrpredictions", "dtpredictions", "rfpredictions", "nbpredictions", "gbpredictions"]:
    df = globals()[model]
    
    tp = df[(df.label == 1) & (df.prediction == 1)].count()
    tn = df[(df.label == 0) & (df.prediction == 0)].count()
    fp = df[(df.label == 0) & (df.prediction == 1)].count()
    fn = df[(df.label == 1) & (df.prediction == 0)].count()
    a = ((tp + tn)/df.count())
    
    if(tp + fn == 0.0):
        r = 0.0
        p = float(tp) / (tp + fp)
    elif(tp + fp == 0.0):
        r = float(tp) / (tp + fn)
        p = 0.0
    else:
        r = float(tp) / (tp + fn)
        p = float(tp) / (tp + fp)
    
    if(p + r == 0):
        f1 = 0
    else:
        f1 = 2 * ((p * r)/(p + r))
    
    print("Model:", model)
    print("True Positives:", tp)
    print("True Negatives:", tn)
    print("False Positives:", fp)
    print("False Negatives:", fn)
    print("Total:", df.count())
    print("Accuracy:", a)
    print("Recall:", r)
    print("Precision: ", p)
    print("F1 score:", f1)
    print('AUC:', BinaryClassificationMetrics(df['label','prediction'].rdd).areaUnderROC)
print("\n")

Model: lrpredictions
True Positives: 1308
True Negatives: 7005
False Positives: 450
False Negatives: 1027
Total: 9790
Accuracy: 0.8491317671092952
Recall: 0.5601713062098501
Precision:  0.7440273037542662
F1 score: 0.6391399951136086
AUC: 0.8080818789687665
Model: dtpredictions
True Positives: 1361
True Negatives: 6971
False Positives: 484
False Negatives: 974
Total: 9790
Accuracy: 0.8510725229826354
Recall: 0.5828693790149893
Precision:  0.737669376693767
F1 score: 0.6511961722488038
AUC: 0.8075382755086205
Model: rfpredictions
True Positives: 1248
True Negatives: 7104
False Positives: 351
False Negatives: 1087
Total: 9790
Accuracy: 0.8531154239019407
Recall: 0.5344753747323341
Precision:  0.7804878048780488
F1 score: 0.6344687341128622
AUC: 0.8238905878254242
Model: nbpredictions
True Positives: 558
True Negatives: 7136
False Positives: 319
False Negatives: 1777
Total: 9790
Accuracy: 0.7859039836567926
Recall: 0.23897216274089936
Precision:  0.636259977194983
F1 score: 0.347447073474

## Section 4) Save Your Transformation Pipeline and Model(s)

In [None]:
#Save the pipeline
pipelineModel.save("/mnt/epsilon/rosh/extra/pipeline")
display(dbutils.fs.ls("/mnt/epsilon/rosh/extra/pipeline"))

path,name,size,modificationTime
dbfs:/mnt/epsilon/rosh/extra/pipeline/metadata/,metadata/,0,0
dbfs:/mnt/epsilon/rosh/extra/pipeline/stages/,stages/,0,0


In [None]:
#save the models
lrcvModel.save("/mnt/epsilon/rosh/extra/models/LogisticRegression")
nbcvModel.save("/mnt/epsilon/rosh/extra/models/NaiveBayes")
dtcvModel.save("/mnt/epsilon/rosh/extra/models/DecisionTree")
rfcvModel.save("/mnt/epsilon/rosh/extra/models/RandomForest")
gbcvModel.save("/mnt/epsilon/rosh/extra/models/GBTree")

display(dbutils.fs.ls("/mnt/epsilon/rosh/extra/models"))

path,name,size,modificationTime
dbfs:/mnt/epsilon/rosh/extra/models/DecisionTree/,DecisionTree/,0,0
dbfs:/mnt/epsilon/rosh/extra/models/GBTree/,GBTree/,0,0
dbfs:/mnt/epsilon/rosh/extra/models/LogisticRegression/,LogisticRegression/,0,0
dbfs:/mnt/epsilon/rosh/extra/models/NaiveBayes/,NaiveBayes/,0,0
dbfs:/mnt/epsilon/rosh/extra/models/RandomForest/,RandomForest/,0,0
