In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import from_unixtime, hour, dayofyear
from pyspark.sql.functions import isnull, when, count, col
from pyspark.sql.functions import rank, col
from pyspark.sql.functions import *
from pyspark.sql import Window
import pyspark.sql.functions as f
import pyspark.sql.functions as F
import pandas as pd
from sparkmeasure import StageMetrics
import numpy as np
import random
from pyspark.sql.functions import broadcast
import re
import matplotlib.pyplot as plt



spark = SparkSession \
.builder \
.appName('Vaseis2')\
.config("spark.jars", "<path-to-jar>/spark-measure_2.12-0.17.jar") \
.master("spark://antonis:7077")\
.getOrCreate()


stagemetrics = StageMetrics(spark)


movies = (spark.read
      .format("csv")
      .option('header', 'true') #means that the first line contains column names
      .option("delimiter", ",") #set the delimiter to comma
      .option("inferSchema", "true") #automatically try to infer the column data types
      .load("movie.csv") #filename to read from
     )

ratings = (spark.read
      .format("csv")
      .option('header', 'true') #means that the first line contains column names
      .option("delimiter", ",") #set the delimiter to comma
      .option("inferSchema", "true") #automatically try to infer the column data types
      .load("rating.csv") #filename to read from
     )


tag = (spark.read
      .format("csv")
      .option('header', 'true') #means that the first line contains column names
      .option("delimiter", ",") #set the delimiter to comma
      .option("inferSchema", "true") #automatically try to infer the column data types
      .load("tag.csv") #filename to read from
     )



In [2]:
#QUERY 1

stagemetrics.begin()

merged = movies.join(ratings, on=['movieId'], how='left_outer')

seen_jumanji = merged\
.filter(F.col("title") == "Jumanji (1995)").count()

print(seen_jumanji)

stagemetrics.end()
stagemetrics.print_report()

22243

