In [1]:
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])

spark = pyspark.sql.SparkSession.builder.appName('Drug').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sc.setLogLevel("INFO")

[('numpy', '1.20.3'), ('pandas', '1.3.4'), ('pyspark', '3.2.1')]




In [2]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                               OneHotEncoder, StringIndexer)

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
df = spark.read.csv('Drug.csv',header=True,inferSchema=True)
print(df.count())
df.show()

200
+---+---+---+------+-----------+-------+----+
|_c0|Age|Sex|    BP|Cholesterol|Na_to_K|Drug|
+---+---+---+------+-----------+-------+----+
|  0| 23|  F|  HIGH|       HIGH| 25.355|   0|
|  1| 47|  M|   LOW|       HIGH| 13.093|   3|
|  2| 47|  M|   LOW|       HIGH| 10.114|   3|
|  3| 28|  F|NORMAL|       HIGH|  7.798|   4|
|  4| 61|  F|   LOW|       HIGH| 18.043|   0|
|  5| 22|  F|NORMAL|       HIGH|  8.607|   4|
|  6| 49|  F|NORMAL|       HIGH| 16.275|   0|
|  7| 41|  M|   LOW|       HIGH| 11.037|   3|
|  8| 60|  M|NORMAL|       HIGH| 15.171|   0|
|  9| 43|  M|   LOW|     NORMAL| 19.368|   0|
| 10| 47|  F|   LOW|       HIGH| 11.767|   3|
| 11| 34|  F|  HIGH|     NORMAL| 19.199|   0|
| 12| 43|  M|   LOW|       HIGH| 15.376|   0|
| 13| 74|  F|   LOW|       HIGH| 20.942|   0|
| 14| 50|  F|NORMAL|       HIGH| 12.703|   4|
| 15| 16|  F|  HIGH|     NORMAL| 15.516|   0|
| 16| 69|  M|   LOW|     NORMAL| 11.455|   4|
| 17| 43|  M|  HIGH|       HIGH| 13.972|   1|
| 18| 23|  M|   LOW|       HIG

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- BP: string (nullable = true)
 |-- Cholesterol: string (nullable = true)
 |-- Na_to_K: double (nullable = true)
 |-- Drug: integer (nullable = true)



In [5]:
df.describe().toPandas().transpose()


Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
_c0,200,99.5,57.879184513951124,0,199
Age,200,44.315,16.544314634751974,15,74
Sex,200,,,F,M
BP,200,,,HIGH,NORMAL
Cholesterol,200,,,HIGH,NORMAL
Na_to_K,200,16.08448499999999,7.223955528459517,6.269,38.247
Drug,200,1.595,1.7163051936027987,0,4


In [6]:
from pyspark.sql.functions import isnull, when, count, col
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+---+---+---+---+-----------+-------+----+
|_c0|Age|Sex| BP|Cholesterol|Na_to_K|Drug|
+---+---+---+---+-----------+-------+----+
|  0|  0|  0|  0|          0|      0|   0|
+---+---+---+---+-----------+-------+----+



In [7]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                               OneHotEncoder, StringIndexer)

In [8]:
Sex_indexer = StringIndexer(inputCol='Sex', outputCol='Sex_index')
Sex_encoder = OneHotEncoder(inputCol='Sex_index', outputCol='Sex_vec')

In [9]:
BP_indexer = StringIndexer(inputCol='BP', outputCol='BP_index')
BP_encoder = OneHotEncoder(inputCol='BP_index', outputCol='BP_vec')

In [17]:
Cholestero_indexer = StringIndexer(inputCol='Cholesterol', outputCol='Cholestero_index')
Cholestero_encoder = OneHotEncoder(inputCol='Cholestero_index', outputCol='Cholestero_vec')

In [18]:
assembler = VectorAssembler(inputCols=['Age','Sex_vec','BP_vec',
                                      'Cholestero_vec','Na_to_K'],
                           outputCol='features')


## Logistic Regression

In [19]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline


In [40]:
lr = LogisticRegression(featuresCol='features',labelCol='Drug')

In [41]:
pipeline = Pipeline(stages=[Sex_indexer, Sex_encoder,
                           BP_indexer, BP_encoder,Cholestero_indexer,Cholestero_encoder,
                           assembler, lr])

In [42]:
train, test = df.randomSplit([0.7, 0.3])

In [43]:
lr_model = pipeline.fit(train)

In [44]:
results = lr_model.transform(test)
results.show(5)

+---+---+---+----+-----------+-------+----+---------+-------------+--------+-------------+----------------+--------------+--------------------+--------------------+--------------------+----------+
|_c0|Age|Sex|  BP|Cholesterol|Na_to_K|Drug|Sex_index|      Sex_vec|BP_index|       BP_vec|Cholestero_index|Cholestero_vec|            features|       rawPrediction|         probability|prediction|
+---+---+---+----+-----------+-------+----+---------+-------------+--------+-------------+----------------+--------------+--------------------+--------------------+--------------------+----------+
|  2| 47|  M| LOW|       HIGH| 10.114|   3|      0.0|(1,[0],[1.0])|     1.0|(2,[1],[1.0])|             0.0| (1,[0],[1.0])|[47.0,1.0,0.0,1.0...|[-47.177738669328...|[4.79624393829567...|       3.0|
|  7| 41|  M| LOW|       HIGH| 11.037|   3|      0.0|(1,[0],[1.0])|     1.0|(2,[1],[1.0])|             0.0| (1,[0],[1.0])|[41.0,1.0,0.0,1.0...|[-21.883900054816...|[5.54046308349572...|       3.0|
| 10| 47|  F| L

In [45]:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [47]:
my_eval = MulticlassClassificationEvaluator(predictionCol='prediction',
                                       labelCol='Drug')

In [48]:
results.select('Drug','prediction').show()

+----+----------+
|Drug|prediction|
+----+----------+
|   3|       3.0|
|   3|       3.0|
|   3|       3.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   4|       4.0|
|   1|       1.0|
|   4|       4.0|
|   2|       2.0|
|   4|       4.0|
|   1|       1.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   4|       4.0|
|   1|       1.0|
|   2|       2.0|
|   4|       4.0|
|   1|       1.0|
+----+----------+
only showing top 20 rows



In [49]:
auc = my_eval.evaluate(results)
auc

0.9587737127371274

## Random Forest

In [52]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(predictionCol='prediction',labelCol='Drug',numTrees=20, maxDepth=20)

In [54]:
pipeline = Pipeline(stages=[Sex_indexer, Sex_encoder,
                           BP_indexer, BP_encoder,Cholestero_indexer,Cholestero_encoder,
                           assembler, rf])

In [55]:
rf_model = pipeline.fit(train)

In [56]:
results = rf_model.transform(test)

In [58]:
results.select('Drug','prediction').show()

+----+----------+
|Drug|prediction|
+----+----------+
|   3|       3.0|
|   3|       3.0|
|   3|       3.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   4|       4.0|
|   1|       1.0|
|   4|       4.0|
|   2|       2.0|
|   4|       4.0|
|   1|       1.0|
|   0|       0.0|
|   0|       0.0|
|   0|       0.0|
|   4|       4.0|
|   1|       1.0|
|   2|       2.0|
|   4|       4.0|
|   1|       1.0|
+----+----------+
only showing top 20 rows



In [59]:
auc = my_eval.evaluate(results)
auc

1.0