In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('DogFood').getOrCreate()

In [3]:
df = spark.read.csv('dog_food.csv', inferSchema = True, header = True)

In [4]:
df.show(5)

+---+---+----+---+-------+
|  A|  B|   C|  D|Spoiled|
+---+---+----+---+-------+
|  4|  2|12.0|  3|    1.0|
|  5|  6|12.0|  7|    1.0|
|  6|  2|13.0|  6|    1.0|
|  4|  2|12.0|  1|    1.0|
|  4|  2|12.0|  3|    1.0|
+---+---+----+---+-------+
only showing top 5 rows



In [5]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+
|summary|                 A|                 B|                 C|                 D|            Spoiled|
+-------+------------------+------------------+------------------+------------------+-------------------+
|  count|               490|               490|               490|               490|                490|
|   mean|  5.53469387755102| 5.504081632653061| 9.126530612244897| 5.579591836734694| 0.2857142857142857|
| stddev|2.9515204234399057|2.8537966089662063|2.0555451971054275|2.8548369309982857|0.45221563164613465|
|    min|                 1|                 1|               5.0|                 1|                0.0|
|    max|                10|                10|              14.0|                10|                1.0|
+-------+------------------+------------------+------------------+------------------+-------------------+



In [6]:
df.printSchema()

root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: double (nullable = true)
 |-- D: integer (nullable = true)
 |-- Spoiled: double (nullable = true)



### Data Wrangling

In [7]:
df.createOrReplaceTempView('df')

In [8]:
spark.sql("select distinct C from df").show()

+----+
|   C|
+----+
| 8.0|
| 7.0|
|11.0|
|14.0|
|10.0|
|13.0|
| 6.0|
| 5.0|
| 9.0|
|12.0|
+----+



In [9]:
df = df.withColumn('C', df['C'].cast('int'))

In [10]:
df.printSchema()

root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: integer (nullable = true)
 |-- D: integer (nullable = true)
 |-- Spoiled: double (nullable = true)



In [11]:
from pyspark.ml.feature import VectorAssembler

In [12]:
df.columns

['A', 'B', 'C', 'D', 'Spoiled']

In [13]:
assembler = VectorAssembler(inputCols = ['A', 'B', 'C', 'D'], outputCol = 'features')

In [14]:
output = assembler.transform(df)

In [15]:
final_data = output.select('features', 'Spoiled')

In [16]:
final_data.show(5)

+------------------+-------+
|          features|Spoiled|
+------------------+-------+
|[4.0,2.0,12.0,3.0]|    1.0|
|[5.0,6.0,12.0,7.0]|    1.0|
|[6.0,2.0,13.0,6.0]|    1.0|
|[4.0,2.0,12.0,1.0]|    1.0|
|[4.0,2.0,12.0,3.0]|    1.0|
+------------------+-------+
only showing top 5 rows



### Model Buiding

In [18]:
from pyspark.ml.classification import RandomForestClassifier

In [19]:
rfc = RandomForestClassifier(labelCol = 'Spoiled', featuresCol = 'features', numTrees = 200)

In [20]:
rfc_model = rfc.fit(final_data)

In [21]:
rfc_model.featureImportances

SparseVector(4, {0: 0.0205, 1: 0.0186, 2: 0.9382, 3: 0.0227})

**It is concluded that Chemical C is cause for early spoiling**

In [22]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [23]:
rfc_model = rfc.fit(train_data)

In [24]:
rfc_preds = rfc_model.transform(test_data)

In [25]:
rfc_preds.show(5)

+------------------+-------+--------------------+--------------------+----------+
|          features|Spoiled|       rawPrediction|         probability|prediction|
+------------------+-------+--------------------+--------------------+----------+
|[1.0,1.0,12.0,4.0]|    1.0|[4.10057338918824...|[0.02050286694594...|       1.0|
| [1.0,2.0,9.0,4.0]|    0.0|[194.281401504126...|[0.97140700752063...|       0.0|
| [1.0,3.0,8.0,5.0]|    0.0|[194.507774297803...|[0.97253887148901...|       0.0|
| [1.0,3.0,9.0,8.0]|    0.0|[186.546914473146...|[0.93273457236573...|       0.0|
| [1.0,4.0,8.0,1.0]|    0.0|[196.170662526655...|[0.98085331263327...|       0.0|
+------------------+-------+--------------------+--------------------+----------+
only showing top 5 rows



In [26]:
rfc_preds.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Spoiled: double (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



### Model Evaluation

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [28]:
my_eval = BinaryClassificationEvaluator(labelCol = 'Spoiled')
acc = MulticlassClassificationEvaluator(labelCol = 'Spoiled', metricName = 'accuracy')

In [29]:
rfc_eval = my_eval.evaluate(rfc_preds)
rfc_acc = acc.evaluate(rfc_preds)

In [30]:
print(f"Random Forest Model AUC: {rfc_eval}\nRandom Forest Model Accuracy: {rfc_acc}")

Random Forest Model AUC: 0.9900826446280991
Random Forest Model Accuracy: 0.9805194805194806