Scheduling mode = FIFO
Spark Context default degree of parallelism = 12
Aggregated Spark stage metrics:
numStages => 4
numTasks => 214
elapsedTime => 3460 (3 s)
stageDuration => 5964 (6 s)
executorRunTime => 38088 (38 s)
executorCpuTime => 28765 (29 s)
executorDeserializeTime => 793 (0.8 s)
executorDeserializeCpuTime => 394 (0.4 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 3075 (3 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 532 (0.5 s)
resultSize => 994163 (970.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1439170560
recordsRead => 20000264
bytesRead => 693171282 (661.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 20000464
shuffleTotalBlocksFetched => 2601
shuffleLocalBlocksFetched => 2601
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 70268602 (67.0 MB)
shuffleLocalBytesRead => 70268602 (67.0 MB)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 By

In [16]:
#QUERY 2

stagemetrics.begin()

tags_and_movies= movies.join(tag, on=['movieId'], how='left_outer')

boring = tags_and_movies\
.select('title')\
.where(tags_and_movies['tag'].contains("boring")).dropDuplicates()

boring = boring\
.orderBy(col('title'))

boring.show()

stagemetrics.end()
stagemetrics.print_report()

+--------------------+
|               title|
+--------------------+
|(500) Days of Sum...|
|101 Reykjavik (10...|
|12 Years a Slave ...|
|         1408 (2007)|
|1492: Conquest of...|
|2001: A Space Ody...|
|2010: The Year We...|
|         2046 (2004)|
|     21 Grams (2003)|
|24 Hour Party Peo...|
|3-Iron (Bin-jip) ...|
|40-Year-Old Virgi...|
|    6 Bullets (2012)|
| 633 Squadron (1964)|
| 7 Plus Seven (1970)|
|      8 Women (2002)|
|A.I. Artificial I...|
|  About a Boy (2002)|
|According to Gret...|
|   Adaptation (2002)|
+--------------------+
only showing top 20 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12
Aggregated Spark stage metrics:
numStages => 3
numTasks => 207
elapsedTime => 313 (0.3 s)
stageDuration => 283 (0.3 s)
executorRunTime => 740 (0.7 s)
executorCpuTime => 592 (0.6 s)
executorDeserializeTime => 389 (0.4 s)
executorDeserializeCpuTime => 271 (0.3 s)
resultSerializationTime => 4 (4 ms)
jvmGCTime => 47 (47 ms)
shuffleFetchWaitTime => 0 (

In [17]:
#QUERY 3

stagemetrics.begin()

tags_and_ratings = ratings.join(tag,on=['userId'], how='left_outer')


# bollywood= tags_and_ratings\
# .select('userId')\
# .where(F.col('tag').contains("bollywood")).distinct()


bollywood= tags_and_ratings\
.select('userId')\
.where((F.col('tag').like("%bollywood") | F.col('tag').like("%BOLLYWOOD") | F.col('tag').like("%Bollywood")) & (F.col('rating') > 3)).distinct()

bollywood.sort('userId')
          
bollywood.show()

stagemetrics.end()
stagemetrics.print_report()

+------+
|userId|
+------+
| 93037|
| 37355|
| 20388|
|    65|
|  6500|
| 50616|
| 53397|
| 20066|
|124998|
| 52698|
|124139|
|131829|
| 54586|
| 68558|
| 19837|
|  8513|
| 77137|
| 35170|
| 41165|
| 25004|
+------+
only showing top 20 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12
Aggregated Spark stage metrics:
numStages => 6
numTasks => 118
elapsedTime => 2243 (2 s)
stageDuration => 2213 (2 s)
executorRunTime => 23505 (24 s)
executorCpuTime => 21982 (22 s)
executorDeserializeTime => 222 (0.2 s)
executorDeserializeCpuTime => 133 (0.1 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 567 (0.6 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 20 (20 ms)
resultSize => 325324 (317.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 69992448
recordsRead => 12195701
bytesRead => 713730828 (680.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 32
shuffleTotalBlocksFetched => 31
shuff

In [18]:
#QUERY 4

stagemetrics.begin()
# top_rated = ratings\
# .groupBy("movieId",ratings['timestamp'].substr(1,4))\
# .agg(avg(col("rating")))\
# .withColumnRenamed("avg(rating)", "avg_rating")\
# .sort(desc("avg_rating"))

# top_rated_movies = top_rated.join(movies, top_rated.movieId == movies.movieId)
# top_rated_movies.show(10)

query4 = ratings.join(movies,on=['movieId'], how ='left_outer')

top_rated = query4\
.groupBy("title",ratings['timestamp'].substr(1,4).alias("Year"))\
.agg(avg(col("rating")))\
.withColumnRenamed("avg(rating)", "avg_rating")\
.sort(desc("avg_rating"))



def get_topN(df, group_by_columns, order_by_column, n=10):
    window_group_by_columns = Window.partitionBy(group_by_columns)
    ordered_df = df.select(df.columns + [
        f.row_number().over(window_group_by_columns.orderBy(order_by_column.desc())).alias('row_rank')])
    topN_df = ordered_df.filter(f"row_rank <= {n}").drop("row_rank")
    return topN_df

final_top_rated = get_topN(top_rated, top_rated["Year"],top_rated["avg_rating"], 10)




results = final_top_rated\
.filter(F.col("Year") == "2005")\
.orderBy(F.col("title").asc())


results.show()

stagemetrics.end()
stagemetrics.print_report()

+--------------------+----+----------+
|               title|Year|avg_rating|
+--------------------+----+----------+
|   Dancemaker (1998)|2005|       5.0|
|Fear Strikes Out ...|2005|       5.0|
|Gate of Heavenly ...|2005|       5.0|
|Life Is Rosy (a.k...|2005|       5.0|
|Married to It (1991)|2005|       5.0|
|My Life and Times...|2005|       5.0|
|Not Love, Just Fr...|2005|       5.0|
|Paris Was a Woman...|2005|       5.0|
|Take Care of My C...|2005|       5.0|
|Too Much Sleep (1...|2005|       5.0|
+--------------------+----+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12
Aggregated Spark stage metrics:
numStages => 6
numTasks => 813
elapsedTime => 4093 (4 s)
stageDuration => 4030 (4 s)
executorRunTime => 39560 (40 s)
executorCpuTime => 33634 (34 s)
executorDeserializeTime => 1772 (2 s)
executorDeserializeCpuTime => 1168 (1 s)
resultSerializationTime => 34 (34 ms)
jvmGCTime => 1463 (1 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 2277 

In [7]:
#QUERY 5

stagemetrics.begin()

tags = tag\
.select("movieId","tag")\
.where(col('timestamp').substr(1,4) == '2015')

tags_and_movies = tags.join(movies, on=['movieId'])


tags_and_movies = tags_and_movies\
.sort(col("title").asc())

tags_and_movies.show(50)

stagemetrics.end()
stagemetrics.print_report()

+-------+--------------------+--------------------+--------------------+
|movieId|                 tag|               title|              genres|
+-------+--------------------+--------------------+--------------------+
|  51372|                BD-R|""Great Performan...|             Musical|
|   2072|              1980's|  'burbs, The (1989)|              Comedy|
|   2072|         dark comedy|  'burbs, The (1989)|              Comedy|
|   2072|           Joe Dante|  'burbs, The (1989)|              Comedy|
|   2072|        black comedy|  'burbs, The (1989)|              Comedy|
|   2072|              quirky|  'burbs, The (1989)|              Comedy|
|  69757|          depressing|(500) Days of Sum...|Comedy|Drama|Romance|
|  69757|              stupid|(500) Days of Sum...|Comedy|Drama|Romance|
|  69757|            artistic|(500) Days of Sum...|Comedy|Drama|Romance|
|  69757|           nonlinear|(500) Days of Sum...|Comedy|Drama|Romance|
|  69757|     no happy ending|(500) Days of Sum...|

In [None]:
#QUERY 6

stagemetrics.begin()

ratings_count = ratings\
.groupBy("movieId")\
.agg(count("userId"))\
.withColumnRenamed("count(userId)", "num_ratings")\





ratings_per_movie = ratings_count.join(movies, ratings_count.movieId == movies.movieId)


ratings_per_movie = ratings_per_movie\
.sort(desc("num_ratings"))

ratings_per_movie.show(20, truncate=False)


stagemetrics.end()
stagemetrics.print_report()

In [3]:
#QUERY 7

stagemetrics.begin()

most_ratings = ratings\
.groupBy("userId",ratings['timestamp'].substr(1,4).alias("Year"))\
.agg(count(col("rating")))\
.withColumnRenamed("count(rating)", "counts")\
.orderBy(desc("counts"))


def get_topN(df, group_by_columns, order_by_column, n=10):
    window_group_by_columns = Window.partitionBy(group_by_columns)
    ordered_df = df.select(df.columns + [
        f.row_number().over(window_group_by_columns.orderBy(order_by_column.desc())).alias('row_rank')])
    topN_df = ordered_df.filter(f"row_rank <= {n}").drop("row_rank")
    return topN_df

final_most_ratings = get_topN(most_ratings, most_ratings["Year"],most_ratings["counts"], 10)

results = final_most_ratings\
.filter(F.col("Year") == "1995")\
.sort(col('userId').asc())

results.show()

stagemetrics.end()
stagemetrics.print_report()

+------+----+------+
|userId|Year|counts|
+------+----+------+
| 28507|1995|     1|
|131160|1995|     3|
+------+----+------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12
Aggregated Spark stage metrics:
numStages => 5
numTasks => 614
elapsedTime => 3528 (4 s)
stageDuration => 3490 (3 s)
executorRunTime => 37084 (37 s)
executorCpuTime => 31070 (31 s)
executorDeserializeTime => 1402 (1 s)
executorDeserializeCpuTime => 815 (0.8 s)
resultSerializationTime => 6 (6 ms)
jvmGCTime => 1770 (2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 38 (38 ms)
resultSize => 1564090 (1527.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 54525952
recordsRead => 20000263
bytesRead => 691677634 (659.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 624 (624 Bytes)
shuffleLocalBytesR

In [65]:
#QUERY 8

stagemetrics.begin()


movies_and_ratings = movies.join(ratings,on=['movieId'], how='left_outer')


genres = movies_and_ratings.withColumn("genres",explode(split("genres","[|]")))


FinalGenres = genres\
.groupby("genres","title")\
.agg(count(col("rating")))\
.withColumnRenamed("count(rating)", "num_ratings")\
.sort(desc("num_ratings"))

# FinalGenres.show(2000)

TheFinalGenres = FinalGenres\
.groupby("genres","title")\
.agg(max(col("num_ratings")))\
.withColumnRenamed("max(num_ratings)","ratings_num")\
.sort(desc("ratings_num"))

def retrieve_topN(df, group_by_columns, order_by_column, n):
    window_group_by_columns = Window.partitionBy(group_by_columns)
    ordered_df = df.select(df.columns + [
        f.row_number().over(window_group_by_columns.orderBy(order_by_column.desc())).alias('row_rank')])
    topN_df = ordered_df.filter(f"row_rank <= {n}").drop("row_rank")
    return topN_df


final_most_ratings_genres = retrieve_topN(TheFinalGenres, TheFinalGenres["genres"],TheFinalGenres["ratings_num"], 1)

final_most_ratings_genres = final_most_ratings_genres\
.sort(col('genres').asc())

final_most_ratings_genres.show()

stagemetrics.end()
stagemetrics.print_report()

+------+----+------+
|userId|Year|counts|
+------+----+------+
|135090|2012|  2101|
| 76630|2012|  2085|
| 98420|2012|  2044|
| 13064|2012|  1749|
|  3284|2012|  1742|
|104063|2012|  1732|
| 40483|2012|  1723|
| 12644|2012|  1703|
| 50367|2012|  1686|
|102911|2012|  1570|
|101044|2014|  2505|
|137277|2014|  2387|
| 91349|2014|  2159|
| 54107|2014|  2018|
| 37253|2014|  1985|
| 93152|2014|  1888|
|123352|2014|  1841|
| 97860|2014|  1530|
| 30317|2014|  1353|
| 74222|2014|  1343|
+------+----+------+
only showing top 20 rows

+------------------+--------------------+-----------+
|            genres|               title|ratings_num|
+------------------+--------------------+-----------+
|             Crime| Pulp Fiction (1994)|      67310|
|           Romance| Forrest Gump (1994)|      66172|
|          Thriller| Pulp Fiction (1994)|      67310|
|         Adventure|Jurassic Park (1993)|      59715|
|             Drama| Pulp Fiction (1994)|      67310|
|               War| Forrest Gump (199

In [66]:
#QUERY 9

stagemetrics.begin()




df4 = ratings\
.groupby("timestamp","movieId")\
.agg(count(col("userId")))\
.withColumnRenamed("count(userId)", "number of users")\
.filter(F.col("number of users") != 1)\
.sort(desc("number of users"))


df5 = df4\
.select(sum("number of users"))



df5.show()

stagemetrics.end()
stagemetrics.print_report()

+--------------------+
|sum(number of users)|
+--------------------+
|                4322|
+--------------------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12
Aggregated Spark stage metrics:
numStages => 3
numTasks => 213
elapsedTime => 7481 (7 s)
stageDuration => 7479 (7 s)
executorRunTime => 86480 (1.4 min)
executorCpuTime => 67575 (1.1 min)
executorDeserializeTime => 398 (0.4 s)
executorDeserializeCpuTime => 329 (0.3 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 10140 (10 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 646 (0.6 s)
resultSize => 840670 (820.0 KB)
diskBytesSpilled => 236498117 (225.0 MB)
memoryBytesSpilled => 2318401536 (2.0 GB)
peakExecutionMemory => 2516582400
recordsRead => 20000263
bytesRead => 691677634 (659.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 20000244
shuffleTotalBlocksFetched => 2600
shuffleLocalBlocksFetched => 2600
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 34466

In [6]:
#QUERY 10

stagemetrics.begin()

tags_and_ratings = ratings.join(tag,on=["movieId"], how='left_outer')

df1 = tags_and_ratings\
.select("movieId","tag")\
.where((F.col('tag') == "funny") & (F.col('rating') > 3.5)).dropDuplicates()


movies_funny = df1.join(movies,on=["movieId"], how='left_outer')



df2 = movies_funny.withColumn("genres",explode(split("genres","[|]")))

final_df = df2\
.groupby("genres")\
.agg(count(col("title")))\
.withColumnRenamed("count(title)", "number of movies")\
.sort(col("genres").asc())

final_df.show()

stagemetrics.end()
stagemetrics.print_report()

+-----------+----------------+
|     genres|number of movies|
+-----------+----------------+
|     Action|             123|
|  Adventure|             128|
|  Animation|              83|
|   Children|              97|
|     Comedy|             527|
|      Crime|              82|
|Documentary|              15|
|      Drama|             182|
|    Fantasy|              78|
|  Film-Noir|               3|
|     Horror|              44|
|       IMAX|              22|
|    Musical|              39|
|    Mystery|              20|
|    Romance|             138|
|     Sci-Fi|              60|
|   Thriller|              69|
|        War|              12|
|    Western|               9|
+-----------+----------------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 12
Aggregated Spark stage metrics:
numStages => 5
numTasks => 419
elapsedTime => 3654 (4 s)
stageDuration => 3801 (4 s)
executorRunTime => 35848 (36 s)
executorCpuTime => 27703 (28 s)
executorDeserializeTime => 980 (

In [5]:
spark