# Pyspark.ml Mini Project

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 70 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 58.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=a06b1e3eb9a65bddc75ec15c3cb96596764ff300b11d24a5009bd6d41e7fbc13
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [3]:
import pyspark
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("DT").getOrCreate()

In [8]:
#Load pet food dataset into a Spark data frame

df = spark.read.csv("pet_food.csv",header =True, inferSchema = True)

In [9]:
df.take(4)

[Row(A=4, B=2, C=12.0, D=3, Spoiled=1.0),
 Row(A=5, B=6, C=12.0, D=7, Spoiled=1.0),
 Row(A=6, B=2, C=13.0, D=6, Spoiled=1.0),
 Row(A=4, B=2, C=12.0, D=1, Spoiled=1.0)]

In [10]:
df.show()

+---+---+----+---+-------+
|  A|  B|   C|  D|Spoiled|
+---+---+----+---+-------+
|  4|  2|12.0|  3|    1.0|
|  5|  6|12.0|  7|    1.0|
|  6|  2|13.0|  6|    1.0|
|  4|  2|12.0|  1|    1.0|
|  4|  2|12.0|  3|    1.0|
| 10|  3|13.0|  9|    1.0|
|  8|  5|14.0|  5|    1.0|
|  5|  8|12.0|  8|    1.0|
|  6|  5|12.0|  9|    1.0|
|  3|  3|12.0|  1|    1.0|
|  9|  8|11.0|  3|    1.0|
|  1| 10|12.0|  3|    1.0|
|  1|  5|13.0| 10|    1.0|
|  2| 10|12.0|  6|    1.0|
|  1| 10|11.0|  4|    1.0|
|  5|  3|12.0|  2|    1.0|
|  4|  9|11.0|  8|    1.0|
|  5|  1|11.0|  1|    1.0|
|  4|  9|12.0| 10|    1.0|
|  5|  8|10.0|  9|    1.0|
+---+---+----+---+-------+
only showing top 20 rows



In [39]:
# Create a vector assembler
# it take features exist from different column into a single feature vector

vec_assembler = VectorAssembler(inputCols=['A','B','C','D'],outputCol='Chemicals')
vec_df = vec_assembler.transform(df)

In [37]:
vec_df = vec_assembler.transform(df)

In [38]:
vec_df.printSchema()

root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: double (nullable = true)
 |-- D: integer (nullable = true)
 |-- Spoiled: double (nullable = true)
 |-- Chemicals: vector (nullable = true)



In [67]:
vec_df.show()

+---+---+----+---+-------+-------------------+
|  A|  B|   C|  D|Spoiled|          Chemicals|
+---+---+----+---+-------+-------------------+
|  4|  2|12.0|  3|    1.0| [4.0,2.0,12.0,3.0]|
|  5|  6|12.0|  7|    1.0| [5.0,6.0,12.0,7.0]|
|  6|  2|13.0|  6|    1.0| [6.0,2.0,13.0,6.0]|
|  4|  2|12.0|  1|    1.0| [4.0,2.0,12.0,1.0]|
|  4|  2|12.0|  3|    1.0| [4.0,2.0,12.0,3.0]|
| 10|  3|13.0|  9|    1.0|[10.0,3.0,13.0,9.0]|
|  8|  5|14.0|  5|    1.0| [8.0,5.0,14.0,5.0]|
|  5|  8|12.0|  8|    1.0| [5.0,8.0,12.0,8.0]|
|  6|  5|12.0|  9|    1.0| [6.0,5.0,12.0,9.0]|
|  3|  3|12.0|  1|    1.0| [3.0,3.0,12.0,1.0]|
|  9|  8|11.0|  3|    1.0| [9.0,8.0,11.0,3.0]|
|  1| 10|12.0|  3|    1.0|[1.0,10.0,12.0,3.0]|
|  1|  5|13.0| 10|    1.0|[1.0,5.0,13.0,10.0]|
|  2| 10|12.0|  6|    1.0|[2.0,10.0,12.0,6.0]|
|  1| 10|11.0|  4|    1.0|[1.0,10.0,11.0,4.0]|
|  5|  3|12.0|  2|    1.0| [5.0,3.0,12.0,2.0]|
|  4|  9|11.0|  8|    1.0| [4.0,9.0,11.0,8.0]|
|  5|  1|11.0|  1|    1.0| [5.0,1.0,11.0,1.0]|
|  4|  9|12.0

