
# Exercise 5 (Spark in Scala)   &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;     [4 points]
---

For this exercise, you will work on this JupyterLab notebook, and solve the tasks listed herein. These tasks, in addition to writing Spark code, require you to analyse various query plans and to reason about them.

To familiarise yourself with Spark and the Scala language, we also provide you with two JupyterLab notebooks,which you can upload on JupyterLab and run yourself. To get a deeper understanding, and look up the types and definitions of various functions, we recommend that you visit the Spark and Spark SQL documentation.

## a) From SQL to Dataframe (and back again)

#### Find for each of the Spark SQL queries an equivalent one that only uses the Dataframe API (or vice versa)


In [None]:
// val badgesDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/stackexchange/badges.csv")
val badgesDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/stackexchange/badges.csv")
val commentsDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/stackexchange/comments.csv")
val postsDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/stackexchange/posts.csv")
val postlinksDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/stackexchange/postlinks.csv")
val usersDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/stackexchange/users.csv")
val votesDF = spark.read.options(Map("header"->"true")).format("csv").load("/user/adbs23_shared/stackexchange/votes.csv")


// Creating the views for SparkSQL
badgesDF.createOrReplaceTempView("badges")
commentsDF.createOrReplaceTempView("comments")
postsDF.createOrReplaceTempView("posts")
postlinksDF.createOrReplaceTempView("postlinks")
usersDF.createOrReplaceTempView("users")
votesDF.createOrReplaceTempView("votes")



In [None]:
badgesDF.printSchema()
commentsDF.printSchema()
postsDF.printSchema()
postlinksDF.printSchema()
usersDF.printSchema()
votesDF.printSchema()

#### Query 1: Transform the given Spark SQL query into the Dataframe API

In [None]:
val query1 = spark.sql("""
SELECT COUNT(*) FROM comments as c, votes as v, users as u
WHERE u.Id = c.UserId AND u.Id = v.UserId
AND v.BountyAmount<=100 AND u.UpVotes=0;
""")
query1.show()
query1.explain()

In [None]:
val query1DF = 
query1DF.show()
query1DF.explain()

#### Query 2: Transform the given Dataframe API query into Spark SQL

In [None]:
val query2 = usersDF.filter("not isnull(location)").groupBy("location", "creationdate").agg(count("*").as("cnt")).orderBy(desc("cnt")).limit(5)
query2.show(false)
query2.explain()

In [None]:
val query2SQL = spark.sql("")
query2SQL.show(false)
query2SQL.explain

#### Query 3: Transform the given Dataframe API query into the dataframe API

In [None]:
%%time

val query = """
SELECT COUNT(*) FROM
posts p, users u, badges b, votes v, postLinks pl
WHERE b.UserId = u.Id AND p.OwnerUserId = u.Id
AND u.Id = v.UserId AND p.Id = pl.RelatedPostId
AND p.Score>=0;
"""

val query3 = spark.sql(query)
query3.show(false)
query3.explain()

In [None]:
val query3DF = 
query3.show(false)
query3.explain()

---

- ### **b) Analyzing Spark SQL Plans**
 


It is known that queries such as query 3, which follow a simple but common structure are not solved optimally in state-of-the-art DBMSs[1][2]. This is due to the fact that the systems execute the query as a sequence of natural joins, producing more intermediate results than necessary. By rewriting the query in a way which propagates only the counts upwards without materializing all tuples, we can avoid a significant overhead.

The following query, query3_opt is a rewritten version of query3, which results in a lower runtime while being semantically equivalent.

Investigate and compare the query plans of query3 and query3_opt.

1) Which physical operators does Spark SQL use to realize the join operations?

2) How are the physical join operators split up into codegen stages in these plans and why is it desirable to have few large codegen stages rather than many small ones?

3) How many rows result as intermediate outputs in total during the execution of query3 and of query3_opt?

You can use the Spark SQL web UI to find the execution graphs.

[1] Georg Gottlob, Matthias Lanzinger, Davide Mario Longo, Cem Okulmus, Reinhard
Pichler, and Alexander Selzer. 2023. Structure-Guided Query Evaluation: Towards
Bridging the Gap from Theory to Practice. CoRR abs/2303.02723 (2023).
https://doi.org/10.48550/arXiv.2303.02723 arXiv:2303.02723

[2] Reinhard Pichler and Sebastian Skritek. 2013. Tractable counting of the answers
to conjunctive queries. J. Comput. Syst. Sci. 79, 6 (2013), 984–1001.
https://doi.org/10.1016/j.jcss.2013.01.012

In [None]:
%%time

val query_opt = """
SELECT SUM(c_pl_p_u_b * c_v) AS cnt FROM
    (SELECT UserId, 1 AS c_v FROM votes) v,
    (SELECT pl_p_u.Id, SUM(c_pl_p_u * c_b) AS c_pl_p_u_b FROM
        (SELECT UserId, 1 AS c_b FROM badges) b,
        (SELECT u.Id, SUM(c_pl_p * c_u) AS c_pl_p_u FROM
            (SELECT Id, 1 AS c_u FROM users) u,
            (SELECT OwnerUserId, SUM(c_pl * c_p) AS c_pl_p FROM
                (SELECT RelatedPostId, 1 AS c_pl FROM postlinks) pl,
                (SELECT OwnerUserId, Id, 1 AS c_p FROM posts WHERE Score>=0) p
                WHERE p.Id = pl.RelatedPostId
                GROUP BY OwnerUserId) pl_p
        WHERE u.Id = pl_p.OwnerUserId
        GROUP BY u.Id) AS pl_p_u
    WHERE pl_p_u.Id = b.UserId
    GROUP BY pl_p_u.Id) pl_p_u_b
    WHERE pl_p_u_b.Id = v.UserId
"""

val query3_opt = spark.sql(query_opt)
query3_opt.show(false) // 'false' turns of truncation of row entries
query3_opt.explain()
