In [1]:
# %matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
# from sklearn.model_selection import train_test_split

In [2]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.sql.functions import col, udf, length
from pyspark.sql.types import IntegerType

# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [3]:
from pyspark.ml.feature import NaiveBayes

ImportError: cannot import name 'NaiveBayes' from 'pyspark.ml.feature' (C:\Users\m246172\Anaconda\lib\site-packages\pyspark\ml\feature.py)

In [4]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

## Dataset: MERGED.csv (concat of 7 beer review scrapes from ratebeer.com)

1. Read in the file containing Beer reviews.
2. Create a column that adds the length of the review as a feature.
3. Create a list of transformations to be applied in the pipeline:
   * Change positive and negative to an index.
   * Tokenize the review.
   * Filter out stop words.
   * Calculate term frequency using `HashingTF`.
   * Calculate TF–IDF.
4. Create a feature vector containing the output from the IDFModel (the last stage in the pipeline) and the length.
   * Set up the pipeline and and fit it to the data.
   * Create training and testing data.
   * Create and fit the Naive Bayes model to the training data.
   * Predict outcomes using the testing set.
   * Use `MulticlassClassificationEvaluator` to evaluate the model on the testing set.
**Hypothesis**: Can we predict NEGATIVE/POSITIVE sentiment from text of beer reviews website?


In [5]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [6]:
# Read the csv file into a pandas DataFrame

beer = pd.read_csv('MERGED.csv')
beer.head()

