# Spark Context

**SparkContext** is the internal engine that allows the connections with the clusters. If you want to run an operation, you need a **SparkContext**.

## Create a SparkContext

In [1]:
import pyspark
from pyspark import SparkContext

sc = SparkContext()

Now that the **SparkContext** is ready, you can create a collection of data called **RDD**--_Resilient Distributed Dataset_. Computation in an **RDD** is automatically _parallelized_ across the cluster.

In [2]:
nums = sc.parallelize([1,2,3,4])

You can access the first row with `take`.

In [3]:
nums.take(1)

[1]

You can apply a transformation to the data with a _lambda_ function. In the PySpark example below, you return the square of nums. It is a map transformation.

In [4]:
squared = nums.map(lambda x: x*x).collect()

for num in squared:
    print('%i ' % (num))

1 
4 
9 
16 


## SparkSession

A more convenient way is to use the DataFrame. SparkContext is already set, you can use it to create the DataFrame. You also need to declare the **SparkSession**.

**SparkSession** allows connecting the engine with different data sources. It is used to initiate the functionalities of Spark SQL.

In [5]:
from pyspark.sql import Row
from pyspark.sql import SparkSession

In [6]:
session = SparkSession.builder.getOrCreate()

Let's create a list of tuples. Each tuple will contain the name of the people and their age. Four steps are required:

1. Create the list of tuple with the information.

In [7]:
session

In [8]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]

2. Build the RDD.

In [9]:
rdd = sc.parallelize(list_p)

3. Convert to tuples.

In [10]:
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

4. Create a DataFrame context.

In [11]:
df_ppl = session.createDataFrame(ppl)

To access the type of each feature, use `printSchema()`.

In [12]:
df_ppl.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



## Machine Learning Example with PySpark

Now that you have a brief idea of Spark and SparkSession, you are ready to build your first _Machine Learning_ program using Spark.

Following are the steps to build a Machine Learning program with PySpark:

1. Basic operation with PySpark
2. Data preprocessing
3. Build a data processing pipeline
4. Build the classifier: logistic
5. Train and evaluate the model
6. Tune the hyperparameter

Note that, the dataset we use is not very big and you may think that the computation takes a long time. Spark is designed to process a considerable amount of data. Spark's performance increases relatively to other machine learning libraries when the dataset grows larger.

### Step 1: Basic operation with PySpark

In [13]:
import pandas as pd

url = "https://raw.githubusercontent.com/sadhana1002/PredictingSalaryClass-Classification/master/adult.csv"
df = session.createDataFrame(pd.read_csv(url, 
                                      names=['Age','workclass',
                                             'fnlwgt','education',
                                             'education_num',
                                             'marital',
                                             'occupation',
                                             'relationship','race',
                                             'sex','capital_gain',
                                             'capital_loss',
                                             'hours_week',
                                             'native_country','label']))

In [14]:
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



You can see the data with show.

In [15]:
df.show(5, truncate = False)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|Age|workclass        |fnlwgt|education |education_num|marital            |occupation        |relationship  |race  |sex    |capital_gain|capital_loss|hours_week|native_country|label |
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male  |2174        |0           |40        | United-States| <=50K|
|50 | Self-emp-not-inc|83311 | Bachelors|13           | Married-civ-spouse| Exec-managerial  | Husband      | White| Male  |0           |0           |13        | United-States| <=50K|
|38 | Private         |215646| HS-grad  |9            | Divorced          | Hand

In [16]:
df.show(5, truncate = True)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|Age|        workclass|fnlwgt| education|education_num|            marital|        occupation|  relationship|  race|    sex|capital_gain|capital_loss|hours_week|native_country| label|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
| 39|        State-gov| 77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White|   Male|        2174|           0|        40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|           0|           0|        13| United-States| <=50K|
| 38|          Private|215646|   HS-grad|            9|           Divorced| Hand

To convert the continuous variable in the right format, you can use recast the columns. You can use withColumn to tell Spark which column to operate the transformation.

#### Import all from `sql.types`

In [17]:
from pyspark.sql.types import *

#### Write a custom function to convert the data type of DataFrame columns

In [18]:
def convertColumn(df, names, newType):
    
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
        
    return df

#### List of continuous features

In [19]:
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']

#### Convert the type

In [20]:
df = convertColumn(df, CONTI_FEATURES, FloatType())

#### Check the dataset

In [21]:
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



#### Select columns

You can select and show the rows with select and the names of the features. Below, age and fnlwgt are selected.

In [22]:
df.select('age','fnlwgt').show(5)

+----+--------+
| age|  fnlwgt|
+----+--------+
|39.0| 77516.0|
|50.0| 83311.0|
|38.0|215646.0|
|53.0|234721.0|
|28.0|338409.0|
+----+--------+
only showing top 5 rows



#### Count by group

