# <center>Big Data &ndash; Exercises &ndash; Solutions</center>
## <center>Fall 2024 &ndash; Week 9 &ndash; ETH Zurich</center>
## <center>Spark Dataframes and SparkSQL</center>

## Preparation for the exercise in Spark

1. Drag this notebook in the `notebooks` folder of your exam magic box

2. Start docker with ```docker-compose up -d```

3. Launch Jupiter from the docker container

## <center>1. Spark Dataframes</center>

Spark Dataframes allow the user to perform simple and efficient operations on data, as long as the data is structured and has a schema. Dataframes are similar to relational tables in relational databases but they allow for nestedness: conceptually a dataframe is a specialization of a Spark RDD with schema information attached. You can find more information in Karau, H. et al. (2015). Learning Spark, Chapter 9 (optional reading).

### 1.1. Data preprocessing

In [1]:
import json
from pyspark.sql import SparkSession
from pyspark import SparkConf

spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

path = "orders.jsonl"
orders_df = spark.read.json(path).cache()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/23 13:39:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


The type of our dataset object is DataFrame

In [2]:
type(orders_df)

pyspark.sql.dataframe.DataFrame

Print the schema

In [3]:
orders_df.printSchema()

root
 |-- customer: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- product: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- order_id: long (nullable = true)



Print one row

In [4]:
orders_df.limit(1).collect()

                                                                                

[Row(customer=Row(first_name='Preston', last_name='Landry'), date='2018-2-4', items=[Row(price=1.53, product='fan', quantity=5), Row(price=1.33, product='computer screen', quantity=6), Row(price=1.06, product='kettle', quantity=6), Row(price=1.96, product='stuffed animal', quantity=3), Row(price=1.09, product='the book', quantity=7), Row(price=1.42, product='headphones', quantity=9), Row(price=1.67, product='whiskey bottle', quantity=3)], order_id=0)]

You can access the underlying RDD object and use any functions you learned for Spark RDDs.

In [5]:
orders_df.rdd.filter(lambda ordr: ordr.customer.last_name == "Landry").count()

                                                                                

1960

Check the Spark version

In [6]:
print(spark.version)

3.5.3


### 1.2. Dataframe Operations
We will perform some queries using operations on Dataframes ([Here](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations) is a guide on DF Operations with a link to the [API Documentation](https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html))

We can select columns and show the result

In [7]:
orders_df.select("customer.first_name", "customer.last_name").limit(5).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|   Preston|   Landry|
|    Jamari|Dominguez|
|   Brendon|  Sicilia|
|    Armani|   Ardeni|
|    Jamari|     Miao|
+----------+---------+



As you can see we can navigate inside nested items with the dot notation

In [8]:
orders_df.filter(orders_df["customer.last_name"] == "Landry").count()

1960

How about nested arrays?

In [9]:
orders_df.select("order_id", "items").orderBy("order_id").limit(5).show()

