# Spark ML
#### Dataset Introduction
This is a popular dataset for classification. Given a feature vector of 14 census results, the problem is to predict whether a persons income is greater than 50K.

#### Open Files (use traindata to train, testdata to test)

In [3]:
dbutils.fs.mount("s3a://"+"AKIAJH57T"+"SADMXPN"+"3NWA:cl7ON3wPVCf"+"a42eAzHjRD"+"v0iVJgsApuS"+"H3qwyMwF"+"@mlonspark", "/mnt/mlonspark")

In [4]:
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)


In [5]:
trainFileRDD = sc.textFile("/mnt/mlonspark/adult.traindata.gz")
testFileRDD = sc.textFile("/mnt/mlonspark/adult.testdata.gz")

print trainFileRDD.take(5)


#### Description of Fields
Note: For all categorial data, the number the number corresponds to a category. I.e. 1 = "Private", 2="Self-emp-not-inc" for the workclass (2nd) column.

* 0-age: continuous.
* 1-workclass: Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked.
* 2-fnlwgt: continuous.
* 3-education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool.
* 4-education-num: continuous.
* 5-marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse.
* 6-occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces.
* 7-relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried.
* 8-race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black.
* 9-sex: Female, Male.
* 10-capital-gain: continuous.
* 11-capital-loss: continuous.
* 12-hours-per-week: continuous.
* 13-native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands.
* 14-income: >50K, <=50K

#### Create a dataframe with the following fields matching the dataset:
With quotes: "age", "workclass", "fnlwgt", "education", "education_num", "marital_status", "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss", "hours_per_week", "native_country", "income"

Without Quotes: age ,  workclass ,  fnlwgt ,  education ,  education_num ,  marital_status ,  occupation ,  relationship ,  race ,  sex ,  capital_gain ,  capital_loss ,  hours_per_week ,  native_country ,  income 

(You'll find the above 2 lines useful when copy/pasting)

In [8]:
trainFileRDD1 = trainFileRDD.map(lambda x: x.replace(" ",""))
trainFileRDD2 = trainFileRDD1.map(lambda x: x.split(","))
trainFileRDD3 = trainFileRDD2.filter(lambda x : len(x) == 15)
def strToint(x):
  try:
    return int(x)
  except:
    return x
trainData = trainFileRDD3.map(lambda x :  [strToint(col) for col in x])  

testFileRDD11 = testFileRDD.map(lambda x: x.replace(" ",""))
testFileRDD1=testFileRDD11.map(lambda x:x[:-1])
testFileRDD2 = testFileRDD1.map(lambda x: x.split(","))
testFileRDD3 = testFileRDD2.filter(lambda x : len(x) == 15)
def strToint(x):
  try:
    return int(x)
  except:
    return x
testData = testFileRDD3.map(lambda x :  [strToint(col) for col in x])

In [9]:
trainData.take(5)

In [10]:
trainDataSchema = spark.createDataFrame(trainData,["age", "workclass", "fnlwgt", "education", "education_num", "marital_status", "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss", "hours_per_week", "native_country", "income"])
testDataSchema = spark.createDataFrame(testData,["age", "workclass", "fnlwgt", "education", "education_num", "marital_status", "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss", "hours_per_week", "native_country", "income"])
#trainDataSchema.head(5)
testDataSchema.take(5)

#### Use StringIndexer to encode the "string" typed fields (i.e. generate the dataset we used in the Random Forest Section)
I.e. produce the following two dataframe objects:
* trainLFDF: is a dataframe of [features: list, label: ]
* testLFDF: is a dataframe of [features: list, label: int]
* where feactures is a list of double values, with all string values converted to double values

Hint: Create one StringIndexer object for each column to convert from String to double, then use a pipeline to add an array of StringIndexer objects

Use VectorAssembler

Look at here for example of StringIndexer: http://spark.apache.org/docs/latest/ml-features.html#stringindexer

Look at here for example of a Pipeline:
http://spark.apache.org/docs/latest/ml-guide.html#example-pipeline

In [12]:
display(trainDataSchema)

In [13]:
trainDataSchema.printSchema

In [14]:
trainDataSchema.collect()

In [15]:
workclassIndexer.fit(trainDataSchema)

In [16]:
workclassIndexer = StringIndexer().setInputCol("workclass").setOutputCol("workclassIndex")
educationIndexer = StringIndexer().setInputCol("education").setOutputCol("educationIndex")
marital_statusIndexer = StringIndexer().setInputCol("marital_status").setOutputCol("marital_statusIndex")
occupationIndexer = StringIndexer().setInputCol("occupation").setOutputCol("occupationIndex")
relationshipIndexer = StringIndexer().setInputCol("relationship").setOutputCol("relationshipIndex")
raceIndexer = StringIndexer().setInputCol("race").setOutputCol("raceIndex")
incomeIndexer = StringIndexer().setInputCol("income").setOutputCol("incomeIndex")
sexIndexer = StringIndexer().setInputCol("sex").setOutputCol("sexIndex")
native_countryIndexer = StringIndexer().setInputCol("native_country").setOutputCol("native_countryIndex")
featuresAssembler = VectorAssembler(inputCols=["age", "workclassIndex", "fnlwgt", "educationIndex", "education_num", "marital_statusIndex", "occupationIndex", "relationshipIndex", "raceIndex", "sexIndex", "capital_gain", "capital_loss", "hours_per_week", "native_countryIndex"],outputCol="Features")

pipleline = Pipeline().setStages([workclassIndexer,educationIndexer,marital_statusIndexer,occupationIndexer,relationshipIndexer,raceIndexer,incomeIndexer,sexIndexer,native_countryIndexer,featuresAssembler])
model = pipleline.fit(trainDataSchema)
trainDataT = model.transform(trainDataSchema)
trainDataTf = trainDataT.select("Features","incomeIndex")
testDataTf = model.transform(testDataSchema).select("Features","incomeIndex")


In [17]:
testDataTf.collect()

#### RandomForest
1. Create another pipeline to train with training dataset RandomForestClassifier (with maxbins=50, numtrees=10, maxdepth=10).
1. use the model to predict the test dataset
1. use MulticlassClassificationEvaluator to get the "precision" of the model on the test dataset

example here: http://spark.apache.org/docs/latest/ml-ensembles.html#example-classification

In [19]:
Rf =  RandomForestClassifier().setLabelCol("incomeIndex").setFeaturesCol("Features").setNumTrees(10).setMaxDepth(10).setMaxBins(50)
piplelineRf = Pipeline().setStages([Rf])
modelRf = piplelineRf.fit(trainDataTf)
predictions = modelRf.transform(testDataTf)

In [20]:
display(predictions)

In [21]:
evaluator = MulticlassClassificationEvaluator()\
  .setLabelCol("incomeIndex")\
  .setPredictionCol("prediction")\
  .setMetricName("accuracy")
accuracy = evaluator.evaluate(predictions)

print "Test Error = %5.2f%%" % ((1.0 - accuracy)*100)

In [22]:
print("megha")