{{badge}}

* In this we do parameter searching using parallelisation.
* i.e entire data set is run parallely among all the parameters and return the best.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#use latest spark version
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from sklearn import datasets
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import pyspark.sql.functions as F
import random
from pyspark.sql.types import *
from sklearn.model_selection import train_test_split

In [None]:
X,y = datasets.make_classification(n_samples=10000, n_features=4, n_informative=2, n_classes=2, random_state=1,shuffle=True)

In [None]:
train = pd.DataFrame(X)
train['target'] = y

In [None]:
train_sp = spark.createDataFrame(train)

In [None]:
train_sp = train_sp.toDF(*['c0', 'c1', 'c2', 'c3', 'target'])

In [None]:
train_sp.show()

+--------------------+--------------------+--------------------+--------------------+------+
|                  c0|                  c1|                  c2|                  c3|target|
+--------------------+--------------------+--------------------+--------------------+------+
| -1.8873649371603203| -1.1455691898002351|  0.8396761000312767|  -2.008855708086743|     0|
|-0.18266809216582025|-0.12226678277057923| 0.08251252435219325| -0.2054662226628986|     0|
| -0.7315948349672106|  0.6559036936823883| 0.20531124349308186| 0.28714060384238044|     0|
| -0.7749652163170958|  0.7440265567629247| 0.21210307064010814| 0.35187466485355684|     0|
| -1.3394227045324436| -1.0424630852864827|  0.6209706165480363| -1.6479989577746343|     0|
|-0.18017499772535683| -1.6244897462240875| 0.24568656908689943| -1.6598910893214862|     1|
|    0.82509156468382| -0.5728119479422702| -0.2497852096620914|-0.16210051159032354|     0|
|  1.1781029789531323| 0.17113092183663403| -0.4647047960767629|  0.72

* Replicate the Dataset n times

In [None]:
# replicate the spark dataframe into multiple copies
replication_df = spark.createDataFrame(pd.DataFrame(list(range(0,100)),columns=['replication_id']))

In [None]:
replicated_train_df = train_sp.crossJoin(replication_df)

In [None]:
print((replicated_train_df.count(), len(replicated_train_df.columns)))

(1000000, 6)


In [None]:
ref = replicated_train_df.groupby('replication_id')

* Create Pandas UDF to run Model

In [None]:
# 0. Declare the schema for the output of our function
outSchema = StructType([StructField('replication_id',IntegerType(),True),StructField('Accuracy',DoubleType(),True),StructField('num_trees',IntegerType(),True),StructField('depth',IntegerType(),True),StructField('criterion',StringType(),True)])

# decorate our function with pandas_udf decorator
@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)

def run_model(pdf):
  
    # 1. Get hyperparam values
    num_trees =  random.choice(list(range(50,500)))
    depth = random.choice(list(range(2,10)))
    criterion = random.choice(['gini','entropy'])
    replication_id = pdf.replication_id.values[0]
    
    # 2. Train test split
    X = pdf[['c0', 'c1', 'c2', 'c3']]
    y = pdf['target']
    #del X['target']
    Xtrain,Xcv,ytrain,ycv = train_test_split(X, y, test_size=0.33, random_state=42)
    
    # 3. Create model using the pandas dataframe
    clf = RandomForestClassifier(n_estimators=num_trees, max_depth = depth, criterion =criterion)
    clf.fit(Xtrain,ytrain)
    
    # 4. Evaluate the model
    accuracy = accuracy_score(clf.predict(Xcv),ycv)
    
    # 5. return results as pandas DF
    res =pd.DataFrame({'replication_id':replication_id,'Accuracy':accuracy, 'num_trees':num_trees,'depth':depth,'criterion':criterion}, index=[0])
    
    return res


* Run the model

In [None]:
results = replicated_train_df.groupby("replication_id").apply(run_model)

results = results.sort(F.desc("Accuracy"))

In [None]:
results = results.sort(F.desc("Accuracy"))
results.show()

+--------------+------------------+---------+-----+---------+
|replication_id|          Accuracy|num_trees|depth|criterion|
+--------------+------------------+---------+-----+---------+
|             4|0.9636363636363636|      293|    9|  entropy|
|            68|0.9636363636363636|      274|    9|  entropy|
|            13|0.9636363636363636|      293|    9|  entropy|
|            87|0.9636363636363636|      274|    9|  entropy|
|            46|0.9633333333333334|      317|    9|  entropy|
|            73|0.9633333333333334|      317|    9|  entropy|
|            53| 0.963030303030303|      177|    9|     gini|
|            99| 0.963030303030303|      107|    8|  entropy|
|            20| 0.963030303030303|      107|    8|  entropy|
|            75| 0.963030303030303|      423|    9|     gini|
|            78| 0.963030303030303|      423|    9|     gini|
|            54|0.9627272727272728|      314|    9|     gini|
|             0|0.9627272727272728|      314|    9|     gini|
|       

In [None]:
results.write.parquet('/content/drive/My Drive/Colab Notebooks/Models/Quora/Error_example.parquet.gzip')