# Getting the necessory libraries and building a spark session

In [47]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [48]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [49]:
rdd = spark.read.csv('/content/drug200.csv', header=True, inferSchema= True)
rdd.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- BP: string (nullable = true)
 |-- Cholesterol: string (nullable = true)
 |-- Na_to_K: double (nullable = true)
 |-- Drug: string (nullable = true)



# Separate the columns based on the data types to be processed

In [50]:
rdd.dtypes

[('Age', 'int'),
 ('Sex', 'string'),
 ('BP', 'string'),
 ('Cholesterol', 'string'),
 ('Na_to_K', 'double'),
 ('Drug', 'string')]

In [51]:
str_type = [f for f, t in rdd.dtypes if ((t == 'string')&(f!='Drug')) ]
str_type

['Sex', 'BP', 'Cholesterol']

In [52]:
num_type = [f for f, t in rdd.dtypes if t != 'string' ]
num_type

['Age', 'Na_to_K']

In [53]:
str_index = [f+'_index' for f in str_type]
str_ohe = [f+'_ohe' for f in str_type]
all_data = num_type + str_ohe

# Encode the Drug column as it is the target variable

In [54]:
drug_indexer = StringIndexer(inputCol="Drug", outputCol="Drug_index")

In [55]:
rdd1 = drug_indexer.fit(rdd).transform(rdd)

# Split the data and build a pipeline of transformations

In [56]:
train, test = rdd1.randomSplit([0.8,0.2])

In [57]:
indexer = StringIndexer(inputCols=str_type, outputCols=str_index)
ohe = OneHotEncoder(inputCols=str_index, outputCols=str_ohe)
vas = VectorAssembler(inputCols=all_data, outputCol='features')
rf = RandomForestClassifier(featuresCol='features', labelCol='Drug_index', predictionCol='prediction')

In [58]:
pipeline = Pipeline(stages=[indexer,ohe,vas,rf])

In [59]:
model = pipeline.fit(train)

# Evaluate the model on the test data

In [60]:
pred = model.transform(test)

In [61]:
pred.select('Drug','Drug_index','prediction').show()

+-----+----------+----------+
| Drug|Drug_index|prediction|
+-----+----------+----------+
|DrugY|       0.0|       0.0|
|drugX|       1.0|       1.0|
|DrugY|       0.0|       0.0|
|DrugY|       0.0|       0.0|
|drugA|       2.0|       2.0|
|DrugY|       0.0|       0.0|
|DrugY|       0.0|       0.0|
|drugA|       2.0|       2.0|
|drugC|       4.0|       0.0|
|DrugY|       0.0|       0.0|
|DrugY|       0.0|       0.0|
|DrugY|       0.0|       0.0|
|DrugY|       0.0|       0.0|
|drugX|       1.0|       1.0|
|drugA|       2.0|       2.0|
|drugX|       1.0|       1.0|
|DrugY|       0.0|       0.0|
|drugA|       2.0|       2.0|
|drugC|       4.0|       4.0|
|drugX|       1.0|       1.0|
+-----+----------+----------+
only showing top 20 rows



In [62]:
eval = MulticlassClassificationEvaluator(labelCol='Drug_index', predictionCol='prediction', metricName='accuracy')
eval.evaluate(pred)

0.9714285714285714

# Save the model on your machine

In [63]:
model.write().overwrite().save('rf_pyspark')

In [69]:
!zip -r rf_pyspark.zip rf_pyspark
from google.colab import files
files.download('rf_pyspark.zip')

updating: rf_pyspark/ (stored 0%)
updating: rf_pyspark/stages/ (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/ (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/data/ (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/data/._SUCCESS.crc (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/data/_SUCCESS (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/data/.part-00000-f8087799-27f1-439f-9a62-b2f2f1f986f6-c000.snappy.parquet.crc (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/data/part-00000-f8087799-27f1-439f-9a62-b2f2f1f986f6-c000.snappy.parquet (deflated 32%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/treesMetadata/ (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c1/treesMetadata/._SUCCESS.crc (stored 0%)
updating: rf_pyspark/stages/3_RandomForestClassifier_226e036b61c

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Get the model back

In [64]:
from pyspark.ml.pipeline import PipelineModel
saved_model = PipelineModel.load('rf_pyspark')

In [66]:
pred2 = saved_model.transform(test)
eval = MulticlassClassificationEvaluator(labelCol='Drug_index', predictionCol='prediction', metricName='accuracy')
eval.evaluate(pred2)

0.9714285714285714