In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from nltk.stem.snowball import SnowballStemmer
#spark ML imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer, CountVectorizer, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import VectorAssembler
%matplotlib inline

In [5]:
#create Spark session
spark = SparkSession.builder.appName('Final_project').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])

sc = spark.sparkContext

In [9]:
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)

In [50]:
df = hive_context.table("akarshsahu.processed_dataset")

In [51]:
df.limit(2).toPandas()

Unnamed: 0,Movie_Id,Cust_Id,Rating,tconst,Year,Name,primaryTitle,titleType,runtimeMinutes,startYear,...,Actor_6,Actor_7,Actor_8,Actor_9,Director_0,Director_1,Director_2,Director_3,Director_4,Director_5
0,7706,770129,4.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,,,,Lawrence Kasdan,,,,,
1,7706,1931185,5.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,,,,Lawrence Kasdan,,,,,


In [16]:
df.printSchema()

root
 |-- Movie_Id: integer (nullable = true)
 |-- Cust_Id: integer (nullable = true)
 |-- Rating: double (nullable = true)
 |-- tconst: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- runtimeMinutes: double (nullable = true)
 |-- startYear: integer (nullable = true)
 |-- isAdult: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)
 |-- Actor_0: string (nullable = true)
 |-- Actor_1: string (nullable = true)
 |-- Actor_2: string (nullable = true)
 |-- Actor_3: string (nullable = true)
 |-- Actor_4: string (nullable = true)
 |-- Actor_5: string (nullable = true)
 |-- Actor_6: string (nullable = true)
 |-- Actor_7: string (nullable = true)
 |-- Actor_8: string (nullable = true)
 |-- Actor_9: string (nullable = true)
 |-- Director_0: string (nullable = tr

In [52]:
#Split genres by comma
tag_split = F.split(df['genres'], ",")
df = df.withColumn('genres_split', tag_split)
df.select('genres', 'genres_split').show(10)

+------------------+--------------------+
|            genres|        genres_split|
+------------------+--------------------+
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
|Action,Crime,Drama|[Action, Crime, D...|
+------------------+--------------------+
only showing top 10 rows



In [53]:
# We will explode the columns to get count of unique tags

from pyspark.sql.functions import explode
from pyspark.sql.functions import trim, col

tag_counts = df.select(explode('genres_split').alias('genres_split')) \
                    .withColumn('genres_split', trim(col('genres_split'))) \
                    .groupby('genres_split') \
                    .count() \
                    .orderBy('count', ascending = False).toPandas()

In [54]:
tag_counts

Unnamed: 0,genres_split,count
0,Drama,41755335
1,Comedy,31446564
2,Action,20343640
3,Crime,16401315
4,Romance,16247175
5,Adventure,13957764
6,Thriller,12954014
7,Mystery,7206974
8,Fantasy,6213506
9,Sci-Fi,6208678


In [24]:
df.limit(2).toPandas()

Unnamed: 0,Movie_Id,Cust_Id,Rating,tconst,Year,Name,primaryTitle,titleType,runtimeMinutes,startYear,...,Actor_7,Actor_8,Actor_9,Director_0,Director_1,Director_2,Director_3,Director_4,Director_5,genres_split
0,7706,770129,4.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,,,Lawrence Kasdan,,,,,,"[Action, Crime, Drama]"
1,7706,1931185,5.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,,,Lawrence Kasdan,,,,,,"[Action, Crime, Drama]"


In [55]:
# Remove \\N from genres
tags = list(tag_counts.iloc[:,0])
tags = [i for i in tags if i != '\\N']

In [57]:
# Adding dummy variables for each tags
for tag in tags:
    df = df.withColumn(tag, col('genres').contains(tag).cast('integer'))

In [68]:
# Create engineered genres
from pyspark.sql.functions import greatest
newdf = df.withColumn('Act_Cri_Thr_Myst_Hor', greatest(df['Action'], df['Crime'], df['Thriller'], df['Mystery'], df['Horror']))
newdf = newdf.withColumn('Ad_Fan_Sci_Ani', greatest(df['Adventure'], df['Fantasy'], df['Sci-Fi'], df['Animation']))
newdf = newdf.withColumn('Bio_War_Hist_Doc', greatest(df['Biography'], df['War'], df['History'], df['Documentary']))
newdf = newdf.withColumn('Mus_Musi_Fam', greatest(df['Music'], df['Musical'], df['Family']))
newdf = newdf.withColumn('Noir_West', greatest(df['Film-Noir'], df['Western']))
newdf = newdf.withColumn('Ad_Rom', greatest(df['Adult'], df['Romance']))
newdf = newdf.withColumn('Drama_n', greatest(df['Drama'], df['Sport']))
newdf = newdf.withColumn('Comedy_n', df['Comedy'])

In [77]:
#Drop original genres
newdf = newdf.drop(*tags)

In [78]:
newdf.columns

['Movie_Id',
 'Cust_Id',
 'Rating',
 'tconst',
 'Year',
 'Name',
 'primaryTitle',
 'titleType',
 'runtimeMinutes',
 'startYear',
 'isAdult',
 'genres',
 'averageRating',
 'numVotes',
 'Actor_0',
 'Actor_1',
 'Actor_2',
 'Actor_3',
 'Actor_4',
 'Actor_5',
 'Actor_6',
 'Actor_7',
 'Actor_8',
 'Actor_9',
 'Director_0',
 'Director_1',
 'Director_2',
 'Director_3',
 'Director_4',
 'Director_5',
 'genres_split',
 'Act_Cri_Thr_Myst_Hor',
 'Ad_Fan_Sci_Ani',
 'Bio_War_Hist_Doc',
 'Mus_Musi_Fam',
 'Noir_West',
 'Ad_Rom',
 'Drama_n',
 'Comedy_n']

In [79]:
newdf.limit(10).toPandas()

Unnamed: 0,Movie_Id,Cust_Id,Rating,tconst,Year,Name,primaryTitle,titleType,runtimeMinutes,startYear,...,Director_5,genres_split,Act_Cri_Thr_Myst_Hor,Ad_Fan_Sci_Ani,Bio_War_Hist_Doc,Mus_Musi_Fam,Noir_West,Ad_Rom,Drama_n,Comedy_n
0,7706,770129,4.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
1,7706,1931185,5.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
2,7706,250166,5.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
3,7706,328654,3.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
4,7706,2414873,4.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
5,7706,1239283,2.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
6,7706,1057021,4.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
7,7706,2633985,3.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
8,7706,2419562,5.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0
9,7706,1047788,2.0,tt0090022,1985,Silverado,Silverado,movie,133.0,1985,...,,"[Action, Crime, Drama]",1,0,0,0,0,0,1,0


In [None]:
## Write to Hive tables

newdf.write.mode('overwrite').saveAsTable("akarshsahu.processed_dataset_v2")
newdf.write.mode('overwrite').saveAsTable("aghose.processed_dataset_v2")
newdf.write.mode('overwrite').saveAsTable("mwehr.processed_dataset_v2")