In [68]:
#create training and test datasets

splits = vec_df.randomSplit([0.75,0.25], 1)

In [69]:
train_df = splits[0]
test_df = splits [1]

In [70]:
vec_df.count()

490

In [71]:
train_df.count()

371

In [72]:
#Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier

In [88]:
DT = DecisionTreeClassifier(labelCol = 'Spoiled', featuresCol= 'Chemicals')

In [89]:
model = DT.fit(train_df)

In [90]:
pred = model.transform(test_df)

In [91]:
pred.show()

+---+---+----+---+-------+-------------------+-------------+--------------------+----------+
|  A|  B|   C|  D|Spoiled|          Chemicals|rawPrediction|         probability|prediction|
+---+---+----+---+-------+-------------------+-------------+--------------------+----------+
|  1|  2| 9.0|  1|    0.0|  [1.0,2.0,9.0,1.0]|  [239.0,1.0]|[0.99583333333333...|       0.0|
|  1|  4| 9.0|  6|    0.0|  [1.0,4.0,9.0,6.0]|  [239.0,1.0]|[0.99583333333333...|       0.0|
|  1|  4|13.0| 10|    1.0|[1.0,4.0,13.0,10.0]|   [0.0,98.0]|           [0.0,1.0]|       1.0|
|  1|  5| 8.0|  3|    0.0|  [1.0,5.0,8.0,3.0]|  [239.0,1.0]|[0.99583333333333...|       0.0|
|  1|  5| 8.0| 10|    0.0| [1.0,5.0,8.0,10.0]|  [239.0,1.0]|[0.99583333333333...|       0.0|
|  1|  7|11.0|  9|    1.0| [1.0,7.0,11.0,9.0]|   [0.0,98.0]|           [0.0,1.0]|       1.0|
|  1|  8| 7.0| 10|    0.0| [1.0,8.0,7.0,10.0]|  [239.0,1.0]|[0.99583333333333...|       0.0|
|  1|  8| 8.0|  6|    0.0|  [1.0,8.0,8.0,6.0]|  [239.0,1.0]|[0.9958333

In [82]:
pred.select(['Chemicals','Spoiled','prediction']).show(15)

+-------------------+-------+----------+
|          Chemicals|Spoiled|prediction|
+-------------------+-------+----------+
|  [1.0,2.0,9.0,1.0]|    0.0|       0.0|
|  [1.0,4.0,9.0,6.0]|    0.0|       0.0|
|[1.0,4.0,13.0,10.0]|    1.0|       1.0|
|  [1.0,5.0,8.0,3.0]|    0.0|       0.0|
| [1.0,5.0,8.0,10.0]|    0.0|       0.0|
| [1.0,7.0,11.0,9.0]|    1.0|       1.0|
| [1.0,8.0,7.0,10.0]|    0.0|       0.0|
|  [1.0,8.0,8.0,6.0]|    0.0|       0.0|
|  [1.0,8.0,8.0,8.0]|    0.0|       0.0|
|  [1.0,9.0,9.0,7.0]|    0.0|       0.0|
|[1.0,10.0,11.0,4.0]|    1.0|       1.0|
|[1.0,10.0,12.0,3.0]|    1.0|       1.0|
|  [2.0,1.0,7.0,9.0]|    0.0|       0.0|
| [2.0,2.0,6.0,10.0]|    0.0|       0.0|
|  [2.0,2.0,8.0,1.0]|    0.0|       0.0|
+-------------------+-------+----------+
only showing top 15 rows



In [85]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [95]:

decTree_eval = MulticlassClassificationEvaluator(labelCol = 'Spoiled', predictionCol='prediction',metricName='accuracy')


In [96]:
decTree_accuracy = decTree_eval.evaluate(pred)

In [97]:
print("Decision Tree accuracy is {0:.2f}.".format(decTree_accuracy))

Decision Tree accuracy is 0.98.


In [None]:
#C chemical caused the early spoiling