If you want to count the number of occurrence by group, you can chain:
- `groupBy()`
- `count()`

... together. In the PySpark example below, you count the number of rows by the education level.

In [23]:
df.groupBy("education").count().sort("count",ascending=True).show()

+-------------+-----+
|    education|count|
+-------------+-----+
|    Preschool|   51|
|      1st-4th|  168|
|      5th-6th|  333|
|    Doctorate|  413|
|         12th|  433|
|          9th|  514|
|  Prof-school|  576|
|      7th-8th|  646|
|         10th|  933|
|   Assoc-acdm| 1067|
|         11th| 1175|
|    Assoc-voc| 1382|
|      Masters| 1723|
|    Bachelors| 5355|
| Some-college| 7291|
|      HS-grad|10501|
+-------------+-----+



#### Describe the data

To get a summary statistics, of the data, you can use describe(). It will compute the:
- count
- mean
- standard deviation
- min
- max

In [24]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+------------------+---------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|     education_num|  marital|       occupation|relationship|               race|    sex|      capital_gain|      capital_loss|        hours_week|native_country| label|
+-------+------------------+------------+------------------+-------------+------------------+---------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|             32561|    32561|            32561|       32561|              32561|  32561|             32561|             32561|             32561|         32561| 32561|
|   mean| 38.58164675532078|    

If you want the summary statistic of only one column, add the name of the column inside `describe()`.

In [25]:
df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840342|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



#### Crosstab computation

In some occasions, it can be interesting to see the descriptive statistics between two pairwise columns. For instance, you can count the number of people with income below or above 50k by education level. This operation is called a crosstab.

In [26]:
df.crosstab('age', 'label').sort("age_label").show()

+---------+------+-----+
|age_label| <=50K| >50K|
+---------+------+-----+
|     17.0|   395|    0|
|     18.0|   550|    0|
|     19.0|   710|    2|
|     20.0|   753|    0|
|     21.0|   717|    3|
|     22.0|   752|   13|
|     23.0|   865|   12|
|     24.0|   767|   31|
|     25.0|   788|   53|
|     26.0|   722|   63|
|     27.0|   754|   81|
|     28.0|   748|  119|
|     29.0|   679|  134|
|     30.0|   690|  171|
|     31.0|   705|  183|
|     32.0|   639|  189|
|     33.0|   684|  191|
|     34.0|   643|  243|
|     35.0|   659|  217|
|     36.0|   635|  263|
+---------+------+-----+
only showing top 20 rows



You can see no people have revenue above 50k when they are young.

#### Drop column

There are two intuitive commands to drop columns:
- **drop()**: Drop a column
- **dropna()**: Drop NA's

Below you drop the column education_num

In [27]:
df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

#### Filter data

You can use filter() to apply descriptive statistics in a subset of data. For instance, you can count the number of people above 40:

In [28]:
df.filter(df.age > 40).count()

13443

#### Descriptive statistics by group

Finally, you can group data by group and compute statistical operations like the mean.

In [29]:
df.groupby('marital').agg({'capital_gain': 'mean'}).show()

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|             Widowed| 571.0715005035247|
| Married-spouse-a...| 653.9832535885167|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
|            Divorced| 728.4148098131893|
|       Never-married|376.58831788823363|
|           Separated| 535.5687804878049|
+--------------------+------------------+



### Step 2: Data preprocessing

Data processing is a critical step in machine learning. After you remove garbage data, you get some important insights.

For instance, you know that age is not a linear function with the income. When people are young, their income is usually lower than mid-age. After retirement, a household uses their saving, meaning a decrease in income. To capture this pattern, you can add a square to the age feature.

#### Add age square

To add a new feature, you need to:
- Select the column
- Apply the transformation and add it to the DataFrame

In [30]:
from pyspark.sql.functions import *

In [31]:
# Select the column

age_square = df.select(col("age")**2)

In [32]:
# Apply the transformation and add it to the DataFrame

df = df.withColumn("age_square", col("age")**2)
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



You can see that agesquare has been successfully added to the data frame. You can change the order of the variables with select. Below, you bring agesquare right after age.

In [33]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']

In [34]:
df = df.select(COLUMNS)
df.first()

Row(age=39.0, age_square=1521.0, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_week=40.0, native_country=' United-States', label=' <=50K')

#### Exclude Holand-Netherlands

When a group within a feature has only one observation, it brings no information to the model. On the contrary, it can lead to an error during the cross-validation.

Let's check the origin of the household