+--------+--------------------+
|order_id|               items|
+--------+--------------------+
|       0|[{1.53, fan, 5}, ...|
|       1|[{1.61, fan, 7}, ...|
|       2|[{1.41, the book,...|
|       3|[{1.05, computer ...|
|       4|[{1.92, headphone...|
+--------+--------------------+



Let us try to find orders of a fan.

In [10]:
orders_df.filter(orders_df["items.product"] == "fan").count()

AnalysisException: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(items.product = fan)" due to data type mismatch: the left and right operands of the binary operator have incompatible types ("ARRAY<STRING>" and "STRING").;
'Filter (items#10.product = fan)
+- Relation [customer#8,date#9,items#10,order_id#11L] json


The above code doesn't work! We can use ```array_contains()``` instead and have to import it first.

In [11]:
from pyspark.sql.functions import array_contains

orders_df.filter(array_contains("items.product", "fan")).count()

32778

Let us try to unnest the data.

We can unnest the products with explode.

Explode will generate as many rows as there are elements in the array and match them to other attributes. You should name the newly generated exploded column in order to be able to refer to it. 

Here you can find the syntax.

In [12]:
from pyspark.sql.functions import explode

orders_df.select(explode("items").alias("i"), "order_id").select("i.product", "order_id").orderBy("order_id").limit(5).show()

+---------------+--------+
|        product|order_id|
+---------------+--------+
|            fan|       0|
|computer screen|       0|
|         kettle|       0|
| stuffed animal|       0|
|       the book|       0|
+---------------+--------+



Now we can use this table to filter.

In [13]:
exploded_df = orders_df.select(explode("items").alias("i"), "order_id").select("i.product", "order_id")
exploded_df.filter(exploded_df["product"] == "fan").count()

39922

You might have tried to access the ```i.product``` column directly using a ```.filter``` right after the ```.select```. That, however, does not work, because the column is not available to ```orders_df``` when creating a clause like ```(orders_df["i.product"] == "fan")```. In order to filter on a freshly exploded column is best to proceed in stps and create an intermediate table. 

A possible workaround when using Dataframe operations is that of using a string clause in ```.filter```, so that the product column will be resolved after it has been added with the ```.select```.

In [14]:
orders_df.select(explode("items").alias("i"), "order_id").select("i.product", "order_id").filter("product = 'fan'").count()

39922

Project the nested columns

In [15]:
orders_df.select(explode("items").alias("i"), "*").select(
    "order_id", "customer.*", "date", "i.*").limit(3).show()

+--------+----------+---------+--------+-----+---------------+--------+
|order_id|first_name|last_name|    date|price|        product|quantity|
+--------+----------+---------+--------+-----+---------------+--------+
|       0|   Preston|   Landry|2018-2-4| 1.53|            fan|       5|
|       0|   Preston|   Landry|2018-2-4| 1.33|computer screen|       6|
|       0|   Preston|   Landry|2018-2-4| 1.06|         kettle|       6|
+--------+----------+---------+--------+-----+---------------+--------+



Just like in SQL ```*``` is a shortcut for just selecting all of the fields involved.

### 1.3 Exercises

#### 1. Find the number of distinct products

In [16]:
exploded_df = orders_df.select(explode("items").alias("i")).select("i.product")
exploded_df.select("product").distinct().count()

10

#### 2. Find the average quantity at which each product is purchased. Only show the top 10 products by quantity. 
(Hint: you need to import the function ```desc``` from ```pyspark.sql.functions``` to define descending order)

In [17]:
from pyspark.sql.functions import desc

exploded_df = orders_df.select(explode("items").alias("i")).select("i.product", "i.quantity")
exploded_df.select("product", "quantity").groupBy("product").avg("quantity").orderBy(desc("avg(quantity)")).limit(10).show()

+---------------+-----------------+
|        product|    avg(quantity)|
+---------------+-----------------+
|        toaster|5.515549016184942|
|       the book|5.514178678641427|
|         kettle|5.512053325314489|
|computer screen|5.504839685420448|
|     mouse trap|5.503895651308093|
|            fan|5.496342868593758|
|     headphones|5.485920795060985|
|       notebook|5.483182341458532|
| whiskey bottle|5.475555222463714|
| stuffed animal|5.470854598218753|
+---------------+-----------------+



#### 3. Find the most expensive order

In [18]:
exploded_df = orders_df.select(explode("items").alias("i"), "order_id").select("order_id", "i.quantity", "i.price")
exploded_df.select(
    "order_id", (exploded_df["quantity"] * exploded_df["price"]).alias("total")
).groupBy("order_id").sum("total").orderBy(desc("sum(total)")).limit(1).show()

+--------+------------------+
|order_id|        sum(total)|
+--------+------------------+
|   99636|104.95999999999998|
+--------+------------------+



## <center>2. Spark SQL</center>

Spark SQL enables users to write queries using an SQL-like dialect, but it requires DataFrames, since they closely resemble relational tables. In addition to providing a familiar interface, SQL queries can deliver better performance compared to RDDs, leveraging the efficiency of DataFrame operations and Spark's automatic query optimization.

First we need to install and load the sparksql magic command

In [19]:
!pip install sparksql-magic --quiet

[0m

In [20]:
%load_ext sparksql_magic

In order to use sql we need to create a temporary table.

This table only exists for the current session.

In [21]:
orders_df.createOrReplaceTempView("orders")

### 2.1 Queries

Finally, run SQL queries on the registered tables. We will run the same queries as during the previous section, but with SQL.

As you can see we can navigate to the nested items with the dot.

In [22]:
%%sparksql
SELECT count(*)
FROM orders
WHERE orders.customer.last_name == "Landry"

0
count(1)
1960


How about nested arrays?

In [23]:
%%sparksql
SELECT order_id, items
FROM orders AS o
ORDER BY order_id
LIMIT 5

0,1
order_id,items
0,"[Row(price=1.53, product='fan', quantity=5), Row(price=1.33, product='computer screen', quantity=6), Row(price=1.06, product='kettle', quantity=6), Row(price=1.96, product='stuffed animal', quantity=3), Row(price=1.09, product='the book', quantity=7), Row(price=1.42, product='headphones', quantity=9), Row(price=1.67, product='whiskey bottle', quantity=3)]"
1,"[Row(price=1.61, product='fan', quantity=7), Row(price=1.39, product='whiskey bottle', quantity=2)]"
2,"[Row(price=1.41, product='the book', quantity=6), Row(price=1.3, product='notebook', quantity=5), Row(price=1.1, product='fan', quantity=7), Row(price=1.5, product='stuffed animal', quantity=10), Row(price=1.39, product='headphones', quantity=8), Row(price=1.78, product='whiskey bottle', quantity=3), Row(price=1.15, product='fan', quantity=8)]"
3,"[Row(price=1.05, product='computer screen', quantity=10), Row(price=1.5, product='stuffed animal', quantity=10), Row(price=1.42, product='whiskey bottle', quantity=10)]"
4,"[Row(price=1.92, product='headphones', quantity=2), Row(price=1.44, product='fan', quantity=2), Row(price=1.84, product='kettle', quantity=4), Row(price=1.44, product='stuffed animal', quantity=5)]"


Let us try to find orders of a fan.

In [24]:
%%sparksql 
SELECT count(*)
FROM orders
WHERE items.product = "fan"

AnalysisException: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(items.product = fan)" due to data type mismatch: the left and right operands of the binary operator have incompatible types ("ARRAY<STRING>" and "STRING").; line 3 pos 6;
'Aggregate [unresolvedalias(count(1), None)]
+- 'Filter (items#10.product = fan)
   +- SubqueryAlias orders
      +- View (`orders`, [customer#8,date#9,items#10,order_id#11L])
         +- Relation [customer#8,date#9,items#10,order_id#11L] json


The above code doesn't work! We need once again to use ```array contains``` instead.

In [25]:
%%sparksql
SELECT count(*)
FROM orders
WHERE array_contains(items.product, "fan")

0
count(1)
32778


Let us try to unnest the data.

We can unnest the products with explode.

Explode will generate as many rows as there are elements in the array and match them to the other attributes.

In [26]:
%%sparksql
SELECT i.product, order_id
FROM (
    SELECT explode(items) AS i, order_id
    FROM orders
) exploded_items
ORDER BY order_id
LIMIT 5

0,1
product,order_id
fan,0
computer screen,0
kettle,0
stuffed animal,0
the book,0


Now we can use this table to filter. For example we want to find out how many times does "fan" appear.

In [27]:
%%sparksql
SELECT count(*)
FROM (
    SELECT explode(items) AS i
    FROM orders
    )
WHERE i.product = "fan"

0
count(1)
39922


You might have tried to access the i.product column directly in the same ```SELECT``` clause. That, however, just like before, does not work. This is because the column is not available to the ```WHERE``` clause right away. In order to access the built columns directly, we need to unnest the data and make it part of our ```FROM``` clause. ```LATERAL VIEW``` lets us do just that, matching each non-array attribute to an unnested row from the array.  

In [28]:
%%sparksql
SELECT *
FROM orders lateral view explode(items) AS flat_items
WHERE flat_items.product = "fan"
ORDER BY order_id
LIMIT 3

0,1,2,3,4
customer,date,items,order_id,flat_items
"Row(first_name='Preston', last_name='Landry')",2018-2-4,"[Row(price=1.53, product='fan', quantity=5), Row(price=1.33, product='computer screen', quantity=6), Row(price=1.06, product='kettle', quantity=6), Row(price=1.96, product='stuffed animal', quantity=3), Row(price=1.09, product='the book', quantity=7), Row(price=1.42, product='headphones', quantity=9), Row(price=1.67, product='whiskey bottle', quantity=3)]",0,"Row(price=1.53, product='fan', quantity=5)"
"Row(first_name='Jamari', last_name='Dominguez')",2016-1-8,"[Row(price=1.61, product='fan', quantity=7), Row(price=1.39, product='whiskey bottle', quantity=2)]",1,"Row(price=1.61, product='fan', quantity=7)"
"Row(first_name='Brendon', last_name='Sicilia')",2016-6-6,"[Row(price=1.41, product='the book', quantity=6), Row(price=1.3, product='notebook', quantity=5), Row(price=1.1, product='fan', quantity=7), Row(price=1.5, product='stuffed animal', quantity=10), Row(price=1.39, product='headphones', quantity=8), Row(price=1.78, product='whiskey bottle', quantity=3), Row(price=1.15, product='fan', quantity=8)]",2,"Row(price=1.1, product='fan', quantity=7)"


With this we can also project the nested columns

In [29]:
%%sparksql
SELECT order_id, customer.first_name, customer.last_name, date, flat_items.*
FROM orders lateral view explode(items) AS flat_items
WHERE flat_items.product = "fan"
ORDER BY order_id
LIMIT 3

0,1,2,3,4,5,6
order_id,first_name,last_name,date,price,product,quantity
0,Preston,Landry,2018-2-4,1.53,fan,5
1,Jamari,Dominguez,2016-1-8,1.61,fan,7
2,Brendon,Sicilia,2016-6-6,1.1,fan,7


Having built an unnested table, we can now easily aggregate over the previously nested columns

### 2.2 Exercises

#### 1. Find the number of distinct products

In [30]:
%%sparksql
SELECT COUNT(*)
FROM (
    SELECT DISTINCT flat_items.product
    FROM orders lateral view explode(items) AS flat_items)

0
count(1)
10


#### 2. Find the average quantity at which each product is purchased. Only show the top 10 products by quantity. 

In [31]:
%%sparksql
SELECT flat_items.product, AVG(flat_items.quantity) AS av_quantity
FROM orders lateral view explode(items) flat_table flat_items
GROUP BY flat_items.product
ORDER BY av_quantity DESC
LIMIT 10

0,1
product,av_quantity
toaster,5.515549016184942
the book,5.514178678641427
kettle,5.512053325314489
computer screen,5.504839685420448
mouse trap,5.503895651308093
fan,5.496342868593758
headphones,5.485920795060985
notebook,5.483182341458532
whiskey bottle,5.475555222463714


#### 3. Find the most expensive order

In [32]:
%%sparksql
SELECT order_id, SUM(flat_items.quantity * flat_items.price) AS total
FROM orders lateral view explode(items) AS flat_items
GROUP BY order_id
ORDER BY total DESC
LIMIT 1

0,1
order_id,total
99636,104.95999999999998


## <center>3. More queries</center>

We will now explore the dataset that is going to be used in the graded exercise of this week. It will be the same language game dataset as in exercise08. If you already have it from last week you just have to copy the `confusion-2014-03-02` folder in the `notebooks` folder of your exam magic box, otherwise here are the instructions again.

1. Move to the `notebooks` folder in the terminal
2. Download the data: <br>
   ```wget https://f003.backblazeb2.com/file/larsyencken-eu-public/greatlanguagegame/confusion-2014-03-02.tbz2``` <br>
   __or__ <br>
   ```curl -O https://f003.backblazeb2.com/file/larsyencken-eu-public/greatlanguagegame/confusion-2014-03-02.tbz2```
3. Extract the data: <br>
   ```tar -jxvf confusion-2014-03-02.tbz2```
4. Change directory to ```confusion-2014-03-02```
5. Extract the part of the dataset that we will work with in this exercise: <br>
   ```head -n 3000000 confusion-2014-03-02.json > confusion-part.json```

### 3.1 Data processing

In [33]:
path = "confusion-2014-03-02/confusion-part.json"
dataset = spark.read.json(path).cache()

                                                                                

Have a look at the data

In [34]:
dataset.limit(3).show()



+--------------------+-------+----------+---------+--------------------+---------+
|             choices|country|      date|    guess|              sample|   target|
+--------------------+-------+----------+---------+--------------------+---------+
|[Maori, Mandarin,...|     AU|2013-08-19|Norwegian|48f9c924e0d98c959...|Norwegian|
|[Danish, Dinka, K...|     AU|2013-08-19|    Dinka|af5e8f27cef9e689a...|    Dinka|
|[German, Hungaria...|     AU|2013-08-19|  Turkish|509c36eb58dbce009...|   Samoan|
+--------------------+-------+----------+---------+--------------------+---------+



                                                                                

Print the schema

In [35]:
dataset.printSchema()

root
 |-- choices: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- country: string (nullable = true)
 |-- date: string (nullable = true)
 |-- guess: string (nullable = true)
 |-- sample: string (nullable = true)
 |-- target: string (nullable = true)



### 3.2 Spark Dataframe queries

#### 1. Find the number of games where the guessed language and target language is Maltese.

In [36]:
dataset.filter(dataset["target"] == "Maltese").filter(dataset["guess"] == "Maltese").count()

11258

#### 2. Return the number of distinct "target" languages.

In [37]:
dataset.select("target").distinct().count()

78

#### 3. Return the sample IDs (i.e., the "sample" field) of the first three games where the guessed language is correct (equal to the target one) ordered by date (descending), then by language (ascending), then by country (descending). 
(Hint: passing `truncate=False` to `show()` allows you to see the full output, otherwise you can simply use `collect()` instead) 

In [38]:
filtered = dataset.select("sample").filter(dataset["target"] == dataset["guess"])
filtered.orderBy(dataset["date"].desc(), dataset["target"].asc(), dataset["country"].desc()).limit(3).show(truncate=False)

+--------------------------------+
|sample                          |
+--------------------------------+
|fdf23d0a7063ba2fcef4b18eb7d57ad8|
|00b85faa8b878a14f8781be334deb137|
|1dd8e1883037c6305b87afe382c4feba|
+--------------------------------+



#### 4. Aggregate all games by country and "guess" language, counting the number of guesses for each group and return the frequencies of the two most frequent country/language combinations.

In [39]:
dataset.groupBy(["country", "guess"]).count().orderBy(desc("count")).select("count").limit(2).collect()

[Row(count=20932), Row(count=20780)]

#### 5 Sort the languages by decreasing overall percentage of correct guesses and return the first four languages. 
(Hint: `withColumnRenamed()` allows you to set the names of the generated columns and remember it is possible to `join()` Dataframes)

In [40]:
correct_c = dataset.filter(dataset["target"] == dataset["guess"]).groupBy("target").count().withColumnRenamed("count", "correct_guesses")
total_c = dataset.groupBy("target").count().withColumnRenamed("count", "total_guesses")
joined_df = correct_c.join(total_c, "target")
joined_df.select("target", (joined_df["correct_guesses"] / joined_df["total_guesses"]).alias("percentage"))\
    .orderBy(desc("percentage")).limit(4).show()

+-------+------------------+
| target|        percentage|
+-------+------------------+
| French|0.9617235377572363|
| German|0.9482122107988057|
|Italian|0.9191241444382873|
|Russian|0.9079549864183158|
+-------+------------------+



                                                                                

### 3.2 Spark SQL queries
We will now go over the same queries but using Spark SQL instead

In [41]:
dataset.createOrReplaceTempView("dataset")

In [42]:
%%sparksql
SELECT *
FROM dataset
LIMIT 3

0,1,2,3,4,5
choices,country,date,guess,sample,target
"['Maori', 'Mandarin', 'Norwegian', 'Tongan']",AU,2013-08-19,Norwegian,48f9c924e0d98c959d8a6f1862b3ce9a,Norwegian
"['Danish', 'Dinka', 'Khmer', 'Lao']",AU,2013-08-19,Dinka,af5e8f27cef9e689a070b8814dcc02c3,Dinka
"['German', 'Hungarian', 'Samoan', 'Turkish']",AU,2013-08-19,Turkish,509c36eb58dbce009ccf93f375358d53,Samoan


#### 1. Find the number of games where the guessed language and target language is Maltese.

In [43]:
%%sparksql
SELECT count(*) FROM dataset
WHERE target == "Maltese" 
AND guess == "Maltese"

0
count(1)
11258


#### 2. Return the number of distinct "target" languages.

In [44]:
%%sparksql
SELECT count(distinct(target))
FROM dataset

0
count(DISTINCT target)
78


#### 3. Return the sample IDs (i.e., the "sample" field) of the first three games where the guessed language is correct (equal to the target one) ordered by date (descending), then by language (ascending), then by country (descending).

In [45]:
%%sparksql
SELECT sample
FROM dataset
WHERE target = guess
ORDER BY date DESC, target ASC, country DESC
LIMIT 3

0
sample
fdf23d0a7063ba2fcef4b18eb7d57ad8
00b85faa8b878a14f8781be334deb137
1dd8e1883037c6305b87afe382c4feba


#### 4. Aggregate all games by country and "guess" language, counting the number of guesses for each group and return the frequencies of the two most frequent country/language combinations.

In [46]:
%%sparksql
SELECT count(guess)
FROM dataset 
GROUP BY country, guess
ORDER BY count(guess) desc
LIMIT 2

0
count(guess)
20932
20780


#### 5. Sort the languages by decreasing overall percentage of correct guesses and return the first four languages.

In [47]:
%%sparksql
WITH correct AS (
    SELECT target, count(*) AS correct_guesses
    FROM dataset
    WHERE target = guess
    GROUP BY target
    ),
total AS (
    SELECT target, count(*) as total_guesses
    FROM dataset
    GROUP BY target
    )
SELECT target, correct_guesses/total_guesses AS fract
FROM correct JOIN total USING(target)
ORDER BY correct_guesses/total_guesses DESC 
LIMIT 4

                                                                                

0,1
target,fract
French,0.9617235377572363
German,0.9482122107988057
Italian,0.9191241444382873
Russian,0.9079549864183158


## <center>4. Optional Exercise: PageRank</center>

The PageRank algorithm, named after Google's Larry Page, assigns a measure of importance to each node (page) in a graph based on the importance of incoming edges (links). The importance of each edge is, in turn, derived from the importance of the source node and its out-degree. PageRank was designed to rank web pages based on hyperlinks between pages, but it can be also used to rank scientific articles, or influential users in a social network.

The algorithm maintains two datasets: one collection of (*pageID*, *linkList*) elements containing the list of neighbors of each page, and one collection of (*pageID*, *rank*) elements containing the current rank for each page. The algorithm proceeds as follows:
1. Initialize each page's rank to $1.0$.
2. On each iteration, have page $x$ send a contribution of $\frac{rank(x)}{numNeighbors(x)}$ to its neighbors (the pages it has links to).
3. Set each page's rank to $0.15 + 0.85 \times contributionsReceived$.

The algorithm runs multiple iterations (of step 2 and 3) until it converges.

Implement the PageRank algorithm in Spark for a simple dataset, running the loop for a fixed number of iterations.

For instance, you can use "parallelize" for that as follows: 
```
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)])
```
where 1,2,3,4 represents ids of pages.

### 4.1 Use Spark RDDs

In [48]:
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)]).groupByKey().cache()

ranks = links.mapValues(lambda x: 1.0)

for i in range(10):
    contributions = links.join(ranks).flatMap(
        lambda xs: [(dest, xs[1][1]/len(list(xs[1][0]))) for dest in list(xs[1][0])]
    )
    ranks = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda v: 0.15 + 0.85*v)
    
ranks.collect()

[(2, 0.7568028566886496),
 (4, 0.3522676188962165),
 (1, 0.49149688216717613),
 (3, 0.49149688216717613)]

### 4.2 Use Spark DataFrames

In [49]:
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)])
links_df = spark.createDataFrame(links).toDF("page_id", "linked_id").cache()

In [50]:
links_c = links_df.groupBy("page_id").count()
ranks = links_df.selectExpr("page_id", "1.0 as rank")

for i in range(10):
    ranks = ranks.join(links_df, "page_id").join(links_c, "page_id").selectExpr(
        "*", "rank / count as contrib"
    ).groupBy("linked_id").sum("contrib").withColumnRenamed("sum(contrib)", "sum_contrib").selectExpr(
        "linked_id as page_id", "(0.15 + 0.85*sum_contrib) as rank"
    ).distinct()
    
ranks.show()

+-------+---------+
|page_id|     rank|
+-------+---------+
|      1|0.5070700|
|      3|0.5070700|
|      2|0.8035221|
|      4|0.3678407|
+-------+---------+



### 4.3 Use Spark SQL
Hint: you can use
```
new_df = spark.sql("... SQL query ...")
new_df.createOrReplaceTempView("new_table")
```
to perform a query inside a for loop and making the updated *new_table* available from SQL at every step.

In [51]:
links = sc.parallelize([(1, 2),(1, 4),(2, 1),(2, 3),(3, 2)])
links_df = spark.createDataFrame(links).toDF("page_id", "linked_id").cache()

links_df.createOrReplaceTempView("links")

In [53]:
# Calculate link counts
links_c = spark.sql("""
    SELECT page_id, COUNT(linked_id) AS count_links
    FROM links
    GROUP BY page_id
""")
links_c.createOrReplaceTempView("links_c")

# Initialize ranks
ranks = spark.sql("""
    SELECT page_id, 1.0 AS rank
    FROM links
    GROUP BY page_id
""")
ranks.createOrReplaceTempView("ranks")

# PageRank algorithm iterations
for i in range(15):
    new_ranks = spark.sql("""
        SELECT l.linked_id AS page_id, (0.15 + 0.85 * SUM(r.rank / c.count_links)) AS rank
        FROM links l
        JOIN ranks r ON l.page_id = r.page_id
        JOIN links_c c ON l.page_id = c.page_id
        GROUP BY l.linked_id
    """)

    # Drop the old 'ranks' view
    spark.catalog.dropTempView("ranks")

    # Create new 'ranks' view with updated data
    new_ranks.createOrReplaceTempView("ranks")

# Show final ranks
ranks = spark.sql("SELECT * FROM ranks")
ranks.show()

                                                                                

+-------+--------+
|page_id|    rank|
+-------+--------+
|      1|0.468064|
|      3|0.468064|
|      2|0.754215|
|      4|0.351405|
+-------+--------+

