In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

In [2]:
def toDoubleSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v) #if it is not a float type return as a string.

## Create an RDD

In [3]:
#load and convert the data
census_raw = sc.textFile("../Data/adult.raw", 4).map(lambda x:  x.split(", "))
census_raw = census_raw.map(lambda row:  [toDoubleSafe(x) for x in row])

## Convert the RDD to DataFrame.


In [4]:
from pyspark.sql.types import *
adultschema = StructType([
    StructField("age",DoubleType(),True),
    StructField("workclass",StringType(),True),
    StructField("fnlwgt",DoubleType(),True),
    StructField("education",StringType(),True),
    StructField("marital_status",StringType(),True),
    StructField("occupation",StringType(),True),
    StructField("relationship",StringType(),True),
    StructField("race",StringType(),True),
    StructField("sex",StringType(),True),
    StructField("capital_gain",DoubleType(),True),
    StructField("capital_loss",DoubleType(),True),
    StructField("hours_per_week",DoubleType(),True),
    StructField("native_country",StringType(),True),
    StructField("income",StringType(),True)
])


In [5]:
dfraw = ss.createDataFrame(census_raw, adultschema)

In [6]:
dfraw.show(10)

+----+----------------+--------+---------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| age|       workclass|  fnlwgt|education|      marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+----------------+--------+---------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|39.0|       State-gov| 77516.0|Bachelors|       Never-married|     Adm-clerical|Not-in-family|White|  Male|      2174.0|         0.0|          40.0| United-States| <=50K|
|50.0|Self-emp-not-inc| 83311.0|Bachelors|  Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|         0.0|         0.0|          13.0| United-States| <=50K|
|38.0|         Private|215646.0|  HS-grad|            Divorced|Handlers-cleaners|Not-in-family|White|  Male|         0.0|         0.0|      

In [7]:
#Check the most commonly used vals.
dfraw.groupBy(dfraw["workclass"]).count().orderBy("count",ascending=False).show()
dfraw.groupBy(dfraw["occupation"]).count().orderBy("count",ascending=False).show()
dfraw.groupBy(dfraw["native_country"]).count().orderBy("count",ascending=False).show()

+----------------+-----+
|       workclass|count|
+----------------+-----+
|         Private|33906|
|Self-emp-not-inc| 3862|
|       Local-gov| 3136|
|               ?| 2799|
|       State-gov| 1981|
|    Self-emp-inc| 1695|
|     Federal-gov| 1432|
|     Without-pay|   21|
|    Never-worked|   10|
+----------------+-----+

+-----------------+-----+
|       occupation|count|
+-----------------+-----+
|   Prof-specialty| 6172|
|     Craft-repair| 6112|
|  Exec-managerial| 6086|
|     Adm-clerical| 5611|
|            Sales| 5504|
|    Other-service| 4923|
|Machine-op-inspct| 3022|
|                ?| 2809|
| Transport-moving| 2355|
|Handlers-cleaners| 2072|
|  Farming-fishing| 1490|
|     Tech-support| 1446|
|  Protective-serv|  983|
|  Priv-house-serv|  242|
|     Armed-Forces|   15|
+-----------------+-----+

+------------------+-----+
|    native_country|count|
+------------------+-----+
|     United-States|43832|
|            Mexico|  951|
|                 ?|  857|
|       Philippin

## Clean the data. 

### Missing data imputation.


In [8]:
#Missing data imputation - Impute the most common row for "?".
dfrawrp = dfraw.replace(["?"], ["Private"], ["workclass"])
dfrawrpl = dfrawrp.replace(["?"], ["Prof-specialty"], ["occupation"])
dfrawnona = dfrawrpl.replace(["?"], ["United-States"], ["native_country"])

In [9]:
dfrawnona.show()

+----+----------------+--------+------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
| age|       workclass|  fnlwgt|   education|      marital_status|       occupation| relationship|              race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+----------------+--------+------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|39.0|       State-gov| 77516.0|   Bachelors|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|      2174.0|         0.0|          40.0| United-States| <=50K|
|50.0|Self-emp-not-inc| 83311.0|   Bachelors|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|         0.0|         0.0|          13.0| United-States| <=50K|
|38.0|         Private|215646.0|     HS-grad|            Div

### Convert strings to categorical values

In [10]:
#converting strings to numeric values
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

dfnumeric = indexStringColumns(dfrawnona, ["workclass", "education",
                                           "marital_status", "occupation",
                                           "relationship", "race", "sex", 
                                           "native_country", "income"])

