In [1]:
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [2]:
def toDoubleSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v) #if it is not a float type return as a string.

In [3]:
#load and convert the data
census_raw = sc.textFile("../Data/adult.raw", 4).map(lambda x:  x.split(", "))
census_raw = census_raw.map(lambda row:  [toDoubleSafe(x) for x in row])

In [4]:
from pyspark.sql.types import *
adultschema = StructType([
    StructField("age",DoubleType(),True),
    StructField("capital_gain",DoubleType(),True),
    StructField("capital_loss",DoubleType(),True),
    StructField("education",StringType(),True),
    StructField("fnlwgt",DoubleType(),True),
    StructField("hours_per_week",DoubleType(),True),
    StructField("income",StringType(),True),
    StructField("marital_status",StringType(),True),
    StructField("native_country",StringType(),True),
    StructField("occupation",StringType(),True),
    StructField("race",StringType(),True),
    StructField("relationship",StringType(),True),
    StructField("sex",StringType(),True),
    StructField("workclass",StringType(),True),
])

# Create a dataframe.
from pyspark.sql import Row
columns = ["age", "workclass", "fnlwgt", "education", "marital_status",
           "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss",
           "hours_per_week", "native_country", "income"]
dfraw = sqlContext.createDataFrame(census_raw.map(lambda row: Row(**{x[0]: x[1] for x in zip(columns, row)})), \
                                    adultschema)
#dfraw.show()

In [5]:
# Original:
print census_raw.take(1)

# Returns a list of tuples.
# zip() : returns a list of tuples, where the i-th tuple contains the i-th element from each of the argument sequences or iterables.
print census_raw.map(lambda x :  zip(columns, x)).take(1) 

# Transform a list into a list with keyword arguments.
print census_raw.map(lambda x : {x[0]: x[1] for x in zip(columns, x)}).take(1) 

# Transform into a row using variable with keywords.
# As this is keyworded, createDataFrame() will match the column name and apply the defined schema.
print census_raw.map(lambda x : Row(**{x[0]: x[1] for x in zip(columns, x)})).take(1) 

