In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .appName("chef") \
        .getOrCreate()

In [3]:
# Import csv of preparation and rating
df = spark.read.csv('data/heroku_kp3hp48w.recipes.csv',header=True,sep='\t')

In [4]:
df.count()

6674

In [5]:
df.show()

+--------------------+--------------------+------+----------------+
|               title|         preparation|rating|makeitagainscore|
+--------------------+--------------------+------+----------------+
|Pasta with Lentil...|1. In a large pot...|     3|             75%|
|Key Lime Pie with...|Combine huckleber...|     4|            100%|
|       Old Fashioned|In old-fashioned ...|   3.5|            100%|
|       Apple Galette|Blend flour and s...|   3.5|            100%|
|Lemon-Spice Bread...|Preheat oven to 3...|     4|            100%|
|    Soft Egg Ravioli|Mix all ingredien...|     4|            100%|
|Mac and Cheese wi...|Prepare barbecue ...|     3|             78%|
|Salt-Roasted Port...|Mix first 7 ingre...|     3|             70%|
|Handmade Pasta wi...|Place 2 cups flou...|     4|            100%|
|Pan-Seared Sea Sc...|Place wine and sh...|     3|             70%|
|Chorizo-Filled Dates|Cook chorizo in h...|   3.5|            100%|
|         Shabu-Shabu|Arrange steak on ...|     

In [6]:
df= df.dropna(how='any')

In [7]:
# Create a length column to be used as a future feature
from pyspark.sql.functions import length
data = df.withColumn('length', length(df['title']))

In [8]:
data.show()

+--------------------+--------------------+------+----------------+------+
|               title|         preparation|rating|makeitagainscore|length|
+--------------------+--------------------+------+----------------+------+
|Pasta with Lentil...|1. In a large pot...|     3|             75%|    27|
|Key Lime Pie with...|Combine huckleber...|     4|            100%|    62|
|       Old Fashioned|In old-fashioned ...|   3.5|            100%|    13|
|       Apple Galette|Blend flour and s...|   3.5|            100%|    13|
|Lemon-Spice Bread...|Preheat oven to 3...|     4|            100%|    46|
|    Soft Egg Ravioli|Mix all ingredien...|     4|            100%|    16|
|Mac and Cheese wi...|Prepare barbecue ...|     3|             78%|    40|
|Salt-Roasted Port...|Mix first 7 ingre...|     3|             70%|    24|
|Handmade Pasta wi...|Place 2 cups flou...|     4|            100%|    57|
|Pan-Seared Sea Sc...|Place wine and sh...|     3|             70%|    65|
|Chorizo-Filled Dates|Coo

In [9]:
data.count()

6662

In [10]:
from pyspark.sql.types import DecimalType
data = data.withColumn("rating_numeric", data["rating"].cast(DecimalType(precision=10, scale=1)))

In [11]:
data.show()

+--------------------+--------------------+------+----------------+------+--------------+
|               title|         preparation|rating|makeitagainscore|length|rating_numeric|
+--------------------+--------------------+------+----------------+------+--------------+
|Pasta with Lentil...|1. In a large pot...|     3|             75%|    27|           3.0|
|Key Lime Pie with...|Combine huckleber...|     4|            100%|    62|           4.0|
|       Old Fashioned|In old-fashioned ...|   3.5|            100%|    13|           3.5|
|       Apple Galette|Blend flour and s...|   3.5|            100%|    13|           3.5|
|Lemon-Spice Bread...|Preheat oven to 3...|     4|            100%|    46|           4.0|
|    Soft Egg Ravioli|Mix all ingredien...|     4|            100%|    16|           4.0|
|Mac and Cheese wi...|Prepare barbecue ...|     3|             78%|    40|           3.0|
|Salt-Roasted Port...|Mix first 7 ingre...|     3|             70%|    24|           3.0|
|Handmade 

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def classify_rating(rating):
    if rating >= 4:
        return 'good'
    else:
        return 'bad'

classify_rating = udf(classify_rating, StringType())
data = data.withColumn('class', classify_rating(data['rating_numeric']))

