# <center>Big Data &ndash; Exercises</center>
## <center>Fall 2021 &ndash; Week 9 &ndash; ETH Zurich</center>
## <center>Spark Dataframes and SparkSQL</center>

# Preparation for the exercise in Spark

1. Change to exercise09 repository

2. Start docker <br>
```docker-compose up -d```

3. copy the data to docker :<br> ```docker cp orders.jsonl jupyter:/home/jovyan/work``` <br>

## <center>1. Spark Dataframes</center>

Spark Dataframes allow the user to perform simple and efficient operations on data, as long as the data is structured and has a schema. Dataframes are similar to relational tables in relational databases: conceptually a dataframe is a specialization of a Spark RDD with schema information attached. You can find more information in Karau, H. et al. (2015). Learning Spark, Chapter 9 (optional reading).

### 1.1. Data preprocessing

In [None]:
import json
from pyspark.sql import SparkSession
from pyspark import SparkConf

spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

path = "orders.jsonl"
orders_df = spark.read.json(path).cache()

The type of our dataset object is DataFrame

In [None]:
type(orders_df)

Print the schema

In [None]:
orders_df.printSchema()

Print one row

In [None]:
orders_df.limit(1).collect()

You can access the underlying RDD object and use any functions you learned for Spark RDDs.

In [None]:
orders_df.rdd.filter(lambda ordr: ordr.customer.last_name == "Landry").count()

### 1.2. Dataframe Operations
We perform some queries using operations on Dataframes ([Here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations) is a guide on DF Operations with a link to the [API Documentation](https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html))

We can select columns and show the result

In [None]:
orders_df.select("customer.first_name", "customer.last_name").limit(5).show()

As you can see we can navigate to the nested items with the dot

In [None]:
orders_df.filter(orders_df["customer.last_name"] == "Landry").count()

How about nested arrays?

In [None]:
orders_df.select("order_id", "items").orderBy("order_id").limit(5).show()

Let us try to find orders of a fan.

In [None]:
orders_df.filter(orders_df["items.product"] == "fan").count()

The above code doesn't work! Use ```array contains``` instead.

In [None]:
from pyspark.sql.functions import array_contains

orders_df.filter(array_contains("items.product", "fan")).count()

Let us try to unnest the data.

Unnest the products with explode.

Explode will generate as many rows as there are elements in the array and match them to other attributes.

In [None]:
from pyspark.sql.functions import explode

orders_df.select(explode("items").alias("i"), "i.product", "order_id").orderBy("order_id").limit(5).show()

Now we can use this table to filter.

In [None]:
exploded_df = orders_df.select(explode("items").alias("i"), "i.product", "order_id")
exploded_df.filter(exploded_df["product"] == "fan").count()

You might have tried to access the i.product column directly using a ```.filter``` right after the ```.select```. That, however, does not work, because the column is not available to ```orders_df``` when creating a clause like ```(orders_df["i.product"] == "fan")```. A possible workaround when using Dataframe operations is that of using a string clause in ```.filter```, so that the product column will be resolved after it has been added with the ```.select```.

In [None]:
orders_df.select(explode("items").alias("i"), "i.product", "order_id").filter("product = 'fan'").count()

Project the nested columns

In [None]:
orders_df.select(explode("items").alias("i"), "*").select(
    "order_id", "customer.*", "date", "i.*").limit(3).show()

### 1.3 Exercises

1) Find the average quantity at which each product is purchased. Only show the top 10 products by quantity.  (Hint: you may need to import the function ```desc``` from ```pyspark.sql.functions``` to define descending order)

2) Find the most expensive order

## <center>2. Spark SQL</center>

Spark SQL allows the users to formulate their queries using SQL. The requirement is the use of Dataframes, which as said before are similar to relational tables. In addition to a familiar interface, writing queries in SQL might provide better performance than RDDs, inheriting efficiency from the Dataframe operations, while also performing automatic optimization of queries.

First we need to install the sparksql magic command.

In [None]:
!pip install sparksql-magic

In [None]:
%load_ext sparksql_magic

In order to use sql we need to create a temporary table.

This table only exists for the current session.

In [None]:
orders_df.registerTempTable("orders")

### 2.1 Queries

Finally, run SQL queries on the registered tables. We will run the same queries as during the previous section, but with SQL.

