# Environment

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Connecting to security.0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Connecting to security.0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.39                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InReleas

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()                        # spark session creation.
spark

In [3]:
import numpy as np
import pandas as pd
import random

import matplotlib.pyplot as plt
import seaborn as sns
from patsy import dmatrices

import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor

from sklearn.metrics import mean_squared_error, mean_absolute_error,r2_score
from sklearn.feature_selection import RFE
from sklearn.linear_model import PoissonRegressor,LogisticRegression
from sklearn.model_selection import train_test_split

  import pandas.util.testing as tm


In [4]:
import warnings
warnings.filterwarnings('ignore')
pd.set_option("display.max_columns",None)
sns.set_theme()

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Data , Feature selection and Engineering

In [6]:
path = 'drive/My Drive/datasets'
df = pd.read_csv(path+'/CASpreprocessed.csv')              

In [7]:
df.drop('Unnamed: 0',axis =1, inplace=True)

In [8]:
df['sc'] = df.apply(
    lambda row: 5 if row['seriousInjuryCount'] >=5 else row['seriousInjuryCount'] ,
    axis=1
)

In [9]:
formula = "sc ~ roadCharacter+roadSurface+trafficSign+carStationWagon+cliffBank+debris+ditch+fence+guardRail+houseOrBuilding+motorcycle+NumberOfLanes+overBank+parkedVehicle+postOrPole+schoolBus+slipOrFlood+speedLimit+tree+urban+vanOrUtility+vehicle"

y, X = dmatrices(formula, df, return_type='dataframe')
X.drop("Intercept",axis=1,inplace=True)

In [10]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=42)

In [11]:
def Feature_imp(Model,n,X,y):
    model = Model
    # create the RFE model and select attributes
    rfe = RFE(model,n_features_to_select= n)              # Recursive feature elimination.
    rfe = rfe.fit(X, y)
    # summarize the selection of the attributes
    return pd.DataFrame({'Decision':rfe.support_,'Ranking':rfe.ranking_,'Feature':X.columns})  

Feature = Feature_imp(PoissonRegressor(),10,X_train,y_train)              # PoissonRegressor,  # DecisionTreeClassifier(), LogisticRegression()

In [12]:
lis = Feature[Feature.Decision==True]
X_train_poi = X[lis['Feature'].unique()]
y_train_poi = y

In [13]:
model_poi = sm.GLM(y_train_poi, X_train_poi,family=sm.families.Poisson()).fit()
X_train_poi['Count'] = y_train_poi
X_train_poi['Lambda'] = model_poi.mu

X_train_poi['AUX_OLS'] = X_train_poi.apply(lambda x: ((x['Count'] - x['Lambda'])**2 - x['Lambda']) / x['Lambda'], axis=1)

In [14]:
aux_olsr_results = sm.OLS(  X_train_poi['Lambda']  ,  X_train_poi['AUX_OLS']  ).fit() 
X_train_nb = X_train[lis['Feature'].unique()]
y_train_nb = y_train

X_test_nb = X_test[lis['Feature'].unique()]
y_test_nb = y_test

In [15]:
model_nb = sm.GLM(y_train_nb, X_train_nb,family=sm.families.NegativeBinomial(alpha= aux_olsr_results.params[0])).fit()

In [16]:
ypred_ntrain = model_nb.predict(X_train_nb)
ypred_ntest = model_nb.predict(X_test_nb)

In [17]:
round(ypred_ntest).value_counts()

0.0    153922
1.0       487
2.0        24
3.0         3
5.0         1
4.0         1
6.0         1
dtype: int64

In [18]:
y_test_nb.value_counts()

sc 
0.0    145218
1.0      8072
2.0       891
3.0       168
4.0        54
5.0        36
dtype: int64

In [19]:
print("Mean Absolute Error of the model: ",mean_absolute_error(y_test_nb,ypred_ntest))
print("mean square error of the model: ",mean_squared_error(y_test_nb,ypred_ntest))
print("Root mean square error of the model: ",np.sqrt(mean_squared_error(y_test_nb,ypred_ntest)))