In [13]:
data.show()

+--------------------+--------------------+------+----------------+------+--------------+-----+
|               title|         preparation|rating|makeitagainscore|length|rating_numeric|class|
+--------------------+--------------------+------+----------------+------+--------------+-----+
|Pasta with Lentil...|1. In a large pot...|     3|             75%|    27|           3.0|  bad|
|Key Lime Pie with...|Combine huckleber...|     4|            100%|    62|           4.0| good|
|       Old Fashioned|In old-fashioned ...|   3.5|            100%|    13|           3.5|  bad|
|       Apple Galette|Blend flour and s...|   3.5|            100%|    13|           3.5|  bad|
|Lemon-Spice Bread...|Preheat oven to 3...|     4|            100%|    46|           4.0| good|
|    Soft Egg Ravioli|Mix all ingredien...|     4|            100%|    16|           4.0| good|
|Mac and Cheese wi...|Prepare barbecue ...|     3|             78%|    40|           3.0|  bad|
|Salt-Roasted Port...|Mix first 7 ingre.

In [14]:
data.groupBy('class').count().show()

+-----+-----+
|class|count|
+-----+-----+
|  bad| 5464|
| good| 1198|
+-----+-----+



Baseline model accuracy is 82%

#### A baseline result is the simplest possible prediction. Classification: If you have a classification problem, you can select the class that has the most observations and use that class as the result for all predictions.

#### Feature Transformations

In [15]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, NGram

In [16]:
# Create all the features to the data set
class_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="title", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
# ngram = NGram(n=2, inputCol="token_text", outputCol="ngrams")
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [17]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [18]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(
    stages=[class_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [19]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data)
cleaned = cleaner.transform(data)

In [20]:
cleaned.columns

['title',
 'preparation',
 'rating',
 'makeitagainscore',
 'length',
 'rating_numeric',
 'class',
 'label',
 'token_text',
 'stop_tokens',
 'hash_token',
 'idf_token',
 'features']

In [21]:
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[68874,13...|
|  1.0|(262145,[13778,14...|
|  0.0|(262145,[4200,247...|
|  0.0|(262145,[98869,20...|
|  1.0|(262145,[67305,70...|
|  1.0|(262145,[12531,12...|
|  0.0|(262145,[32310,65...|
|  0.0|(262145,[111817,2...|
|  1.0|(262145,[14894,15...|
|  0.0|(262145,[23181,43...|
|  0.0|(262145,[181360,2...|
|  0.0|(262145,[240559,2...|
|  1.0|(262145,[12826,50...|
|  0.0|(262145,[126861,1...|
|  0.0|(262145,[216711,2...|
|  0.0|(262145,[42141,94...|
|  0.0|(262145,[90825,11...|
|  0.0|(262145,[42235,94...|
|  0.0|(262145,[30121,65...|
|  0.0|(262145,[6113,169...|
+-----+--------------------+
only showing top 20 rows



In [22]:
# Break data down into a training set and a testing set
(train, test) = cleaned.randomSplit([0.7, 0.3])

### NaiveBayes

In [23]:
from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes model and fit training data
nb = NaiveBayes(smoothing=1.0, modelType='multinomial')
nb_predictor = nb.fit(train)

In [24]:
# Tranform the model with the testing data
test_results = nb_predictor.transform(test)

In [25]:
test_results.select('rating','class','label','prediction').show(20)

+------+-----+-----+----------+
|rating|class|label|prediction|
+------+-----+-----+----------+
|     3|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|     4| good|  1.0|       0.0|
|     3|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|     3|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|     3|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|     4| good|  1.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|     4| good|  1.0|       0.0|
|     4| good|  1.0|       0.0|
|     3|  bad|  0.0|       0.0|
|     3|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
|   3.5|  bad|  0.0|       0.0|
+------+-----+-----+----------+
only showing top 20 rows



In [26]:
#Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print(f"Accuracy of model at predicting reviews was: {acc}")

Accuracy of model at predicting reviews was: 0.7482345312483297
