# Initialize Spark Environment,set configuration

In [1]:
import findspark
findspark.init()

from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from sparkmeasure import StageMetrics

In [90]:
#spark.stop() #execute this to stop current spark context

In [2]:
from pyspark.sql.functions import desc,asc
import regex
from pyspark.sql.functions import lower, col
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import from_unixtime, hour, dayofyear
import re
from pyspark.sql import Window

In [3]:
#local node all cores
spark= SparkSession.builder.master("local[*]").config("spark.jars","C:\spark-measure_2.11-0.13.jar").appName("MovieLens").getOrCreate()

In [82]:
#config 1
#spark = SparkSession.builder.appName("standalone-cluster-MovieLens").master("spark://192.168.56.1:7077").config("spark.driver.memory","1G").config("spark.driver.cores",'1').config("spark.executor.memory",'1G').config("spark.executor.cores",'1').config('spark.cores.max','2').config("spark.jars","C:\spark-measure_2.11-0.13.jar").getOrCreate()

#config 2
#spark = SparkSession.builder.appName("standalone-cluster-MovieLens").master("spark://192.168.56.1:7077").config("spark.driver.memory","1G").config("spark.driver.cores",'1').config("spark.executor.memory",'1500M').config("spark.executor.cores",'2').config('spark.cores.max','4').config("spark.jars","C:\spark-measure_2.11-0.13.jar").getOrCreate()        

In [73]:
#config 3 ,uncomment any of these configurations to run them properly in their own
#spark = SparkSession.builder.appName("standalone-cluster-MovieLens").master("spark://192.168.56.1:7077").config("spark.driver.memory","1500M").config("spark.driver.cores",'1').config("spark.executor.memory",'1G').config("spark.executor.cores",'2').config('spark.cores.max','4').config("spark.jars","C:\spark-measure_2.11-0.13.jar").getOrCreate()        

In [83]:
spark #spark ui

# Load Datasets

In [84]:
df_movies = (spark.read
             .format('csv')
             .option('header','true')
             .option('delimiter',",")
             .option('inferSchema','true')#assigns correct data types to columns otherwise we read them as strings
             .load("C:\MovieLens20M_dataset\movie.csv"))

In [91]:
df_movies.show(truncate=False)

In [85]:
stagemetrics = StageMetrics(spark)#create a stagemetrics object to user for measurement

In [86]:
df_rating = (spark.read
             .format('csv')
             .option('header','true')
             .option('delimiter',",")
             .option('inferSchema','true')#assigns correct data types to columns otherwise we read them as strings
             .load("C:/MovieLens20M_dataset/rating.csv"))

In [87]:
df_tag = (spark.read
             .format('csv')
             .option('header','true')
             .option('delimiter',",")
             .option('inferSchema','true')#assigns correct data types to columns otherwise we read them as strings
             .load("C:/MovieLens20M_dataset/tag.csv"))

In [88]:
df_genome_tags = (spark.read
             .format('csv')
             .option('header','true')
             .option('delimiter',",")
             .option('inferSchema','true')#assigns correct data types to columns otherwise we read them as strings
             .load("C:/MovieLens20M_dataset/genome_tags.csv"))

In [92]:
df_rating.show(5)

In [93]:
df_tag.show(5)

In [94]:
df_genome_tags.show(5)

# Query 1-Find number of users that watched the movie "Jumanji"

In [95]:
stagemetrics.begin()

joinedq1 = df_movies.filter(df_movies.title.like("%Jumanji%")).join(df_rating, df_rating["movieId"] == df_movies["movieId"], "inner").drop(df_rating.movieId).select(df_rating['userId'],df_movies["title"]).groupBy(df_rating.userId).count().agg(sum(col('count')))
joinedq1.select(col('sum(count)').alias('Jumanji_viewers')).show()

stagemetrics.end()
stagemetrics.print_report()

# Query 2-Find the movie names that users reviewed as "boring"

In [46]:

#lower the tag column
df_low=df_tag.withColumn("tag", lower(col("tag")))#;


In [47]:
df_low_clr = df_low.select(col('userId'),col('movieId'),regexp_replace(df_low.tag,'[^a-zA-Z0-9]',"").alias('tag_clr'))#we clear any special characters around 'boring' and we will ignore complex sentences containing boring like 'boring plot' etc->paradoxi

In [96]:
stagemetrics.begin()

joinq2 = df_movies.join(df_low_clr,df_movies['movieId'] == df_low_clr['movieId'],'inner').drop(df_movies.movieId).filter(df_low_clr.tag_clr.like('boring')).dropDuplicates(subset=['movieId']).select(col('userId'),col('movieId'),col('title'),col('tag_clr').alias('tag')).orderBy(col('title').asc()).show(truncate=False) #.dropDuplicates(subset=['movieId'])

stagemetrics.end()
stagemetrics.print_report()


# Query 3-Find users(users id's) that have rated movies 'Bollywood' and have given rating >3

In [98]:
df3_ratings = df_rating.filter(df_rating.rating > 3).select(df_rating.userId,df_rating.rating)
df_tagboll = df_low_clr.filter(df_low_clr.tag_clr.like('bollywood')).select(df_low_clr.movieId,df_low_clr.userId,df_low_clr.tag_clr.alias('tag'))

stagemetrics.begin()

join3_ratag = (df3_ratings.join(df_tagboll,df3_ratings['userId'] == df_tagboll['userId'],"inner")\
                           .drop(df3_ratings.userId).select(df_tagboll.movieId,df_tagboll['userId'].alias("userIdJoined"),df_tagboll['tag'],df_rating['rating'])).dropDuplicates(subset = ['userIdJoined']).orderBy(col('userIdJoined').asc()).show(truncate=False)

