# Predict whether salary will be above or below 50K
  * Project by Akshay

## Objective
* Predict whether the salary of people is above or below 50K on the basis of various factors such as workclass, education, occupation, marital_status, sex, race.

* Use a random forest algorithm
  * Train on a portion of the dataset
  * Test the trained model against the remainder of the dataset
    * Accuracy can be determined because the dataset is labeled (i.e., this uses supervised learning)

## Download dataset
  * Dataset: https://archive.ics.uci.edu/ml/machine-learning-databases/adult/

### Column Description
  * The dataset does not contain the header row and we need to add it explicitly.
  * Below is the description of  columns in the order they appear in the dataset:
  	*<br/><b>age</b>: continuous.
	*<br/><b>workclass</b>: Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked.
	*<br/><b>fnlwgt</b>: continuous.
	*<br/><b>education</b>: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool.
	*<br/><b>education-num</b>: continuous.
	*<br/><b>marital-status</b>: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse.
	*<br/><b>occupation</b>: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces.
	*<br/><b>relationship</b>: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried.
	*<br/><b>race</b>: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black.
	*<br/><b>sex</b>: Female, Male.
	*<br/><b>capital-gain</b>: continuous.
	*<br/><b>capital-loss</b>: continuous.
	*<br/><b>hours-per-week</b>: continuous.
	*<br/><b>native-country</b>: United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands

In [5]:
%sh
mkdir -p kaggle_project
curl 'https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data' > kaggle_project/adult.data
ls kaggle_project

In [6]:
#read the adult.data file as CSV
# we keep header option as false since the file does not have any headers and we want default names
df_data = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'false')\
  .option('inferSchema', 'true')\
  .load("file:/databricks/driver/kaggle_project/adult.data")
df_data.show(3)

In [7]:
df_data.printSchema()

##Select only the relevant columns from the dataset and rename the columns

In [9]:
df = df_data.selectExpr('_c0 as age','_c1 as WorkClass', '_c3 as education', '_c5 as marital_status', '_c6 as occupation', '_c8 as race', '_c9 as sex', '_c12 as hoursPerWeek', '_c13 as native_country','_c14 as Salary')

## Data Cleaning

In [11]:
#Remove all rows where WorkClass = ? or Occupation = ?
df = df.where(~(df.WorkClass.contains('?') | df.occupation.contains('?')))

## Data explored and explained

In [13]:
df.printSchema()

In [14]:
df.show(30)

In [15]:
df.registerTempTable('adultTbl')

In [16]:
%sql
--We see the number of distinct values for each column
select count(distinct sex) as DistinctSexCount, count(distinct education) as DistinctEduCnt, count(distinct marital_status) as DistinctMarCnt, count(distinct occupation) as DistinctOccCnt, count(distinct workclass) as DistinctWorkClassCnt, count(distinct race) as DistinctRaceCnt, count(distinct age) as DistinctAgeCnt, count(distinct native_country) as DistinctCountryCnt, count(distinct hoursPerWeek) as DistinctHoursCnt from adultTbl;

In [17]:
df.select('Salary').distinct().show()

In [18]:
df.select('Salary').groupBy('Salary').count().show()

## Visualizations in Tableau 
  
 * The tableau book (Kaggle_project_visualizations.twb) has been provided along with this notebook.
 * The visualizations have also been provided in jpeg format. 
 * Alternately, they can be found at the below links:
   * Salary vs Occupation vs Sex
     * https://photos.app.goo.gl/PU21ojo1BQwJfcXu2
   * Salary vs Work vs Class
     * https://photos.app.goo.gl/YWFh3WrGtzkIO0Ta2
   * Salary vs Race vs Age-group
     * https://photos.app.goo.gl/VxaKiV0gUrcCuuoc2

In [20]:
df.count()

## Data transformation

In [22]:
splitted_data = df.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print "Number of training records: " + str(train_data.count())
print "Number of testing records : " + str(test_data.count())
print "Number of prediction records : " + str(predict_data.count())

## Data modeling

In [24]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model

In [25]:
#Check the schema once again to know the names of the columns
df.printSchema()

