# PySpark Cookbook

### Tomasz Drabas, Denny Lee
#### Version: 0.1
#### Date: 2/28/2018

# Loading the data

In [140]:
import pyspark.sql.functions as func
census_path = '../data/census_income.csv'

census = spark.read.csv(census_path, header=True, inferSchema=True)

for col, typ in census.dtypes:
    if typ == 'string':
        census = census.withColumn(col, func.ltrim(func.rtrim(census[col])))
census.count()

32561

In [142]:
census.show()

+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+-----+
|age|       workclass|fnlwgt|   education|education-num|      marital-status|       occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|label|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+-----+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States|<=50K|
| 50|Self-emp-not-inc| 83311|   Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|           0|           0|            13| United-States|<=50K|
| 38|     

In [143]:
census.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- label: string (nullable = true)

## Loading into RDD

In [127]:
census_rdd = sc.textFile(census_path)

header = census_rdd.first().split(',')
census_split = (
    census_rdd
    .map(lambda row: row.split(','))
    .map(lambda row: [e.strip() for e in row])
    .filter(lambda row: row != header) # remove header
)

census_split.take(1)

[['39', 'State-gov', '77516', 'Bachelors', '13', 'Never-married', 'Adm-clerical', 'Not-in-family', 'White', 'Male', '2174', '0', '40', 'United-States', '<=50K']]

# Exploring data

## Data prep

In [162]:
import pyspark.mllib.stat as st
import numpy as np

census_subset = census.select(cols_to_keep)

cols_num = [e[0] for e in census_subset.dtypes if e[1] == 'int']
cols_cat = [e[0] for e in census_subset.dtypes[1:] if e[1] == 'string']
cols_num, cols_cat

