In [3]:
datafile = "/user/root/AdultCensusIncome.csv"

#Read the data to a spark data frame.
data_all = spark.read.format('csv').options(header='true', inferSchema='true', ignoreLeadingWhiteSpace='true', ignoreTrailingWhiteSpace='true').load(datafile)
print("Number of rows: {},  Number of coulumns : {}".format(data_all.count(), len(data_all.columns)))
data_all.printSchema() 

#Replace "-" with "_" in column names
columns_new = [col.replace("-", "_") for col in data_all.columns]
data_all = data_all.toDF(*columns_new)
data_all.printSchema()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1583908949111_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of rows: 32561,  Number of coulumns : 15
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: stri

In [4]:
#Basic data exploration

##1. Sub set the data and print some important columns
print("Select few columns to see the data")
data_all.select(['income','age','hours_per_week', 'education']).show(10)

## Find the number of distict values
print("Number of distinct values for income")
ds_sub = data_all.select('income').distinct()
ds_sub.show()

##Add a numberic column(income_code) derived from income column
print("Added numeric column(income_code) derived from income column")
from pyspark.sql.functions import expr

df_new = data_all.withColumn("income_code", expr("case \
                                            when income like '%<=50K%' then 0 \
                                            when income like '%>50K%' then 1 \
                                            else 2 end "))

df_new.select(['income', 'age', 'hours_per_week', 'education', 'income_code']).show(10)

##Summary  statistical operations on dataframe
print("Print a statistical summary of a few columns")
df_new.select(['income','age','hours_per_week', 'education','income_code']).describe().show()

print("Calculate Co variance between a few columns to understand features to use")
mycov = df_new.stat.cov('income_code','hours_per_week')
print("Covariance between income and hours_per_week is", round(mycov,1))

mycov = df_new.stat.cov('income_code','age')
print("Covariance between income and age is", round(mycov,1))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Select few columns to see the data
+------+---+--------------+---------+
|income|age|hours_per_week|education|
+------+---+--------------+---------+
| <=50K| 39|            40|Bachelors|
| <=50K| 50|            13|Bachelors|
| <=50K| 38|            40|  HS-grad|
| <=50K| 53|            40|     11th|
| <=50K| 28|            40|Bachelors|
| <=50K| 37|            40|  Masters|
| <=50K| 49|            16|      9th|
|  >50K| 52|            45|  HS-grad|
|  >50K| 31|            50|  Masters|
|  >50K| 42|            40|Bachelors|
+------+---+--------------+---------+
only showing top 10 rows

Number of distinct values for income
+------+
|income|
+------+
| <=50K|
|  >50K|
+------+

Added numeric column(income_code) derived from income column
+------+---+--------------+---------+-----------+
|income|age|hours_per_week|education|income_code|
+------+---+--------------+---------+-----------+
| <=50K| 39|            40|Bachelors|          0|
| <=50K| 50|            13|Bachelors|          0|
| <=

In [5]:
# Choose feature columns and the label column.
label = "income"
xvars = ["age", "hours_per_week", 'education'] #numeric and string

print("label = {}".format(label))
print("features = {}".format(xvars))

#Check label counts to check data bias
print("Count of rows that are <=50K", data_all[data_all.income=="<=50K"].count())
print("Count of rows that are >50K", data_all[data_all.income==">50K"].count())


select_cols = xvars
select_cols.append(label)
data = data_all.select(select_cols)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

label = income
features = ['age', 'hours_per_week', 'education']
Count of rows that are <=50K 24720
Count of rows that are >50K 7841

In [6]:
train, test = data.randomSplit([0.75, 0.25], seed=123)

print("train ({}, {})".format(train.count(), len(train.columns)))
print("test ({}, {})".format(test.count(), len(test.columns)))

train_data_path = "/spark_ml/AdultCensusIncomeTrain"
test_data_path = "/spark_ml/AdultCensusIncomeTest"

train.write.mode('overwrite').orc(train_data_path)
test.write.mode('overwrite').orc(test_data_path)
print("train and test datasets saved to {} and {}".format(train_data_path, test_data_path))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

