#### Experiments using simulated dataset 
#### Spark UDFS

In [1]:
# Initiate spark
import findspark
findspark.init()
import pyspark # This needs to have been installed first
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark import SparkContext, SparkConf
from pyspark.sql import *

In [2]:
# Initial set up

# check version
import sys
print(sys.version)

# Show all output from cells
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Show graphics within Jupyter
%matplotlib inline

import matplotlib as plt

# Show edges of the histograms
plt.rcParams["patch.force_edgecolor"] = True

# Timing code
import time

# Set seaborn themes
import seaborn as sns
sns.set()
import os
os.getcwd()

3.6.2 |Anaconda custom (64-bit)| (default, Sep 19 2017, 08:03:39) [MSC v.1900 64 bit (AMD64)]


'C:\\Users\\areda\\Documents'

In [3]:
import pandas as pd
import numpy as np

In [4]:
# Create context
spark = SparkSession.builder.master("local").appName("AlexReda").config("spark.some.config.option", "some-value").getOrCreate()

In [5]:
spark.version

'2.3.0'

In [6]:
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import pandas as pd
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, StringType, ArrayType

#### First investigation

In [7]:
# Make fake data and train a model
n_samples_test = 10000
n_samples_train = 1000
n_samples_all = n_samples_train + n_samples_test
n_features = 50

In [8]:
X, y = make_classification(n_samples = n_samples_all, n_features = n_features, random_state = 123)

In [9]:
X_train, X_test, y_train, y_test = \
train_test_split(X, y, test_size = n_samples_test, random_state = 45)

In [10]:
column_names = [f'feature{i}' for i in range(n_features)]

In [11]:
X_test2 = pd.DataFrame(X_test, columns = column_names).reset_index().rename(columns = {'index': 'id'})

In [12]:
X_test2.head()

Unnamed: 0,id,feature0,feature1,feature2,feature3,feature4,feature5,feature6,feature7,feature8,...,feature40,feature41,feature42,feature43,feature44,feature45,feature46,feature47,feature48,feature49
0,0,-0.410779,1.408338,-0.069688,0.040203,-0.307234,-0.844737,-0.138596,1.549422,-1.880608,...,-0.693239,0.130851,-1.39952,0.488927,-0.513195,-1.18052,-0.779563,1.022411,0.865472,0.965532
1,1,-1.432422,2.648975,0.664868,-0.948636,-1.2031,-3.198291,-0.537351,-0.315041,1.633572,...,-1.377846,0.232574,0.106566,-0.049747,0.440557,1.644224,-1.688193,0.113779,1.438163,1.676255
2,2,0.308283,-0.777478,0.57228,-0.494407,0.234693,0.664338,-0.198858,1.007483,1.761043,...,0.171687,1.292698,-0.264595,0.313309,0.461846,-0.793724,2.184697,0.136228,-1.30659,-0.718391
3,3,-1.135503,0.636659,1.083565,-1.079299,0.749069,1.992796,-0.829618,0.052515,-1.170957,...,0.603715,0.954866,0.235125,2.690548,-1.218967,0.075196,0.129194,-0.340945,1.116894,0.90413
4,4,-0.138736,-1.195967,-0.924284,-0.092341,-1.439911,-0.227875,-0.753525,0.067751,-0.567689,...,-0.926318,0.822914,-0.828078,-0.368547,1.343977,0.380322,-0.065282,-0.293283,-1.852242,-0.522319


#### Train model

In [13]:
param_grid = {'n_estimators': [100], 'max_depth': [2, 4, None]}
gs_rf = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid=param_grid,
    scoring='roc_auc'
).fit(X_train, y_train)
print('ROC AUC: %.3f' % gs_rf.best_score_)

ROC AUC: 0.988


In [14]:
X_test3 = spark.createDataFrame(X_test2)

#### Option 1: Using simple python function

In [15]:
@udf(returnType=DoubleType())
def predict_udf(*cols):
    # cols will be a tuple of floats here.
    return float(gs_rf.predict_proba((cols,))[0, 1])

In [16]:
t0 = time.time()
df_pred_a = X_test3.select(
    col('id'),
    predict_udf(*column_names).alias('prediction')
)
t1 = time.time()
t1-t0

1.6713435649871826

In [17]:
t0 = time.time()
df_pred_a.show(5)
t1 = time.time()
t1-t0

+---+----------+
| id|prediction|
+---+----------+
|  0|      0.95|
|  1|       0.1|
|  2|      0.91|
|  3|       0.1|
|  4|      0.08|
+---+----------+
only showing top 5 rows



154.20745062828064

#### Option 2 Using pandas udf

In [19]:
@pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*cols):
    # cols will be a tuple of pandas.Series here.
    X = pd.concat(cols, axis=1)
    return pd.Series(gs_rf.predict_proba(X)[:, 1])


In [20]:
t0 = time.time()
df_pred_b = X_test3.select(
    col('id'),
    predict_pandas_udf(*column_names).alias('prediction')
)
t1 = time.time()
t1-t0

1.1119499206542969

In [21]:
t0 = time.time()
df_pred_b.show(5)
t1 = time.time()
t1-t0

