# Pyspark ML Project
This notebook contains the exact peoblem discussed in [Pandas_income_project] with the puspuse to learn the differences between [MLlib] in Pyspark and [Scikitlearn and Pandas].

## Table of content
1. [Framing the problem](#problem)
2. [imports](#imports)
3. [SparkSession](#sparksession)
4. [Read and Explore the data](#data) <br>
    4.1. [Original raw data](#data1)<br>
    4.2. [data midefied in pandas Jupyternotebook](#data2)
5. [preprocessing and Feature engineering](#preproc)<br>
    5.1. [Categorical_variables](#cat) <br>
    5.2. [Continious_variables](#con)
6. [Model and Prediction](#pred)
7. [Metric](#metric)


## 1. Framing the problem
<a id="problem"></a>

The goal is to predict whether an adult’s income exceeds $50K/year based on census data. \
The data can be downloaded from the UC Irvine Machine Learning Repository.

## 2. Imports
<a id="imports"></a>

In [94]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.sql import functions as F

from functools import reduce


## 3. SparkSession
<a id="SparkSession"></a>

The entry point into all functionality in Spark is the SparkSession class. The code in the proceeding section will be running on our local machine. 
To create a basic SparkSession, we can use SparkSession.builder:

In [95]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## 4. Read and Explore the data
<a id="data"></a>

First I define the column names which I will use when reading in the data.

In [96]:
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", IntegerType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital-gain", IntegerType(), True),
    StructField("capital-loss", IntegerType(), True),
    StructField("hours-per-week", IntegerType(), True),
    StructField("native-country", StringType(), True),
    StructField("salary", StringType(), True)
])

### 4.1 Original raw data
<a id="data1"></a>

There are two different data sets loaded here one the original raw data set and one the data set which was modified in the Pandas_income_project jupyternotebook.  

This part can be ignored and you can proceed from 4.2. Here is the raw data: 

In [106]:
#adult_df = spark.read.csv("./data/adult.data", header= False, inferSchema= False, schema= schema)

#adult_df.printSchema()
#display(adult_df)

In [105]:
#adult_df.limit(5).toPandas()

In [104]:
#adult_test_df = spark.read.csv("./data/adult.test", header= True, inferSchema= False, schema= schema)


#adult_test_df.printSchema()
#display(adult_test_df)



In [103]:
#adult_test_df.limit(5).toPandas()

### 4.2 Data midefied in pandas Jupyternotebook
<a id="data2"></a>

In this project I used this method since I could easily delet the first row in the test data and also I could use the defined schema as the column names.

In [107]:
train_df = spark.read.csv('./data/train.csv', header=False, schema=schema)

##### The schema doesn't work for the test data due to different data types in a column, therefore I use withColumnRenamed and reduce finction to set the column names.

In [108]:
test_df = spark.read.csv('./data/test.csv', inferSchema=True, header = False)#, schema=schema)

oldColumns = test_df.schema.names
newColumns = ["age", "workclass", "fnlwgt", "education", "education-num", "marital-status", "occupation", "relationship"\
              ,"race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "salary"]
test_df = reduce(lambda test_df, idx: test_df.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), test_df)


The .show() function is not very nice to look at the data better to use .toPandas() or if using databricks the display function.

In [109]:
train_df.show(10)

+---+----------------+------+---------+-------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education-num|      marital-status|       occupation| relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native-country|salary|
+---+----------------+------+---------+-------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|            Divorced|Handlers-cl

In [112]:
train_df.limit(5).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


In [113]:
test_df.limit(5).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,25,Private,226802.0,11th,7.0,Never-married,Machine-op-inspct,Own-child,Black,Male,0.0,0.0,40.0,United-States,<=50K.
1,38,Private,89814.0,HS-grad,9.0,Married-civ-spouse,Farming-fishing,Husband,White,Male,0.0,0.0,50.0,United-States,<=50K.
2,28,Local-gov,336951.0,Assoc-acdm,12.0,Married-civ-spouse,Protective-serv,Husband,White,Male,0.0,0.0,40.0,United-States,>50K.
3,44,Private,160323.0,Some-college,10.0,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688.0,0.0,40.0,United-States,>50K.
4,18,?,103497.0,Some-college,10.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K.


### 5. preprocessing and Feature engineering
<a id="preproc"></a>

##### Some important dinstinction between Spark and Scikit-learn/Pandas are:
1) Spark DataFrames are immutable. Thus, whenever we want to apply transformations, we must do so by creating new columns.

2) MLlib expects all features to be contained within a single column.

#### 5.1. Categorical_variables
<a id="cat"></a>
Here the "pyspark.ml.feature" module is used to encode/all of the necessary transformations to the categorical variables. 

The [StringIndexer] class performs label encoding and must be applied before the [OneHotEncoderEstimator] which in turn performs one hot encoding. The [VectorAssembler] class takes multiple columns as input and outputs a single column whose contents is an array containing the values for all of the input columns.


In [114]:
categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']
indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]
encoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)
assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features"
)
pipeline = Pipeline(stages=indexers + [encoder, assembler])
train_df = pipeline.fit(train_df).transform(train_df)
test_df = pipeline.fit(test_df).transform(test_df)

After applying the transformations, we end up with a single column that contains an array with every encoded categorical variable.

In [115]:
train_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native-country-index: double 

#### 5.2. Categorical_variables
<a id="cont"></a>
We combine the continuous variables with the categorical variables into a single column.

In [116]:
continuous_variables = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']
assembler = VectorAssembler(
    inputCols=['categorical-features', *continuous_variables],
    outputCol='features'
)
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

The final column which we’ll use to train our model.It outputs a SparseVector. To save space, sparse vectors do not contain the 0s from one hot encoding.

In [117]:
train_df.limit(5).toPandas()['features'][0]

SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 13.0, 96: 2174.0, 98: 40.0})

#### Finally, we encode our target label.

In [118]:
indexer = StringIndexer(inputCol='salary', outputCol='label')
train_df = indexer.fit(train_df).transform(train_df)
test_df = indexer.fit(test_df).transform(test_df)
train_df.limit(10).toPandas()['label']

0    0.0
1    0.0
2    0.0
3    0.0
4    0.0
5    0.0
6    0.0
7    1.0
8    1.0
9    1.0
Name: label, dtype: float64

### 6. Model and Prediction
<a id="pred"></a>
We fit and train our model.

In [119]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_df)

In [120]:
pred = model.transform(test_df)
pred.limit(10).toPandas()[['label', 'prediction']]

Unnamed: 0,label,prediction
0,0.0,0.0
1,0.0,0.0
2,1.0,0.0
3,1.0,1.0
4,0.0,0.0
5,0.0,0.0
6,0.0,0.0
7,1.0,1.0
8,0.0,0.0
9,0.0,0.0


### 7. Metric
<a id="metric"></a>
We measure models performance using its accuracy.

In [121]:
pred.withColumn('correct', F.col('label') == F.col('prediction')).toPandas()['correct'].mean()

0.8397518579939807