

# Step 5: PageRank

Many of you may already know PageRank computation by its reputation:  it is used to measure the importance of a Web page.  (Contrary to popular belief, PageRank is named after Larry Page, not Web pages…)  PageRank is actually a tweaked version of a network centrality measure called *eigenvector centrality*.  One way to implement PageRank is as an iterative computation.  We take each graph node $x$ and in iteration 0 assign it a corresponding PageRank $p_x$:

$p_x^0= 1 / N$

where $N$ is the total number of nodes.

Now in each iteration $i$ we recompute:

$p_x^{(i)} = \alpha * \Sigma_{j \in B(x)} (1 / N_j) p_j^{(i-1)} + \beta$

![Graph](graph.png)

Where $B(x)$ is the set of nodes linking to node $x$, and $N_j$ is the outdegree of each such node $j$.  Typically, repeating the PageRank computation for a number of iterations (15 or so) results in convergence within an acceptable tolerance.  For this assignment we’ll assume $\beta = 0.15$ and $\alpha = 0.85$ (anecdoctally these are the most common values used in practice).

*Example*. In the figure to the right, nodes $j_1$ and $j_2$ represent the back-link set $B(x)$ for node $x$.  $N_{j1}$ is 3 and $N_{j2}$ is 2.  Thus in each iteration $i$, we recompute the PageRank score for $x$ by adding half of the PageRank score for $j_2$ and a third of the PageRank score of $j_3$ (both from the previous iteration $i-1$).

*Hint*.  Build some “helper” DataFrames.  We suggest at least 2 DataFrames, where the first is used the build the second, and the second is used in your solution:
1. a DataFrame with each from_node and the proportion of weight it transfers to each outgoing edge.  For instance, if the from_node is node j then the proportion of weight should be $1/N_j$.
2. a DataFrame, again with the from_node, each node it transfers weight to, and the proportion of weight computed in (1).  For instance, if the `from_node` is $j$ and the to_node is $x$, then the tuple should be $(j, x, 1/N_j)$.

*Submission*. See the external document for submission information.  Remember to first do the basic part of **Homework 2**.

## 5.1  Initialization and Marshalling

### 5.1.1 Spark setup and data load

Initialize PySpark as in the basic Homework 2.  Load `pr_graph.txt` as a text file with a single column.

In [1]:
import numpy as np
import pandas as pd
import networkx as nx

# TODO: Connect to Spark session as in Homework 2.
# Then load the file

# Worth 5 points


# YOUR CODE HERE
raise NotImplementedError()

NotImplementedError: 

In [None]:
if pr_sdf.count() != 19:
    raise ValueError('Unexpected graph size')

### 5.1.2 Wrangling the Graph Data

There are 3 columns in the file:
* `from_node`
* `to_node`
* `reserved`

You can ignore `reserved`.  Use Spark's *split()* function to update `pr_sdf` to have two columns, `from_node` and `to_node`.  Make each an integer.

The split function in Spark, works similarly to the one in Python.  It can be called directly from Spark SQL (`select split(x,’ ’) …`) or by `import`ing `pyspark.sql.functions` and referring to the function in Python.

You may need to cast your columns since they start off as strings.  In Python, you can call `my_sdf.column.cast(‘type’)` to convert data types.  In SQL it’s `SELECT CAST(my_sdf.column AS type).`  

In [None]:
# TODO: Convert pr_sdf into (from_node, to_node) with integer fields
# Worth 5 points

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
results = pr_sdf.take(20)

if 'from_node' not in pr_sdf.columns:
    raise KeyError('Unexpected column names')
if 'to_node' not in pr_sdf.columns:
    raise KeyError('Unexpected column names')


## 5.2 Basic PageRank

Write the function `pagerank(G, num_iter)` which takes a graph DataFrame G corresponding to your graph, and runs for `num_iter` steps.  It should return a DataFrame with columns (`node_id`, `pagerank`).

Initialize your PageRank values for each node in the “base case”.  Then, in each iteration, use the helper DataFrames to compute PageRank scores for each node in the next iteration.

You will likely find it easier to express some of the computations in SparkSQL.  If you want to use spark.select, you may find it useful to use the Spark F.udf function to create functions that can be called over each row in the DataFrame.  You can create a function that returns a double as follows:

```
my_fn = F.udf(lambda x: f(x), DoubleType())
```

Then you can call it like:
```
	my_sdf.select(my_fn(my_arg)).alias(‘col_name’)
```

In [None]:
# TODO: write the function
# Worth 10 points
def pagerank(G, num_iter):
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
pagerank(pr_sdf, 5).orderBy("pagerank").show()

### 5.3 Removal of Self-Loops

The existing graph has a few self-loops.  Let's see what happens if you remove them.  For this one, take `pr_sdf` and remove all self-edges, creating `pr_no_loops_sdf`.  Run `pagerank(pr_no_loops_sdf, 5)`, sort in decreasing order by pagerank, and put the results in a list `pageranks`.

In [None]:
# TODO: create pr_no_loops_sdf and feed it into pagerank.  
# The final result should be an ordered list of Rows (nodes and pageranks) called pageranks.

# YOUR CODE HERE
raise NotImplementedError()
pageranks

In [None]:
if len(pageranks) != 7:
    raise ValueError('Should have 7 nodes!')
    