In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [2]:
def toDoubleSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v) #if it is not a float type return as a string.

## Create an RDD

In [3]:
#load and convert the data
census_raw = sc.textFile("../Data/adult.raw", 4).map(lambda x:  x.split(", "))
census_raw = census_raw.map(lambda row:  [toDoubleSafe(x) for x in row])

## Convert the RDD to DataFrame.


In [4]:
from pyspark.sql.types import *
adultschema = StructType([
    StructField("age",DoubleType(),True),
    StructField("capital_gain",DoubleType(),True),
    StructField("capital_loss",DoubleType(),True),
    StructField("education",StringType(),True),
    StructField("fnlwgt",DoubleType(),True),
    StructField("hours_per_week",DoubleType(),True),
    StructField("income",StringType(),True),
    StructField("marital_status",StringType(),True),
    StructField("native_country",StringType(),True),
    StructField("occupation",StringType(),True),
    StructField("race",StringType(),True),
    StructField("relationship",StringType(),True),
    StructField("sex",StringType(),True),
    StructField("workclass",StringType(),True),
])

# Create a dataframe.
from pyspark.sql import Row
columns = ["age", "workclass", "fnlwgt", "education", "marital_status",
           "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss",
           "hours_per_week", "native_country", "income"]
dfraw = ss.createDataFrame(census_raw.map(lambda row: Row(**{x[0]: x[1] for x in zip(columns, row)})), \
                           adultschema)
dfraw.show(5)

+----+------------+------------+---------+--------+--------------+------+------------------+--------------+-----------------+-----+-------------+------+----------------+
| age|capital_gain|capital_loss|education|  fnlwgt|hours_per_week|income|    marital_status|native_country|       occupation| race| relationship|   sex|       workclass|
+----+------------+------------+---------+--------+--------------+------+------------------+--------------+-----------------+-----+-------------+------+----------------+
|39.0|      2174.0|         0.0|Bachelors| 77516.0|          40.0| <=50K|     Never-married| United-States|     Adm-clerical|White|Not-in-family|  Male|       State-gov|
|50.0|         0.0|         0.0|Bachelors| 83311.0|          13.0| <=50K|Married-civ-spouse| United-States|  Exec-managerial|White|      Husband|  Male|Self-emp-not-inc|
|38.0|         0.0|         0.0|  HS-grad|215646.0|          40.0| <=50K|          Divorced| United-States|Handlers-cleaners|White|Not-in-family|  Mal

In [5]:
# Original:
print(census_raw.take(1))

# Returns a list of tuples.
# zip() : returns a list of tuples, where the i-th tuple contains the i-th element from each of the argument sequences or iterables.
print(census_raw.map(lambda x :  zip(columns, x)).take(1))

# Transform a list into a list with keyword arguments.
print(census_raw.map(lambda x : {x[0]: x[1] for x in zip(columns, x)}).take(1))

# Transform into a row using variable with keywords.
# As this is keyworded, createDataFrame() will match the column name and apply the defined schema.
print(census_raw.map(lambda x : Row(**{x[0]: x[1] for x in zip(columns, x)})).take(1))

[[39.0, 'State-gov', 77516.0, 'Bachelors', 'Never-married', 'Adm-clerical', 'Not-in-family', 'White', 'Male', 2174.0, 0.0, 40.0, 'United-States', '<=50K']]
[<zip object at 0x121a4a410>]
[{'age': 39.0, 'workclass': 'State-gov', 'fnlwgt': 77516.0, 'education': 'Bachelors', 'marital_status': 'Never-married', 'occupation': 'Adm-clerical', 'relationship': 'Not-in-family', 'race': 'White', 'sex': 'Male', 'capital_gain': 2174.0, 'capital_loss': 0.0, 'hours_per_week': 40.0, 'native_country': 'United-States', 'income': '<=50K'}]
[Row(age=39.0, capital_gain=2174.0, capital_loss=0.0, education='Bachelors', fnlwgt=77516.0, hours_per_week=40.0, income='<=50K', marital_status='Never-married', native_country='United-States', occupation='Adm-clerical', race='White', relationship='Not-in-family', sex='Male', workclass='State-gov')]


