In [1]:
from pyspark.sql import SparkSession
from sparkmeasure import StageMetrics
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

spark = SparkSession.builder.appName('q4').config("spark.jars", "/home/athina/Downloads/spark-measure_2.12-0.17.jar").getOrCreate()
#For VIRTUAL
#spark = SparkSession.builder.master('spark://picachu-VirtualBox:7077').appName('q4').config("spark.jars", "/home/athina/Downloads/spark-measure_2.12-0.17.jar").getOrCreate()

#For LIVY
#spark=SparkSession.builder.master('spark://localhost:7077').appName("Testing").getOrCreate()
#and changed the filepaths to '/home/administrator/Downloads/movielens/file_name.csv'

stagemetrics = StageMetrics(spark)
stagemetrics.begin()#start measuring performance

df_rating= (spark.read
     .format("csv")
     .option('header', 'true')
     .option('delimiter', ",")
     .option("inferSchema","true")
     .load("rating.csv") 
    )

# get the year and adds it to a new column # Columns: userId, rating and year
withyear=df_rating.select('movieId','rating',regexp_extract('timestamp', r'\d{4}', 0).cast(IntegerType()).alias('year'))

#calculating the average for each movieId and its rating year
average=withyear.groupBy('movieId','year').avg('rating').select('movieId','year',col('avg(rating)').alias('avg_rating')).orderBy('year')

df_movie= (spark.read
     .format("csv")
     .option('header', 'true')
     .option('delimiter', ",")
     .option("inferSchema","true")
     .load("movie.csv") 
    )
#combining movieId with movie's title
average_titles=average.join(df_movie, df_movie['movieId']==average['movieId'],"inner").hint("broadcast").select(average['movieId'],'title','year','avg_rating')

#Taking the first 10 movies with the most rating average 
#If more have the same top average then take them with alphabetical order
windowSpec=Window.partitionBy(average_titles['year']).orderBy(average_titles['avg_rating'].desc(),average_titles['title'].asc())
tops=average_titles.withColumn('row',row_number().over(windowSpec))

#keeping only the top 10 movies of each year
results=tops.filter(tops['row'] <=10).select('movieId','title','year','avg_rating')
results.select('title').filter(tops['year']==2005).show()#requested answer for query 4

stagemetrics.end()#stop measuring performance
stagemetrics.print_report()#print performance metrics
spark.stop() # stop spark

+--------------------+
|               title|
+--------------------+
|Before the Fall (...|
|   Dancemaker (1998)|
|Fear Strikes Out ...|
|Gate of Heavenly ...|
|Life Is Rosy (a.k...|
|Married to It (1991)|
|My Life and Times...|
|Not Love, Just Fr...|
|Paris Was a Woman...|
|Take Care of My C...|
+--------------------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 12
numTasks => 416
elapsedTime => 59941 (60 s)
stageDuration => 53499 (53 s)
executorRunTime => 89857 (1.5 min)
executorCpuTime => 63721 (1.1 min)
executorDeserializeTime => 4277 (4 s)
executorDeserializeCpuTime => 3201 (3 s)
resultSerializationTime => 52 (52 ms)
jvmGCTime => 2780 (3 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 1091 (1 s)
resultSize => 969815 (947.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1447131200
recordsRead => 40055086
bytesRead => 1384480482 (1320.0 MB)
recordsWritten =