+---+----------+
| id|prediction|
+---+----------+
|  0|      0.95|
|  1|       0.1|
|  2|      0.91|
|  3|       0.1|
|  4|      0.08|
+---+----------+
only showing top 5 rows



17.683598041534424

#### Option 3

Multiclass prediction in Pandas_UDF

In [47]:
@pandas_udf(returnType=ArrayType(DoubleType()))
def predict_pandas_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(row.tolist() for row in gs_rf.predict_proba(X))


In [54]:
t0 = time.time()
df_pred_multi = (
    X_test3.select(
        col('id'),
        predict_pandas_udf(*column_names).alias('predictions')
    )
    # Select each item of the prediction array into its own column.
    .select(
        col('id'),
        *[col('predictions')[i].alias(f'prediction_{c}')
          for i, c in enumerate(gs_rf.classes_)]
    )
)
t1 = time.time()
t1 - t0

In [55]:
t0 = time.time()
df_pred_multi.show(5)
t1 = time.time()
t1 - t0

+---+------------+------------+
| id|prediction_0|prediction_1|
+---+------------+------------+
|  0|        0.05|        0.95|
|  1|         0.9|         0.1|
|  2|        0.09|        0.91|
|  3|         0.9|         0.1|
|  4|        0.92|        0.08|
+---+------------+------------+
only showing top 5 rows



16.64201521873474

#### Experimenting with K-Nearest Neighbor model using non-simulated dataset

In [40]:

from pandas import read_csv
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.pipeline import FeatureUnion
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
# load data
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"
names = ['preg', 'plas', 'pres', 'skin', 'test', 'mass', 'pedi', 'age', 'class']
dataframe = read_csv(url, names=names)
array = dataframe.values
X = array[:,0:8]
Y = array[:,8]
# create feature union
features = []
features.append(('pca', PCA(n_components=3)))
features.append(('select_best', SelectKBest(k=6)))
feature_union = FeatureUnion(features)
# create pipeline
estimators = []
estimators.append(('feature_union', feature_union))
estimators.append(('knn', KNeighborsClassifier(n_neighbors=3)))
model = Pipeline(estimators)

In [66]:
X2.head()

Unnamed: 0,0,1,2,3,4,5,6,7
0,6.0,148.0,72.0,35.0,0.0,33.6,0.627,50.0
1,1.0,85.0,66.0,29.0,0.0,26.6,0.351,31.0
2,8.0,183.0,64.0,0.0,0.0,23.3,0.672,32.0
3,1.0,89.0,66.0,23.0,94.0,28.1,0.167,21.0
4,0.0,137.0,40.0,35.0,168.0,43.1,2.288,33.0


In [82]:
column_names2 = [f'v{i}' for i in range(8)]

In [83]:
column_names2

['v0', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6', 'v7']

In [87]:
X2 = pd.DataFrame(X, columns = column_names2)

In [88]:
m = model.fit(X2, Y)

  


In [89]:
X3 = spark.createDataFrame(X2)

In [90]:
X3.show()

+----+-----+----+----+-----+----+-------------------+----+
|  v0|   v1|  v2|  v3|   v4|  v5|                 v6|  v7|
+----+-----+----+----+-----+----+-------------------+----+
| 6.0|148.0|72.0|35.0|  0.0|33.6|              0.627|50.0|
| 1.0| 85.0|66.0|29.0|  0.0|26.6|0.35100000000000003|31.0|
| 8.0|183.0|64.0| 0.0|  0.0|23.3|              0.672|32.0|
| 1.0| 89.0|66.0|23.0| 94.0|28.1|0.16699999999999998|21.0|
| 0.0|137.0|40.0|35.0|168.0|43.1| 2.2880000000000003|33.0|
| 5.0|116.0|74.0| 0.0|  0.0|25.6|              0.201|30.0|
| 3.0| 78.0|50.0|32.0| 88.0|31.0|              0.248|26.0|
|10.0|115.0| 0.0| 0.0|  0.0|35.3|              0.134|29.0|
| 2.0|197.0|70.0|45.0|543.0|30.5|              0.158|53.0|
| 8.0|125.0|96.0| 0.0|  0.0| 0.0|0.23199999999999998|54.0|
| 4.0|110.0|92.0| 0.0|  0.0|37.6|              0.191|30.0|
|10.0|168.0|74.0| 0.0|  0.0|38.0|              0.537|34.0|
|10.0|139.0|80.0| 0.0|  0.0|27.1| 1.4409999999999998|57.0|
| 1.0|189.0|60.0|23.0|846.0|30.1|0.39799999999999996|59.

In [57]:
@udf(returnType=DoubleType())
def predict_udf2(*cols):
    # cols will be a tuple of floats here.
    return float(m.predict_proba((cols,))[0, 1])

In [92]:
t0 = time.time()
df_pred_x = X3.select(
    predict_udf(*cols).alias('prediction')
)
t1 = time.time()
t1-t0

0.10574889183044434

In [75]:
list(range(8))

[0, 1, 2, 3, 4, 5, 6, 7]