In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [389]:
import pyspark as ps

spark = ps.sql.SparkSession.builder \
        .master("local[4]") \
        .appName("df lecture") \
        .getOrCreate()

sc = spark.sparkContext

In [408]:
# %load_ext autoreload
%autoreload 2
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
import nltk
from nltk.tokenize import RegexpTokenizer
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import ElasticNet
from sklearn.linear_model import Lasso
from sklearn.metrics import confusion_matrix
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
import cleaner

%matplotlib inline

In [4]:
review_df = spark.read.json('../break_week/data/dataset/review.json')
user_df = spark.read.json('../break_week/data/dataset/user.json')
business_df = spark.read.json("../break_week/data/dataset/business.json")

In [5]:
review_df.createTempView("review")
user_df.createTempView("user")
business_df.createTempView("business")

In [6]:
df = spark.sql("""SELECT new.user_name, new.user_id, new.business_id, new.friends, \
                b.name AS business_name, b.state, b.city, b.address, b.categories, b.stars AS bus_star,\
                new.text, new.stars AS review_star \
                FROM \
                    (SELECT u.name AS user_name, r.user_id, r.business_id, r.text, r.stars, u.friends \
                    FROM review AS r \
                    LEFT JOIN user AS u \
                    ON r.user_id = u.user_id) AS new\
                INNER JOIN business as b\
                ON new.business_id = b.business_id \
                WHERE ARRAY_CONTAINS(b.categories, 'Restaurants') \
                AND b.state IN ("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS", \
                                "KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY", \
                                "NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY") """)

In [7]:
df.count()

2598115

In [8]:
df_1_5 = df.filter("review_star = 1 OR review_star = 5")
df_1_5.show(2)