As you can see we can navigate to the nested items with the dot.

In [None]:
%%sparksql
-- Finally, run SQL queries on the registered tables
-- As you can see we can navigate to the nested items with the dot
SELECT count(*)
FROM orders
WHERE orders.customer.last_name == "Landry"

How about nested arrays?

In [None]:
%%sparksql
-- How about nested arrays?
SELECT order_id, items
FROM orders AS o
ORDER BY order_id
LIMIT 5

Let us try to find orders of a fan.

In [None]:
%%sparksql 
SELECT count(*)
FROM orders
WHERE items.product = "fan"

The above code doesn't work! Use ```array contains``` instead.

In [None]:
%%sparksql

SELECT count(*)
FROM orders
WHERE array_contains(items.product, "fan")

Let us try to unnest the data.

Unnest the products with explode.

Explode will generate as many rows as there are elements in the array and match them to other attributes.

In [None]:
%%sparksql
SELECT explode(items) as i, i.product, order_id
FROM orders
ORDER BY order_id
limit 5

Now we can use this table to filter.

In [None]:
%%sparksql
-- Filter on product
SELECT count(*)
    FROM (
    SELECT explode(items) as i, i.product, order_id
    FROM orders
    ORDER BY order_id
    )
WHERE product = "fan"

You might have tried to access the i.product column directly in the same ```SELECT``` clause. That, however, does not work, because the column is not available to the ```WHERE``` clause. In order to access the built columns directly, we need to unnest the data and make it part of our ```FROM``` clause. ```LATERAL VIEW``` lets us do just that, matching each non-array attribute to an unnested row from the array.  

In [None]:
%%sparksql
SELECT *
FROM orders lateral view explode(items) as flat_items
WHERE flat_items.product = "fan"
ORDER BY order_id
LIMIT 3

Project the nested columns

In [None]:
%%sparksql
SELECT order_id, customer.first_name, customer.last_name, date, flat_items.*
FROM orders lateral view explode(items) item_table as flat_items
WHERE flat_items.product = "fan"
ORDER BY order_id
LIMIT 3

Having built an unnested table, we can now easily aggregate over the previously nested columns

### 2.2 Exercises

1) Find the average quantity at which each product is purchased. Only show the top 10 products by quantity. 

2) Find the most expensive order

## <center>3. Exercise: PageRank (Optional)</center>

The PageRank algorithm, named after Google's Larry Page, assigns a measure of importance to each node (page) in a graph based on the importance of incoming edges (links). The importance of each edge is, in turn, derived from the importance of the source node and its out-degree. PageRank was designed to rank web pages based on hyperlinks between pages, but it can be also used to rank scientific articles, or influential users in a social network.

The algorithm maintains two datasets: one collection of (*pageID*, *linkList*) elements containing the list of neighbors of each page, and one collection of (*pageID*, *rank*) elements containing the current rank for each page. The algorithm proceeds as follows:
1. Initialize each page's rank to $1.0$.
2. On each iteration, have page $x$ send a contribution of $\frac{rank(x)}{numNeighbors(x)}$ to its neighbors (the pages it has links to). Then you will get each page a new rank value.
3. Set each page's rank to $0.15 + 0.85 \times contributionsReceived$.

The algorithm runs multiple iterations (of step 2 and 3) until it converges. You can check this [video](https://www.youtube.com/watch?v=P8Kt6Abq_rM) for a more detailed explanation of the PageRank algorithm.

Implement the PageRank algorithm in Spark for a simple dataset, running the loop for a fixed number of iterations.

For instance, you can use "parallelize" for that as follows: 
```
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)])
```
where 1,2,3,4 represents ids of pages.

### 3.1 Use Spark RDDs

In [None]:
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)]).groupByKey().cache()

#Your code here

ranks.collect()

### 3.2 Use Spark DataFrames

In [None]:
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)])
links_df = spark.createDataFrame(links).toDF("page_id", "linked_id").cache()

In [None]:
#Your code here

ranks.show()

### 3.3 Use Spark SQL
Hint: you can use
```
new_df = spark.sql("... SQL query ...")
new_df.registerTempTable("new_table")
```
to perform a query inside a for loop and making the updated *new_table* available from SQL at every step

In [None]:
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)])
links_df = spark.createDataFrame(links).toDF("page_id", "linked_id").cache()