In [6]:
#Check the most commonly used vals.
dfraw.groupBy(dfraw["workclass"]).count().orderBy("count",ascending=False).show()
dfraw.groupBy(dfraw["occupation"]).count().orderBy("count",ascending=False).show()
dfraw.groupBy(dfraw["native_country"]).count().orderBy("count",ascending=False).show()

+----------------+-----+
|       workclass|count|
+----------------+-----+
|         Private|33906|
|Self-emp-not-inc| 3862|
|       Local-gov| 3136|
|               ?| 2799|
|       State-gov| 1981|
|    Self-emp-inc| 1695|
|     Federal-gov| 1432|
|     Without-pay|   21|
|    Never-worked|   10|
+----------------+-----+

+-----------------+-----+
|       occupation|count|
+-----------------+-----+
|   Prof-specialty| 6172|
|     Craft-repair| 6112|
|  Exec-managerial| 6086|
|     Adm-clerical| 5611|
|            Sales| 5504|
|    Other-service| 4923|
|Machine-op-inspct| 3022|
|                ?| 2809|
| Transport-moving| 2355|
|Handlers-cleaners| 2072|
|  Farming-fishing| 1490|
|     Tech-support| 1446|
|  Protective-serv|  983|
|  Priv-house-serv|  242|
|     Armed-Forces|   15|
+-----------------+-----+

+------------------+-----+
|    native_country|count|
+------------------+-----+
|     United-States|43832|
|            Mexico|  951|
|                 ?|  857|
|       Philippin

## Clean the data. 

### Missing data imputation.


In [7]:
#Missing data imputation - Impute the most common row for "?".
dfrawrp = dfraw.replace(["?"], ["Private"], ["workclass"])
dfrawrpl = dfrawrp.replace(["?"], ["Prof-specialty"], ["occupation"])
dfrawnona = dfrawrpl.replace(["?"], ["United-States"], ["native_country"])

In [8]:
dfrawnona.show()

+----+------------+------------+------------+--------+--------------+------+--------------------+--------------+-----------------+------------------+-------------+------+----------------+
| age|capital_gain|capital_loss|   education|  fnlwgt|hours_per_week|income|      marital_status|native_country|       occupation|              race| relationship|   sex|       workclass|
+----+------------+------------+------------+--------+--------------+------+--------------------+--------------+-----------------+------------------+-------------+------+----------------+
|39.0|      2174.0|         0.0|   Bachelors| 77516.0|          40.0| <=50K|       Never-married| United-States|     Adm-clerical|             White|Not-in-family|  Male|       State-gov|
|50.0|         0.0|         0.0|   Bachelors| 83311.0|          13.0| <=50K|  Married-civ-spouse| United-States|  Exec-managerial|             White|      Husband|  Male|Self-emp-not-inc|
|38.0|         0.0|         0.0|     HS-grad|215646.0|      

### Convert strings to categorical values

In [9]:
#converting strings to numeric values
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

dfnumeric = indexStringColumns(dfrawnona, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country", "income"])

In [10]:
dfnumeric.show()

+----+------------+------------+--------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
| age|capital_gain|capital_loss|  fnlwgt|hours_per_week|workclass|education|marital_status|occupation|relationship|race|sex|native_country|income|
+----+------------+------------+--------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
|39.0|      2174.0|         0.0| 77516.0|          40.0|      3.0|      2.0|           1.0|       3.0|         1.0| 0.0|0.0|           0.0|   0.0|
|50.0|         0.0|         0.0| 83311.0|          13.0|      1.0|      2.0|           0.0|       2.0|         0.0| 0.0|0.0|           0.0|   0.0|
|38.0|         0.0|         0.0|215646.0|          40.0|      0.0|      0.0|           2.0|       8.0|         1.0| 0.0|0.0|           0.0|   0.0|
|53.0|         0.0|         0.0|234721.0|          40.0|      0.0|      5.0|           0.0|       8.0|         0.0| 1.

In [11]:
from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "native_country"])

In [12]:
dfhot.show()