In [26]:
SexTypes = int(df.select('sex').distinct().count())
EducationTypes = int(df.select('education').distinct().count())
Marital_stat_type = int(df.select('marital_status').distinct().count())
OccupationTypes = int(df.select('occupation').distinct().count())
WorkClassTypes = int(df.select('workclass').distinct().count())
RaceTypes = int(df.select('race').distinct().count())
DistinctAges = int(df.select('age').distinct().count()) 
DistinctCountries = int(df.select('native_country').distinct().count())
DistinctHours = int(df.select('hoursPerWeek').distinct().count())

print [['SexTypes', SexTypes], ['EducationTypes', EducationTypes], ['Marital_stat_type', Marital_stat_type], ['OccupationTypes', OccupationTypes], ['WorkClassTypes', WorkClassTypes],['RaceTypes', RaceTypes],['DistinctAges', DistinctAges],['DistinctCountries', DistinctCountries],['DistinctHours', DistinctHours]]
resultDF = sqlContext.createDataFrame([['SexTypes', SexTypes], ['EducationTypes', EducationTypes], ['Marital_stat_type', Marital_stat_type], ['OccupationTypes', OccupationTypes], ['WorkClassTypes', WorkClassTypes],['RaceTypes', RaceTypes],['DistinctAges', DistinctAges],['DistinctCountries', DistinctCountries],['DistinctHours', DistinctHours]], ['metric', 'value'])
display(resultDF)

## Observation:
 * Thus we see that there are <b>large number of distinct values</b> for the columns <b>Age</b>, <b>Country</b> and <b>HoursPerWeek</b>.
 * We will leave out these features as it will affect the accuracy of our model.
 * Thus we will determine whether the Salary of a person is <= 50K or >50K on the basis of his <b>education</b>, <b>occupation</b>, <b>sex</b>, <b>marital_status</b>, <b>workClass</b> and <b>Race</b>.

In [28]:
stringIndexer_label = StringIndexer(inputCol="Salary", outputCol="label").fit(df)
# Note the above outputCol is label (the predicted column). Here we predict the Salary(above) from the attributes below.
stringIndexer_WC = StringIndexer(inputCol="WorkClass", outputCol="WORKCLASS_IX")
stringIndexer_edu = StringIndexer(inputCol="education", outputCol="EDUCATION_IX")
stringIndexer_mariStat = StringIndexer(inputCol="marital_status", outputCol="MARITAL_STATUS_IX")
stringIndexer_occuptn = StringIndexer(inputCol="occupation", outputCol="OCCUPATION_IX")
stringIndexer_race = StringIndexer(inputCol="race", outputCol="RACE_IX")
stringIndexer_sex = StringIndexer(inputCol="sex", outputCol="SEX_IX")

In [29]:
# Select the input columns for the model (and put them into one features column)
vectorAssembler_features = VectorAssembler(inputCols=["WORKCLASS_IX", "EDUCATION_IX", "MARITAL_STATUS_IX", "OCCUPATION_IX", "RACE_IX", "SEX_IX"], outputCol="features")

In [30]:
# The model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

In [31]:
# Columns for the output
# Convert from indexed labels (added above) back to original labels.
# https://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.feature.IndexToString
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=stringIndexer_label.labels)

In [32]:
# The ML pipeline
pipeline_rf = Pipeline(stages=[stringIndexer_label, stringIndexer_WC, stringIndexer_edu, stringIndexer_mariStat, stringIndexer_occuptn, stringIndexer_race, stringIndexer_sex, vectorAssembler_features, rf, labelConverter])

In [33]:
# Model training
model_rf = pipeline_rf.fit(train_data)

## Model evaluation

In [35]:
# Model quality
predictions = model_rf.transform(test_data)
evaluatorRF = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluatorRF.evaluate(predictions)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

In [36]:
display(predictions)

In [37]:
#Number of predictions predicting above 50K salary and below 50K salary
# above 50K -> 1.0
# below 50K -> 0.0
predictions.select('prediction').groupBy('prediction').count().show()

In [38]:
correct = predictions.where("(label = prediction)").count()
incorrect = predictions.where("(label != prediction)").count()

resultDF = sqlContext.createDataFrame([['correct', correct], ['incorrect', incorrect]], ['metric', 'value'])
display(resultDF)