stagemetrics.end()
stagemetrics.print_report()


# Query 4

In [99]:
df_rating.groupby().max('rating').collect()[0] #find max rating which it results to be 5.0

In [100]:
stagemetrics.begin()

rating_years = df_rating.select(df_rating.movieId,year(df_rating.timestamp).alias('rate_year'),df_rating.rating)
rating_years_q = rating_years.join(df_movies,rating_years['movieId'] == df_movies['movieId'],'inner').drop(df_movies.movieId).groupBy(col('title'),col('rate_year')).avg('rating').withColumn("rating_rank",row_number().over(Window.partitionBy("rate_year").orderBy(col('avg(rating)').desc()))).orderBy(col('rate_year').asc()).filter(col('rating_rank')<=10).withColumn("alphab_rank",row_number().over(Window.partitionBy("rate_year").orderBy(col('title').asc())))#.dropDuplicates(subset=['title'])
rating_years_q.show(truncate=False)

stagemetrics.end()
stagemetrics.print_report()

In [101]:
rating_years_q.filter(col('rate_year') == 2005).show(truncate=False)

# QUERY 5

In [102]:
stagemetrics.begin()
df_join5 =  df_movies.join(df_tag,df_movies['movieId'] == df_tag['movieId'],'inner').select(df_tag['tag'],df_movies['title'],df_tag.timestamp)

In [103]:
df_join5_conc = df_join5.filter(year(df_join5.timestamp)==2015).groupBy(col("title")).agg(collect_list(col("tag")).alias('tag')).withColumn("tag", concat_ws(",", col("tag"))).orderBy(col('title').asc()).show(truncate=False)

stagemetrics.end()
stagemetrics.print_report()

# QUERY 6

In [104]:
stagemetrics.begin()

join6q = df_movies.join(df_rating, df_movies['movieId'] == df_rating['movieId'],"inner").drop(df_rating.movieId).groupBy(df_movies.title).count().orderBy(col("count").desc()).show(truncate=False)


stagemetrics.end()
stagemetrics.print_report()

# Query 7 Find the top 10 users each year with the most ratings

In [105]:
stagemetrics.begin()

count_ratings = df_rating.groupBy(col('userId'),year(df_rating.timestamp)).count().select(col('userId'),col('year(timestamp)').alias('rating_year'),col('count').cast('int').alias('rating_count')).withColumn('count_rank',row_number().over(Window.partitionBy('rating_year').orderBy(col('rating_count').desc()))).orderBy(col('rating_year').asc()).filter(col('count_rank')<=10).withColumn('id_alphab_rank',row_number().over(Window.partitionBy('rating_year').orderBy(col('userId').asc())))
count_ratings.show(truncate=False)

stagemetrics.end()
stagemetrics.print_report()

In [106]:
count_ratings.filter(col('rating_year') == 1995).show()#na metrieseis to pano query gia xrono kai na parathesei ta edo apotelesmata gia ta top 10

# Query 8

In [68]:
movies_genres = df_movies.withColumn("genres",explode(split("genres","[|]")))#take distinct genres apart for each movie in case it has more than one 

In [69]:
genres_nodupes = movies_genres.dropDuplicates(subset=['movieId']).filter(~col('genres').like('%(no genres listed)%')).orderBy(col('movieId').asc())#takes the first genre in each movie and drops the rest,because if any movie has one or more genres the extras will be dropped except the first genre
#also we exlude the (no genres listed) records->paradoxi

In [107]:
stagemetrics.begin()
rating_group = df_rating.groupBy(df_rating.movieId).count()

quer8_join = genres_nodupes.join(rating_group,rating_group['movieId'] == genres_nodupes['movieId'],"inner").drop(genres_nodupes.movieId).dropDuplicates(subset=['movieId']).withColumn("count_rtng_rank",row_number().over(Window.partitionBy("genres").orderBy(col('count').desc()))).filter(col('count_rtng_rank')==1).orderBy(col("genres").asc())

In [108]:
quer8_join.show(truncate=False)

stagemetrics.end()
stagemetrics.print_report()

# Query 9

In [78]:
df_rating.createOrReplaceTempView("rating_sql")

In [109]:
stagemetrics.begin()
df_hour_day = df_rating.select(col('movieId'),col('userId'),regexp_extract(df_rating.timestamp,'\d\d\d\d\-\\d\d\-\\d\d\\s\d\d',0).alias('day_hour')).groupBy(col('movieId'),col('day_hour')).count().filter(col('count')>1).agg(sum(col('count'))).show()
#returns the number of users that watch the same movie the same hour and day 
stagemetrics.end()
stagemetrics.print_report()

# Query 10 

In [110]:
stagemetrics.begin()#we again take into account the first genre for a movie if it belongs to more than one in this query,also for the funny tag we ignore complex sentences with funny like: funny <complex_sentence>+,we take into consideration terms like funny and those that had some form of exclamation or other special characters before or after them see df_low_clr->paradoxi

join_movies10 = df_low_clr.join(genres_nodupes,df_low_clr['movieId']  == genres_nodupes['movieId']).drop(genres_nodupes.movieId)

In [111]:
df_q10 = df_rating.join(join_movies10,df_rating['movieId'] == join_movies10['movieId'],'inner').drop(df_rating.movieId).filter(col('rating')>3.5).filter(join_movies10.tag_clr.like("funny")).dropDuplicates(subset=['movieId']).groupBy(col('genres')).count().orderBy(col('genres').asc())#i to count.desc() an thes diatiaxi me basi to count,edo to kanoume alphabitika me basi ta onomata ton genres

In [112]:
df_q10.show(truncate=False)
stagemetrics.end()
stagemetrics.print_report()