In [11]:
dfnumeric.show()

+----+--------+------------+------------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
| age|  fnlwgt|capital_gain|capital_loss|hours_per_week|workclass|education|marital_status|occupation|relationship|race|sex|native_country|income|
+----+--------+------------+------------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
|39.0| 77516.0|      2174.0|         0.0|          40.0|      3.0|      2.0|           1.0|       3.0|         1.0| 0.0|0.0|           0.0|   0.0|
|50.0| 83311.0|         0.0|         0.0|          13.0|      1.0|      2.0|           0.0|       2.0|         0.0| 0.0|0.0|           0.0|   0.0|
|38.0|215646.0|         0.0|         0.0|          40.0|      0.0|      0.0|           2.0|       8.0|         1.0| 0.0|0.0|           0.0|   0.0|
|53.0|234721.0|         0.0|         0.0|          40.0|      0.0|      5.0|           0.0|       8.0|         0.0| 1.

In [12]:
from pyspark.ml.feature import OneHotEncoder

def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        ohe = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        ohe_model = ohe.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = ohe_model.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, ["workclass", "education", 
                                        "marital_status", "occupation", 
                                        "relationship", "race", "native_country"])        

In [13]:
from pyspark.ml.feature import OneHotEncoder

def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        ohe = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        ohe_model = ohe.fit(newdf)

        newdf = ohe_model.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, ["workclass", "education", 
                                        "marital_status", "occupation", 
                                        "relationship", "race", "native_country"])        

In [14]:
dfhot.show()

+----+--------+------------+------------+--------------+---+------+-------------+---------------+--------------+--------------+-------------+-------------+---------------+
| age|  fnlwgt|capital_gain|capital_loss|hours_per_week|sex|income|    workclass|      education|marital_status|    occupation| relationship|         race| native_country|
+----+--------+------------+------------+--------------+---+------+-------------+---------------+--------------+--------------+-------------+-------------+---------------+
|39.0| 77516.0|      2174.0|         0.0|          40.0|0.0|   0.0|(8,[3],[1.0])| (16,[2],[1.0])| (7,[1],[1.0])|(14,[3],[1.0])|(6,[1],[1.0])|(5,[0],[1.0])| (41,[0],[1.0])|
|50.0| 83311.0|         0.0|         0.0|          13.0|0.0|   0.0|(8,[1],[1.0])| (16,[2],[1.0])| (7,[0],[1.0])|(14,[2],[1.0])|(6,[0],[1.0])|(5,[0],[1.0])| (41,[0],[1.0])|
|38.0|215646.0|         0.0|         0.0|          40.0|0.0|   0.0|(8,[0],[1.0])| (16,[0],[1.0])| (7,[2],[1.0])|(14,[8],[1.0])|(6,[1],[1.0])

### Create a feature vector

In [15]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
input_cols=["age","capital_gain","capital_loss","fnlwgt","hours_per_week","sex","workclass",
            "education","marital_status","occupation","relationship","native_country","race"]

#VectorAssembler takes a number of collumn names(inputCols) and output column name (outputCol)
#and transforms a DataFrame to assemble the values in inputCols into one single vector with outputCol.
va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(dfhot).select("features", "income").withColumnRenamed("income", "label")

In [16]:
lpoints.rdd.take(5)

[Row(features=SparseVector(103, {0: 39.0, 1: 2174.0, 3: 77516.0, 4: 40.0, 9: 1.0, 16: 1.0, 31: 1.0, 40: 1.0, 52: 1.0, 57: 1.0, 98: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 50.0, 3: 83311.0, 4: 13.0, 7: 1.0, 16: 1.0, 30: 1.0, 39: 1.0, 51: 1.0, 57: 1.0, 98: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 38.0, 3: 215646.0, 4: 40.0, 6: 1.0, 14: 1.0, 32: 1.0, 45: 1.0, 52: 1.0, 57: 1.0, 98: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 53.0, 3: 234721.0, 4: 40.0, 6: 1.0, 19: 1.0, 30: 1.0, 45: 1.0, 51: 1.0, 57: 1.0, 99: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 28.0, 3: 338409.0, 4: 40.0, 5: 1.0, 6: 1.0, 16: 1.0, 30: 1.0, 37: 1.0, 55: 1.0, 65: 1.0, 99: 1.0}), label=0.0)]

