In [None]:
from pyspark.sql import SparkSession
from sparkmeasure import StageMetrics
from pyspark.sql.functions import year, row_number, col, count, split, explode, rank, dayofyear, hour, sum, concat_ws, collect_list, lower
from pyspark.sql.window import Window

spark = SparkSession \
    .builder \
    .appName('vaseis-2') \
    .config("spark.jars", "<path-to-jar>/spark-measure_2.12-0.17.jar") \
    .master("spark://antonis:7077") \
    .getOrCreate()

rating_df = (spark.read
    .format("csv")
    .option('header','true') #csv files have headers
    .option('delimiter', ',') #delimiter of the csv files that are processed
    .option('inferSchema', 'true') #if false, all values are strings
    .load("rating.csv") #file to be processed
)
movie_df = (spark.read
    .format("csv")
    .option('header','true') #csv files have headers
    .option('delimiter', ',') #delimiter of the csv files that are processed
    .option('inferSchema', 'true') #if false, all values are strings
    .load("movie.csv") #file to be processed
)

In [None]:
# QUERY 1
stagemetrics.begin()

movie = movie_df.filter(movie_df["title"] == "Jumanji (1995)") #get rows with title jumanji
movieID = movie.collect()[0][0] #get movieID
q1 = rating_df.filter(rating_df["movieId"] == movieID).select(rating_df["userId"]).count() #count how many people have seen the movie jumanji
print(q1)

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 2
stagemetrics.begin()

boring_movies = tag_df.select('userId','movieId', lower(col('tag')).alias('tag')).filter(tag_df["tag"].contains('boring')).drop_duplicates(subset=['movieId']).select("movieId")
boring_movie_names = boring_movies.join(movie_df, boring_movies.movieId == movie_df.movieId).orderBy(movie_df['title']).select(movie_df['title'])
boring_movie_names.show() #show the first 5

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 3
stagemetrics.begin()

bollywood_tag = tag_df.select('userId','movieId', lower(col('tag')).alias('tag')).filter(tag_df["tag"] == ('bollywood')).select('userId','movieId')
user_bollywood_good_rating = bollywood_tag.join(rating_df.select('userId','rating'), ["userId"], 'inner').distinct()
user_bollywood_good_rating = user_bollywood_good_rating.filter(col("rating")>3.0).dropDuplicates(['userId']).orderBy(col('userId')).select('userId')
user_bollywood_good_rating.show()


stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 4
stagemetrics.begin()

q4_df = rating_df.withColumn('year', year(rating_df['timestamp'])).groupBy('year','movieId').avg('rating') #.join(movie_df.select('movieId','title'),['movieId'])
window = Window.partitionBy(q4_df['year']).orderBy(q4_df['avg(rating)'].desc())

q4_df = q4_df.select('*', row_number().over(window).alias('row_num')).filter(col('row_num') <= 10).join(movie_df.select('movieId','title'),['movieId']).orderBy(movie_df['title'].asc()).filter(col('year') == 2005).select(col('title'))
q4_df.show()

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 5
stagemetrics.begin()

tag_df_2015 = tag_df.filter(year(tag_df['timestamp']) == 2015).select('movieId', 'tag').join(movie_df.select('movieId', 'title'), ['movieId'])
tag_df_2015 = tag_df_2015.groupBy('movieId','title').agg(concat_ws(", ", collect_list(tag_df_2015.tag)).alias('tags')).orderBy(tag_df_2015['title'].asc())
tag_df_2015.show()

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 6
stagemetrics.begin()

q6_df = rating_df.groupBy('movieId').count()
q6_df = q6_df.join(movie_df.select('movieId','title'),['movieId']).orderBy(q6_df['count'].desc())
q6_df.select(col('title'),col('count')).show()

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 7
stagemetrics.begin()

q7_df = rating_df.withColumn('year', year(rating_df['timestamp'])).groupBy('year','userId').count()
window = Window.partitionBy(q7_df['year']).orderBy(q7_df['count'].desc())
q7_df = q7_df.select('*', row_number().over(window).alias('row_num')).filter(col('row_num') <= 10)
q7_df.filter(q7_df['year'] == 1995).select(col('userId')).orderBy(col('userId').asc()).show()

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 8
stagemetrics.begin()

q8_df = movie_df.withColumn('genre',explode(split('genres',"[|]")))
q8_df = q8_df.join(rating_df,['movieId'],'left')
q8_df = q8_df.groupBy('genre','title').count()
window = Window.partitionBy(q8_df['genre']).orderBy(q8_df['count'].desc())
q8_df = q8_df.select('*', row_number().over(window).alias('rank')).filter(col('rank') == 1).orderBy(col('genre').asc())
q8_df.select(col('genre'), col('title'), col('count')).show()

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 9
stagemetrics.begin()

q9_df = rating_df.withColumn('year', year(rating_df['timestamp']))
q9_df = q9_df.withColumn('doy', dayofyear(rating_df['timestamp']))
q9_df = q9_df.withColumn('hour', hour(rating_df['timestamp']))
q9_df = q9_df.groupBy(q9_df['movieId'],q9_df['year'],q9_df['doy'],q9_df['hour']).count()
q9 = q9_df.filter(q9_df['count']>1).select(sum('count'))
q9.show()

stagemetrics.end()
stagemetrics.print_report()

In [None]:
# QUERY 10
stagemetrics.begin()

q10_df = rating_df.join(tag_df, ['movieId'], 'inner')
q10_df = q10_df.select("movieId","tag").where((col('tag') == "funny") & (col('rating') > 3.5)).dropDuplicates()
q10_df = q10_df.join(movie_df, ['movieId'], 'inner')
q10_df = q10_df.withColumn("genre",explode(split("genres","[|]")))
q10_df = q10_df = q10_df.groupBy(q10_df['genre']).count().orderBy(q10_df['genre'].asc())
q10_df.show()

stagemetrics.end()
stagemetrics.print_report()