Unnamed: 0.1,Unnamed: 0,Beer Names,Rating,Locations,Reviews,Users
0,1,Beamish Irish Stout,5.0,"Cork, Ireland","""il solito"" al pub che frequento di più era la...",flogisto
1,2,Simpler Times Pilsner,0.6,"Monrovia, California",Jesus this is bad. Not sure what discriminates...,PhillyBeer2112
2,3,Revolution Fistmas Ale,1.9,"Chicago, Illinois","HAHA, funny name and interesting can from chal...",cheap
3,4,Samuel Adams Utopias,2.0,"Boston, Massachusetts","Mildly syrupy, but not overly sweet. Woody and...",tryeverything
4,5,Ora Limoncello IPA,1.4,"Tottenham, Greater London",På GBBF Winter 2020 i Birmingham. Delt med Run...,Finn


In [14]:
sparkbeer=spark.createDataFrame(beer.astype(str))

In [15]:
type(sparkbeer)

pyspark.sql.dataframe.DataFrame

In [16]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(sparkbeer)
cleaned = cleaner.transform(sparkbeer)

IllegalArgumentException: 'Field "class" does not exist.\nAvailable fields: Unnamed: 0, Beer Names, Rating, Locations, Reviews, Users'

In [17]:
type(beer)

pandas.core.frame.DataFrame

In [18]:
sparkbeer=spark.createDataFrame(beer.astype(str))

In [19]:
type(sparkbeer)

pyspark.sql.dataframe.DataFrame

In [20]:
sparkbeer.show()

+----------+--------------------+------+--------------------+--------------------+--------------------+
|Unnamed: 0|          Beer Names|Rating|           Locations|             Reviews|               Users|
+----------+--------------------+------+--------------------+--------------------+--------------------+
|         1| Beamish Irish Stout|   5.0|       Cork, Ireland|"il solito" al pu...|            flogisto|
|         2|Simpler Times Pil...|   0.6|Monrovia, California|Jesus this is bad...|      PhillyBeer2112|
|         3|Revolution Fistma...|   1.9|   Chicago, Illinois|HAHA, funny name ...|               cheap|
|         4|Samuel Adams Utopias|   2.0|Boston, Massachus...|Mildly syrupy, bu...|       tryeverything|
|         5|  Ora Limoncello IPA|   1.4|Tottenham, Greate...|På GBBF Winter 20...|                Finn|
|         6|            ABK Hell|   1.5| Kaufbeuren, Bavaria|Light caramel & c...|         FACambridge|
|         7|            ABK Fels|   1.1| Kaufbeuren, Bavaria|Aro

In [5]:
# Assign the data to X and y

X = beer[["Beer Names"]]
y = beer["Rating"].values.reshape(-1, 1)
print(X.shape, y.shape)

NameError: name 'beer' is not defined

In [21]:
review_data = Tokenizer(inputCol="Reviews", outputCol="words")
review_data

Tokenizer_f808b4537894

In [22]:
reviewed = review_data.transform(sparkbeer)
reviewed.show()

+----------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|Unnamed: 0|          Beer Names|Rating|           Locations|             Reviews|               Users|               words|
+----------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|         1| Beamish Irish Stout|   5.0|       Cork, Ireland|"il solito" al pu...|            flogisto|["il, solito", al...|
|         2|Simpler Times Pil...|   0.6|Monrovia, California|Jesus this is bad...|      PhillyBeer2112|[jesus, this, is,...|
|         3|Revolution Fistma...|   1.9|   Chicago, Illinois|HAHA, funny name ...|               cheap|[haha,, funny, na...|
|         4|Samuel Adams Utopias|   2.0|Boston, Massachus...|Mildly syrupy, bu...|       tryeverything|[mildly, syrupy,,...|
|         5|  Ora Limoncello IPA|   1.4|Tottenham, Greate...|På GBBF Winter 20...|                Finn|[på, gbbf, winter...|


In [10]:
def word_list_length(word_list):
    return len(word_list)

In [11]:
count_tokens = udf(word_list_length, IntegerType())
count_tokens

<function __main__.word_list_length(word_list)>

In [23]:
tokenized = tokenizer.transform(sparkbeer)

IllegalArgumentException: 'Field "text" does not exist.\nAvailable fields: Unnamed: 0, Beer Names, Rating, Locations, Reviews, Users'

In [24]:
tokenized.select("Reviews", "words")\
    .withColumn("tokens", count_tokens(col("words"))).show(truncate=False)

NameError: name 'tokenized' is not defined

In [25]:
from pyspark.sql import types
sparkbeer["Reviews"].cast(types.ArrayType(types.StringType()))

Column<b'Reviews'>

In [26]:
remover = StopWordsRemover(inputCol="words", outputCol="filteredwords")

In [27]:
type(sparkbeer)

pyspark.sql.dataframe.DataFrame

In [28]:
sparkbeer.dtypes

[('Unnamed: 0', 'string'),
 ('Beer Names', 'string'),
 ('Rating', 'string'),
 ('Locations', 'string'),
 ('Reviews', 'string'),
 ('Users', 'string')]

In [29]:
filteredSparkDF = remover.transform(reviewed)
filteredSparkDF.show()

+----------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Unnamed: 0|          Beer Names|Rating|           Locations|             Reviews|               Users|               words|       filteredwords|
+----------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|         1| Beamish Irish Stout|   5.0|       Cork, Ireland|"il solito" al pu...|            flogisto|["il, solito", al...|["il, solito", al...|
|         2|Simpler Times Pil...|   0.6|Monrovia, California|Jesus this is bad...|      PhillyBeer2112|[jesus, this, is,...|[jesus, bad., sur...|
|         3|Revolution Fistma...|   1.9|   Chicago, Illinois|HAHA, funny name ...|               cheap|[haha,, funny, na...|[haha,, funny, na...|
|         4|Samuel Adams Utopias|   2.0|Boston, Massachus...|Mildly syrupy, bu...|       tryeverything|[mildly, syrupy,,...|

In [30]:
filteredSparkDF.select("filteredwords").show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|filteredwords                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------------

In [31]:
# Run the hashing term frequency
hashing = HashingTF(inputCol="filteredwords", outputCol="hashedValues", numFeatures=pow(2,4))

In [32]:
hashed_df = hashing.transform(filteredSparkDF)

In [33]:
hashed_df.show(truncate=False)

+----------+--------------------------------------+------+-------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [34]:
hashed_df.dtypes

[('Unnamed: 0', 'string'),
 ('Beer Names', 'string'),
 ('Rating', 'string'),
 ('Locations', 'string'),
 ('Reviews', 'string'),
 ('Users', 'string'),
 ('words', 'array<string>'),
 ('filteredwords', 'array<string>'),
 ('hashedValues', 'vector')]

In [35]:
idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

In [36]:
rescaledData.select("words", "features").show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Use train_test_split to create training and testing data

### BEGIN SOLUTION
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

### END SOLUTION

In [None]:
# Create the model using LinearRegression

### BEGIN SOLUTION
from sklearn.linear_model import LinearRegression
model = LinearRegression()
### END SOLUTION

In [None]:
# Fit the model to the training data and calculate the scores for the training and testing data

### BEGIN SOLUTION
model.fit(X_train, y_train)
training_score = model.score(X_train, y_train)
testing_score = model.score(X_test, y_test)

### END SOLUTION 

print(f"Training Score: {training_score}")
print(f"Testing Score: {testing_score}")

In [None]:
# Plot the Residuals for the Training and Testing data

### BEGIN SOLUTION
plt.scatter(model.predict(X_train), model.predict(X_train) - y_train, c="blue", label="Training Data")
plt.scatter(model.predict(X_test), model.predict(X_test) - y_test, c="orange", label="Testing Data")
plt.legend()
plt.hlines(y=0, xmin=y.min(), xmax=y.max())
plt.title("Residual Plot")
### END SOLUTION