train (24469, 4)
test (8092, 4)
train and test datasets saved to /spark_ml/AdultCensusIncomeTrain and /spark_ml/AdultCensusIncomeTest

In [7]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

reg = 0.1
print("Using LogisticRegression model with Regularization Rate of {}.".format(reg))

# create a new Logistic Regression model.
lr = LogisticRegression(regParam=reg)

dtypes = dict(train.dtypes)
dtypes.pop(label)

si_xvars = []
ohe_xvars = []
featureCols = []
for idx,key in enumerate(dtypes):
    if dtypes[key] == "string":
        featureCol = "-".join([key, "encoded"])
        featureCols.append(featureCol)
        
        tmpCol = "-".join([key, "tmp"])
        si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid="skip")) #, handleInvalid="keep"
        ohe_xvars.append(OneHotEncoderEstimator(inputCols=[tmpCol], outputCols=[featureCol]))
    else:
        featureCols.append(key)

# string-index the label column into a column named "label"
si_label = StringIndexer(inputCol=label, outputCol='label')

# assemble the encoded feature columns in to a column named "features"
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")


stages = []
stages.extend(si_xvars)
stages.extend(ohe_xvars)
stages.append(si_label)
stages.append(assembler)
stages.append(lr)
pipe = Pipeline(stages=stages)
print("Pipeline Created")

model = pipe.fit(train)
print("Model Trained")
print("Model is ", model)
print("Model Stages", model.stages)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Using LogisticRegression model with Regularization Rate of 0.1.
Pipeline Created
Model Trained
Model is  PipelineModel_697f2c47874d
Model Stages [StringIndexer_eb4260535b16, OneHotEncoderEstimator_9ef3e47fe66c, StringIndexer_6c9c932623c1, VectorAssembler_d4a6d1c4a6d3, LogisticRegressionModel: uid = LogisticRegression_fed6a208d061, numClasses = 2, numFeatures = 17]

In [8]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# make prediction
pred = model.transform(test)

# evaluate. note only 2 metrics are supported out of the box by Spark ML.
bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)

print("Area under ROC: {}".format(au_roc))
print("Area Under PR: {}".format(au_prc))

pred[pred.prediction==1.0][pred.income,pred.label,pred.prediction].show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Area under ROC: 0.7964496884726682
Area Under PR: 0.5358180243123482
+------+-----+----------+
|income|label|prediction|
+------+-----+----------+
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
+------+-----+----------+
only showing top 20 rows

In [9]:

model_name = "AdultCensus.mml"
model_fs = "/spark_ml/" + model_name

model.write().overwrite().save(model_fs)
print("saved model to {}".format(model_fs))

# load the model file and check its same as the in-memory model
model2 = PipelineModel.load(model_fs)
assert str(model2) == str(model)
print("Successfully loaded from {}".format(model_fs))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

saved model to /spark_ml/AdultCensus.mml
Successfully loaded from /spark_ml/AdultCensus.mml

In [10]:
import os
from mleap.pyspark.spark_support import SimpleSparkSerializer
# serialize the model to a zip file in JSON format
model_name_export = "adult_census_pipeline.zip"
model_name_path = os.getcwd()
model_file = os.path.join(model_name_path, model_name_export)
# remove an old model file, if needed.
try:
    os.remove(model_file)
except OSError:
    pass
model_file_path = "jar:file:{}".format(model_file)
model.serializeToBundle(model_file_path, model.transform(train))
print("persist the mleap bundle from local to hdfs")
from subprocess import Popen, PIPE
proc = Popen(["hadoop", "fs", "-put", "-f", model_file, os.path.join("/spark_ml", model_name_export)], stdout=PIPE, stderr=PIPE)
s_output, s_err = proc.communicate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
[Errno 2] No such file or directory: 'hadoop'
Traceback (most recent call last):
  File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
    restore_signals, start_new_session)
  File "/usr/lib/python3.5/subprocess.py", line 1551, in _execute_child
    raise child_exception_type(errno_num, err_msg)
FileNotFoundError: [Errno 2] No such file or directory: 'hadoop'

