## Salary

**GOAL:** predict whether an adult’s income exceeds $50K/year based on census data

**TOOLS:**
  * pandas
  * spark ML lib


----------------------------------------------

Let's prepare the environment for using `pyspark` (later)

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version
!pip install pyspark

You can either download the data from [here](https://archive.ics.uci.edu/ml/datasets/adult) or use the `request` library of `python3` to load the data.

We save the results into local files for further use.


In [0]:
import requests
urldata = 'http://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data'
r = requests.get(urldata, allow_redirects=True)
open('adult.data', 'wb').write(r.content)

urltest = 'http://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test'
r = requests.get(urltest, allow_redirects=True)
open('adult.test', 'wb').write(r.content)

As for now, we will use only `pandas`.

In [0]:
import pandas as pd
import numpy as np

You can check (e.g., `head` or `cat`) that there is no header in the downloaded `csv` files. Let's create a header for our data.

In [0]:
column_names = [
    'age',
    'workclass',
    'fnlwgt',
    'education',
    'education-num',
    'marital-status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'capital-gain',
    'capital-loss',
    'hours-per-week',
    'native-country',
    'salary'
]

Let's load the data (training and testing).

In [132]:
train_df = pd.read_csv('adult.data', names=column_names)
test_df = pd.read_csv('adult.test', names=column_names, skiprows=1)
print(len(train_df.columns) == len(test_df.columns))

True


In [118]:
train_df.columns

Index(['age', 'workclass', 'fnlwgt', 'education', 'education-num',
       'marital-status', 'occupation', 'relationship', 'race', 'sex',
       'capital-gain', 'capital-loss', 'hours-per-week', 'native-country',
       'salary'],
      dtype='object')

Some considerations have to be made.


1.   Our dataframes contain heterogenous data types (`str`, `int`, etc)
2.   `test_df` does not contain anyone born in the Netherlands.

We have thus to *clean* the `str` lines (e.g., removing white spaces), remove any Netherland from the training set and save the results to *cleaned* local files for further use.



In [0]:
train_df = train_df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
train_df_cp = train_df.copy()

train_df_cp = train_df_cp.loc[train_df_cp['native-country'] != 'Holand-Netherlands']
train_df_cp.to_csv('train.csv', index=False, header=False)

test_df = test_df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
test_df.to_csv('test.csv', index=False, header=False)

In [115]:
!head train.csv


39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K
37,Private,284582,Masters,14,Married-civ-spouse,Exec-managerial,Wife,White,Female,0,0,40,United-States,<=50K
49,Private,160187,9th,5,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0,0,16,Jamaica,<=50K
52,Self-emp-not-inc,209642,HS-grad,9,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,45,United-States,>50K
31,Private,45781,Masters,14,Never-married,Prof-specialty,Not-in-family,White,Female,14084,0,50,United-States,>50K
42

In [0]:
print('Training data shape: ', train_df.shape)
print('Testing data shape: ', test_df.shape)


Let's check if the dataset is complete. That is, if there are missing values in the training dataset. (hint: answer is **no**).

In [122]:
train_df.isnull().sum()

age               0
workclass         0
fnlwgt            0
education         0
education-num     0
marital-status    0
occupation        0
relationship      0
race              0
sex               0
capital-gain      0
capital-loss      0
hours-per-week    0
native-country    0
salary            0
dtype: int64

Categorical variables will have a `object` type.
 
Categorical variables must be encoded in order to be interpreted by machine learning models (other than decision trees).

In [123]:
train_df.dtypes.value_counts()

object    9
int64     6
dtype: int64

The following code prints the distinct number of categories for each categorical variable.

In [124]:
train_df.select_dtypes('object').apply(pd.Series.nunique, axis=0)

workclass          9
education         16
marital-status     7
occupation        15
relationship       6
race               5
sex                2
native-country    42
salary             2
dtype: int64

In [125]:
test_df.select_dtypes('object').apply(pd.Series.nunique, axis=0)

workclass          9
education         16
marital-status     7
occupation        15
relationship       6
race               5
sex                2
native-country    41
salary             2
dtype: int64

One Hot Encoding: we do not want to create two columns (`<=50K` and `>50K`), so we manually encode it.

In [126]:
train_df['salary'] = train_df['salary'].apply(lambda x: 0 if x == '<=50K' else 1)
test_df['salary'] = test_df['salary'].apply(lambda x: 0 if x == '<=50K' else 1)

train_df = pd.get_dummies(train_df)  # get categorical data
test_df = pd.get_dummies(test_df)

print('Training Features shape: ', train_df.shape)
print('Testing Features shape: ', test_df.shape)

Training Features shape:  (32561, 109)
Testing Features shape:  (16281, 108)


In [127]:
# Align the training and testing data, keep only columns present in both dataframes
train_df, test_df = train_df.align(test_df, join = 'inner', axis = 1)

print('Training Features shape: ', train_df.shape)
print('Testing Features shape: ', test_df.shape)

Training Features shape:  (32561, 108)
Testing Features shape:  (16281, 108)


Break up the dataframes into dependent and independent variables.

In [0]:
X_train = train_df.drop('salary', axis=1)
y_train = train_df['salary']

X_test = test_df.drop('salary', axis=1)
y_test = test_df['salary']

We need to scale our data because `scikit-learn` uses `L2` regularization, that penalizes large values of all parameters equally.

In [0]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range = (0, 1))

scaler.fit(X_train)

X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

Finally, let's train and compute the accuracy score of the model on the testing set.

In [130]:
from sklearn.linear_model import LogisticRegression

lr = LogisticRegression(solver='liblinear')

lr.fit(X_train, y_train)

lr_pred = lr.predict(X_test)

from sklearn.metrics import accuracy_score

accuracy_score(y_test, lr_pred)

0.19200294822185368

Let's switch to Spark.

In [0]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
spark = SparkSession.builder.appName("Predict Adult Salary").getOrCreate()

In [0]:
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", IntegerType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital-gain", IntegerType(), True),
    StructField("capital-loss", IntegerType(), True),
    StructField("hours-per-week", IntegerType(), True),
    StructField("native-country", StringType(), True),
    StructField("salary", StringType(), True)
])

In [0]:
train_df = spark.read.csv('train.csv', header=False, schema=schema)

test_df = spark.read.csv('test.csv', header=False, schema=schema)

In [0]:
categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']
continuous_variables = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']