In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from sparkmeasure import StageMetrics

#create a new Spark Session
spark = SparkSession \
    .builder \
    .master('spark://con:7077') \
    .appName('PySpark Project') \
    .config("spark.drivers.cores", "2") \
    .config("spark.drivers.memory", "4G") \
    .getOrCreate()

#create spark metrics object
stagemetrics = StageMetrics(spark)

In [2]:
spark

In [3]:
#load a csv file as a Spark DataFrame
movie = (spark.read
      .format("csv")
      .option('header', 'true') #means that the first line contains column names
      .option("delimiter", ",") #set the delimiter to comma
      .option("inferSchema", "true") #automatically try to infer the column data types
      .load("data/movie.csv") #filename to read from
     )
tag = (spark.read
      .format("csv")
      .option('header', 'true') #means that the first line contains column names
      .option("delimiter", ",") #set the delimiter to comma
      .option("inferSchema", "true") #automatically try to infer the column data types
      .load("data/tag.csv") #filename to read from
     )
rating = (spark.read
      .format("csv")
      .option('header', 'true') #means that the first line contains column names
      .option("delimiter", ",") #set the delimiter to comma
      .option("inferSchema", "true") #automatically try to infer the column data types
      .load("data/rating.csv") #filename to read from
     )

# Register DataFrames as an SQL temporary view
movie.createOrReplaceTempView("movie")
rating.createOrReplaceTempView("rating")
tag.createOrReplaceTempView("tag")

In [4]:
# 1st Query
#start measuring performance
stagemetrics.begin()

