# PySpark with HDFS
Import libraries

In [1]:
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import lit, udf
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier as RF, LogisticRegression as LR
import warnings
warnings.filterwarnings('ignore')

Waiting for a Spark session to start...

Waiting for a Spark session to start...

In [2]:
def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

## Load data from HDFS
Load data and cast strings to integers<br>
Used ../../generate_data.py mydata 1000000 and hdfs dfs -put mydata.csv /

In [3]:
df_load = spark.read.csv('hdfs:///mydata.csv', header='true')
df_load = df_load.withColumn('x1', df_load['x1'].cast(DoubleType()))
df_load = df_load.withColumn('x2', df_load['x2'].cast(DoubleType()))
df_load = df_load.withColumn('x3', df_load['x3'].cast(DoubleType()))
df_load = df_load.withColumn('x4', df_load['x4'].cast(DoubleType()))
df_load = df_load.withColumn('x5', df_load['x5'].cast(DoubleType()))
df_load = df_load.withColumn('y', df_load['y'].cast(IntegerType()))

print(df_load.count())
df_load.head(10)

1000000


[Row(x1=0.08233868872869078, x2=-1.5688842960628786, x3=0.24286278226347638, x4=1.6632669176333197, x5=-0.638417836433558, y=1), Row(x1=-0.0688196903983117, x2=0.23467582062946052, x3=0.9681355622321666, x4=-0.6297384641938888, x5=0.8729121946592299, y=0), Row(x1=-0.2671644068093285, x2=0.6601887170491937, x3=-0.033269126866050346, x4=-0.417056393443199, x5=-0.9561963008953746, y=0), Row(x1=1.8705222962022887, x2=-0.02353846676989591, x3=0.8665311665699504, x4=-2.1866373785552993, x5=0.6339444908592159, y=1), Row(x1=-0.5566611969808073, x2=0.4716568808612375, x3=-1.2678458345274355, x4=1.3686253831257036, x5=0.8072574299126498, y=1), Row(x1=-0.47545895695246654, x2=-1.6658449595941496, x3=1.447444059706974, x4=0.4644738374060886, x5=1.0652560426581885, y=0), Row(x1=1.399884785512308, x2=1.5972976628674194, x3=-0.5567132761483432, x4=-2.1678218520132, x5=0.9733066975630595, y=0), Row(x1=1.198401852092572, x2=0.5813847750589491, x3=0.4990789029506081, x4=-1.5154440818994184, x5=-0.159702

## Create pipeline
Assign features and dependent variable.<br>
Build random forest model

In [4]:
featureCols = ['x1', 'x2', 'x3', 'x4', 'x5']
assembler_features = VectorAssembler(inputCols=featureCols, outputCol='features')
labelIndexer = StringIndexer(inputCol='y', outputCol='label')

dfX = [assembler_features, labelIndexer]
pipeline = Pipeline(stages=dfX)

allData = pipeline.fit(df_load).transform(df_load)

trainingData, testData = allData.randomSplit([0.8, 0.2], seed=0)
rf = RF(labelCol='label', featuresCol='features', numTrees=20, featureSubsetStrategy='all')
fit = rf.fit(trainingData)
transformed = fit.transform(testData)
results = transformed.select(['probability', 'label'])

print(results.count())
results.head(10)

199995


[Row(probability=DenseVector([1.0, 0.0]), label=0.0), Row(probability=DenseVector([0.9961, 0.0039]), label=0.0), Row(probability=DenseVector([0.3586, 0.6414]), label=1.0), Row(probability=DenseVector([0.4463, 0.5537]), label=0.0), Row(probability=DenseVector([0.4261, 0.5739]), label=1.0), Row(probability=DenseVector([0.3126, 0.6874]), label=1.0), Row(probability=DenseVector([0.3285, 0.6715]), label=1.0), Row(probability=DenseVector([0.3285, 0.6715]), label=1.0), Row(probability=DenseVector([0.3285, 0.6715]), label=1.0), Row(probability=DenseVector([0.4463, 0.5537]), label=1.0)]

## Create truth table
Extract probability from Dense Vector<br>
Create group by using Spark. This scales, but may not with python dataframes.

In [5]:
validation = results.select(['label', (ith("probability", lit(1)) > 0.5).cast('integer').alias('prediction')])
truth_table = validation.groupBy(['label', 'prediction']).count().orderBy(['label', 'prediction'])

tt = truth_table.toPandas()
tp = tt[((tt.label == 1) & (tt.prediction == 1))]['count'].values[0]
fp = tt[((tt.label == 0) & (tt.prediction == 1))]['count'].values[0]
fn = tt[((tt.label == 1) & (tt.prediction == 0))]['count'].values[0]
tn = tt[((tt.label == 0) & (tt.prediction == 0))]['count'].values[0]

accuracy = (tp + tn)/(tp + tn + fp + fn)
precision = tp/(tp + fp)
recall = tp/(tp + fn)
f1 = (2.0 * precision*recall)/(precision+recall)

print("Out of sample accuracy =", accuracy)
print("Out of sample precision =", precision)
print("Out of sample recall =", recall)
print("Out of sample F1 =", f1)
tt

Out of sample accuracy = 0.6623815595389885
Out of sample precision = 0.6008046104673405
Out of sample recall = 0.9717952812487525
Out of sample F1 = 0.7425399028452464


   label  prediction  count
0    0.0           0  35103
1    0.0           1  64696
2    1.0           0   2826
3    1.0           1  97370

## Aside
To make the model a bit of a challenege the dependent variable was a circle inside a circle.

![Sample](data_sample.png)

Nothing to do with Spark or HDFS, but here is a fun way to demonstrate how domain knowledge and improved data/transformation can be more valuable than model parameters.

In [6]:
df_load = df_load.withColumn("x6", ((df_load["x1"] + df_load["x2"] + df_load["x3"] + df_load["x4"])**2 + df_load["x5"]**2).cast(DoubleType()))

featureCols = ['x1', 'x2', 'x3', 'x4', 'x5', 'x6']
assembler_features = VectorAssembler(inputCols=featureCols, outputCol='features')
labelIndexer = StringIndexer(inputCol='y', outputCol="label")

dfX = [assembler_features, labelIndexer]
pipeline = Pipeline(stages=dfX)

allData = pipeline.fit(df_load).transform(df_load)

trainingData, testData = allData.randomSplit([0.8, 0.2], seed=0)
lr = LR(labelCol='label', featuresCol='features')
fit = lr.fit(trainingData)
transformed = fit.transform(testData)
results = transformed.select(['probability', 'label'])

validation = results.select(['label', (ith("probability", lit(1)) > 0.5).cast('integer').alias('prediction')])
truth_table = validation.groupBy(['label', 'prediction']).count().orderBy(['label', 'prediction'])

tt = truth_table.toPandas()
tp = tt[((tt.label == 1) & (tt.prediction == 1))]['count'].values[0]
fp = tt[((tt.label == 0) & (tt.prediction == 1))]['count'].values[0]
fn = tt[((tt.label == 1) & (tt.prediction == 0))]['count'].values[0]
tn = tt[((tt.label == 0) & (tt.prediction == 0))]['count'].values[0]

accuracy = (tp + tn)/(tp + tn + fp + fn)
precision = tp/(tp + fp)
recall = tp/(tp + fn)
f1 = (2.0 * precision*recall)/(precision+recall)

print("Out of sample accuracy =", accuracy)
print("Out of sample precision =", precision)
print("Out of sample recall =", recall)
print("Out of sample F1 =", f1)
tt

Out of sample accuracy = 0.9770594264856621
Out of sample precision = 0.9766101694915255
Out of sample recall = 0.9776238572398099
Out of sample F1 = 0.977116750458862


   label  prediction  count
0    0.0           0  97453
1    0.0           1   2346
2    1.0           0   2242
3    1.0           1  97954

Granted this data is clearly rigged and the knowledge of how it's rigged is exploited for the transform. <br>
It's worth noting, however, because many real business problems can also be solved with greater intuition more than more data or model tuning.