In [5]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.2'
#spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.91.39)]                                                                               Hit:2 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Conn                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to ppa.launchpa                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Conn                           

In [6]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [7]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://moviepickfiles.s3.us-east-2.amazonaws.com/combined_df.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("combined_df.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+---+---------+------------------+-----+------------+--------------------+--------------------+-------+------------+---------+----------+
|_c0|     Mood|    original_title|   id|release_year|               genre|            overview|runtime|vote_average|   budget|   revenue|
+---+---------+------------------+-----+------------+--------------------+--------------------+-------+------------+---------+----------+
|  0|    Happy|Back to the Future|  105|        1985|Adventure,Comedy,...|Eighties teenager...|  116.0|         8.0| 19000000| 381109762|
|  1|    Happy|      Forrest Gump|   13|        1994|Comedy,Drama,Romance|A man with a low ...|  142.0|         8.2| 55000000| 677945399|
|  2|      Sad|      Forrest Gump|   13|        1994|Comedy,Drama,Romance|A man with a low ...|  142.0|         8.2| 55000000| 677945399|
|  3|    Happy|      Forrest Gump|   13|        1994|Comedy,Drama,Romance|A man with a low ...|  142.0|         8.2| 55000000| 677945399|
|  4| Inspired|   The Dark Knight|

In [31]:
df.columns

['_c0',
 'Mood',
 'original_title',
 'id',
 'release_year',
 'genre',
 'overview',
 'runtime',
 'vote_average',
 'budget',
 'revenue']

In [32]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['overview']))

data_df = data_df.drop('_c0',
 'original_title',
 'id',
 'release_year',
 'genre',
 'runtime',
 'vote_average',
 'budget',
 'revenue')

data_df.show()


+---------+--------------------+------+
|     Mood|            overview|length|
+---------+--------------------+------+
|    Happy|Eighties teenager...|   327|
|    Happy|A man with a low ...|   406|
|      Sad|A man with a low ...|   406|
|    Happy|A man with a low ...|   406|
| Inspired|Batman raises the...|   396|
| Inspired|Batman raises the...|   396|
|    Angry|Batman raises the...|   396|
|    Happy|An epic love stor...|   301|
| Romantic|An epic love stor...|   301|
|      Sad|An epic love stor...|   301|
|Emotional|Princess Leia is ...|   312|
|    Happy|"Bolt is the star...|   263|
|      Sad|Into the world of...|   153|
|    Angry|In a dystopian fu...|   117|
|    Angry|In a dystopian fu...|   117|
|    Happy|Nemo, an adventur...|   333|
| Inspired|Nemo, an adventur...|   333|
| Thrilled|When soldier Robi...|   266|
| Romantic|When the kingdom'...|   512|
|    Happy|When the kingdom'...|   512|
+---------+--------------------+------+
only showing top 20 rows



### Feature Transformations


In [37]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='Mood',outputCol='label')
tokenizer = Tokenizer(inputCol="overview", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [38]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [39]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [40]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [None]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[33933,69...|
|  1.0|(262145,[15889,13...|
|  1.0|(262145,[25570,63...|
|  0.0|(262145,[6286,272...|
|  0.0|(262145,[6979,255...|
|  1.0|(262145,[24417,24...|
|  1.0|(262145,[12084,48...|
|  1.0|(262145,[3645,963...|
|  0.0|(262145,[53777,10...|
|  0.0|(262145,[138356,2...|
|  0.0|(262145,[24113,25...|
|  1.0|(262145,[68867,13...|
|  1.0|(262145,[24417,36...|
|  0.0|(262145,[18098,24...|
|  1.0|(262145,[24417,25...|
|  1.0|(262145,[24417,25...|
|  0.0|(262145,[31704,21...|
|  1.0|(262145,[25570,27...|
|  1.0|(262145,[12329,15...|
|  1.0|(262145,[8287,139...|
+-----+--------------------+
only showing top 20 rows



In [41]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [42]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+---------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|     Mood|            overview|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+---------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    Angry|After the re-emer...|   137|  5.0|[after, the, re-e...|[re-emergence, wo...|(262144,[14829,96...|(262144,[14829,96...|(262145,[14829,96...|[-1059.7413851541...|[1.0,1.3883638198...|       0.0|
|    Angry|In a dystopian fu...|   117|  5.0|[in, a, dystopian...|[dystopian, futur...|(262144,[36671,63...|(262144,[36671,63...|(262145,[36671,63...|[-910.32113073013...|[1.0,1.446575

In [43]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.052128