Mean Absolute Error of the model:  0.13195964003353552
mean square error of the model:  0.09165715082261716
Root mean square error of the model:  0.30274932010265054


In [20]:
X_test_nb.to_csv('test.csv')

In [21]:
df = spark.read.option("header",True).csv("test.csv",inferSchema = True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- roadCharacter[T.Motorway ramp]: double (nullable = true)
 |-- carStationWagon: double (nullable = true)
 |-- fence: double (nullable = true)
 |-- guardRail: double (nullable = true)
 |-- motorcycle: double (nullable = true)
 |-- NumberOfLanes: double (nullable = true)
 |-- parkedVehicle: double (nullable = true)
 |-- postOrPole: double (nullable = true)
 |-- speedLimit: double (nullable = true)
 |-- tree: double (nullable = true)



In [22]:
tempList = [] #Edit01
for col in df.columns:
        new_name = col.strip()
        new_name = "".join(new_name.split())
        new_name = new_name.replace('.','') # EDIT
        tempList.append(new_name) #Edit02
        
print(tempList) #Just for the sake of it #Edit03

df = df.toDF(*tempList) #Edit04

['_c0', 'roadCharacter[TMotorwayramp]', 'carStationWagon', 'fence', 'guardRail', 'motorcycle', 'NumberOfLanes', 'parkedVehicle', 'postOrPole', 'speedLimit', 'tree']


In [23]:
len(df.columns)

11

In [24]:
df.describe()

DataFrame[summary: string, _c0: string, roadCharacter[TMotorwayramp]: string, carStationWagon: string, fence: string, guardRail: string, motorcycle: string, NumberOfLanes: string, parkedVehicle: string, postOrPole: string, speedLimit: string, tree: string]

# Timing difference in Large Scale Predictions

> **keeping number of partitions fixed = 10** , difference in size of the data : Fraction by 5 , 10 , 20 , 40 , 50 , 70 , 100 ,200


> **keeping the size of the data fixed = fraction(200.0)** , difference in size of Partition : No. of Partitions : 15 , 20 , 30

## different sizes of data

In [39]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 5.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 771896
number of columns : 13
+-------+--------------------+
|user_id|          prediction|
+-------+--------------------+
|     17|0.023248506343369554|
|     21| 0.08743805268583761|
|     35|0.025375380146349012|
|     48|0.024288674033339272|
|     54| 0.03850534187006575|
|     68|0.018816220068673842|
|     75|0.036856342321984065|
|     81| 0.08743805268583761|
|    120| 0.04757553225103889|
|    123|0.018816220068673842|
|    124| 0.04757553225103889|
|    139|  0.0973581371439446|
|    145| 0.07442053866862616|
|    152| 0.07442053866862616|
|    162|0.024288674033339272|
|    190| 0.19923280769879664|
|    204|0.025375380146349012|
|    211| 0.04757553225103889|
|    218| 0.39024801663230124|
|    238|0.007630547852825697|
+-------+--------------------+
only showing top 20 rows

time taken in seconds:  3


In [40]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 10.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 1542136
number of columns : 13
+-------+--------------------+
|user_id|          prediction|
+-------+--------------------+
|      7|  0.0973581371439446|
|1217711| 0.19923280769879664|
|      9|0.036366726412750856|
|1217745| 0.04757553225103889|
|     10|0.018816220068673842|
|1217761| 0.06014143773450335|
|     16| 0.04757553225103889|
|1217768|  0.0973581371439446|
|     17| 0.37353556137770405|
|1217784| 0.14577151196928223|
|     18|0.018816220068673842|
|1217801| 0.04757553225103889|
|     35|0.025375380146349012|
|1217851|  0.0973581371439446|
|     50| 0.07442053866862616|
|1217859|0.037993819923439955|
|     66| 0.04757553225103889|
|1217862|0.018816220068673842|
|     73| 0.01939693501551258|
|1217886| 0.04757553225103889|
+-------+--------------------+
only showing top 20 rows

time taken in seconds:  8


In [41]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 20.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 3088890
number of columns : 13
+-------+--------------------+
|user_id|          prediction|
+-------+--------------------+
|      1| 0.04757553225103889|
|2706007|0.012400064873366227|
|     19| 0.05281419685629926|
|2706009| 0.10376127111111834|
|     20|  0.0877256866193537|
|2706021| 0.03339877769934377|
|     26|  0.0973581371439446|
|2706028|0.025375380146349012|
|     56| 0.04757553225103889|
|2706030|  0.0973581371439446|
|     57|  0.0973581371439446|
|2706037| 0.08743805268583761|
|     58| 0.07879698191875664|
|2706049| 0.06962497000744693|
|     61|0.037993819923439955|
|2706056| 0.04757553225103889|
|     65| 0.04757553225103889|
|2706060| 0.07442053866862616|
|     70|0.012400064873366227|
|2706070| 0.06776462605653523|
+-------+--------------------+
only showing top 20 rows

time taken in seconds:  16


In [42]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 40.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 6181659
number of columns : 13
+-------+--------------------+
|user_id|          prediction|
+-------+--------------------+
|      9|0.003231947212957...|
|5242883| 0.01055398055029299|
|     11|0.024047872897950972|
|5242893| 0.04757553225103889|
|     23|0.030094358744664266|
|5242909|0.012400064873366227|
|     39|  0.0973581371439446|
|5242920| 0.04757553225103889|
|     51| 0.04049263025436044|
|5242932|0.012400064873366227|
|     82|0.018816220068673842|
|5242940| 0.07442053866862616|
|    105|  0.0973581371439446|
|5242963|  0.1907006198541338|
|    115| 0.06051055789988303|
|5242972| 0.08743805268583761|
|    116| 0.15229351448544487|
|5242983| 0.03850534187006575|
|    120| 0.07442053866862616|
|5242998| 0.07442053866862616|
+-------+--------------------+
only showing top 20 rows

time taken in seconds:  29


In [43]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 50.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 7725511
number of columns : 13
+-------+--------------------+
|user_id|          prediction|
+-------+--------------------+
|     13| 0.04757553225103889|
|7340034|0.036366726412750856|
|     24| 0.04757553225103889|
|7340051|  0.0427279323285803|
|     70| 0.04757553225103889|
|7340052| 0.01695457486686251|
|     71| 0.07442053866862616|
|7340056|  0.1789324369524283|
|     81| 0.07442053866862616|
|7340065|0.012400064873366227|
|     84| 0.09108449763862994|
|7340075| 0.08657118023598755|
|     97| 0.03850534187006575|
|7340080|  0.0973581371439446|
|     98| 0.04757553225103889|
|7340089| 0.03850534187006575|
|     99| 0.13867291536136658|
|7340090|  0.0973581371439446|
|    118| 0.06273639244683395|
|7340100| 0.05281419685629926|
+-------+--------------------+
only showing top 20 rows

time taken in seconds:  50


In [44]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 70.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 10812470
number of columns : 13
+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|       1| 0.07879698191875664|
|10147505|0.025375380146349012|
|       6|0.004631865193592804|
|10147514|  0.0973581371439446|
|      15|0.025375380146349012|
|10147525| 0.31697100132425815|
|      23|0.006613827425844225|
|10147527| 0.39024801663230124|
|      34| 0.04757553225103889|
|10147531|0.009194831690301247|
|      40| 0.04757553225103889|
|10147536|  0.0973581371439446|
|      50|0.025375380146349012|
|10147541|0.012400064873366227|
|      60| 0.06705749560136744|
|10147542| 0.04757553225103889|
|      90| 0.13867291536136658|
|10147547| 0.19923280769879664|
|      97| 0.07442053866862616|
|10147569| 0.04757553225103889|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  49


In [45]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 100.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 15451139
number of columns : 13
+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|       6|0.018566257101194004|
|14206501| 0.07442053866862616|
|      10|  0.0973581371439446|
|14206506| 0.07442053866862616|
|      17| 0.07442053866862616|
|14206520| 0.07442053866862616|
|      22|  0.0973581371439446|
|14206538| 0.04757553225103889|
|      29|0.018459683461361755|
|14206539| 0.07442053866862616|
|      31| 0.04757553225103889|
|14206546|0.025375380146349012|
|      32|0.025375380146349012|
|14206557|  0.0973581371439446|
|      40|  0.0973581371439446|
|14206560| 0.04757553225103889|
|      45| 0.18639448878883308|
|14206561| 0.07442053866862616|
|      47|0.012954859518850292|
|14206573|  0.0973581371439446|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  43


In [52]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 200.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%10 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

number of rows : 30889158
number of columns : 13
+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|       6| 0.04757553225103889|
|30408723|  0.2032427827628562|
|      18|  0.0973581371439446|
|30408726|  0.0973581371439446|
|      28|  0.0973581371439446|
|30408733| 0.04757553225103889|
|      58|  0.2983054982779649|
|30408746|0.009194831690301247|
|      75| 0.03850534187006575|
|30408749|  0.0973581371439446|
|      83| 0.27670356887636377|
|30408752|0.006330588843698663|
|     106|0.037993819923439955|
|30408774| 0.07219250010881767|
|     107| 0.02175040097576503|
|30408777|0.006330588843698663|
|     127| 0.13521540110809097|
|30408800|0.011751358296161788|
|     129| 0.06776462605653523|
|30408809| 0.09313249442763089|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  149


## different number of partitions : factor data by 200.0

In [53]:
spark_df = spark.sql("""
select *, _c0%15 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

number of rows : 30889158
number of columns : 13


Unnamed: 0,partition_id,count
14,0,2051976
1,1,2071074
13,2,2067250
4,3,2067516
7,4,2049202
5,5,2044147
3,6,2088074
9,7,2075064
8,8,2052425
6,9,2040632


In [54]:
import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|       2|0.006330588843698663|
|30408719|  0.0973581371439446|
|       6|0.025375380146349012|
|30408772| 0.37353556137770405|
|       7|0.025375380146349012|
|30408774|0.012400064873366227|
|      55| 0.04757553225103889|
|30408781| 0.09108449763862994|
|      91|0.012954859518850292|
|30408782| 0.04757553225103889|
|      96|  0.0973581371439446|
|30408783|0.036366726412750856|
|     100|0.006330588843698663|
|30408798| 0.04757553225103889|
|     102|0.036366726412750856|
|30408841| 0.04757553225103889|
|     112|0.012400064873366227|
|30408860|0.002961056931896...|
|     115|  0.0973581371439446|
|30408903| 0.04757553225103889|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  181


### partitions : 20

In [55]:
spark_df = spark.sql("""
select *, _c0%20 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

number of rows : 30884267
number of columns : 13


Unnamed: 0,partition_id,count
18,0,1522186
1,1,1572389
17,2,1531537
5,3,1535273
11,4,1538251
6,5,1538175
3,6,1544597
13,7,1550215
12,8,1542467
9,9,1518785


In [56]:
import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|       3| 0.07442053866862616|
|30408739| 0.04757553225103889|
|      47| 0.15229351448544487|
|30408751| 0.06776462605653523|
|      82|0.014946364505814666|
|30408793|  0.1907006198541338|
|      96| 0.06705749560136744|
|30408797| 0.04757553225103889|
|     101|0.001723824942387...|
|30408806| 0.03850534187006575|
|     112|0.045086629087996585|
|30408812|  0.0973581371439446|
|     133| 0.15229351448544487|
|30408816|0.012400064873366227|
|     192|0.005388109837376994|
|30408829| 0.04757553225103889|
|     235|  0.0973581371439446|
|30408841|0.025375380146349012|
|     237|  0.0973581371439446|
|30408866| 0.07442053866862616|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  201


### partitions : 30

In [57]:
spark_df = spark.sql("""
select *, _c0%30 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)



number of rows : 30886275
number of columns : 13


Unnamed: 0,partition_id,count
28,0,1017349
5,1,1043099
27,2,1040901
9,3,1043979
16,4,1025385
11,5,1029442
8,6,1053537
19,7,1034320
17,8,1024273
15,9,1016451


In [58]:
import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|      23| 0.07442053866862616|
|30408731| 0.19923280769879664|
|      30| 0.19923280769879664|
|30408743|0.006330588843698663|
|      47|0.011751358296161788|
|30408781| 0.08631943283429547|
|      80|  0.0973581371439446|
|30408782| 0.07442053866862616|
|     146| 0.31697100132425815|
|30408817|  0.0051257264834694|
|     197| 0.19923280769879664|
|30408867| 0.04757553225103889|
|     202| 0.04757553225103889|
|30408873| 0.19923280769879664|
|     224| 0.31697100132425815|
|30408894| 0.07442053866862616|
|     244|  0.0973581371439446|
|30408926| 0.06705749560136744|
|     255|  0.0973581371439446|
|30408947|  0.0973581371439446|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  138


### partitions : 50

In [59]:
spark_df = spark.sql("""
select *, _c0%50 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

number of rows : 30886233
number of columns : 13


Unnamed: 0,partition_id,count
47,0,605482
9,1,630486
44,2,602712
13,3,625678
26,4,624061
17,5,612774
12,6,612651
31,7,625834
27,8,620012
23,9,614723


In [60]:
import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|       4| 0.09108449763862994|
|30408719| 0.03850534187006575|
|      72| 0.08286375082092703|
|30408749| 0.06962497000744693|
|     110|0.022864782666027414|
|30408778|0.025375380146349012|
|     111| 0.39024801663230124|
|30408810|0.049704121965567165|
|     112| 0.08743805268583761|
|30408902| 0.04757553225103889|
|     120| 0.19923280769879664|
|30408923| 0.06705749560136744|
|     192| 0.13521540110809097|
|30408999| 0.07442053866862616|
|     244| 0.07442053866862616|
|30409032| 0.07442053866862616|
|     636| 0.16957162715734622|
|30409093| 0.09108449763862994|
|     663| 0.10171406380508481|
|30409127| 0.07442053866862616|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  87


### partitions: 70

In [63]:
bootstrapped_data = df.sample(withReplacement = True, fraction = 500.0)

bootstrapped_data.createOrReplaceTempView("data")

spark_df = spark.sql("""
select *, _c0%70 as partition_id 
from (
  select *, row_number() over (order by rand()) as user_id
  from data
) 
""")

print("number of rows :",spark_df.count())
print("number of columns :",len(spark_df.columns))

spark_df.groupBy("partition_id").count().toPandas().sort_values(["partition_id"],ascending=True)

number of rows : 77211981
number of columns : 13


Unnamed: 0,partition_id,count
67,0,1126062
11,1,1116121
62,2,1087999
16,3,1106243
34,4,1103143
...,...,...
1,65,1125985
64,66,1109124
66,67,1100850
59,68,1115426


In [64]:
import time
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_df):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_test = sample_df.drop(['user_id', 'partition_id', '_c0'], axis=1)
    pred = model_nb.predict(x_test)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:]})
  
# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
start = time.time()
results.show()
print("time taken in seconds: ", round(time.time()-start))

+--------+--------------------+
| user_id|          prediction|
+--------+--------------------+
|       4|  0.1907006198541338|
|76241560| 0.09108449763862994|
|     423| 0.15229351448544487|
|76241567| 0.07442053866862616|
|     470|  0.0973581371439446|
|76241648|0.009478606868139125|
|     492|0.018816220068673842|
|76241686| 0.10335862293544673|
|     549| 0.06962497000744693|
|76241738|  0.1548926934977234|
|     898| 0.07442053866862616|
|76241790| 0.08743805268583761|
|     900|  0.4146665257959259|
|76241807|0.006454917582147434|
|     980|  0.0427279323285803|
|76241988| 0.04757553225103889|
|    1008|  0.1907006198541338|
|76242002| 0.04757553225103889|
|    1016| 0.03850534187006575|
|76242093|  0.0973581371439446|
+--------+--------------------+
only showing top 20 rows

time taken in seconds:  343