In [35]:
df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|            Honduras|                   13|
|             Hungary|                   13|
| Outlying-US(Guam...|                   14|
|          Yugoslavia|                   16|
|                Laos|                   18|
|            Thailand|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|         

The feature native_country has only one household coming from Netherland. We can exclude it.

In [36]:
df_remove = df.filter(df.native_country != 'Holand-Netherlands')

### Step 3: Build a data processing pipeline

Similar to scikit-learn, Pyspark has a pipeline API.

A pipeline is very convenient to maintain the structure of the data. You push the data into the pipeline. Inside the pipeline, various operations are done, the output is used to feed the algorithm.

For instance, one universal transformation in machine learning consists of converting a string to one hot encoder, i.e., one column by a group. One hot encoder is usually a matrix full of zeroes.

The steps to transform the data are very similar to scikit-learn. You need to:

- Index the string to numeric
- Create the one hot encoder
- Transform the data
- Two APIs do the job:
    - StringIndexer
    - OneHotEncoder

First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column.

#### Fit the data and transform it

Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.

In [37]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [43]:
stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec").fit(indexed)
encoded = encoder.transform(indexed)
encoded.select('workclass','workclass_encoded').show(20)

+-----------------+-----------------+
|        workclass|workclass_encoded|
+-----------------+-----------------+
|        State-gov|              4.0|
| Self-emp-not-inc|              1.0|
|          Private|              0.0|
|          Private|              0.0|
|          Private|              0.0|
|          Private|              0.0|
|          Private|              0.0|
| Self-emp-not-inc|              1.0|
|          Private|              0.0|
|          Private|              0.0|
|          Private|              0.0|
|        State-gov|              4.0|
|          Private|              0.0|
|          Private|              0.0|
|          Private|              0.0|
|          Private|              0.0|
| Self-emp-not-inc|              1.0|
|          Private|              0.0|
|          Private|              0.0|
| Self-emp-not-inc|              1.0|
+-----------------+-----------------+
only showing top 20 rows



#### Build the pipeline

You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.
- Encode the categorical data
- Index the label feature
- Add continuous variable
- Assemble the steps

Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.

#### Encode the categorical data

This step is very similar to the above example, except that you loop over all the categorical features.

In [44]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder

In [45]:
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline

for categoricalCol in CATE_FEATURES:

    stringIndexer = StringIndexer(inputCol=categoricalCol,
                                  outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                            outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [46]:
stages

[StringIndexer_d772cfb6f496,
 OneHotEncoder_fad166fc8259,
 StringIndexer_61677eb7b79a,
 OneHotEncoder_1d88f17fefe1,
 StringIndexer_9c935c36c284,
 OneHotEncoder_9d2569712155,
 StringIndexer_a0264a5bde10,
 OneHotEncoder_7f159f13efc9,
 StringIndexer_156fe1cb3fdc,
 OneHotEncoder_f039c67c0d6f,
 StringIndexer_5fc92b16d783,
 OneHotEncoder_0dba2b88f38d,
 StringIndexer_597ea0780d41,
 OneHotEncoder_7fcad3f6d53d,
 StringIndexer_129fb25ac9e5,
 OneHotEncoder_59d93a2b3b38]

#### Index the label feature

Spark, like many other libraries, does not accept string values for the label. You convert the label feature with StringIndexer and add it to the list stages

In [60]:
# Convert label into label indices using the StringIndexer

label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. The code below populate the list with encoded categorical features and continuous features.

In [61]:
# Add continuous variable

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

Finally, you pass all the steps in the VectorAssembler

In [62]:
# Assemble the steps

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Now that all the steps are ready, you push the data to the pipeline.

In [63]:
# Create a Pipeline

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

If you check the new dataset, you can see that it contains all the features, transformed and not transformed. You are only interested by the newlabel and features. The features includes all the transformed features and the continuous variables.

In [64]:
model.take(1)

[Row(age=39.0, age_square=1521.0, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_week=40.0, native_country=' United-States', label=' <=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(41, {0: 1.0}), newlabel=0.0, features=SparseVector(100, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 39.0, 95: 77516.0, 96: 2174.0, 9

### Step 4: Build the classifier: logistic

To make the computation faster, we convert features to DenseVector type.

In [65]:
from pyspark.ml.linalg import DenseVector

input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

You are ready to create the train data as a DataFrame. You use the SparkSession.

In [67]:
df_train = session.createDataFrame(input_data, ["label", "features"])
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



#### Create a train/test set

You split the dataset 80/20 with randomSplit.

In [69]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Let's count how many people with income below/above 50k in both training and test set

In [71]:
train_data.groupby('label').agg({'label': 'count'}).show()
test_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19826|
|  1.0|        6302|
+-----+------------+

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4894|
|  1.0|        1539|
+-----+------------+



#### Build the logistic regressor

Last but not least, you can build the classifier. Pyspark has an API called LogisticRegression to perform logistic regression.

You initialize lr by indicating the label column and feature columns. You set a maximum of 10 iterations and add a regularization parameter with a value of 0.3. Note that in the next section, you will use cross-validation with a parameter grid to tune the model

In [72]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

You can see the coefficients from the regression

In [73]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.018863050234839432,-0.06422116579115773,0.022255729512822713,-0.14844822637836383,-0.03113278018629248,0.2951262002293314,0.24334660524641202,-0.4599538178022337,-0.14156782855713737,-0.04299938177320906,0.23317778533012906,0.37975764312729604,0.023306794502272153,-0.2131681665958368,-0.01093471808834952,-0.23193326477915652,-0.2663170289412882,0.5342262585001979,-0.2627271374773837,-0.17073113887170185,0.5118739125588695,-0.21907760273917623,-0.19131191487682556,0.4050141915572931,-0.3156346514809409,-0.16712985646590578,-0.18086167130818823,-0.14194438495925646,-0.14682557774685617,0.23684679672709705,-0.02512641152660794,0.3356278935576671,-0.09670273711760363,0.06865310839483384,-0.2531861765712007,-0.15212658583842312,-0.14908668146560394,-0.09085827510816771,-0.23903459339481317,-0.27181869803813163,0.14068158811077774,0.1359615011281939,-0.258839312416689,0.3331109354127363,-0.18661962059819398,-0.28521194962220014,-0.2142812845954518,0.4865508741627895,0.08128

### Step 5: Train and evaluate the model

To generate predictions for your test set, you can use linearModel with transform() on test_data.

In [74]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

You can print the variables in predictions

In [75]:
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



We are interested in the label, prediction and the probability.

In [76]:
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.95376152667862...|
|  0.0|       0.0|[0.81717452074922...|
|  0.0|       0.0|[0.66425573247645...|
|  0.0|       0.0|[0.79263770206661...|
|  0.0|       0.0|[0.74674281823849...|
|  0.0|       0.0|[0.77215470057475...|
|  0.0|       0.0|[0.76971036054327...|
|  0.0|       0.0|[0.79607655032804...|
|  0.0|       0.0|[0.82630914804835...|
|  0.0|       0.0|[0.76338299457109...|
|  0.0|       0.0|[0.87760250880957...|
|  0.0|       0.0|[0.84173110299343...|
|  0.0|       0.0|[0.84125005216033...|
|  0.0|       0.0|[0.84641376482877...|
|  0.0|       0.0|[0.57004817920740...|
|  0.0|       0.0|[0.62797467523073...|
|  0.0|       0.0|[0.58986632717154...|
|  0.0|       0.0|[0.76688381469957...|
|  0.0|       0.0|[0.82555152178817...|
|  0.0|       0.0|[0.87708252522929...|
+-----+----------+--------------------+
only showing top 20 rows



#### Evaluate the model

You need to look at the accuracy metric to see how well (or bad) the model performs. Currently, there is no API to compute the accuracy measure in Spark. The default value is the ROC, receiver operating characteristic curve. It is a different metric that take into account the false positive rate.

Before you look at the ROC, let's construct the accuracy measure. You are more familiar with this metric. The accuracy measure is the sum of the correct prediction over the total number of observations.

You create a DataFrame with the label and the prediction.

In [77]:
cm = predictions.select("label", "prediction")

You can check the number of class in the label and the prediction

In [78]:
cm.groupby('label').agg({'label': 'count'}).show()
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4894|
|  1.0|        1539|
+-----+------------+

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5755|
|       1.0|              678|
+----------+-----------------+



For instance, in the test set, there is 1568 household with an income above 50k and 4934 below. The classifier, however, predicted 651 households with income above 50k.

> Note: Your numbers may be slightly different.

You can compute the accuracy by computing the count when the label is correctly classified over the total number of rows.

In [79]:
cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8272967511270014

You can wrap everything together and write a function to compute the accuracy.

In [80]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 

In [81]:
accuracy_m(model = linearModel)

Model accuracy: 82.730%


#### ROC metrics

The module `BinaryClassificationEvaluator` includes the ROC measures. The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 - specificity.

In [82]:
# Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8907092213270923
areaUnderROC


### Step 6 (Stretch): Tune the hyperparameter

Last but not least, you can tune the hyperparameters. Similar to scikit-learn you create a parameter grid, and you add the parameters you want to tune.

To reduce the time of the computation, you only tune the regularization parameter with only two values.

In [83]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Finally, you evaluate the model with using the cross-validation method with 5 folds. It takes some time to train.

In [84]:
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)

# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 75.653 seconds


The best regularization hyperparameter is 0.01, with an accuracy of 85.316 percent.

In [85]:
accuracy_m(model = cvModel)

Model accuracy: 85.326%


You can extract the recommended parameter by chaining `cvModel.bestModel` with `extractParamMap()`.

In [86]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_3705c5ce9a17', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_3705c5ce9a17', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_3705c5ce9a17', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_3705c5ce9a17', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_3705c5ce9a17', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_3705c5ce9a17', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_3705c5ce9a17', name='maxBlockSizeInMB', doc='maximum memory in MB for s