# Tree Methods Consulting Project 

You've been hired by a dog food company to try to predict why some batches of their dog food are spoiling much quicker than intended! Unfortunately this Dog Food company hasn't upgraded to the latest machinery, meaning that the amounts of the five preservative chemicals they are using can vary a lot, but which is the chemical that has the strongest effect? The dog food company first mixes up a batch of preservative that contains 4 different preservative chemicals (A,B,C,D) and then is completed with a "filler" chemical. The food scientists beelive one of the A,B,C, or D preservatives is causing the problem, but need your help to figure out which one!
Use Machine Learning with RF to find out which parameter had the most predicitive power, thus finding out which chemical causes the early spoiling! So create a model and then find out how you can decide which chemical is the problem!

* Pres_A : Percentage of preservative A in the mix
* Pres_B : Percentage of preservative B in the mix
* Pres_C : Percentage of preservative C in the mix
* Pres_D : Percentage of preservative D in the mix
* Spoiled: Label indicating whether or not the dog food batch was spoiled.
___

**Think carefully about what this problem is really asking you to solve. While we will use Machine Learning to solve this, it won't be with your typical train/test split workflow. If this confuses you, skip ahead to the solution code along walk-through!**
____

# Configuration

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('dogfoodproj').getOrCreate()

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled',True)
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows',5)

# Imports

In [None]:
import pyspark.sql.functions as F

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Data 

In [None]:
data = spark.read.csv('dog_food.csv', inferSchema=True, header=True)

In [None]:
data

A,B,C,D,Spoiled
4,2,12.0,3,1.0
5,6,12.0,7,1.0
6,2,13.0,6,1.0
4,2,12.0,1,1.0
4,2,12.0,3,1.0


In [None]:
data.describe()

summary,A,B,C,D,Spoiled
count,490.0,490.0,490.0,490.0,490.0
mean,5.53469387755102,5.504081632653061,9.126530612244895,5.579591836734694,0.2857142857142857
stddev,2.9515204234399057,2.8537966089662063,2.0555451971054275,2.8548369309982857,0.4522156316461346
min,1.0,1.0,5.0,1.0,0.0
max,10.0,10.0,14.0,10.0,1.0


In [None]:
data.printSchema()

root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: double (nullable = true)
 |-- D: integer (nullable = true)
 |-- Spoiled: double (nullable = true)



In [None]:
a = output_list[0][0]['count']

for pres, label in zip(output_list, ["A","B","C","D"]):
    
    spld = pres[0]['count']
    notspld = pres[1]['count']
    ratio = spld / (spld + notspld)
    print(f"Ratio of Spoiled when preservative {label} above average: \n {ratio}")    

Ratio of Spoiled when preservative A above average: 
 0.3034188034188034
Ratio of Spoiled when preservative B above average: 
 0.2490118577075099
Ratio of Spoiled when preservative C above average: 
 0.8475609756097561
Ratio of Spoiled when preservative D above average: 
 0.2764227642276423


In [None]:
data.select("C").distinct().collect()

[Row(C=8.0),
 Row(C=7.0),
 Row(C=11.0),
 Row(C=14.0),
 Row(C=10.0),
 Row(C=13.0),
 Row(C=6.0),
 Row(C=5.0),
 Row(C=9.0),
 Row(C=12.0)]

In [None]:
data = data.withColumn("C",F.col("C").cast("int")) \
           .withColumn("Spoiled",F.col("Spoiled").cast("int"))

# Data Exploration

In [None]:
output_list = []

for col in ["A","B","C","D"]:
    mean = data.select(F.mean(col)).collect()[0][f'avg({col})']
    output_list.append(data.select(f"{col}","Spoiled").filter(f"{col} > {mean}").groupBy("Spoiled").count().collect())
    
output_list

[[Row(Spoiled=1, count=71), Row(Spoiled=0, count=163)],
 [Row(Spoiled=1, count=63), Row(Spoiled=0, count=190)],
 [Row(Spoiled=1, count=139), Row(Spoiled=0, count=25)],
 [Row(Spoiled=1, count=68), Row(Spoiled=0, count=178)]]

In [None]:
for pres, label in zip(output_list, ["A","B","C","D"]):
    
    spld = pres[0]['count']
    notspld = pres[1]['count']
    ratio = spld / (spld + notspld)
    print(f"Ratio of Spoiled when preservative {label} above average: \n {ratio}")    

Ratio of Spoiled when preservative A above average: 
 0.3034188034188034
Ratio of Spoiled when preservative B above average: 
 0.2490118577075099
Ratio of Spoiled when preservative C above average: 
 0.8475609756097561
Ratio of Spoiled when preservative D above average: 
 0.2764227642276423


In [None]:
data.groupBy("Spoiled").count() #Slightly unbalanced

Spoiled,count
True,140
False,350


# Spark Data Formatting

In [None]:
data.columns

['A', 'B', 'C', 'D', 'Spoiled']

In [None]:
assembler = VectorAssembler(inputCols=['A', 'B', 'C', 'D'],outputCol='features')
fmt_data = assembler.transform(data)

In [None]:
fmt_data = fmt_data.select(F.col("Spoiled").alias("label"),"features")

# Modeling

In [None]:
rfc = RandomForestClassifier()

In [None]:
rfc_model = rfc.fit(fmt_data)

In [None]:
# Features Importances

rfc_model.featureImportances

SparseVector(4, {0: 0.0219, 1: 0.0231, 2: 0.9364, 3: 0.0186})