links_df.registerTempTable("links")

In [None]:
#Your code here

ranks.show()

## Exam

Run the code below to create a Spark DataFrame:

In [None]:
!pip install sparksql-magic

In [7]:
import json


from pyspark import SparkContext

spark = SparkContext("local", "exam")

In [9]:
%reload_ext sparksql_magic
from pyspark.sql import SQLContext

spark_sql = SQLContext(spark)
orders_df = spark_sql.read.json('orders.jsonl')
orders_df.createOrReplaceTempView("orders")

Print the type and schema:

In [10]:
orders_df.printSchema()

root
 |-- customer: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- product: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- order_id: long (nullable = true)



Print one row using spark dataframe.

In [11]:
%%sparksql
SELECT *
FROM orders
LIMIT 1

0,1,2,3
customer,date,items,order_id
"Row(first_name='Preston', last_name='Landry')",2018-2-4,"[Row(price=1.53, product='fan', quantity=5), Row(price=1.33, product='computer screen', quantity=6), Row(price=1.06, product='kettle', quantity=6), Row(price=1.96, product='stuffed animal', quantity=3), Row(price=1.09, product='the book', quantity=7), Row(price=1.42, product='headphones', quantity=9), Row(price=1.67, product='whiskey bottle', quantity=3)]",0


In [12]:
%%sparksql
SELECT *
FROM (
    SELECT order_id, date, customer.first_name, customer.last_name, size(items) as items_count, array_max(items.price) as max_price
    FROM orders
)
WHERE items_count > 4
ORDER BY max_price
LIMIT 5

0,1,2,3,4,5
order_id,date,first_name,last_name,items_count,max_price
21840,2017-6-6,Hanna,Sicilia,6,1.15
96473,2018-3-1,Roderick,Badash,5,1.19
49908,2016-5-8,Sawyer,Gruber,5,1.2
65618,2017-2-1,Iris,Rosenbloom,5,1.2
54844,2017-6-9,Santiago,Egan,5,1.2


##### Note: the examples provided above do not contain all the query operations you might need during the exam. For more information on DF transformers (e.g. ``filter``), use ``help``:

In [14]:
help(orders_df.filter)

Help on method filter in module pyspark.sql.dataframe:

filter(condition) method of pyspark.sql.dataframe.DataFrame instance
    Filters rows using the given condition.
    
    :func:`where` is an alias for :func:`filter`.
    
    .. versionadded:: 1.3.0
    
    Parameters
    ----------
    condition : :class:`Column` or str
        a :class:`Column` of :class:`types.BooleanType`
        or a string of SQL expression.
    
    Examples
    --------
    >>> df.filter(df.age > 3).collect()
    [Row(age=5, name='Bob')]
    >>> df.where(df.age == 2).collect()
    [Row(age=2, name='Alice')]
    
    >>> df.filter("age > 3").collect()
    [Row(age=5, name='Bob')]
    >>> df.where("age = 2").collect()
    [Row(age=2, name='Alice')]



In [17]:
%%sparksql
-- Filter on product
SELECT COUNT(DISTINCT order_id)
    FROM (
    SELECT explode(items) as i, i.product, order_id
    FROM orders
    ORDER BY order_id
    )
WHERE product = "fan"

0
count(DISTINCT order_id)
32778


In [24]:
%%sparksql
SELECT order_id, customer.first_name, customer.last_name, date, flat_items, flat_items.price
FROM orders
    lateral view explode(items) as flat_items
WHERE flat_items.product = "fan"
ORDER BY order_id
LIMIT 3

0,1,2,3,4,5
order_id,first_name,last_name,date,flat_items,price
0,Preston,Landry,2018-2-4,"Row(price=1.53, product='fan', quantity=5)",1.53
1,Jamari,Dominguez,2016-1-8,"Row(price=1.61, product='fan', quantity=7)",1.61
2,Brendon,Sicilia,2016-6-6,"Row(price=1.1, product='fan', quantity=7)",1.1


In [26]:
%%sparksql
SELECT order_id, SUM(flat_items.quantity * flat_items.price) as total
FROM orders lateral view explode(items) as flat_items
GROUP BY order_id
ORDER BY total desc
LIMIT 1

0,1
order_id,total
99636,104.95999999999998