print(movie.join(rating, movie.movieId == rating.movieId, 'inner').filter(movie.title.contains('Jumanji')).count()

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

22243

Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 3
numTasks => 8
elapsedTime => 8299 (8 s)
stageDuration => 8146 (8 s)
executorRunTime => 22025 (22 s)
executorCpuTime => 17738 (18 s)
executorDeserializeTime => 559 (0.6 s)
executorDeserializeCpuTime => 141 (0.1 s)
resultSerializationTime => 0 (0 ms)
jvmGCTime => 216 (0.2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 57 (57 ms)
resultSize => 12762 (12.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 192
recordsRead => 20000264
bytesRead => 692174705 (660.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 6
shuffleTotalBlocksFetched => 6
shuffleLocalBlocksFetched => 6
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 354 (354 Bytes)
shuffleLocalBytesRead => 354 (354 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 354

In [5]:
# 2nd Query
#start measuring performance
stagemetrics.begin()

movie.join(tag, tag.movieId == movie.movieId, 'inner').filter(lower(tag.tag).contains('boring')).drop_duplicates(subset=['movieId']).orderBy(movie.title).select(movie.title).show(5, truncate=60)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+------------------------------------+
|                               title|
+------------------------------------+
|         (500) Days of Summer (2009)|
|101 Reykjavik (101 Reykjavík) (2000)|
|             12 Years a Slave (2013)|
|                         1408 (2007)|
|   1492: Conquest of Paradise (1992)|
+------------------------------------+
only showing top 5 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 3
numTasks => 205
elapsedTime => 6791 (7 s)
stageDuration => 6481 (6 s)
executorRunTime => 16002 (16 s)
executorCpuTime => 4736 (5 s)
executorDeserializeTime => 4331 (4 s)
executorDeserializeCpuTime => 1390 (1 s)
resultSerializationTime => 265 (0.3 s)
jvmGCTime => 222 (0.2 s)
shuffleFetchWaitTime => 1 (1 ms)
shuffleWriteTime => 141 (0.1 s)
resultSize => 1375754 (1343.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 1648885760
recordsRead => 492842
bytesRead

In [6]:
# 3rd Query
#start measuring performance
stagemetrics.begin()

spark.sql("SELECT DISTINCT rating.userId FROM rating JOIN tag ON tag.userId = rating.userId AND tag.movieId = rating.movieId WHERE rating > 3 AND LOWER(tag) LIKE '%bollywood%' AND LOWER(tag) NOT LIKE '%not bollywood%' ORDER BY rating.userId").show(5)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+------+
|userId|
+------+
| 10573|
| 19837|
| 23333|
| 25004|
| 31338|
+------+
only showing top 5 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 3
numTasks => 210
elapsedTime => 12833 (13 s)
stageDuration => 12609 (13 s)
executorRunTime => 33226 (33 s)
executorCpuTime => 24343 (24 s)
executorDeserializeTime => 3030 (3 s)
executorDeserializeCpuTime => 947 (0.9 s)
resultSerializationTime => 79 (79 ms)
jvmGCTime => 813 (0.8 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 56 (56 ms)
resultSize => 1326140 (1295.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 253755392
recordsRead => 12661130
bytesRead => 712627509 (679.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 24
shuffleTotalBlocksFetched => 24
shuffleLocalBlocksFetched => 24
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 1416 (1416 Bytes)
shuffleLocalBytesRead => 14

In [7]:
# 4th Query
#start measuring performance
stagemetrics.begin()

spark.sql("SELECT first(title) AS Title, AVG(rating) AS Average FROM movie INNER JOIN rating ON movie.movieId = rating.movieId WHERE YEAR(timestamp)=1995 GROUP BY rating.movieId ORDER BY Average DESC, Title ASC").show(10, truncate=60)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+------------------------------------------------------------+-------+
|                                                       Title|Average|
+------------------------------------------------------------+-------+
|                                 Seven (a.k.a. Se7en) (1995)|    5.0|
|Double Life of Veronique, The (Double Vie de Véronique, L...|    4.0|
|                                 Fish Called Wanda, A (1988)|    3.0|
|                                           Get Shorty (1995)|    3.0|
+------------------------------------------------------------+-------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 3
numTasks => 207
elapsedTime => 16667 (17 s)
stageDuration => 16534 (17 s)
executorRunTime => 42449 (42 s)
executorCpuTime => 32571 (33 s)
executorDeserializeTime => 2816 (3 s)
executorDeserializeCpuTime => 834 (0.8 s)
resultSerializationTime => 78 (78 ms)
jvmGCTime => 1124 (1 s)
shuffleFetchWaitTime => 0 (0 ms)

In [8]:
# 5th Query
# #start measuring performance
stagemetrics.begin()

spark.sql("SELECT title, concat_ws(',', collect_list(tag)) Tags FROM tag INNER JOIN movie ON tag.movieId = movie.movieId WHERE YEAR(timestamp) = 2015 GROUP BY tag.movieId, title ORDER BY title").show(5, truncate=60)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+----------------------------------+------------------------------------------------------------+
|                             title|                                                        Tags|
+----------------------------------+------------------------------------------------------------+
|""Great Performances"" Cats (1998)|                                                        BD-R|
|                'burbs, The (1989)|            1980's,black comedy,dark comedy,Joe Dante,quirky|
|       (500) Days of Summer (2009)|intelligent,nonlinear,artistic,bittersweet,Funny,humor,hu...|
| ...tick... tick... tick... (1970)|                                                        BD-R|
|                          1 (2014)|                                                     Sukumar|
+----------------------------------+------------------------------------------------------------+
only showing top 5 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stag

In [9]:
# 6th Query
# #start measuring performance
stagemetrics.begin()

spark.sql("SELECT first(title) Title, count(rating) Total_Ratings FROM rating JOIN movie ON movie.movieId = rating.movieId GROUP BY rating.movieId ORDER BY Total_Ratings DESC").show(5, truncate=60)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+--------------------------------+-------------+
|                           Title|Total_Ratings|
+--------------------------------+-------------+
|             Pulp Fiction (1994)|        67310|
|             Forrest Gump (1994)|        66172|
|Shawshank Redemption, The (1994)|        63366|
|Silence of the Lambs, The (1991)|        63299|
|            Jurassic Park (1993)|        59715|
+--------------------------------+-------------+
only showing top 5 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 3
numTasks => 207
elapsedTime => 21385 (21 s)
stageDuration => 21306 (21 s)
executorRunTime => 63329 (1.1 min)
executorCpuTime => 40118 (40 s)
executorDeserializeTime => 1575 (2 s)
executorDeserializeCpuTime => 713 (0.7 s)
resultSerializationTime => 35 (35 ms)
jvmGCTime => 7145 (7 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 151 (0.2 s)
resultSize => 1352377 (1320.0 KB)
diskBytesSpilled => 69445851 (66.

In [10]:
# 7th Query
# #start measuring performance
stagemetrics.begin()

spark.sql("SELECT userId, count(*) Total_Ratings FROM rating WHERE YEAR(timestamp) = 1995 GROUP BY userId ORDER BY Total_Ratings DESC, userId ASC").show(10)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+------+-------------+
|userId|Total_Ratings|
+------+-------------+
|131160|            3|
| 28507|            1|
+------+-------------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 2
numTasks => 206
elapsedTime => 14798 (15 s)
stageDuration => 14795 (15 s)
executorRunTime => 40108 (40 s)
executorCpuTime => 30233 (30 s)
executorDeserializeTime => 2055 (2 s)
executorDeserializeCpuTime => 721 (0.7 s)
resultSerializationTime => 34 (34 ms)
jvmGCTime => 1563 (2 s)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 7 (7 ms)
resultSize => 1226634 (1197.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 69206016
recordsRead => 20000263
bytesRead => 690681057 (658.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 2
shuffleTotalBlocksFetched => 2
shuffleLocalBlocksFetched => 2
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 129 (129 Bytes)
shu

In [11]:
# 8th Query
# #start measuring performance
stagemetrics.begin()

# Find the genres
genre = movie.select('title', 'movieId', split('genres', '[|]').alias('Genres')).select('title', 'movieId', explode('Genres')).where("col != '(no genres listed)'")

# Find most popular movies
popular = spark.sql("SELECT movieId, COUNT(movieId) Popularity FROM rating GROUP BY movieId ORDER BY Popularity DESC")

popular.createOrReplaceTempView('popular')
genre.createOrReplaceTempView('genre')

# Group movies and order them by genre
genre = spark.sql("SELECT title, movieId, first(col) Genre FROM genre GROUP BY title, movieId ORDER BY Genre")
genre.createOrReplaceTempView('genre')

# Join Genres and Popular movies together
popular = spark.sql("SELECT Genre, title, Popularity FROM popular INNER JOIN genre ON popular.movieId = genre.movieId ORDER BY Popularity DESC")
popular.createOrReplaceTempView('popular')

# Group Genres and order by Genre ASC, Popularity DESC
spark.sql("SELECT Genre, first(title) Title, first(Popularity) Popularity FROM popular GROUP BY Genre ORDER BY Genre, Popularity DESC").show(5)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+---------+--------------------+----------+
|    Genre|               Title|Popularity|
+---------+--------------------+----------+
|   Action|Jurassic Park (1993)|     59715|
|Adventure|Who Framed Roger ...|     21739|
|Animation|Nightmare Before ...|     20509|
| Children|E.T. the Extra-Te...|     32685|
|   Comedy|As Good as It Get...|     21684|
+---------+--------------------+----------+
only showing top 5 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 7
numTasks => 925
elapsedTime => 20876 (21 s)
stageDuration => 20574 (21 s)
executorRunTime => 52222 (52 s)
executorCpuTime => 29660 (30 s)
executorDeserializeTime => 7586 (8 s)
executorDeserializeCpuTime => 3360 (3 s)
resultSerializationTime => 174 (0.2 s)
jvmGCTime => 949 (0.9 s)
shuffleFetchWaitTime => 8 (8 ms)
shuffleWriteTime => 4479 (4 s)
resultSize => 2168498 (2.0 MB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemor

In [12]:
# 9th Query
# #start measuring performance
stagemetrics.begin()

rating.groupBy('movieId', 'timestamp').agg(countDistinct('userId').alias('Total')).filter('Total>1').select(sum('Total')).show()

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+----------+
|sum(Total)|
+----------+
|      4322|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 4
numTasks => 407
elapsedTime => 61410 (1.0 min)
stageDuration => 61386 (1.0 min)
executorRunTime => 195708 (3.3 min)
executorCpuTime => 129613 (2.2 min)
executorDeserializeTime => 1777 (2 s)
executorDeserializeCpuTime => 1155 (1 s)
resultSerializationTime => 8 (8 ms)
jvmGCTime => 8672 (9 s)
shuffleFetchWaitTime => 7 (7 ms)
shuffleWriteTime => 7177 (7 s)
resultSize => 1109247 (1083.0 KB)
diskBytesSpilled => 351896690 (335.0 MB)
memoryBytesSpilled => 2215796094 (2.0 GB)
peakExecutionMemory => 4613734400
recordsRead => 20000263
bytesRead => 690681057 (658.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 40000714
shuffleTotalBlocksFetched => 41400
shuffleLocalBlocksFetched => 41400
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 842786958 (803.0 MB)
shuffleLocalBytesRead =>

In [13]:
# 10th Query
# #start measuring performance
stagemetrics.begin()

# Find the genres
genre = movie.select('title', 'movieId', split('genres', '[|]').alias('Genres')).select('title', 'movieId', explode('Genres')).where("col != '(no genres listed)'")

# Find movies with rating > 3.5 and tagged as funny
funny = spark.sql("SELECT movieId FROM tag WHERE LOWER(tag) LIKE '%funny%' AND LOWER(tag) NOT LIKE '%not funny%' GROUP BY movieId")

rated = spark.sql("SELECT movieId FROM rating WHERE rating > 3.5 GROUP BY movieId")

funny.createOrReplaceTempView('funny')
rated.createOrReplaceTempView('rated')

fr = funny.join(rated, funny.movieId == rated.movieId, 'inner').select(tag.movieId)

fr.join(genre, fr.movieId == genre.movieId, 'inner').groupby('col').count().sort('col').show(5)

#stop measuring performance
stagemetrics.end()
#print performance metrics
stagemetrics.print_report()

+---------+-----+
|      col|count|
+---------+-----+
|   Action|  212|
|Adventure|  190|
|Animation|   92|
| Children|  113|
|   Comedy|  728|
+---------+-----+
only showing top 5 rows


Scheduling mode = FIFO
Spark Context default degree of parallelism = 4
Aggregated Spark stage metrics:
numStages => 6
numTasks => 611
elapsedTime => 17996 (18 s)
stageDuration => 18554 (19 s)
executorRunTime => 47558 (48 s)
executorCpuTime => 28771 (29 s)
executorDeserializeTime => 4224 (4 s)
executorDeserializeCpuTime => 1938 (2 s)
resultSerializationTime => 74 (74 ms)
jvmGCTime => 1178 (1 s)
shuffleFetchWaitTime => 19 (19 ms)
shuffleWriteTime => 2196 (2 s)
resultSize => 2081564 (2032.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 2446692800
recordsRead => 10488252
bytesRead => 714121157 (681.0 MB)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 89259
shuffleTotalBlocksFetched => 3340
shuffleLocalBlocksFetched => 3340
shuffleRemote

In [15]:
spark.stop()