# PySpark Cookbook

### Tomasz Drabas, Denny Lee
#### Version: 0.1
#### Date: 2/28/2018

# Loading the data

In [1]:
import pyspark.sql.functions as func
census_path = '../data/census_income.csv'

census = spark.read.csv(
    census_path
    , header=True
    , inferSchema=True
)

for col, typ in census.dtypes:
    if typ == 'string':
        census = census.withColumn(
            col
            , func.ltrim(func.rtrim(census[col]))
        )
census.count()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,,pyspark,idle,,,✔


SparkSession available as 'spark'.
32561

In [2]:
census.show()

+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+-----+
|age|       workclass|fnlwgt|   education|education-num|      marital-status|       occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|label|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+-----+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States|<=50K|
| 50|Self-emp-not-inc| 83311|   Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|           0|           0|            13| United-States|<=50K|
| 38|     

In [3]:
census.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- label: string (nullable = true)

# Exploring data

## Data prep

List of columns to keep

In [4]:
cols_to_keep = census.dtypes

cols_to_keep = (
    ['label','age'
     ,'capital-gain'
     ,'capital-loss'
     ,'hours-per-week'
    ] + [
        e[0] for e in cols_to_keep[:-1] 
        if e[1] == 'string'
    ]
)

cols_to_keep

['label', 'age', 'capital-gain', 'capital-loss', 'hours-per-week', 'workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']

Get numeric and categorical columns

In [5]:
import pyspark.mllib.stat as st
import numpy as np

census_subset = census.select(cols_to_keep)

cols_num = [
    e[0] for e in census_subset.dtypes 
    if e[1] == 'int'
]
cols_cat = [
    e[0] for e in census_subset.dtypes[1:] 
    if e[1] == 'string'
]
cols_num, cols_cat