In [17]:
lpoints.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(103,[0,1,3,4,9,1...|  0.0|
|(103,[0,3,4,7,16,...|  0.0|
|(103,[0,3,4,6,14,...|  0.0|
|(103,[0,3,4,6,19,...|  0.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,5,6,2...|  0.0|
|(103,[0,3,4,7,14,...|  1.0|
|(103,[0,1,3,4,5,6...|  1.0|
|(103,[0,1,3,4,6,1...|  1.0|
|(103,[0,3,4,6,15,...|  1.0|
|(103,[0,3,4,9,16,...|  1.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,6,20,...|  0.0|
|(103,[0,3,4,6,18,...|  1.0|
|(103,[0,3,4,6,22,...|  0.0|
|(103,[0,3,4,7,14,...|  0.0|
|(103,[0,3,4,6,14,...|  0.0|
|(103,[0,3,4,6,19,...|  0.0|
|(103,[0,3,4,5,7,1...|  1.0|
+--------------------+-----+
only showing top 20 rows



## Divide the dataset into training and vaildation sets.

In [18]:
#Divide the dataset into training and vaildation sets.
splits = lpoints.randomSplit([0.8, 0.2])

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
adulttrain = splits[0].cache()
adultvalid = splits[1].cache()

In [28]:
adulttrain.write.saveAsTable("adulttrain")
adultvalid.write.saveAsTable("adultvalid")

## Train the model.

In [19]:
#Train the model.
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(adulttrain)
#The above lines are same as..
#lr = LogisticRegression()
#lrmodel = lr.setParams(regParam=0.01, maxIter=1000, fitIntercept=True).fit(adulttrain)

## Interpret the model parameters.

In [27]:
#Interpret the model parameters
print(lrmodel.coefficients)
print(lrmodel.intercept)

[0.02025392867263919,0.00013954168247343215,0.0005659738079209435,5.737193814137114e-07,0.02730671911802001,-0.5203492177135571,0.03826827574618934,-0.3634222561658451,0.020539996467041792,-0.1697040236431011,0.2587445619753766,0.5927864448543625,-0.5926856419576879,-1.61308307377656,-0.3448663734024702,-0.0062085023961661965,0.7267749116438394,1.1282791209221805,0.12427425502407757,-0.9076950114238427,0.1568166222928785,-1.0536262171639208,-1.4910609913022377,1.5970514081586598,-1.2770927677384616,-0.5365436061461513,1.4988665034002586,-1.0318671012205929,-1.500138177257482,-1.4416361338290495,0.8202191934255989,-0.6646372168729721,-0.314834792325499,-0.3573516363953145,-0.2294097795864684,-0.18547115667094333,0.7075436568583559,0.22447421797713696,0.03246464853240606,0.6651810663787544,-0.03735282194273436,0.20308891049900787,-0.7374607366672291,-0.26029885292884514,-0.19713731782701172,-0.6114454298946684,-0.8181887973359528,0.4344474091688194,0.315264457258125,-0.9548365154178551,0

In [28]:
#Evaluate models using test dataset.
#First, transform the validation set.
validpredicts = lrmodel.transform(adultvalid)
validpredicts.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(103,[0,1,3,4,5,6...|  0.0|[0.69192272388484...|[0.66639450967657...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[-0.4483369419097...|[0.38975624568947...|       1.0|
|(103,[0,1,3,4,5,6...|  1.0|[0.61528049856638...|[0.64914441213276...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[5.23623103753651...|[0.99470788449110...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.14908913762261...|[0.95887281627894...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.39871202101943...|[0.96766425852320...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[1.28234631725982...|[0.78284890683665...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[1.44776893673236...|[0.80965483368118...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[2.60956419190394...|[0.93147458401316...|       0.0|
|(103,[0,1,3,4,5

## Output
rawPrediction : includes two values - log-odds that a sample doesn't and does belong to the category (making > 50,000).

probability : the probability that the sample is not in the category.

prediction : proability that the sample belongs to the category.

In [29]:
validpredicts.select("rawPrediction").collect()

[Row(rawPrediction=DenseVector([0.6919, -0.6919])),
 Row(rawPrediction=DenseVector([-0.4483, 0.4483])),
 Row(rawPrediction=DenseVector([0.6153, -0.6153])),
 Row(rawPrediction=DenseVector([5.2362, -5.2362])),
 Row(rawPrediction=DenseVector([3.1491, -3.1491])),
 Row(rawPrediction=DenseVector([3.3987, -3.3987])),
 Row(rawPrediction=DenseVector([1.2823, -1.2823])),
 Row(rawPrediction=DenseVector([1.4478, -1.4478])),
 Row(rawPrediction=DenseVector([2.6096, -2.6096])),
 Row(rawPrediction=DenseVector([2.418, -2.418])),
 Row(rawPrediction=DenseVector([2.9876, -2.9876])),
 Row(rawPrediction=DenseVector([0.0958, -0.0958])),
 Row(rawPrediction=DenseVector([3.4202, -3.4202])),
 Row(rawPrediction=DenseVector([2.3841, -2.3841])),
 Row(rawPrediction=DenseVector([2.4429, -2.4429])),
 Row(rawPrediction=DenseVector([-10.4415, 10.4415])),
 Row(rawPrediction=DenseVector([3.0533, -3.0533])),
 Row(rawPrediction=DenseVector([2.3074, -2.3074])),
 Row(rawPrediction=DenseVector([2.2384, -2.2384])),
 Row(rawPred

In [30]:
validpredicts.select("probability").collect()

[Row(probability=DenseVector([0.6664, 0.3336])),
 Row(probability=DenseVector([0.3898, 0.6102])),
 Row(probability=DenseVector([0.6491, 0.3509])),
 Row(probability=DenseVector([0.9947, 0.0053])),
 Row(probability=DenseVector([0.9589, 0.0411])),
 Row(probability=DenseVector([0.9677, 0.0323])),
 Row(probability=DenseVector([0.7828, 0.2172])),
 Row(probability=DenseVector([0.8097, 0.1903])),
 Row(probability=DenseVector([0.9315, 0.0685])),
 Row(probability=DenseVector([0.9182, 0.0818])),
 Row(probability=DenseVector([0.952, 0.048])),
 Row(probability=DenseVector([0.5239, 0.4761])),
 Row(probability=DenseVector([0.9683, 0.0317])),
 Row(probability=DenseVector([0.9156, 0.0844])),
 Row(probability=DenseVector([0.92, 0.08])),
 Row(probability=DenseVector([0.0, 1.0])),
 Row(probability=DenseVector([0.9549, 0.0451])),
 Row(probability=DenseVector([0.9095, 0.0905])),
 Row(probability=DenseVector([0.9036, 0.0964])),
 Row(probability=DenseVector([0.0, 1.0])),
 Row(probability=DenseVector([0.1114, 

## Evaluate the model.

In [31]:
#Evaluate the model. default metric : Area Under ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderROC:0.9067324977561491


In [32]:
#Evaluate the model. metric : Area Under PR
bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderPR:0.7591734778102814


### n-fold validation and the results.

In [33]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [1000])\
.addGrid(lr.regParam, [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(adulttrain)

In [34]:
print(cvmodel.bestModel.coefficients)
print(cvmodel.bestModel.intercept)
print(cvmodel.bestModel.getMaxIter())
print(cvmodel.bestModel.getRegParam())

[0.022343633936571895,0.0003110096244057545,0.0006614912482648875,6.800755606948551e-07,0.03075007614792728,-0.7081636076761005,-0.4129061190539692,-0.8967367680608547,-0.46665089807995336,-0.6537091654886582,-0.23912963151832162,0.1788246913066453,-1.1135967201406956,-5.2138168987016,-0.6035563900520047,-0.22266459896194166,0.5703403872837949,1.0014044416164964,-0.10447585524932765,-1.3606070272419342,-0.06046103433736345,-1.4806770567014376,-2.062709505481376,1.5473730254653042,-1.7821036155621057,-0.8562046906670087,1.4468606329499818,-1.438598610273767,-2.251304837171203,-2.611975549816896,1.1900546932437748,-1.3933188835424268,-1.0044117962394592,-1.0596345915332097,-0.8840628028177822,-0.875039362355603,1.1398359442842472,0.0006359614694178862,-0.1553272722159081,0.48577406736233425,-0.22370669380795566,0.006424234435236811,-1.0848278032957617,-0.47612089716403067,-0.3976500450583543,-0.8904097420848538,-1.1487820937366113,0.26875512072260627,0.16565266464557438,-2.03898431706271

In [35]:
BinaryClassificationEvaluator().evaluate(cvmodel.bestModel.transform(adultvalid))

0.9097443864956878

In [36]:
BinaryClassificationEvaluator().setMetricName("areaUnderPR").evaluate(cvmodel.bestModel.transform(adultvalid))

0.7711175368506971

In [37]:
BinaryClassificationEvaluator().setMetricName("areaUnderROC").evaluate(cvmodel.bestModel.transform(adultvalid))

0.9097493871983731

In [38]:
ss.stop()