+----+------------+------------+--------+--------------+---+------+-------------+---------------+--------------+--------------+-------------+-------------+---------------+
| age|capital_gain|capital_loss|  fnlwgt|hours_per_week|sex|income|    workclass|      education|marital_status|    occupation| relationship|         race| native_country|
+----+------------+------------+--------+--------------+---+------+-------------+---------------+--------------+--------------+-------------+-------------+---------------+
|39.0|      2174.0|         0.0| 77516.0|          40.0|0.0|   0.0|(8,[3],[1.0])| (16,[2],[1.0])| (7,[1],[1.0])|(14,[3],[1.0])|(6,[1],[1.0])|(5,[0],[1.0])| (41,[0],[1.0])|
|50.0|         0.0|         0.0| 83311.0|          13.0|0.0|   0.0|(8,[1],[1.0])| (16,[2],[1.0])| (7,[0],[1.0])|(14,[2],[1.0])|(6,[0],[1.0])|(5,[0],[1.0])| (41,[0],[1.0])|
|38.0|         0.0|         0.0|215646.0|          40.0|0.0|   0.0|(8,[0],[1.0])| (16,[0],[1.0])| (7,[2],[1.0])|(14,[8],[1.0])|(6,[1],[1.0])

### Create a feature vector

In [13]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
input_cols=["age","capital_gain","capital_loss","fnlwgt","hours_per_week","sex","workclass","education","marital_status","occupation","relationship","native_country","race"]

#VectorAssembler takes a number of collumn names(inputCols) and output column name (outputCol)
#and transforms a DataFrame to assemble the values in inputCols into one single vector with outputCol.
va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(dfhot).select("features", "income").withColumnRenamed("income", "label")

In [14]:
lpoints.rdd.take(5)

[Row(features=SparseVector(103, {0: 39.0, 1: 2174.0, 3: 77516.0, 4: 40.0, 9: 1.0, 16: 1.0, 31: 1.0, 40: 1.0, 52: 1.0, 57: 1.0, 98: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 50.0, 3: 83311.0, 4: 13.0, 7: 1.0, 16: 1.0, 30: 1.0, 39: 1.0, 51: 1.0, 57: 1.0, 98: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 38.0, 3: 215646.0, 4: 40.0, 6: 1.0, 14: 1.0, 32: 1.0, 45: 1.0, 52: 1.0, 57: 1.0, 98: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 53.0, 3: 234721.0, 4: 40.0, 6: 1.0, 19: 1.0, 30: 1.0, 45: 1.0, 51: 1.0, 57: 1.0, 99: 1.0}), label=0.0),
 Row(features=SparseVector(103, {0: 28.0, 3: 338409.0, 4: 40.0, 5: 1.0, 6: 1.0, 16: 1.0, 30: 1.0, 37: 1.0, 55: 1.0, 65: 1.0, 99: 1.0}), label=0.0)]