+---------+--------------------+--------------------+--------------------+--------------------+-----+---------+--------------------+--------------------+--------+--------------------+-----------+
|user_name|             user_id|         business_id|             friends|       business_name|state|     city|             address|          categories|bus_star|                text|review_star|
+---------+--------------------+--------------------+--------------------+--------------------+-----+---------+--------------------+--------------------+--------+--------------------+-----------+
|   Justin|0y8ORuC2X1i1UF6SG...|--9e1ONYQuAa-CB_R...|[sf-8AusztxHc4o5b...|Delmonico Steakhouse|   NV|Las Vegas|3355 Las Vegas Bl...|[Cajun/Creole, St...|     4.0|WOW.

I came to V...|          5|
|        J|A4GnBOU7ZCTcoQK4e...|--9e1ONYQuAa-CB_R...|[MGPQVLsODMm9ZtYQ...|Delmonico Steakhouse|   NV|Las Vegas|3355 Las Vegas Bl...|[Cajun/Creole, St...|     4.0|This restaurant i...|          5|
+---------+---------

In [66]:
df_1_5.count()

1333392

In [9]:
df_1_5.select("state").groupBy("state").count().show(50,False)

+-----+------+
|state|count |
+-----+------+
|AZ   |492259|
|SC   |3481  |
|VA   |1     |
|NV   |543049|
|WI   |32850 |
|CA   |2     |
|NC   |95105 |
|IL   |11183 |
|IN   |12    |
|OH   |82870 |
|PA   |72525 |
|NY   |35    |
|CO   |4     |
|AK   |16    |
+-----+------+



In [10]:
df1 = df_1_5.where("state = 'WI'").select(["text", "review_star"])

In [11]:
# tokenizer = Tokenizer(inputCol="text", outputCol="words")
# wordsData = tokenizer.transform(df1)

In [12]:
df_wi = df1.toPandas()

In [13]:
df_wi.head(10)

Unnamed: 0,text,review_star
0,Enjoyed a delicious meal with family on Friday...,5
1,Had a great time with family at this cool plac...,5
2,Came several times with my friends. Very good ...,5
3,Very disappointed. It used to be one my favori...,1
4,My favorite place to have chicken wings! Ike t...,5
5,Location is perfect is u r shopping or after a...,5
6,Newly opened Chinese home style cuisine. Great...,5
7,Great Belgian restaurant. Had mussels and frie...,5
8,One of the best and authentic Chinese restaura...,5
9,Love this place! Authentic and fresh dishes! T...,5


In [14]:
corpus = df_wi["text"]

In [331]:
corpus[3]

"Very disappointed. It used to be one my favorite restaurants in the town: fresh food, reasonable price and the freedom to make my own bowl! Now they changed their system so their ppl make your bowl. AND the female server who made my bowl, on oct 26, was very rude! She was rushing and making sure I didn't get too much of the food! I'm never coming back again. I will also spread the words to my friends not to come.\n\nPs: they charged for extra $2 for getting proteins, which I didn't know until I paid!! And this was very invisible on the menu!"

In [332]:
sw = set(stopwords.words("english"))
# sw.update(["i", "and", "i'm", "she", "he"])
tokenizer = RegexpTokenizer("[\w']+")
st = PorterStemmer()
lemma = WordNetLemmatizer()

In [333]:
cleaned = cleaner.clean_stem(corpus, tokenizer, lemma, sw)

In [336]:
cleaned[3]

['very',
 'disappointed',
 'it',
 'used',
 'one',
 'favorite',
 'restaurant',
 'town',
 'fresh',
 'food',
 'reasonable',
 'price',
 'freedom',
 'make',
 'bowl',
 'now',
 'changed',
 'system',
 'ppl',
 'make',
 'bowl',
 'and',
 'female',
 'server',
 'made',
 'bowl',
 'oct',
 'rude',
 'she',
 'rushing',
 'making',
 'sure',
 'i',
 'get',
 'much',
 'food',
 "i'm",
 'never',
 'coming',
 'back',
 'i',
 'also',
 'spread',
 'word',
 'friend',
 'come',
 'p',
 'charged',
 'extra',
 'getting',
 'protein',
 'i',
 'know',
 'i',
 'paid',
 'and',
 'invisible',
 'menu']

In [59]:
tfidf_vectorizer= TfidfVectorizer(max_df=0.95, min_df=2,
                                stop_words='english')

tfidf = tfidf_vectorizer.fit_transform(cleaned)

In [60]:
X = tfidf.toarray()
X.shape

(32850, 18102)

In [61]:
y = df_wi["review_star"]

In [62]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25)

In [63]:
# nb = GaussianNB()
# nb.fit(X_train, y_train)

### LogisticRegression Linear model

In [70]:
lreg = LogisticRegression()
lreg.fit(X_train, y_train)

LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False)

In [124]:
y_pred = lreg.predict(X_test)

In [416]:
#Estimating evaluation metrics
matrix, recall, precision, accuracy = cleaner.metrics(y_test=y_test, y_predict=y_pred)

In [417]:
#confusion matrix
matrix

array([[6434,  219],
       [  50, 1510]])

In [418]:
print("Recall: {}%".format(round(recall*100, 2)))
print("Precision: {}%".format(round(precision*100, 2)))
print("Accuracy: {}%".format(round(accuracy*100, 2)))

Recall: 99.23%
Precision: 96.71%
Accuracy: 96.72%


In [72]:
terms = tfidf_vectorizer.get_feature_names()

In [73]:
cleaner.show_topics(lreg.coef_, terms, length=40)

Topic 1: ['great', 'delicious', 'amazing', 'best', 'excellent', 'love', 'awesome', 'favorite', 'friendly', 'perfect', 'fantastic', 'madison', 'definitely', 'loved', 'wonderful', 'good', 'happy', 'fresh', 'nice', 'perfectly', 'highly', 'tasty', 'outstanding', 've', 'enjoyed', 'fun', 'little', 'attentive', 'fast', 'super', 'yummy', 'incredible', 'thank', 'flavorful', 'bit', 'helpful', 'yum', 'try', 'notch', 'reasonable']


In [74]:
cleaner.show_topics(lreg.coef_, terms, length=40)

Topic 1: ['worst', 'terrible', 'horrible', 'bland', 'awful', 'rude', 'disappointing', 'poor', 'mediocre', 'bad', 'minute', 'told', 'ok', 'dry', 'asked', 'disgusting', 'tasted', 'cold', 'overpriced', 'tasteless', 'money', 'slow', 'dirty', 'ordered', 'gross', 'worse', 'customer', 'charged', 'frozen', 'left', 'flavorless', 'management', 'said', 'disappointment', 'waste', 'unfortunately', 'soggy', 'waited', 'barely', 'sorry']


In [75]:
df_1_5.select("review_star").groupBy("review_star").count().show()

+-----------+-------+
|review_star|  count|
+-----------+-------+
|          5|1031519|
|          1| 301873|
+-----------+-------+



In [259]:
pos_terms, neg_terms = cleaner.show_topics(lreg.coef_, terms, length=40)

In [260]:
pos_terms

[('great', 9.6339199219398921),
 ('delicious', 9.5086695708994373),
 ('amazing', 7.9170599102536379),
 ('best', 7.3589483682208217),
 ('excellent', 6.6895446573983914),
 ('love', 6.3849393517080557),
 ('awesome', 5.6882532659470213),
 ('favorite', 5.6378331048222989),
 ('friendly', 5.3344655463092208),
 ('perfect', 5.1487301606386211),
 ('fantastic', 4.9271576211302959),
 ('madison', 4.5292909003664485),
 ('definitely', 4.486343438415247),
 ('loved', 4.0306943032514306),
 ('wonderful', 3.9535371632915139),
 ('good', 3.8106663126652971),
 ('happy', 3.7261373823249087),
 ('fresh', 3.4802625832525842),
 ('nice', 3.3065880195360347),
 ('perfectly', 3.0823391451468418),
 ('highly', 2.9778623076666277),
 ('tasty', 2.9082167789912066),
 ('outstanding', 2.8545282365526417),
 ('ve', 2.8206297853641185),
 ('enjoyed', 2.6887189898520636),
 ('fun', 2.596468899456712),
 ('little', 2.5549889693513079),
 ('attentive', 2.5345851110853932),
 ('fast', 2.4638339969770398),
 ('super', 2.3439106507574947),

In [261]:
neg_terms

[('worst', -7.8428487355547984),
 ('terrible', -6.3079030915106147),
 ('horrible', -5.751475018400706),
 ('bland', -5.1065524302327372),
 ('awful', -4.8790553173531208),
 ('rude', -4.6553109680826168),
 ('disappointing', -4.5527040275278674),
 ('poor', -4.5292789281254802),
 ('mediocre', -4.5238552456152945),
 ('bad', -4.4682088611261186),
 ('minute', -4.313426659289445),
 ('told', -3.9543664624267665),
 ('ok', -3.858301806403829),
 ('dry', -3.8292530674849785),
 ('asked', -3.5862004447195974),
 ('disgusting', -3.5675109238113314),
 ('tasted', -3.536419801643381),
 ('cold', -3.448647333203243),
 ('overpriced', -3.3473641044353819),
 ('tasteless', -3.3292811357064571),
 ('money', -3.2726399181447468),
 ('slow', -3.2396165759347912),
 ('dirty', -3.2147656019520117),
 ('ordered', -3.2110760494267212),
 ('gross', -3.1794458033762432),
 ('worse', -3.1079818464443512),
 ('customer', -2.9925095943697189),
 ('charged', -2.9689180757806759),
 ('frozen', -2.8625362889855053),
 ('left', -2.862506

### SPARK DATAFRAME PIPELINE

In [618]:
df_spk = df1.limit(7000)
df_spk.show(2)

+--------------------+-----------+
|                text|review_star|
+--------------------+-----------+
|Enjoyed a delicio...|          5|
|Had a great time ...|          5|
+--------------------+-----------+
only showing top 2 rows



In [628]:
df_test, df_train = df_spk.randomSplit(weights=[0.3, 0.7], seed=1)

In [629]:
print("Number of rows for trainset: {}".format(df_train.count()))
print("Number of rows for testset: {}".format(df_test.count()))

Number of rows for trainset: 4876
Number of rows for testset: 2124


In [630]:
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized = regex_tokenizer.transform(df_train)

In [631]:
countTokens = udf(lambda words: len(words), IntegerType())

In [632]:
tokenized_df = tokenized.select("text", "words", "review_star") \
    .withColumn("tokens", countTokens(col("words")))

In [633]:
stops_removed_df = remover.transform(tokenized_df)
stops_removed_df.show(3, truncate=True)

+--------------------+--------------------+-----------+------+--------------------+
|                text|               words|review_star|tokens|            filtered|
+--------------------+--------------------+-----------+------+--------------------+
|"A once in a life...|[a, once, in, a, ...|          5|   180|[lifetime, experi...|
|"Forrest, what's ...|[forrest, what, s...|          5|   304|[forrest, going, ...|
|"Garbage baked in...|[garbage, baked, ...|          1|   210|[garbage, baked, ...|
+--------------------+--------------------+-----------+------+--------------------+
only showing top 3 rows



In [634]:
input_df = stops_removed_df.selectExpr("filtered", "review_star as label")
input_df.show(1)

+--------------------+-----+
|            filtered|label|
+--------------------+-----+
|[lifetime, experi...|    5|
+--------------------+-----+
only showing top 1 row



In [337]:
input_df.select("filtered").show(4, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|filtered                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [635]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
featurizedData = hashingTF.transform(input_df)

In [640]:
featurizedData.show(1)

+--------------------+-----+--------------------+
|            filtered|label|         rawFeatures|
+--------------------+-----+--------------------+
|[lifetime, experi...|    5|(262144,[8443,880...|
+--------------------+-----+--------------------+
only showing top 1 row



In [641]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
tfidfModel = idf.fit(featurizedData).transform(featurizedData)

Py4JJavaError: An error occurred while calling o5957.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 956.0 failed 1 times, most recent failure: Lost task 0.0 in stage 956.0 (TID 24925, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:92)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [495]:
tfidfModel.show(2, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [496]:
tfidfModel.select("features").show(1, False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                                        

In [None]:
lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(tfidfModel)

In [498]:
#testing the model

In [499]:
# df_test = df1.subtract(df_train).limit(1250)
# df_test.show(1)

+--------------------+-----------+
|                text|review_star|
+--------------------+-----------+
|I was visiting Ma...|          5|
+--------------------+-----------+
only showing top 1 row



### Building test TFIDF matrix

In [637]:
tokenized_test = regex_tokenizer.transform(df_test)


In [638]:
countTokens = udf(lambda words: len(words), IntegerType())

In [632]:
tokenized_df = tokenized_test.select("text", "words", "review_star") \
    .withColumn("tokens", countTokens(col("words")))

In [633]:
stops_removed_df = remover.transform(tokenized_df)
stops_removed_df.show(3, truncate=True)

+--------------------+--------------------+-----------+------+--------------------+
|                text|               words|review_star|tokens|            filtered|
+--------------------+--------------------+-----------+------+--------------------+
|"A once in a life...|[a, once, in, a, ...|          5|   180|[lifetime, experi...|
|"Forrest, what's ...|[forrest, what, s...|          5|   304|[forrest, going, ...|
|"Garbage baked in...|[garbage, baked, ...|          1|   210|[garbage, baked, ...|
+--------------------+--------------------+-----------+------+--------------------+
only showing top 3 rows



In [500]:
input_df_test = stops_removed_df_test.selectExpr("filtered", "review_star as label")
featurizedData_test = hashingTF.transform(input_df_test)
tfidfModel_test = idf.fit(featurizedData_test).transform(featurizedData_test)

In [501]:
prediction = model.transform(tfidfModel_test)

In [502]:
prediction.where(prediction.prediction != prediction.label).show()

+--------+-----+-----------+--------+-------------+-----------+----------+
|filtered|label|rawFeatures|features|rawPrediction|probability|prediction|
+--------+-----+-----------+--------+-------------+-----------+----------+
+--------+-----+-----------+--------+-------------+-----------+----------+



In [503]:
prediction.show(1)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|            filtered|label|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|[enjoyed, delicio...|    5|(262144,[24113,27...|(262144,[24113,27...|[-2.5715539192712...|[8.65336402638732...|       5.0|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
only showing top 1 row



In [505]:
df_pred = prediction.select(["prediction", "label"]).createTempView("predictions2")

In [506]:
metric_df = spark.sql("SELECT SUM(CASE WHEN prediction = 5 AND label = 5 THEN 1 ELSE 0 END) AS tp, \
                              SUM(CASE WHEN prediction = 1 AND label = 1 THEN 1 ELSE 0 END) AS tn, \
                              SUM(CASE WHEN prediction = 5 AND label = 1 THEN 1 ELSE 0 END) AS fp, \
                              SUM(CASE WHEN prediction = 1 AND label = 5 THEN 1 ELSE 0 END) AS fn \
                      FROM predictions2")

In [507]:
metric_df.show()

+----+----+---+---+
|  tp|  tn| fp| fn|
+----+----+---+---+
|3930|1070|  0|  0|
+----+----+---+---+



In [508]:
matrix, recall, precision, accuracy = cleaner.metrics(df=metric_df)

In [509]:
#confusion matrix
matrix

array([[3930,    0],
       [   0, 1070]])

In [510]:
print("Recall: {}%".format(round(recall*100, 2)))
print("Precision: {}%".format(round(precision*100, 2)))
print("Accuracy: {}%".format(round(accuracy*100, 2)))

Recall: 100.0%
Precision: 100.0%
Accuracy: 100.0%


In [514]:
beta = model.coefficientMatrix.values

In [515]:
len(beta)

90906

### CountVectorizer

In [516]:
cv = CountVectorizer(inputCol="filtered", outputCol="features", minDF=2.0)
model_cv = cv.fit(input_df)
result = model_cv.transform(input_df)

In [517]:
len(model_cv.vocabulary)

8546

In [518]:
lr_model_cv = lr.fit(result)

In [568]:
input_df_test.show(1)

+--------------------+-----+
|            filtered|label|
+--------------------+-----+
|[enjoyed, delicio...|    5|
+--------------------+-----+
only showing top 1 row



In [569]:
#dataframe without labels
input_df_test_cv = input_df_test.select("filtered")

In [576]:
model_cv = cv.fit(input_df_test)
result_test = model_cv.transform(input_df_test)
prediction = lr_model_cv.transform(result_test)

In [577]:
prediction.where(prediction.prediction != prediction.label).show()

+--------+-----+--------+-------------+-----------+----------+
|filtered|label|features|rawPrediction|probability|prediction|
+--------+-----+--------+-------------+-----------+----------+
+--------+-----+--------+-------------+-----------+----------+



In [572]:
len(model_cv.vocabulary)

8546

In [578]:
# prediction = prediction.join(input_df_test, ["filtered"], "label")

In [579]:
df_pred = prediction.select(["prediction", "label"]).createTempView("predictions4")

In [580]:
metric_df = spark.sql("SELECT SUM(CASE WHEN prediction = 5 AND label = 5 THEN 1 ELSE 0 END) AS tp, \
                              SUM(CASE WHEN prediction = 1 AND label = 1 THEN 1 ELSE 0 END) AS tn, \
                              SUM(CASE WHEN prediction = 5 AND label = 1 THEN 1 ELSE 0 END) AS fp, \
                              SUM(CASE WHEN prediction = 1 AND label = 5 THEN 1 ELSE 0 END) AS fn \
                      FROM predictions4")

In [581]:
metric_df.show()

+----+----+---+---+
|  tp|  tn| fp| fn|
+----+----+---+---+
|3930|1070|  0|  0|
+----+----+---+---+



In [527]:
matrix, recall, precision, accuracy = cleaner.metrics(df=metric_df)

In [528]:
#confusion matrix
matrix

array([[3930,    0],
       [   0, 1070]])

In [616]:
lr_model_cv.coefficientMatrix.toSparse


<bound method DenseMatrix.toSparse of DenseMatrix(6, 8546, [-0.0014, -0.0013, -0.0015, -0.0012, -0.002, -0.0009, -0.001, -0.0014, ..., 0.1127, -0.0709, -0.0482, 0.1296, 0.2178, -0.6063, -0.0029, 0.1646], 1)>

In [617]:
lr_model_cv.coefficientMatrix

DenseMatrix(6, 8546, [-0.0014, -0.0013, -0.0015, -0.0012, -0.002, -0.0009, -0.001, -0.0014, ..., 0.1127, -0.0709, -0.0482, 0.1296, 0.2178, -0.6063, -0.0029, 0.1646], 1)

In [530]:
print("Recall: {}%".format(round(recall*100, 2)))
print("Precision: {}%".format(round(precision*100, 2)))
print("Accuracy: {}%".format(round(accuracy*100, 2)))

Recall: 100.0%
Precision: 100.0%
Accuracy: 100.0%


In [534]:
model_cv.vocabulary

['food',
 'great',
 'place',
 'good',
 'service',
 'one',
 'like',
 'madison',
 'time',
 'back',
 've',
 'go',
 'get',
 'best',
 'also',
 'restaurant',
 'really',
 'delicious',
 'ordered',
 'menu',
 'well',
 'us',
 'love',
 'order',
 'cheese',
 'got',
 'pizza',
 'always',
 'even',
 'amazing',
 'friendly',
 'staff',
 'nice',
 'm',
 'chicken',
 'never',
 'try',
 'came',
 'bar',
 'ever',
 'definitely',
 'first',
 'made',
 'dinner',
 'went',
 'wait',
 'sauce',
 'eat',
 'come',
 'people',
 'night',
 'favorite',
 'excellent',
 'little',
 'fresh',
 'make',
 'much',
 're',
 'lunch',
 'table',
 'beer',
 'didn',
 'everything',
 'two',
 'meal',
 'know',
 'experience',
 'recommend',
 'every',
 'minutes',
 'salad',
 'going',
 'way',
 'atmosphere',
 '5',
 'better',
 'coffee',
 'drinks',
 'times',
 'new',
 'sandwich',
 'burger',
 'perfect',
 'tried',
 'right',
 'want',
 'side',
 '2',
 'small',
 'sure',
 'pretty',
 'many',
 'take',
 'awesome',
 'say',
 'day',
 'breakfast',
 'think',
 'area',
 'said',
