# Use spark for machine learning

 The source data which we use in this tutorial is publicly available at the [UCI Machine Learning Repository](https://archive.ics.uci.edu/dataset/2/adult). This data consists of `48842` american residents and their annual income. 
 The objective of this tutorial is to train a model which can predict if a person's revenu range is `<=50K` or `>50k` a year.
 

Dataset feature description:

| name           | type    | description                                                                                                         |
|----------------|---------|---------------------------------------------------------------------------------------------------------------------|
| age            | double  | age of the resident                                                                                                 |
| workclass      | string  | Simplified employment status of an individual. 9 distinct values: State-gov, Private,Self-emp-not-inc               |
| fnlwgt:        | integer | Final weight of the row. Basically interpret as the number of people represented by this row. (Continous, positive) |
| education      | string  | The education level. Categorical column, 13 distinct values. (e.g. Bachelors, Some-college, 11th, HS-grad...)       |
| education-num: | double  | The education level. Categorical column, 13 distinct values.  (e.g. 7.0, 9.0, 13.0)                                 |
| marital-status | string  | Marital status of a person. Categorical column, 7 distinct values Divorced e.g. Never-married, Separated,...        |
| occupation     | string  | Rough category of the occupation. (categorical, 15 distinct values) Tech-support, Craft-repair, Sales, ...          |
| relationship   | string  | Relationship in terms of the family.  (categorical, 6 distinct values) Wife, Own-child, Husband, ...                |
| race           | string  | Race of the person. (categorical, 5 distinct values) White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black    |
| sex            | string  | gender at-birth (boolean) Female, Male                                                                              |
| capital-gain   | double  | Dollar gain of capital. (continuous)                                                                                |
| capital-loss   | double  | Dollar loss of capital. (continuous)                                                                                |
| hours-per-week | double  | Working hours per week.(continous positive integer)                                                                 |
| native-country | string  | Country at birth.  (categorical, 41 distinct values) United-States, Cambodia, England, Puerto-Rico, Canada, Germany |


Dataset label description:

| name           | type    | description                                                                 |
|----------------|---------|-----------------------------------------------------------------------------|
| income-bracket | boolean | Income range of the person. True if ≥ 50K, otherwise False (< 50K per year) |

In [1]:
from pyspark.sql.types import StructType, DoubleType, StructField, StringType
from pyspark.sql import SparkSession

In [2]:
# Initialize a Spark Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("spark_ml") \
    .getOrCreate()

In [3]:
# data path
file_path = "../data/us_census_1994.csv"

# specify dataset schema
schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])

In [4]:
# load dataset
df_raw = spark.read.format("csv").schema(schema).load(file_path)

In [5]:
df_raw.show(5)

+----+----------------+--------+---------+-------------+------------------+-----------------+-------------+-----+----+------------+------------+--------------+--------------+------+
| age|       workclass|  fnlwgt|education|education_num|    marital_status|       occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+----------------+--------+---------+-------------+------------------+-----------------+-------------+-----+----+------------+------------+--------------+--------------+------+
|NULL|       workclass|    NULL|education|         NULL|     marial-status|       occupation| relationship| race| sex|        NULL|        NULL|          NULL|native-country|income|
|39.0|       State-gov| 77516.0|Bachelors|         13.0|     Never-married|     Adm-clerical|Not-in-family|White|Male|      2174.0|         0.0|          40.0| United-States| <=50K|
|50.0|Self-emp-not-inc| 83311.0|Bachelors|         13.0|Married-civ-spouse|  Exec-manageri

In [6]:
df_raw.printSchema()

root
 |-- age: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



## Preprocess

To use algorithms like `Logistic Regression`, you must first convert the categorical features into numeric variables. There are two ways to do this.

In this tutorial, we use `StringIndexer`, and `OneHotEncoder` to do the transformation.

> One-Hot Encoding: categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

from distutils.version import LooseVersion
from pyspark.ml.feature import OneHotEncoder

categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")    
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]