In [None]:
!pip install nltk # install nltk onto databricks
import nltk
nltk.download('stopwords') #add stopwords

import os.path
import sklearn
from pyspark.sql.types import *
from nltk.stem import LancasterStemmer
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import string
from pyspark.sql.functions import concat_ws, udf



Collecting nltk
  Downloading nltk-3.7-py3-none-any.whl (1.5 MB)
[?25l[K     |▏                               | 10 kB 6.7 MB/s eta 0:00:01[K     |▍                               | 20 kB 2.6 MB/s eta 0:00:01[K     |▋                               | 30 kB 3.8 MB/s eta 0:00:01[K     |▉                               | 40 kB 3.7 MB/s eta 0:00:01[K     |█                               | 51 kB 3.8 MB/s eta 0:00:01[K     |█▎                              | 61 kB 4.3 MB/s eta 0:00:01[K     |█▌                              | 71 kB 4.5 MB/s eta 0:00:01[K     |█▊                              | 81 kB 5.1 MB/s eta 0:00:01[K     |██                              | 92 kB 5.2 MB/s eta 0:00:01[K     |██▏                             | 102 kB 4.7 MB/s eta 0:00:01[K     |██▍                             | 112 kB 4.7 MB/s eta 0:00:01[K     |██▋                             | 122 kB 4.7 MB/s eta 0:00:01[K     |██▉                             | 133 kB 4.7 MB/s eta 0:00:01[K     |███

In [None]:
#list of words to filter out of statements
stop_words = set(stopwords.words('english'))
# stemmer to turn words into their roots
lan=LancasterStemmer()

@udf
def clean(text):
    print(text)
    text = text.lower()#convert to lowercase
    text = re.sub(r"http\S+|www\S+|https\S+|ftp\S+", '', text, flags=re.MULTILINE)#remove links
    text = re.sub('[^\w ]','',text)#remove whitespaces 
    text = text.translate(str.maketrans('', '', string.punctuation))#remove punctuation 
    tweet_tokens = word_tokenize(text,preserve_line=True)#tokenize words
    filtered_words = [lan.stem(w) for w in tweet_tokens if not w in stop_words]#stemming

    return ' '.join(filtered_words)

In [None]:
reviewsDF = spark.read.csv('/FileStore/tables/reviews-2.csv', header = True) #read in dataset
reviewsDF = reviewsDF.select('overall',concat_ws(' ',reviewsDF['summary'],reviewsDF['reviewText']).alias("text"))#combine summary and reviewText
reviewsDF = reviewsDF.withColumn('text', clean(reviewsDF['text']))
reviewsDF.show(5)

+-------+--------------------+
|overall|                text|
+-------+--------------------+
|      5|fiv star advert r...|
|      5|good fac lik od f...|
|      1|smel aw bought sm...|
|      5|tru noth lik aqu ...|
|      5|bvlgari shampoo e...|
+-------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
#tokenize words
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(reviewsDF)
#convert to vector
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
#copute IDF of rawFeatures
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
#add to RDD
reviewsRDD = rescaledData.select("overall", "features").rdd
reviewsRDD.take(1)

Out[4]: [Row(overall='5', features=SparseVector(20, {0: 0.5772, 4: 0.7366, 10: 0.3344, 15: 0.4318, 19: 0.6349}))]

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

#split data (80% train 20% test)
transformedRDD = reviewsRDD.map(lambda row: LabeledPoint(row['overall'], row['features'].toArray()))

splits = [0.8, 0.2]
training_data, test_data = transformedRDD.randomSplit(splits, 0)

print("Number of training set rows: %d" % training_data.count())
print("Number of test set rows: %d" % test_data.count())

Number of training set rows: 4196
Number of test set rows: 1073


In [None]:
from pyspark.mllib.tree import RandomForest
#Train RandomForest Model

#classes represent star ratings
#set number of trees for eval to 10 and set maxDepth to 20
#set random seed to 0 so results can be replicated
model = RandomForest.trainClassifier(sc.parallelize(training_data.collect()), numClasses=6, categoricalFeaturesInfo={}, numTrees=10, maxDepth=20, seed=0)

In [None]:
x_test = test_data.map(lambda x: x.features)
y_test = test_data.map(lambda x: x.label)
#use model to predict on test data
y_pred = model.predict(x_test)
#combine y_test and y_pred
both = list(zip(y_test.collect(),y_pred.collect()))
#evaluate accuracy based on actual (y_test) and predicted rating (y_pred)
#accuracy = sum of correct predictions / number of test points
acc = sum(1 for test, pred in both if test == pred) / float(test_data.count())
print("Model accuracy: %.2f%%" % (acc * 100))

Model accuracy: 97.39%