(['age', 'capital-gain', 'capital-loss', 'hours-per-week'], ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country'])

## Numerical data

In [163]:
rdd_num = census_subset.select(cols_num).rdd.map(lambda row: [e for e in row])
stats_num = st.Statistics.colStats(numeric_rdd)

for col, min_, mean_, max_, var_ in zip(
      cols_num
    , stats_num.min()
    , stats_num.mean()
    , stats_num.max()
    , stats_num.variance()
):
    print('{0}: min->{1:.1f}, mean->{2:.1f}, max->{3:.1f}, stdev->{4:.1f}'
          .format(col, min_, mean_, max_, np.sqrt(var_)))

age: min->17.0, mean->38.6, max->90.0, stdev->13.6
capital-gain: min->0.0, mean->1077.6, max->99999.0, stdev->7385.3
capital-loss: min->0.0, mean->87.3, max->4356.0, stdev->403.0
hours-per-week: min->1.0, mean->40.4, max->99.0, stdev->12.3

## Categorical data

In [164]:
rdd_cat = census_subset.select(cols_cat).rdd.map(lambda row: [e for e in row])

results_cat = {}

for i, col in enumerate(cols_cat):
    results_cat[col] = rdd_cat.groupBy(lambda row: row[i]).map(lambda el: (el[0], len(el[1]))).collect()

for k in results_cat:
    print(k, sorted(results_cat[k], key=lambda el: el[1], reverse=True), '\n')

sex [('Male', 21790), ('Female', 10771)] 

race [('White', 27816), ('Black', 3124), ('Asian-Pac-Islander', 1039), ('Amer-Indian-Eskimo', 311), ('Other', 271)] 

native-country [('United-States', 29170), ('Mexico', 643), ('?', 583), ('Philippines', 198), ('Germany', 137), ('Canada', 121), ('Puerto-Rico', 114), ('El-Salvador', 106), ('India', 100), ('Cuba', 95), ('England', 90), ('Jamaica', 81), ('South', 80), ('China', 75), ('Italy', 73), ('Dominican-Republic', 70), ('Vietnam', 67), ('Guatemala', 64), ('Japan', 62), ('Poland', 60), ('Columbia', 59), ('Taiwan', 51), ('Haiti', 44), ('Iran', 43), ('Portugal', 37), ('Nicaragua', 34), ('Peru', 31), ('France', 29), ('Greece', 29), ('Ecuador', 28), ('Ireland', 24), ('Hong', 20), ('Trinadad&Tobago', 19), ('Cambodia', 19), ('Laos', 18), ('Thailand', 18), ('Yugoslavia', 16), ('Outlying-US(Guam-USVI-etc)', 14), ('Hungary', 13), ('Honduras', 13), ('Scotland', 12), ('Holand-Netherlands', 1)] 

marital-status [('Married-civ-spouse', 14976), ('Never-m

## Correlations

In [165]:
correlations = st.Statistics.corr(rdd_num)

In [175]:
for i, el_i in enumerate(abs(correlations) > 0.05):
    print(cols_num[i])
    
    for j, el_j in enumerate(el_i):
        if el_j and j != i:
            print('    ', cols_num[j], correlations[i][j])
            
    print('\n')

age
     capital-gain 0.077674498166
     capital-loss 0.057774539479
     hours-per-week 0.0687557075095


capital-gain
     age 0.077674498166
     hours-per-week 0.0784086153901


capital-loss
     age 0.057774539479
     hours-per-week 0.0542563622727


hours-per-week
     age 0.0687557075095
     capital-gain 0.0784086153901
     capital-loss 0.0542563622727

# Statistical testing

In [197]:
import pyspark.mllib.linalg as ln

census_occupation = census.groupby('label').pivot('occupation').count()

census_occupation_coll = (
    census_occupation
    .rdd
    .map(lambda row: (row[1:]))
    .flatMap(lambda row: row)
    .collect()
)

census_occupation_coll

[1652, 3263, 8, 3170, 2098, 879, 1284, 1752, 3158, 148, 2281, 438, 2667, 645, 1277, 191, 507, 1, 929, 1968, 115, 86, 250, 137, 1, 1859, 211, 983, 283, 320]

In [198]:
len_row = len(census_occupation.collect()[0]) - 1
dense_mat = ln.Matrices.dense(len_row, 2, census_occupation_coll)

In [200]:
chi_sq = st.Statistics.chiSqTest(dense_mat)

In [201]:
print(chi_sq.pValue)

0.0

# Transforming the data

List of columns to keep

In [161]:
cols_to_keep = census.dtypes

cols_to_keep = (
    ['label','age', 'capital-gain', 'capital-loss','hours-per-week'] + 
    [e[0] for e in cols_to_keep[:-1] if e[1] == 'string']
)

cols_to_keep

['label', 'age', 'capital-gain', 'capital-loss', 'hours-per-week', 'workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']

Number of distinct values

In [56]:
len_ftrs = []

for col in cols_to_keep[5:]:
    len_ftrs.append((col, census.select(col).distinct().count()))
    
len_ftrs = dict(len_ftrs)

Using hashing trick

In [180]:
import pyspark.mllib.feature as feat

final_data = (
    census
    .select(cols_to_keep)
    .rdd
    .map(lambda row: [
        list(
            feat.HashingTF(len_ftrs[col])
            .transform(row[i])
            .toArray()
        ) if i > 4
        else [row[i]] 
        for i, col in enumerate(cols_to_keep)]
    )
)

final_data.take(3)

[[['<=50K'], [39], [2174], [0], [40], [0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 3.0, 3.0, 0.0], [1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], [1.0, 1.0, 2.0, 3.0, 5.0, 1.0, 0.0], [0.0, 2.0, 0.0, 1.0, 1.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 3.0, 0.0, 0.0, 2.0], [2.0, 2.0, 0.0, 3.0, 3.0, 3.0], [0.0, 0.0, 0.0, 2.0, 3.0], [1.0, 3.0], [1.0, 0.0, 0.0, 3.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0]], [['<=50K'], [50], [0], [0], [13], [1.0, 0.0, 2.0, 2.0, 1.0, 3.0, 2.0, 3.0, 2.0], [1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], [3.0, 1.0, 3.0, 3.0, 5.0, 3.0, 0.0], [1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 2.0, 0.0, 0.0, 2.0, 0.0, 1.0, 5.0], [2.0, 1.0, 0.0, 2.0, 1.0, 1.0], [0.0, 0.0, 0.0, 2.0, 3.0], [1.0, 3.0], [1.0, 0.0, 0.0, 3.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2

Encode label

In [78]:
def labelEncode(label):
    return [int(label[0] == '>50K')]

In [81]:
final_data.map(lambda row: labelEncode(row[0]) + [item for sublist in row[1:]
                  for item in sublist]).take(10)

[[0, 39, 2174, 0, 40, 0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 3.0, 3.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 2.0, 3.0, 5.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 1.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 3.0, 0.0, 0.0, 2.0, 2.0, 2.0, 0.0, 3.0, 3.0, 3.0, 0.0, 0.0, 0.0, 2.0, 3.0, 1.0, 3.0, 1.0, 0.0, 0.0, 3.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0], [0, 50, 0, 0, 13, 1.0, 0.0, 2.0, 2.0, 1.0, 3.0, 2.0, 3.0, 2.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 1.0, 3.0, 3.0, 5.0, 3.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 2.0, 0.0, 0.0, 2.0, 0.0, 1.0, 5.0, 2.0, 1.0, 0.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 2.0, 3.0, 1.0, 3.0, 1.0, 0.0, 0.0, 3.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,

In [None]:
encoding

# Creating an RDD for training

# Predicting a workclass of census respondents

# Forecasting income levels of census respondents

# Building clustering models

# Computing performance statistics