[[39.0, 'State-gov', 77516.0, 'Bachelors', 'Never-married', 'Adm-clerical', 'Not-in-family', 'White', 'Male', 2174.0, 0.0, 40.0, 'United-States', '<=50K']]
[[('age', 39.0), ('workclass', 'State-gov'), ('fnlwgt', 77516.0), ('education', 'Bachelors'), ('marital_status', 'Never-married'), ('occupation', 'Adm-clerical'), ('relationship', 'Not-in-family'), ('race', 'White'), ('sex', 'Male'), ('capital_gain', 2174.0), ('capital_loss', 0.0), ('hours_per_week', 40.0), ('native_country', 'United-States'), ('income', '<=50K')]]
[{'hours_per_week': 40.0, 'workclass': 'State-gov', 'relationship': 'Not-in-family', 'age': 39.0, 'marital_status': 'Never-married', 'sex': 'Male', 'race': 'White', 'income': '<=50K', 'native_country': 'United-States', 'capital_loss': 0.0, 'education': 'Bachelors', 'fnlwgt': 77516.0, 'capital_gain': 2174.0, 'occupation': 'Adm-clerical'}]
[Row(age=39.0, capital_gain=2174.0, capital_loss=0.0, education='Bachelors', fnlwgt=77516.0, hours_per_week=40.0, income='<=50K', marita

In [6]:
#Check the most commonly used vals.
print dfraw.groupBy(dfraw["workclass"]).count().orderBy("count",ascending=False).show()
print dfraw.groupBy(dfraw["occupation"]).count().orderBy("count",ascending=False).show()
print dfraw.groupBy(dfraw["native_country"]).count().orderBy("count",ascending=False).show()

+----------------+-----+
|       workclass|count|
+----------------+-----+
|         Private|33906|
|Self-emp-not-inc| 3862|
|       Local-gov| 3136|
|               ?| 2799|
|       State-gov| 1981|
|    Self-emp-inc| 1695|
|     Federal-gov| 1432|
|     Without-pay|   21|
|    Never-worked|   10|
+----------------+-----+

None
+-----------------+-----+
|       occupation|count|
+-----------------+-----+
|   Prof-specialty| 6172|
|     Craft-repair| 6112|
|  Exec-managerial| 6086|
|     Adm-clerical| 5611|
|            Sales| 5504|
|    Other-service| 4923|
|Machine-op-inspct| 3022|
|                ?| 2809|
| Transport-moving| 2355|
|Handlers-cleaners| 2072|
|  Farming-fishing| 1490|
|     Tech-support| 1446|
|  Protective-serv|  983|
|  Priv-house-serv|  242|
|     Armed-Forces|   15|
+-----------------+-----+

None
+------------------+-----+
|    native_country|count|
+------------------+-----+
|     United-States|43832|
|            Mexico|  951|
|                 ?|  857|
|      

In [7]:
#Missing data imputation - Impute the most common row for "?".
dfrawrp = dfraw.na.replace(["?"], ["Private"], ["workclass"])
dfrawrpl = dfrawrp.na.replace(["?"], ["Prof-specialty"], ["occupation"])
dfrawnona = dfrawrpl.na.replace(["?"], ["United-States"], ["native_country"])

In [8]:
dfrawnona.show()

+----+------------+------------+------------+--------+--------------+------+--------------------+--------------+-----------------+------------------+-------------+------+----------------+
| age|capital_gain|capital_loss|   education|  fnlwgt|hours_per_week|income|      marital_status|native_country|       occupation|              race| relationship|   sex|       workclass|
+----+------------+------------+------------+--------+--------------+------+--------------------+--------------+-----------------+------------------+-------------+------+----------------+
|39.0|      2174.0|         0.0|   Bachelors| 77516.0|          40.0| <=50K|       Never-married| United-States|     Adm-clerical|             White|Not-in-family|  Male|       State-gov|
|50.0|         0.0|         0.0|   Bachelors| 83311.0|          13.0| <=50K|  Married-civ-spouse| United-States|  Exec-managerial|             White|      Husband|  Male|Self-emp-not-inc|
|38.0|         0.0|         0.0|     HS-grad|215646.0|      

In [40]:
#converting strings to numeric values
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

dfnumeric = indexStringColumns(dfrawnona, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country", "income"])

In [41]:
dfnumeric.show()

+----+------------+------------+--------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
| age|capital_gain|capital_loss|  fnlwgt|hours_per_week|workclass|education|marital_status|occupation|relationship|race|sex|native_country|income|
+----+------------+------------+--------+--------------+---------+---------+--------------+----------+------------+----+---+--------------+------+
|39.0|      2174.0|         0.0| 77516.0|          40.0|      3.0|      2.0|           1.0|       3.0|         1.0| 0.0|0.0|           0.0|   0.0|
|50.0|         0.0|         0.0| 83311.0|          13.0|      1.0|      2.0|           0.0|       2.0|         0.0| 0.0|0.0|           0.0|   0.0|
|38.0|         0.0|         0.0|215646.0|          40.0|      0.0|      0.0|           2.0|       8.0|         1.0| 0.0|0.0|           0.0|   0.0|
|53.0|         0.0|         0.0|234721.0|          40.0|      0.0|      5.0|           0.0|       8.0|         0.0| 1.

In [58]:
from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, ["workclass", "education", "marital_status", "occupation", "relationship", "race", "native_country"])

In [59]:
dfhot.show()

+----+------------+------------+--------+--------------+---+------+-------------+---------------+--------------+--------------+-------------+-------------+---------------+
| age|capital_gain|capital_loss|  fnlwgt|hours_per_week|sex|income|    workclass|      education|marital_status|    occupation| relationship|         race| native_country|
+----+------------+------------+--------+--------------+---+------+-------------+---------------+--------------+--------------+-------------+-------------+---------------+
|39.0|      2174.0|         0.0| 77516.0|          40.0|0.0|   0.0|(8,[3],[1.0])| (16,[2],[1.0])| (7,[1],[1.0])|(14,[3],[1.0])|(6,[1],[1.0])|(5,[0],[1.0])| (41,[0],[1.0])|
|50.0|         0.0|         0.0| 83311.0|          13.0|0.0|   0.0|(8,[1],[1.0])| (16,[2],[1.0])| (7,[0],[1.0])|(14,[2],[1.0])|(6,[0],[1.0])|(5,[0],[1.0])| (41,[0],[1.0])|
|38.0|         0.0|         0.0|215646.0|          40.0|0.0|   0.0|(8,[0],[1.0])| (16,[0],[1.0])| (7,[2],[1.0])|(14,[8],[1.0])|(6,[1],[1.0])

In [50]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
input_cols=["age","capital_gain","capital_loss","fnlwgt","hours_per_week","sex","workclass","education","marital_status","occupation","relationship","native_country","race"]

#VectorAssembler takes a number of collumn names(inputCols) and output column name (outputCol)
#and transforms a DataFrame to assemble the values in inputCols into one single vector with outputCol.
va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(dfhot).select("features", "income").withColumnRenamed("income", "label")

In [51]:
lpoints.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(103,[0,1,3,4,9,1...|  0.0|
|(103,[0,3,4,7,16,...|  0.0|
|(103,[0,3,4,6,14,...|  0.0|
|(103,[0,3,4,6,19,...|  0.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,5,6,2...|  0.0|
|(103,[0,3,4,7,14,...|  1.0|
|(103,[0,1,3,4,5,6...|  1.0|
|(103,[0,1,3,4,6,1...|  1.0|
|(103,[0,3,4,6,15,...|  1.0|
|(103,[0,3,4,9,16,...|  1.0|
|(103,[0,3,4,5,6,1...|  0.0|
|(103,[0,3,4,6,20,...|  0.0|
|(103,[0,3,4,6,18,...|  1.0|
|(103,[0,3,4,6,22,...|  0.0|
|(103,[0,3,4,7,14,...|  0.0|
|(103,[0,3,4,6,14,...|  0.0|
|(103,[0,3,4,6,19,...|  0.0|
|(103,[0,3,4,5,7,1...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [60]:
#Divide the dataset into training and testing sets.
splits = lpoints.randomSplit([0.8, 0.2])

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
adulttrain = splits[0].cache()
adultvalid = splits[1].cache()

In [69]:
#Train the model.
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(adulttrain)
#The above lines are same as..
#lr = LogisticRegression()
#lrmodel = lr.setParams(regParam=0.01, maxIter=1000, fitIntercept=True).fit(adulttrain)

In [70]:
#Interpret the model parameters
print(lrmodel.coefficients)
print(lrmodel.intercept)

[0.020886655493,0.00013814208456,0.000536023519324,6.62226843118e-07,0.0270660441379,-0.487503141373,0.0161366388365,-0.383200902707,0.0907533856955,-0.122649521649,0.258675488878,0.579727153328,-0.724588401247,-1.42443086934,-0.327174695493,-0.0275767138075,0.752433780449,1.11409927583,0.112983115872,-1.03163157169,0.209985019504,-1.14798545423,-1.34006633173,1.68985368222,-1.25924296867,-0.666037687585,1.71649694015,-1.2636995093,-1.4126439157,-1.74917838824,0.829447909905,-0.671006009667,-0.309814152094,-0.325528322448,-0.29411582029,-0.230698769544,0.702149407656,0.188305041635,0.0412512600486,0.660236786877,-0.0307415709937,0.229514872575,-0.726889295886,-0.30092451067,-0.110126977672,-0.62334158471,-0.893698429616,0.426415766812,0.287298545489,-0.860518044844,0.38269314022,0.449933930274,-0.100124028741,-0.784646063433,-0.285406836653,1.27012603093,-0.529018656198,0.222965040344,-0.58954925667,0.31958228822,0.0517411473652,-0.431112372582,0.5186145274,-0.47000451594,0.00018149742

In [80]:
#Evaluate models using test dataset.
#First, transform the validation set.
validpredicts = lrmodel.transform(adultvalid)
validpredicts.show()

#rawPrediction : includes two values - log-odds that a sample doesn't and does belong to the category (making > 50,000).
#probability : the probability that the sample is not in the category.
#prediction : proability that the sample belongs to the category.
#validpredicts.select("rawPrediction").collect()
#validpredicts.select("probability").collect()


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(103,[0,1,3,4,5,6...|  0.0|[0.66359415944655...|[0.66006730376248...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[-0.1135414329086...|[0.47164509703669...|       1.0|
|(103,[0,1,3,4,5,6...|  0.0|[0.94540936331228...|[0.72019102745677...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[0.64857384994214...|[0.65668901109215...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[-0.9479794955166...|[0.27929134323132...|       1.0|
|(103,[0,1,3,4,5,6...|  0.0|[4.36950698924466...|[0.98750072994855...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[5.22655782204167...|[0.99465671918182...|       0.0|
|(103,[0,1,3,4,5,6...|  0.0|[3.00146514442795...|[0.95264027328105...|       0.0|
|(103,[0,1,3,4,5,6...|  1.0|[1.01444144459535...|[0.73388844702834...|       0.0|
|(103,[0,1,3,4,5

In [81]:
#Evaluate the model. default metric : Area Under ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderROC:0.905679568497


In [82]:
#Evaluate the model. metric : Area Under PR
bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderPR:0.755839393918


In [90]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(5)
#ParamGridBuilder() – combinations of parameters and their values.
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [1000]).addGrid(lr.regParam, [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
#setEstimatorParamMaps() takes ParamGridBuilder().
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(adulttrain)

In [91]:
print cvmodel.bestModel.coefficients
print cvmodel.bestModel.intercept
print cvmodel.bestModel._java_obj.getMaxIter()
print cvmodel.bestModel._java_obj.getRegParam()

[0.0234338788762,0.000304796114907,0.00062874334433,7.77035489089e-07,0.0307924540079,-0.641099248989,-0.443318171871,-0.925069998667,-0.385154125631,-0.613372592103,-0.251827208678,0.158669624309,-1.26846300514,-4.89332618123,-0.581189343102,-0.24593770866,0.598810471799,0.987127931754,-0.111951081402,-1.52397636113,0.0128174398478,-1.61319972409,-1.82924200939,1.67144413992,-1.74256992422,-1.00488031153,1.69687112398,-1.760260614,-2.06164116827,-5.66628580183,1.2366685519,-1.43102072707,-1.02259537336,-1.02000653922,-1.01902169702,-0.970378937429,1.16944496017,-0.0479286537312,-0.140962876756,0.481171600081,-0.231439290576,0.0337140396313,-1.08113353137,-0.511945959385,-0.304846798908,-0.900248079746,-1.27130944447,0.255039899041,0.13218758793,-1.80910679783,0.276849741455,-0.389592730611,0.119805536582,-1.00326154074,-0.0370740551346,0.671326335818,-0.874230777888,-0.992978546689,-1.90048459587,-0.916423335565,-1.19798966162,-1.70286290138,-0.63243262278,-1.89392968124,-1.3855307651

In [92]:
BinaryClassificationEvaluator().evaluate(cvmodel.bestModel.transform(adultvalid))

0.9088674800439503

In [93]:
BinaryClassificationEvaluator().setMetricName("areaUnderPR").evaluate(cvmodel.bestModel.transform(adultvalid))

0.7691322262239652