In [15]:
lpoints.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(103,[0,1,3,4,9,1...|  0.0|
|(103,[0,3,4,7,16,...|  0.0|
|(103,[0,3,4,6,14,...|  0.0|
|(103,[0,3,4,6,19,...|  0.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,5,6,2...|  0.0|
|(103,[0,3,4,7,14,...|  1.0|
|(103,[0,1,3,4,5,6...|  1.0|
|(103,[0,1,3,4,6,1...|  1.0|
|(103,[0,3,4,6,15,...|  1.0|
|(103,[0,3,4,9,16,...|  1.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,6,20,...|  0.0|
|(103,[0,3,4,6,18,...|  1.0|
|(103,[0,3,4,6,22,...|  0.0|
|(103,[0,3,4,7,14,...|  0.0|
|(103,[0,3,4,6,14,...|  0.0|
|(103,[0,3,4,6,19,...|  0.0|
|(103,[0,3,4,5,7,1...|  1.0|
+--------------------+-----+
only showing top 20 rows



## Divide the dataset into training and testing sets.

In [16]:
#Divide the dataset into training and testing sets.
splits = lpoints.randomSplit([0.8, 0.2])

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
adulttrain = splits[0].cache()
adultvalid = splits[1].cache()

In [17]:
adulttrain.write.saveAsTable("adulttrain")
adultvalid.write.saveAsTable("adultvalid")

## Train the model.

In [18]:
#Train the model.
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(adulttrain)
#The above lines are same as..
#lr = LogisticRegression()
#lrmodel = lr.setParams(regParam=0.01, maxIter=1000, fitIntercept=True).fit(adulttrain)

## Interpret the model parameters.

In [19]:
#Interpret the model parameters
print(lrmodel.coefficients)
print(lrmodel.intercept)

[0.02018527662862479,0.00014267788604173205,0.0005512681747893359,5.51697040018831e-07,0.026724362791813904,-0.5319625312361737,0.02876629258403501,-0.38061299736152965,0.05456768119420952,-0.15246517592629183,0.28049078189096993,0.5622322239004728,-0.524830721960051,-1.3359465678348355,-0.34130330863613356,-0.004701469175193283,0.7651219397366995,1.0810252027160827,0.1696387773229618,-0.947919971842666,0.20446082350113265,-1.12618759629697,-1.4901165222293469,1.629269607379185,-1.1266486205478683,-0.6522324351446221,1.5972707343276396,-1.3272959707993568,-1.5067044467674686,-1.7670592686059454,0.8386927301458394,-0.6828800100827963,-0.2906199732505725,-0.37681924561378244,-0.29240191660699033,-0.23036839969460288,0.7616622656062908,0.21622565294068488,0.01771396049387079,0.6835759127697387,-0.0838933680168185,0.21667926434585005,-0.7349458019026479,-0.2976153009914244,-0.13370717954452432,-0.5682592181676618,-0.8127824116371712,0.47004305054148515,0.36029660347441517,-0.97185715505372

In [20]:
#Evaluate models using test dataset.
#First, transform the validation set.
validpredicts = lrmodel.transform(adultvalid)
validpredicts.show()

#rawPrediction : includes two values - log-odds that a sample doesn't and does belong to the category (making > 50,000).
#probability : the probability that the sample is not in the category.
#prediction : proability that the sample belongs to the category.
#validpredicts.select("rawPrediction").collect()
#validpredicts.select("probability").collect()


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(103,[0,1,3,4,5,6...|  0.0|[0.65273620039858...|[0.65762679401211...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[0.75583497274098...|[0.68044877904877...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.69023178049778...|[0.97564191450204...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[5.22290469954717...|[0.99463726869528...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.41467495245978...|[0.96816002930221...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[4.08134172762075...|[0.98339556681037...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[1.40144728386494...|[0.80241345021242...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[2.35489014698306...|[0.91332213839078...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[3.10411230831667...|[0.95706205539186...|       0.0|
|(103,[0,1,3,4,5

## Evaluate the model.

In [21]:
#Evaluate the model. default metric : Area Under ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderROC:0.9024136346918922


In [22]:
#Evaluate the model. metric : Area Under PR
bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderPR:0.7591751098858374


In [23]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [1000]).addGrid(lr.regParam, [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(adulttrain)

In [24]:
print(cvmodel.bestModel.coefficients)
print(cvmodel.bestModel.intercept)
print(cvmodel.bestModel._java_obj.getMaxIter())
print(cvmodel.bestModel._java_obj.getRegParam())

[0.022456569848210317,0.00030947667004765635,0.0006466088616701111,6.32798180288299e-07,0.030109477578081186,-0.718596958410882,-0.4130490772382551,-0.9087994927142261,-0.4143616378855607,-0.6380007251813382,-0.2150081224290168,0.1511067399810792,-1.0398320556929355,-4.79751327710646,-0.58811178840551,-0.20758049092743244,0.619720382470168,0.9656710824164961,-0.04531956700907306,-1.3995379911831927,0.016078975278237533,-1.571974722687792,-2.0206842923389075,1.5982005363172302,-1.5354454615882882,-1.0037651170654671,1.565355684816618,-1.8728922151435476,-2.3010886508202657,-5.9122772885480215,1.2974579667465478,-1.4607890494371756,-1.0159799832762715,-1.1261096256402234,-1.004267853036903,-0.9519211363891856,1.2639893326906986,-0.0037968571196130988,-0.17147586231410616,0.5205675226645546,-0.28698523320134167,0.02689715853557866,-1.0850181624859174,-0.5109842845569028,-0.32052956733852306,-0.835586020928271,-1.129396451028462,0.31500099458533304,0.22277611528504668,-2.108860467442049,-0

In [25]:
BinaryClassificationEvaluator().evaluate(cvmodel.bestModel.transform(adultvalid))

0.9044081052518386

In [26]:
BinaryClassificationEvaluator().setMetricName("areaUnderPR").evaluate(cvmodel.bestModel.transform(adultvalid))

0.7698303355382903

In [27]:
BinaryClassificationEvaluator().setMetricName("areaUnderROC").evaluate(cvmodel.bestModel.transform(adultvalid))

0.904408105251841