In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = "spark-3.2.0"
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease [15.9 kB]
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hashing").getOrCreate()

In [None]:
# Imports
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# Read in CSV
from pyspark import SparkFiles
df = spark.read.csv(SparkFiles.get("/content/IMDB Dataset.csv"),sep=",", escape='"', encoding="utf-8", quote='"',  header=True)

# Show DataFrame
df.show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Check number of rows and columns 
row = df.count()
col = len(df.columns)

print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (50000, 2)
Number of Rows are: 50000
Number of Columns are: 2


In [None]:
# Tokenize DataFrame
tokened = Tokenizer(inputCol="review", outputCol="words")
tokened_transformed = tokened.transform(df)
tokened_transformed.show()

row = tokened_transformed.count()
col = len(tokened_transformed.columns)

print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

+--------------------+---------+--------------------+
|              review|sentiment|               words|
+--------------------+---------+--------------------+
|One of the other ...| positive|[one, of, the, ot...|
|A wonderful littl...| positive|[a, wonderful, li...|
|I thought this wa...| positive|[i, thought, this...|
|Basically there's...| negative|[basically, there...|
|Petter Mattei's "...| positive|[petter, mattei's...|
|Probably my all-t...| positive|[probably, my, al...|
|I sure would like...| positive|[i, sure, would, ...|
|This show was an ...| negative|[this, show, was,...|
|Encouraged by the...| negative|[encouraged, by, ...|
|If you like origi...| positive|[if, you, like, o...|
|Phil the Alien is...| negative|[phil, the, alien...|
|I saw this movie ...| negative|[i, saw, this, mo...|
|So im not a big f...| negative|[so, im, not, a, ...|
|The cast played S...| negative|[the, cast, playe...|
|This a fantastic ...| positive|[this, a, fantast...|
|Kind of drawn in ...| negat

In [None]:
# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="Wordsfiltered")
removed_frame = remover.transform(tokened_transformed)
removed_frame.show()

row = removed_frame.count()
col = len(removed_frame.columns)

print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

+--------------------+---------+--------------------+--------------------+
|              review|sentiment|               words|       Wordsfiltered|
+--------------------+---------+--------------------+--------------------+
|One of the other ...| positive|[one, of, the, ot...|[one, reviewers, ...|
|A wonderful littl...| positive|[a, wonderful, li...|[wonderful, littl...|
|I thought this wa...| positive|[i, thought, this...|[thought, wonderf...|
|Basically there's...| negative|[basically, there...|[basically, famil...|
|Petter Mattei's "...| positive|[petter, mattei's...|[petter, mattei's...|
|Probably my all-t...| positive|[probably, my, al...|[probably, all-ti...|
|I sure would like...| positive|[i, sure, would, ...|[sure, like, see,...|
|This show was an ...| negative|[this, show, was,...|[show, amazing,, ...|
|Encouraged by the...| negative|[encouraged, by, ...|[encouraged, posi...|
|If you like origi...| positive|[if, you, like, o...|[like, original, ...|
|Phil the Alien is...| ne

In [None]:
# Run the hashing term frequency
hashing = HashingTF(inputCol="Wordsfiltered", outputCol="hashedValues")

# Transform into a DF
hashed_df = hashing.transform(removed_frame)
hashed_df.show()

row = hashed_df.count()
col = len(hashed_df.columns)

print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

+--------------------+---------+--------------------+--------------------+--------------------+
|              review|sentiment|               words|       Wordsfiltered|        hashedValues|
+--------------------+---------+--------------------+--------------------+--------------------+
|One of the other ...| positive|[one, of, the, ot...|[one, reviewers, ...|(262144,[3280,436...|
|A wonderful littl...| positive|[a, wonderful, li...|[wonderful, littl...|(262144,[120,521,...|
|I thought this wa...| positive|[i, thought, this...|[thought, wonderf...|(262144,[1043,139...|
|Basically there's...| negative|[basically, there...|[basically, famil...|(262144,[6512,853...|
|Petter Mattei's "...| positive|[petter, mattei's...|[petter, mattei's...|(262144,[2751,392...|
|Probably my all-t...| positive|[probably, my, al...|[probably, all-ti...|(262144,[5381,158...|
|I sure would like...| positive|[i, sure, would, ...|[sure, like, see,...|(262144,[1889,545...|
|This show was an ...| negative|[this, s

In [None]:
# Train test split
training, testing = df.randomSplit([0.7, 0.3],1)

In [None]:
print("Training Dataset Count: " + str(df.count()))
print("Training Dataset Count: " + str(training.count()))
print("Test Dataset Count: " + str(testing.count()))

Training Dataset Count: 50000
Training Dataset Count: 35094
Test Dataset Count: 14906


In [None]:
# Show training data 
training.show()

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|\b\b\b\bA Turkish...| positive|
|!!!! MILD SPOILER...| negative|
|!!!! MILD SPOILER...| negative|
|!!!! POSSIBLE MIL...| negative|
|" While sporadica...| negative|
|"... the beat is ...| positive|
|"2001: A Space Od...| positive|
|"200l: A Space Od...| positive|
|"8 SIMPLE RULES.....| positive|
|"9/11," hosted by...| positive|
|"A Cry in the Dar...| positive|
|"A Tale of Two Si...| positive|
|"A Thief in the N...| positive|
|"A bored televisi...| negative|
|"A death at a col...| negative|
|"A lot of the fil...| negative|
|"A research scien...| negative|
|"A total waste of...| negative|
|"A trio of treasu...| negative|
|"A truly nice sto...| positive|
+--------------------+---------+
only showing top 20 rows



In [None]:
# Find the data types
training.dtypes


[('review', 'string'), ('sentiment', 'string')]

In [None]:
# Imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [None]:
# LOGISTIC REGRESSION MODEL 

# Create all the steps for the pipeline
label_indexer = StringIndexer(inputCol='sentiment',outputCol='label')
# code to change positive sentiment to 1 values - stringOrderType="frequencyAsc"
tokenizer = Tokenizer(inputCol="review", outputCol="Wordsfiltered")
stopremove = StopWordsRemover(inputCol='Wordsfiltered',outputCol='hashedValues')
hashingTF = HashingTF(inputCol="hashedValues", outputCol='features')
lr = LogisticRegression(maxIter=20, regParam=0.001)

# Define pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, stopremove, hashingTF, lr])

# Fit the pipeline to training reviews.
lrmodel = pipeline.fit(training)

# Tranform the model with the testing data
predictions_lr = lrmodel.transform(testing)

predictions_lr.filter(predictions_lr['label'] == 0) \
    .select("review","Wordsfiltered","features","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

# Evaluate Logistic Regression model
f1_eval = MulticlassClassificationEvaluator(metricName='f1',predictionCol="prediction")
print("Logistic Regression F1 Score: ", f1_eval.evaluate(predictions_lr))
accuracy_score = MulticlassClassificationEvaluator(metricName='accuracy',predictionCol="prediction")
print("Logistic Regression Accuracy: ", accuracy_score.evaluate(predictions_lr))

+------------------------------+------------------------------+------------------------------+------------------------------+-----+----------+
|                        review|                 Wordsfiltered|                      features|                   probability|label|prediction|
+------------------------------+------------------------------+------------------------------+------------------------------+-----+----------+
|By now you've probably hear...|[by, now, you've, probably,...|(262144,[432,921,1189,2325,...|                     [1.0,0.0]|  0.0|       0.0|
|In a style reminiscent of t...|[in, a, style, reminiscent,...|(262144,[303,437,666,861,18...|[0.9999999999999996,4.44089...|  0.0|       0.0|
|Life is comprised of infini...|[life, is, comprised, of, i...|(262144,[303,2705,2977,3176...|[0.9999999999999951,4.88498...|  0.0|       0.0|
|Universal Studios version o...|[universal, studios, versio...|(262144,[3924,3928,5942,596...|[0.9999999999999867,1.33226...|  0.0|       0.0|

In [None]:
### RANDOM FOREST MODEL 

# Create all the steps for the pipeline
label_indexer = StringIndexer(inputCol='sentiment',outputCol='label')
tokenizer = Tokenizer(inputCol="review", outputCol="Wordsfiltered")
stopremove = StopWordsRemover(inputCol='Wordsfiltered',outputCol='hashedValues')
hashingTF = HashingTF(inputCol="hashedValues", outputCol='features')
rf = RandomForestClassifier()

# Define pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, stopremove, hashingTF, rf])

# Fit the pipeline to training reviews.
rfmodel = pipeline.fit(training)

# Tranform the model with the testing data
predictions_rf = rfmodel.transform(testing)

predictions_rf.filter(predictions_rf['label'] == 0) \
    .select("review","Wordsfiltered","features","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

# Evaluate Random Forest model
f1_eval = MulticlassClassificationEvaluator(metricName='f1',predictionCol="prediction")
print("Random Forest F1 Score: ", f1_eval.evaluate(predictions_rf))
accuracy_score = MulticlassClassificationEvaluator(metricName='accuracy',predictionCol="prediction")
print("Random Forest Accuracy: ", accuracy_score.evaluate(predictions_rf))

+------------------------------+------------------------------+------------------------------+------------------------------+-----+----------+
|                        review|                 Wordsfiltered|                      features|                   probability|label|prediction|
+------------------------------+------------------------------+------------------------------+------------------------------+-----+----------+
|When people nowadays hear o...|[when, people, nowadays, he...|(262144,[1891,2306,3121,356...|[0.5923609418468325,0.40763...|  0.0|       0.0|
|The best of the seven Sam F...|[the, best, of, the, seven,...|(262144,[2366,6034,7589,840...|[0.5885786355466045,0.41142...|  0.0|       0.0|
|It's been a long time since...|[it's, been, a, long, time,...|(262144,[445,844,999,3048,4...|[0.5862893296639776,0.41371...|  0.0|       0.0|
|Way back in 1955, the Briti...|[way, back, in, 1955,, the,...|(262144,[766,991,1889,2977,...|[0.58393075994923,0.4160692...|  0.0|       0.0|

In [None]:
#Import naive
from pyspark.ml.classification import NaiveBayes

In [None]:
### NAIVE BAYES MODEL

# Create all the steps for the pipeline
label_indexer = StringIndexer(inputCol='sentiment',outputCol='label')
tokenizer = Tokenizer(inputCol="review", outputCol="Wordsfiltered")
stopremove = StopWordsRemover(inputCol='Wordsfiltered',outputCol='hashedValues')
hashingTF = HashingTF(inputCol="hashedValues", outputCol='features')
nb = NaiveBayes(smoothing=1)

# Define pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, stopremove, hashingTF, nb])

# Fit the pipeline to training reviews.
nbmodel = pipeline.fit(training)

# Tranform the model with the testing data
predictions_nb = nbmodel.transform(testing)

predictions_nb.filter(predictions_nb['label'] == 0) \
    .select("review","Wordsfiltered","features","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

# Evaluate Naive Bayes model
f1_eval = MulticlassClassificationEvaluator(metricName='f1',predictionCol="prediction")
print("Naive Bayes F1 Score: ", f1_eval.evaluate(predictions_nb))
accuracy_score = MulticlassClassificationEvaluator(metricName='accuracy',predictionCol="prediction")
print("Naive Bayes Accuracy: ", accuracy_score.evaluate(predictions_nb))

+------------------------------+------------------------------+------------------------------+----------------------------+-----+----------+
|                        review|                 Wordsfiltered|                      features|                 probability|label|prediction|
+------------------------------+------------------------------+------------------------------+----------------------------+-----+----------+
|Billy Hughes is a mute youn...|[billy, hughes, is, a, mute...|(262144,[1880,2705,4210,844...|[1.0,1.1001633128428086E-16]|  0.0|       0.0|
|This small John Ford wester...|[this, small, john, ford, w...|(262144,[5381,6558,7433,762...|[1.0,1.1000321771336301E-16]|  0.0|       0.0|
|Back in 2004 I saw "True", ...|[back, in, 2004, i, saw, "t...|(262144,[2306,3785,4798,503...|[1.0,1.0910169090802258E-16]|  0.0|       0.0|
|The third collaboration for...|[the, third, collaboration,...|(262144,[2626,4093,6501,840...|[1.0,1.0885433043067308E-16]|  0.0|       0.0|
|Minor Spoile

In [None]:
# Imports
from pyspark.ml.classification import LinearSVC

In [None]:
### SVM MODEL 

# Create all the steps for the pipeline
label_indexer = StringIndexer(inputCol='sentiment',outputCol='label')
tokenizer = Tokenizer(inputCol="review", outputCol="Wordsfiltered")
stopremove = StopWordsRemover(inputCol='Wordsfiltered',outputCol='hashedValues')
hashingTF = HashingTF(inputCol="hashedValues", outputCol='features')
lsvc = LinearSVC()

# Define pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, stopremove, hashingTF, lsvc])

# Fit the pipeline to training reviews.
lsvcmodel = pipeline.fit(training)

# Tranform the model with the testing data
predictions_svm = lsvcmodel.transform(testing)

predictions_svm.filter(predictions_svm['label'] == 0) \
    .select("review","Wordsfiltered","features","label","prediction") \
    .show(n = 10, truncate = 30)

# Evaluate Logistic Regression model
f1_eval = MulticlassClassificationEvaluator(metricName='f1',predictionCol="prediction")
print("SVM F1 Score: ", f1_eval.evaluate(predictions_svm))
accuracy_score = MulticlassClassificationEvaluator(metricName='accuracy',predictionCol="prediction")
print("SVM Accuracy: ", accuracy_score.evaluate(predictions_svm))

+------------------------------+------------------------------+------------------------------+-----+----------+
|                        review|                 Wordsfiltered|                      features|label|prediction|
+------------------------------+------------------------------+------------------------------+-----+----------+
|" Now in India's sunny 'cli...|[", now, in, india's, sunny...|(262144,[535,1765,2701,7625...|  0.0|       0.0|
|" Så som i himmelen " .. as...|[", så, som, i, himmelen, "...|(262144,[5150,8538,12716,15...|  0.0|       0.0|
|"A Guy Thing" may not be a ...|["a, guy, thing", may, not,...|(262144,[6690,10077,13020,1...|  0.0|       0.0|
|"A Minute to Pray, A Second...|["a, minute, to, pray,, a, ...|(262144,[2701,6699,7136,902...|  0.0|       0.0|
|"A Mouse in the House" is a...|["a, mouse, in, the, house"...|(262144,[9747,10172,16259,1...|  0.0|       0.0|
|"A Slight Case of Murder" i...|["a, slight, case, of, murd...|(262144,[4757,5429,8538,151...|  0.0|    

In [None]:
# Cross validation for SVM model
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Define pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, stopremove, hashingTF, lsvc])

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=ParamGridBuilder().build(),
                    evaluator=MulticlassClassificationEvaluator(metricName="accuracy"),
                    numFolds=5)
model_svc = cv.fit(training)
# Tranform the model with the testing data
predictions = model_svc.transform(testing)
predictions.filter(predictions['label'] == 0) \
    .select("review","Wordsfiltered","features","label","prediction") \
    .show(n = 10, truncate = 30)


+------------------------------+------------------------------+------------------------------+-----+----------+
|                        review|                 Wordsfiltered|                      features|label|prediction|
+------------------------------+------------------------------+------------------------------+-----+----------+
|" Now in India's sunny 'cli...|[", now, in, india's, sunny...|(262144,[535,1765,2701,7625...|  0.0|       0.0|
|" Så som i himmelen " .. as...|[", så, som, i, himmelen, "...|(262144,[5150,8538,12716,15...|  0.0|       0.0|
|"A Guy Thing" may not be a ...|["a, guy, thing", may, not,...|(262144,[6690,10077,13020,1...|  0.0|       0.0|
|"A Minute to Pray, A Second...|["a, minute, to, pray,, a, ...|(262144,[2701,6699,7136,902...|  0.0|       0.0|
|"A Mouse in the House" is a...|["a, mouse, in, the, house"...|(262144,[9747,10172,16259,1...|  0.0|       0.0|
|"A Slight Case of Murder" i...|["a, slight, case, of, murd...|(262144,[4757,5429,8538,151...|  0.0|    

In [None]:
# Computing metrics
avgMetricsGrid_svc = model_svc.avgMetrics
print (avgMetricsGrid_svc)

[0.8723427748328098]


In [None]:
# SVM accuracy
modelAcc_svc = max(avgMetricsGrid_svc)
print("accuracy for this grid ", modelAcc_svc)

accuracy for this grid  0.8723427748328098


In [None]:
# Best model
model1 = model_svc.bestModel
model1.transform(testing)

DataFrame[review: string, sentiment: string, label: double, Wordsfiltered: array<string>, hashedValues: array<string>, features: vector, rawPrediction: vector, prediction: double]

In [None]:
# Prediction for the best model
predictions_best = model1.transform(testing)
predictions_best.filter(predictions_best['label'] == 0) \
    .select("review","Wordsfiltered","features","label","prediction") \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+------------------------------+-----+----------+
|                        review|                 Wordsfiltered|                      features|label|prediction|
+------------------------------+------------------------------+------------------------------+-----+----------+
|" Now in India's sunny 'cli...|[", now, in, india's, sunny...|(262144,[535,1765,2701,7625...|  0.0|       0.0|
|" Så som i himmelen " .. as...|[", så, som, i, himmelen, "...|(262144,[5150,8538,12716,15...|  0.0|       0.0|
|"A Guy Thing" may not be a ...|["a, guy, thing", may, not,...|(262144,[6690,10077,13020,1...|  0.0|       0.0|
|"A Minute to Pray, A Second...|["a, minute, to, pray,, a, ...|(262144,[2701,6699,7136,902...|  0.0|       0.0|
|"A Mouse in the House" is a...|["a, mouse, in, the, house"...|(262144,[9747,10172,16259,1...|  0.0|       0.0|
|"A Slight Case of Murder" i...|["a, slight, case, of, murd...|(262144,[4757,5429,8538,151...|  0.0|    

In [None]:
# Print best model accuracy
accuracy_score = MulticlassClassificationEvaluator(metricName='accuracy',predictionCol="prediction")
print("SVM best model Accuracy: ", accuracy_score.evaluate(predictions_best))

SVM best model Accuracy:  0.8770964712196431


In [None]:
# Cross validation for Logistic regression
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Define pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, stopremove, hashingTF, lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
                              .addGrid(lr.tol, (1e-5, 1e-6))\
                              .build()
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=MulticlassClassificationEvaluator(metricName="accuracy"),
                    numFolds=5)
model = cv.fit(training)
# Tranform the model with the testing data
predictions = model.transform(testing)
predictions.filter(predictions['label'] == 0) \
    .select("review","Wordsfiltered","features","label","prediction") \
    .show(n = 10, truncate = 30)



+------------------------------+------------------------------+------------------------------+-----+----------+
|                        review|                 Wordsfiltered|                      features|label|prediction|
+------------------------------+------------------------------+------------------------------+-----+----------+
|" Now in India's sunny 'cli...|[", now, in, india's, sunny...|(262144,[535,1765,2701,7625...|  0.0|       0.0|
|" Så som i himmelen " .. as...|[", så, som, i, himmelen, "...|(262144,[5150,8538,12716,15...|  0.0|       0.0|
|"A Guy Thing" may not be a ...|["a, guy, thing", may, not,...|(262144,[6690,10077,13020,1...|  0.0|       0.0|
|"A Minute to Pray, A Second...|["a, minute, to, pray,, a, ...|(262144,[2701,6699,7136,902...|  0.0|       0.0|
|"A Mouse in the House" is a...|["a, mouse, in, the, house"...|(262144,[9747,10172,16259,1...|  0.0|       0.0|
|"A Slight Case of Murder" i...|["a, slight, case, of, murd...|(262144,[4757,5429,8538,151...|  0.0|    

In [None]:
# Evaluate Logistic Regression model
accuracy_score = MulticlassClassificationEvaluator(metricName='accuracy',predictionCol="prediction")
print("Logistic Regression Accuracy: ", accuracy_score.evaluate(predictions))

Logistic Regression Accuracy:  0.8683751509459278


In [None]:
#print best parameters for logistic regression
import numpy as np

print(model.getEstimatorParamMaps()[ np.argmax(model.avgMetrics) ])

{Param(parent='LogisticRegression_3f72a14a176e', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='LogisticRegression_3f72a14a176e', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 1e-05}


In [None]:
# Computing metrics
avgMetricsGrid = model.avgMetrics
print (avgMetricsGrid)

[0.8570387061541025, 0.8570387061541025, 0.8609394834152646, 0.8609394834152646]


In [None]:
# Accuracy for Logistic regression after Cross validation 
modelAcc = max(avgMetricsGrid)
print("accuracy for this grid ", modelAcc)

accuracy for this grid  0.8609394834152646


In [None]:
### DATA TRANSFORMATION - NEW REVIEWS FOR CLASSIFICATION #####

In [None]:
pip install sqlalchemy



In [None]:
!pip install psycopg2



In [None]:
import psycopg2
import pandas as pd
from pyspark.sql import SparkSession
from sqlalchemy import create_engine

engine = create_engine("postgresql://uhjsyonveadkbs:6032bfc65d7a3e652a22410287ef209734fe3d25644efcade80d0912bfc13d56@ec2-54-229-68-88.eu-west-1.compute.amazonaws.com:5432/ddd07kc11jnnd7")

#postgresql://postgres:postgres@localhost:5432/movie_review

pdf = pd.read_sql('SELECT * FROM IMDB_REVIEWS', engine)

# Convert Pandas dataframe to spark DataFrame
df_dvd = spark.createDataFrame(pdf)
print(df.schema)
df_dvd.show()

  """)


StructType(List(StructField(review,StringType,true),StructField(sentiment,StringType,true)))
+---+--------------------+--------------------+--------------------+
|ind|               title|                 url|              review|
+---+--------------------+--------------------+--------------------+
|  0|     The Survivalist|https://www.imdb....|It's day 592 of C...|
|  1| The Addams Family 2|https://www.imdb....|Wednesday uses Un...|
|  2|          Witch Hunt|https://www.imdb....|Martha (Elizabeth...|
|  3|      American Night|https://www.imdb....|This film start w...|
|  4|Space Jam: A New ...|https://www.imdb....|LeBron James work...|
|  5|       Escape Room 2|https://www.imdb....|Zoey Davis (Taylo...|
|  6|Six Minutes to Mi...|https://www.imdb....|In 1939, Thomas M...|
|  7|           The Stand|https://www.imdb....|It's a nine part ...|
|  8|             Clarice|https://www.imdb....|Overall this seri...|
|  9|     Broken Diamonds|https://www.imdb....|Very well acted. ...|
| 10|     

In [None]:
# Read in CSV
#from pyspark import SparkFiles
#df_dvd = spark.read.csv(SparkFiles.get("/content/new_upcoming_dvd_reviews.csv"),sep=",", escape='"', encoding="utf-8", quote='"',  header=True)

# Show DataFrame
#df_dvd.show(10, truncate=False)

In [None]:
test= df_dvd
predictions_ul = model1.transform(test)
predictions_ul.select("ind","title","URL","review","prediction") \
    .show(n = 10, truncate = 30)


+---+-----------------------+------------------------------+------------------------------+----------+
|ind|                  title|                           URL|                        review|prediction|
+---+-----------------------+------------------------------+------------------------------+----------+
|  0|        The Survivalist|https://www.imdb.com/title/...|It's day 592 of Covid-19 De...|       1.0|
|  1|    The Addams Family 2|https://www.imdb.com/title/...|Wednesday uses Uncle Fester...|       0.0|
|  2|             Witch Hunt|https://www.imdb.com/title/...|Martha (Elizabeth Mitchell)...|       1.0|
|  3|         American Night|https://www.imdb.com/title/...|This film start with a man ...|       1.0|
|  4|Space Jam: A New Legacy|https://www.imdb.com/title/...|LeBron James worked hard to...|       1.0|
|  5|          Escape Room 2|https://www.imdb.com/title/...|Zoey Davis (Taylor Russell)...|       1.0|
|  6|Six Minutes to Midnight|https://www.imdb.com/title/...|In 1939, Thom