(['age', 'capital-gain', 'capital-loss', 'hours-per-week'], ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country'])

## Numerical data

In [6]:
rdd_num = (
    census_subset
    .select(cols_num)
    .rdd
    .map(lambda row: [e for e in row])
)

stats_num = st.Statistics.colStats(rdd_num)

for col, min_, mean_, max_, var_ in zip(
      cols_num
    , stats_num.min()
    , stats_num.mean()
    , stats_num.max()
    , stats_num.variance()
):
    print('{0}: min->{1:.1f}, mean->{2:.1f}, max->{3:.1f}, stdev->{4:.1f}'
          .format(col, min_, mean_, max_, np.sqrt(var_)))

age: min->17.0, mean->38.6, max->90.0, stdev->13.6
capital-gain: min->0.0, mean->1077.6, max->99999.0, stdev->7385.3
capital-loss: min->0.0, mean->87.3, max->4356.0, stdev->403.0
hours-per-week: min->1.0, mean->40.4, max->99.0, stdev->12.3

## Categorical data

In [7]:
rdd_cat = (
    census_subset
    .select(cols_cat + ['label'])
    .rdd
    .map(lambda row: [e for e in row])
)

results_cat = {}

for i, col in enumerate(cols_cat + ['label']):
    results_cat[col] = (
        rdd_cat
        .groupBy(lambda row: row[i])
        .map(lambda el: (el[0], len(el[1])))
        .collect()
    )

for k in results_cat:
    print(
        k
        , sorted(
            results_cat[k]
            , key=lambda el: el[1]
            , reverse=True)
        , '\n')

sex [('Male', 21790), ('Female', 10771)] 

race [('White', 27816), ('Black', 3124), ('Asian-Pac-Islander', 1039), ('Amer-Indian-Eskimo', 311), ('Other', 271)] 

label [('<=50K', 24720), ('>50K', 7841)] 

native-country [('United-States', 29170), ('Mexico', 643), ('?', 583), ('Philippines', 198), ('Germany', 137), ('Canada', 121), ('Puerto-Rico', 114), ('El-Salvador', 106), ('India', 100), ('Cuba', 95), ('England', 90), ('Jamaica', 81), ('South', 80), ('China', 75), ('Italy', 73), ('Dominican-Republic', 70), ('Vietnam', 67), ('Guatemala', 64), ('Japan', 62), ('Poland', 60), ('Columbia', 59), ('Taiwan', 51), ('Haiti', 44), ('Iran', 43), ('Portugal', 37), ('Nicaragua', 34), ('Peru', 31), ('France', 29), ('Greece', 29), ('Ecuador', 28), ('Ireland', 24), ('Hong', 20), ('Trinadad&Tobago', 19), ('Cambodia', 19), ('Laos', 18), ('Thailand', 18), ('Yugoslavia', 16), ('Outlying-US(Guam-USVI-etc)', 14), ('Hungary', 13), ('Honduras', 13), ('Scotland', 12), ('Holand-Netherlands', 1)] 

marital-statu

## Correlations

In [8]:
correlations = st.Statistics.corr(rdd_num)
correlations

array([[ 1.        ,  0.0776745 ,  0.05777454,  0.06875571],
       [ 0.0776745 ,  1.        , -0.03161506,  0.07840862],
       [ 0.05777454, -0.03161506,  1.        ,  0.05425636],
       [ 0.06875571,  0.07840862,  0.05425636,  1.        ]])

In [9]:
for i, el_i in enumerate(abs(correlations) > 0.05):
    print(cols_num[i])
    
    for j, el_j in enumerate(el_i):
        if el_j and j != i:
            print(
                '    '
                , cols_num[j]
                , correlations[i][j]
            )
            
    print()

age
     capital-gain 0.077674498166
     capital-loss 0.057774539479
     hours-per-week 0.0687557075095

capital-gain
     age 0.077674498166
     hours-per-week 0.0784086153901

capital-loss
     age 0.057774539479
     hours-per-week 0.0542563622727

hours-per-week
     age 0.0687557075095
     capital-gain 0.0784086153901
     capital-loss 0.0542563622727

# Statistical testing

In [84]:
import pyspark.mllib.linalg as ln

census_occupation = (
    census
    .groupby('occupation')
    .pivot('label')
    .count()
)

census_occupation_coll = (
    census_occupation
    .rdd
    .map(lambda row: (row[1:]))
    .flatMap(lambda row: row)
    .collect()
)

len_row = len(census_occupation.collect())
dense_mat = ln.DenseMatrix(
    len_row
    , 2
    , census_occupation_coll
    , True
)

chi_sq = st.Statistics.chiSqTest(dense_mat)

print(chi_sq.pValue)
print(chi_sq.nullHypothesis)

0.0
the occurrence of the outcomes is statistically independent.

In [87]:
dense_mat.toArray()

array([[  2.66700000e+03,   9.83000000e+02],
       [  2.09800000e+03,   1.96800000e+03],
       [  2.28100000e+03,   1.85900000e+03],
       [  1.28400000e+03,   8.60000000e+01],
       [  8.79000000e+02,   1.15000000e+02],
       [  3.17000000e+03,   9.29000000e+02],
       [  1.27700000e+03,   3.20000000e+02],
       [  1.48000000e+02,   1.00000000e+00],
       [  4.38000000e+02,   2.11000000e+02],
       [  3.15800000e+03,   1.37000000e+02],
       [  6.45000000e+02,   2.83000000e+02],
       [  1.75200000e+03,   2.50000000e+02],
       [  8.00000000e+00,   1.00000000e+00],
       [  1.65200000e+03,   1.91000000e+02],
       [  3.26300000e+03,   5.07000000e+02]])

# Transforming the data

Number of distinct values

In [None]:
len_ftrs = []

for col in cols_to_keep[5:]:
    (
        len_ftrs
        .append(
            (col
             , census
                 .select(col)
                 .distinct()
                 .count()
            )
        )
    )
    
len_ftrs = dict(len_ftrs)

Using hashing trick

In [None]:
import pyspark.mllib.feature as feat

final_data = (
    census
    .select(cols_to_keep)
    .rdd
    .map(lambda row: [
        list(
            feat.HashingTF(int(len_ftrs[col] / 2.0))
            .transform(row[i])
            .toArray()
        ) if i > 4
        else [row[i]] 
        for i, col in enumerate(cols_to_keep)]
    )
)

final_data.take(3)

Encode label

In [None]:
def labelEncode(label):
    return [int(label[0] == '>50K')]

final_data = (
    final_data
    .map(lambda row: labelEncode(row[0]) 
         + [item 
            for sublist in row[1:] 
            for item in sublist]
        )
)

# Creating an RDD for training

In [None]:
import pyspark.mllib.feature as ft
import pyspark.mllib.linalg as ln
import pyspark.mllib.regression as reg

final_data_income = (
    final_data
    .map(lambda row: reg.LabeledPoint(
        row[0]
        , ln.Vectors.dense(row[1:]))
        )
)
final_data_income.take(2)

In [None]:
final_data_hours = (
    final_data
    .map(lambda row: reg.LabeledPoint(
        row[4]
        , ln.Vectors.dense(row[0:4] + row[5:]))
        )
)
final_data_hours.take(2)

# Predicting hours of work for census respondents

Get some testing data

In [None]:
test_data_reg=sc.parallelize(final_data_hours.take(10))

Linear regression (benchmark)

In [None]:
workhours_model_lm = reg.LinearRegressionWithSGD.train(
    final_data_hours
    , iterations = 10
)

In [None]:
for t,p in zip(
    test_data_reg
        .map(lambda row: row.label)
        .collect()
    , workhours_model_lm.predict(
        test_data_reg
            .map(lambda row: row.features)
    ).collect()):
    print(t,p)

# Forecasting income levels of census respondents

Get some testing data

In [None]:
test_data_class=sc.parallelize(final_data_income.take(10))

Logistic regression

In [None]:
import pyspark.mllib.classification as cl

income_model_lr = cl.LogisticRegressionWithSGD.train(
    final_data_income
    , iterations=10
)

In [None]:
for t,p in zip(
    test_data_class
        .map(lambda row: row.label)
        .collect()
    , income_model_lr.predict(
        test_data_class
            .map(lambda row: row.features)
    ).collect()):
    print(t,p)

In [None]:
income_model_lr.threshold

In [None]:
income_model_lr.weights

Support Vector Machines

In [None]:
income_model_svm = cl.SVMWithSGD.train(
    final_data_income
    , iterations=100
    , step=0.98
    , miniBatchFraction=1/3.0
)

In [None]:
for t,p in zip(
    test_data_class
        .map(lambda row: row.label)
        .collect()
    , income_model_svm.predict(
        test_data_class
            .map(lambda row: row.features)
    ).collect()):
    print(t,p)

In [None]:
income_model_svm.weights

# Building clustering models

In [None]:
import pyspark.mllib.clustering as clu

model = clu.KMeans.train(
    final_data.map(lambda row: row[1:])
    , 2
    , maxIterations=10
    , initializationMode='random'
    , seed=666
    , initializationSteps=5
    , epsilon=1e-4
)

In [None]:
import sklearn.metrics as m

predicted = (
    model
        .predict(
            final_data.map(lambda row: row[1:])
        )
)
predicted = predicted.collect()

true = final_data.map(lambda row: row[0]).collect()

print(m.homogeneity_score(true, predicted))
print(m.completeness_score(true, predicted))

# Computing performance statistics

In [None]:
import pyspark.mllib.evaluation as ev

Regression metrics

In [None]:
true_pred_reg = (
    final_data_hours
    .map(lambda row: (
         float(workhours_model_lm.predict(row.features))
         , row.label))
)

metrics_lm = ev.RegressionMetrics(true_pred_reg)

In [None]:
print('R^2: ', metrics_lm.r2)
print('Explained Variance: ', metrics_lm.explainedVariance)
print('meanAbsoluteError: ', metrics_lm.meanAbsoluteError)

Classification metrics

In [None]:
true_pred_class_lr = (
    final_data_income
    .map(lambda row: (
        float(income_model_lr.predict(row.features))
        , row.label))
)

metrics_lr = ev.BinaryClassificationMetrics(true_pred_class_lr)

print('areaUnderPR: ', metrics_lr.areaUnderPR)
print('areaUnderROC: ', metrics_lr.areaUnderPR)

In [None]:
trainErr = (
    true_pred_class_lr
    .filter(lambda lp: lp[0] != lp[1]).count() 
    / float(true_pred_class_lr.count())
)
print("Training Error = " + str(trainErr))

In [None]:
true_pred_class_svm = (
    final_data_income
    .map(lambda row: (
        float(income_model_svm.predict(row.features))
        , row.label))
)

metrics_svm = ev.BinaryClassificationMetrics(true_pred_class_svm)

print('areaUnderPR: ', metrics_svm.areaUnderPR)
print('areaUnderROC: ', metrics_svm.areaUnderPR)

In [None]:
trainErr = (
    true_pred_class_svm
    .filter(lambda lp: lp[0] != lp[1]).count() 
    / float(true_pred_class_svm.count())
)

print("Training Error = " + str(trainErr))

Confusion matrix

In [None]:
(
    true_pred_class_svm
    .map(lambda el: ((el), 1))
    .reduceByKey(lambda x,y: x+y)
    .take(4)
)

In [None]:
(
    true_pred_class_lr
    .map(lambda el: ((el), 1))
    .reduceByKey(lambda x,y: x+y)
    .take(4)
)