In [1]:
# CIS 545

# Homework 2, Advanced Part: Iterative Spark Computations
### Worth 20 points in total

Building upon your experiences with graph data, we will now use Spark to compute PageRank.  Following our discussion of graphs, we will use an edge list, which is a variation of the adjacency list.

# Step 5: PageRank

Many of you may already know PageRank computation by its reputation:  it is used to measure the importance of a Web page.  (Contrary to popular belief, PageRank is named after Larry Page, not Web pages…)  PageRank is actually a tweaked version of a network centrality measure called *eigenvector centrality*.  One way to implement PageRank is as an iterative computation.  We take each graph node $x$ and in iteration 0 assign it a corresponding PageRank $p_x$:

$p_x^0= 1 / N$

where $N$ is the total number of nodes.

Now in each iteration $i$ we recompute:

$p_x^{(i)} = \alpha * \Sigma_{j \in B(x)} (1 / N_j) p_j^{(i-1)} + \beta$

![Graph](graph.png)

Where $B(x)$ is the set of nodes linking to node $x$, and $N_j$ is the outdegree of each such node $j$.  Typically, repeating the PageRank computation for a number of iterations (15 or so) results in convergence within an acceptable tolerance.  For this assignment we’ll assume $\beta = 0.15$ and $\alpha = 0.85$ (anecdoctally these are the most common values used in practice).

*Example*. In the figure to the right, nodes $j_1$ and $j_2$ represent the back-link set $B(x)$ for node $x$.  $N_{j1}$ is 3 and $N_{j2}$ is 2.  Thus in each iteration $i$, we recompute the PageRank score for $x$ by adding half of the PageRank score for $j_2$ and a third of the PageRank score of $j_3$ (both from the previous iteration $i-1$).

*Hint*.  Build some “helper” DataFrames.  We suggest at least 2 DataFrames, where the first is used the build the second, and the second is used in your solution:
1. a DataFrame with each from_node and the proportion of weight it transfers to each outgoing edge.  For instance, if the from_node is node j then the proportion of weight should be $1/N_j$.
2. a DataFrame, again with the from_node, each node it transfers weight to, and the proportion of weight computed in (1).  For instance, if the `from_node` is $j$ and the to_node is $x$, then the tuple should be $(j, x, 1/N_j)$.

*Submission*. See the external document for submission information.  Remember to first do the basic part of **Homework 2**.

## 5.1  Initialization and Marshalling

### 5.1.1 Spark setup and data load

Initialize PySpark as in the basic Homework 2.  Load `pr_graph.txt` as a text file with a single column.

In [2]:
# TODO: Read pr_graph.txt as SDF

import numpy as np
import pandas as pd
import networkx as nx

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
# os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/ipython3'

spark = SparkSession.builder.appName('Graphs-HW2-Adv').getOrCreate()

# YOUR CODE HERE
pr_sdf = spark.read.csv('pr_graph.txt',header=None)

In [3]:
pr_sdf.show(10)

+-----+
|  _c0|
+-----+
|1 2 0|
|1 3 0|
|1 4 0|
|1 5 0|
|2 3 0|
|2 1 0|
|2 5 0|
|3 2 0|
|3 3 0|
|4 5 0|
+-----+
only showing top 10 rows



In [4]:
# [CIS 545 Test Cases] (2 pts)


print('[CIS 545 Test Cases] (2 pts)')

[CIS 545 Test Cases] (2 pts)


### 5.1.2 Wrangling the Graph Data

As you might have noticed, the `pr_sdf` you loaded in 5.1.1 contains a single column of three values. In this section you will need to use the _SPLIT()_ function to update `pr_sdf` to have 3 columns, and you must cast all three columns to integers: <br>
* `from_node` (int)
* `to_node` (int)
* `reserved` (int)

_Hint_: The split function in Spark can be called directly from Spark SQL (`SELECT SPLIT(column,’del’). . .`) or by `import`ing `pyspark.sql.functions` and referring to the function in Python.

You may need to cast your columns since they start off as strings.  In Python, you can call `my_sdf.column.cast(‘type’)` to convert data types.  In SQL it’s `SELECT CAST(my_sdf.column AS type).`  

In [5]:
# TODO: Convert pr_sdf into (from_node, to_node, reserved) with integer fields

# YOUR CODE HERE
import pyspark.sql.functions

# df.withColumn("_tmp", split($"columnToSplit", "\\.")).select(
#   $"_tmp".getItem(0).as("col1"),
#   $"_tmp".getItem(1).as("col2"),
#   $"_tmp".getItem(2).as("col3")
# ).drop("_tmp")

pr_sdf = pr_sdf.select(F.split(pr_sdf._c0, ' ')[0].cast('int').alias('from_node'),
                       F.split(pr_sdf._c0, ' ')[1].cast('int').alias('to_node'),
                       F.split(pr_sdf._c0, ' ')[2].cast('int').alias('reserved'))


In [6]:
pr_sdf.show(10)

+---------+-------+--------+
|from_node|to_node|reserved|
+---------+-------+--------+
|        1|      2|       0|
|        1|      3|       0|
|        1|      4|       0|
|        1|      5|       0|
|        2|      3|       0|
|        2|      1|       0|
|        2|      5|       0|
|        3|      2|       0|
|        3|      3|       0|
|        4|      5|       0|
+---------+-------+--------+
only showing top 10 rows



In [7]:
pr_sdf.schema

StructType(List(StructField(from_node,IntegerType,true),StructField(to_node,IntegerType,true),StructField(reserved,IntegerType,true)))

In [8]:
# [CIS 545 Test Cases] (3 pts)

results = pr_sdf.take(20)

if 'from_node' not in pr_sdf.columns:
    raise KeyError('Unexpected column names')
if 'to_node' not in pr_sdf.columns:
    raise KeyError('Unexpected column names')


print('[CIS 545 Test Cases] (3 pts)')

[CIS 545 Test Cases] (3 pts)


## 5.2 Basic PageRank

Write the function `pagerank(G, num_iter)` which takes a graph DataFrame G corresponding to your graph, and runs for `num_iter` steps.  It should return a DataFrame with columns (`node_id`, `pagerank`).

Initialize your PageRank values for each node in the “base case”.  Then, in each iteration, use the helper DataFrames to compute PageRank scores for each node in the next iteration.

You will likely find it easier to express some of the computations in SparkSQL.  If you want to use spark.select, you may find it useful to use the Spark F.udf function to create functions that can be called over each row in the DataFrame.  You can create a function that returns a double as follows:

```
my_fn = F.udf(lambda x: f(x), DoubleType())
```

Then you can call it like:
```
	my_sdf.select(my_fn(my_arg)).alias(‘col_name’)
```

In [9]:
from pyspark.sql.functions import lit
schema = StructType([
    StructField("node_id", StringType(), True), StructField("pagerank", IntegerType(), True)
])

def pagerank(G, num_iter):
    G.createOrReplaceTempView('pr')
    graph1 = spark.sql('''
            SELECT from_node, 1/count(to_node) as weight_proportion
            FROM pr
            GROUP BY from_node
            ORDER BY from_node
            ''').cache()
    graph1.createOrReplaceTempView('gr1')
    graph2 = spark.sql('''
            SELECT pr.from_node, to_node, gr1.weight_proportion
            FROM pr
            LEFT JOIN gr1
            ON pr.from_node == gr1.from_node
            ORDER BY pr.from_node ASC, to_node ASC
            ''')
#graph2.schema
    matrix_g = pd.DataFrame(np.zeros((7,7)), columns=['1','2','3','4','5','6','7'],dtype=int)
    gr = graph2.toPandas()
    f = gr.from_node.values.tolist()
    t = gr.to_node.values.tolist()
    w = gr.weight_proportion.tolist()
    for i in range(0, len(f)):
        matrix_g.iloc[t[i]-1,f[i]-1]=w[i]
    gr = np.array(matrix_g.values.tolist())
    #print(matrix_g)

    step=0
    pagerank = np.array([1/7,1/7,1/7,1/7,1/7,1/7,1/7])
    while True:
        #print(G.dtypes, pagerank.dtypes)
        pagerank = 0.85*np.dot(gr,pagerank)+0.15
        step = step+1
        if step == num_iter:
            result = pd.DataFrame({'node_id':[1,2,3,4,5,6,7],'pagerank':pagerank})
            result = spark.createDataFrame(result)
            return result
#pagerank(pr_sdf, 5).show()


In [10]:
pagerank(pr_sdf, 5).orderBy("pagerank").show()

+-------+-------------------+
|node_id|           pagerank|
+-------+-------------------+
|      4|0.36084700241725365|
|      1| 0.4495011122882807|
|      6|   0.48955231046338|
|      3| 0.6585104089419872|
|      2| 0.6985616071170864|
|      5| 0.8185605449305454|
|      7| 0.8622351388414664|
+-------+-------------------+



In [11]:
# [CIS 545 Test Cases] (3 pts)


print('[CIS 545 Test Cases] (3 pts)')

[CIS 545 Test Cases] (3 pts)


### 5.3 Removal of Self-Loops

The existing graph has a few self-loops.  Let's see what happens if you remove them.  For this one, take `pr_sdf` and remove all self-edges, creating `pr_no_loops_sdf`.  Run `pagerank(pr_no_loops_sdf, 5)`, sort in decreasing order by pagerank, and put the results in a list `pageranks`.

In [12]:
# TODO: create pr_no_loops_sdf and feed it into pagerank.  
# The final result should be an ordered list of Rows (nodes and pageranks) called pageranks.
#pr_sdf.show()
from pyspark.sql.functions import desc
pr_sdf.createOrReplaceTempView('pr')
pr1 = spark.sql('''
            SELECT from_node, to_node
            FROM pr
            WHERE from_node <> to_node
            ''').cache()
pr1.createOrReplaceTempView('pr1')
graph1 = spark.sql('''
            SELECT from_node, 1/count(to_node) as weight_proportion
            FROM pr1
            GROUP BY from_node
            ORDER BY from_node
            ''').cache()
graph1.createOrReplaceTempView('gr1')
pr_no_loops_sdf = spark.sql('''
            SELECT pr1.from_node, pr1.to_node, weight_proportion
            FROM pr1
            LEFT JOIN gr1
            ON pr1.from_node == gr1.from_node
            ORDER BY pr1.from_node ASC, pr1.to_node ASC
            ''')
#pr_no_loops_sdf.show()

pageranks = pagerank(pr_no_loops_sdf, 5).orderBy(desc('pagerank')).select('pagerank').toPandas()
pageranks = pageranks['pagerank'].tolist()
# .orderBy(desc('pagerank')).show()
#pagerank = list(pagerank['pagerank'].sort_values(by=['pagerank'], ascending=False))

In [13]:
pageranks

[0.8792241643754263,
 0.7750296824993153,
 0.7306856585644919,
 0.5711990636189779,
 0.5176590027614371,
 0.4685776700442682,
 0.3953928831360832]

In [14]:
# [CIS 545 Test Cases] (2 pts)

if len(pageranks) != 7:
    raise ValueError('Should have 7 nodes!')
    

print('[CIS 545 Test Cases] (2 pts)')

[CIS 545 Test Cases] (2 pts)


# 6.1 Analysis of Graph Data

We will now use everything we have learned thus far to analyze a dataset. We will be using the yelp_review2.csv dataset we used for the basic part of this assignment. The goal of this section is to learn something about the distribution of reviewers. When given a large dataset such as this one, it is very useful to ask yourself questions like these to build effictive priors to use when designing machine learning models. 

### 6.1.1
First, load the `yelp_review2.csv` and `yelp_business.csv` you downloaded in the basic part of this assignment. Make sure they are avaliable as SQL tables as well. This should be the same code you wrote in the basic part.

In [15]:
# TODO: Load yelp_reviews_sdf and yelp_business_sdf and 
#       yelp_reviews, yelp_business Spark SQL Tables
# yelp_reviews_sdf = ...
# yelp_business_sdf = ...
# yelp_reviews_sdf.createOrReplaceTempView('yelp_reviews')
# yelp_business_sdf.createOrReplaceTempView('yelp_business')

yelp_reviews_sdf = spark.read.load('yelp_review2.csv', format='csv',header=True)
yelp_business_sdf = spark.read.load('yelp_business.csv', format='csv',header=True)

yelp_reviews_sdf.createOrReplaceTempView('yelp_reviews')
yelp_business_sdf.createOrReplaceTempView('yelp_business')

In [16]:
yelp_business_sdf.show(5)

+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+----------+------------+-----+------------+-------+--------------------+
|         business_id|                name|neighborhood|             address|          city|state|postal_code|  latitude|   longitude|stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+----------+------------+-----+------------+-------+--------------------+
|FYWN1wneV18bWNgQj...|    Dental by Design|        null|4855 E Warner Rd,...|     Ahwatukee|   AZ|      85044|33.3306902|-111.9785992|  4.0|          22|      1|Dentists;General ...|
|He-G7vWjzVUysIKrf...| Stephen Szabo Salon|        null|  3101 Washington Rd|      McMurray|   PA|      15317|40.2916853| -80.1048999|  3.0|          11|      1|Hair Stylists;Hai...|
|KQPW8lFf1y5BT2Mxi...|Western Motor Veh...|        null|6025 N 27th Ave, ...|       P

In [17]:
yelp_reviews_sdf.show(5)

+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|           review_id|             user_id|         business_id|stars|      date|                text|useful|funny|cool|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+
|vkVSCC7xljjrAI4UG...|bv2nCi5Qv5vroFiqK...|AEx2SYEUJmTxVVB18...|    5|2016-05-28|Super simple plac...|     0|    0|   0|
|n6QzIUObkYshz4dz2...|bv2nCi5Qv5vroFiqK...|VR6GpWIda3SfvPC-l...|    5|2016-05-28|Small unassuming ...|     0|    0|   0|
|MV3CcKScW05u5LVfF...|bv2nCi5Qv5vroFiqK...|CKC0-MOWMqoeWf6s-...|    5|2016-05-28|Lester's is locat...|     0|    0|   0|
|IXvOzsEMYtiJI0CAR...|bv2nCi5Qv5vroFiqK...|ACFtxLv8pGrrxMm6E...|    4|2016-05-28|Love coming here....|     0|    0|   0|
|L_9BTb55X0GDtThi6...|bv2nCi5Qv5vroFiqK...|s2I_Ni76bjJNK9yG6...|    4|2016-05-28|Had their chocola...|     0|    0|   0|
+--------------------+----------


Just like in the basic part, construct a *directed* graph with edges from users to businesses indicating reviews.

To do this, you should extract from `yelp_reviews_sdf` the `user_id` as the `from_node`, the `business_id` as the `to_node`, and the `stars` field as the `score`.  Put this into a dataframe called `review_graph_sdf`, and make it available as a table in SQL called `review_graph`.

Some of the values may be null; remove these for `user_id` (`from_node`) or `business_id` (`to_node`).

In [18]:
# TODO: Create yelp_reivews_sdf and review_graph SQL Table

# review_graph_sdf = ...
# ...

# YOUR CODE HERE
from pyspark.sql.functions import *
review_graph_sdf = spark.sql('''
            SELECT user_id as from_node, business_id as to_node, stars as score
            FROM yelp_reviews
            WHERE user_id IS NOT NULL AND business_id IS NOT NULL
            ''').cache()

In [19]:
review_graph_sdf.show(5)

+--------------------+--------------------+-----+
|           from_node|             to_node|score|
+--------------------+--------------------+-----+
|bv2nCi5Qv5vroFiqK...|AEx2SYEUJmTxVVB18...|    5|
|bv2nCi5Qv5vroFiqK...|VR6GpWIda3SfvPC-l...|    5|
|bv2nCi5Qv5vroFiqK...|CKC0-MOWMqoeWf6s-...|    5|
|bv2nCi5Qv5vroFiqK...|ACFtxLv8pGrrxMm6E...|    4|
|bv2nCi5Qv5vroFiqK...|s2I_Ni76bjJNK9yG6...|    4|
+--------------------+--------------------+-----+
only showing top 5 rows



Now, to make this process a bit faster we will limit our search to a subset of the review_graph.
Create a subset of `review_graph_sdf` called `subset_review_graph_sdf` containing only business_id's (`to_node`) from the city of `Vaughan`

_Hint_: `yelp_business_sdf` contains location information

In [20]:
# TODO: Create subset_review_graph_sdf and SQL table subset_review_graph

# subset_review_graph_sdf = ...
review_graph_sdf.createOrReplaceTempView('review_graph')
subset_review_graph_sdf = spark.sql('''
            SELECT *
            FROM review_graph
            WHERE to_node IN (SELECT business_id FROM yelp_business WHERE city == 'Vaughan')            
            ''').cache()

In [21]:
subset_review_graph_sdf.show(10)

+--------------------+--------------------+-----+
|           from_node|             to_node|score|
+--------------------+--------------------+-----+
|40OP-bcwLep3I1nHN...|WFB1fn8rWNukmmIfT...|    5|
|nOTl4aPC4tKHK35T3...|D-lzSVYyaobiguo7t...|    3|
|FEg8v92qx3kK4Hu4T...|AoW2A9bi4g_0AS2uK...|    5|
|FEg8v92qx3kK4Hu4T...|iGqGde420TlBrKcU4...|    3|
|JGZV0RT2Z7sBDv938...|NqKrfQmxethHjz59P...|    2|
|C1pox_TJtah6daXL-...|D0tYz9YSVTP5cZgxp...|    5|
|H6uC5xkIvsCUE4cZZ...|5GCaHoHo547U7wkco...|    1|
|G-LPOI3oW9T24kEy7...|R1wv7_R7i8aCD1Olu...|    1|
|ZESB_yTuNTchP-ReY...|S4QVuwfzn9T2-torV...|    1|
|ZESB_yTuNTchP-ReY...|WxfaCysgEvkBxUWo-...|    4|
+--------------------+--------------------+-----+
only showing top 10 rows



In [22]:
# [CIS 545 Test Cases] (3 pts)


print('[CIS 545 Test Cases] (3 pts)')

[CIS 545 Test Cases] (3 pts)


### 6.1.2 A greedy approach

The goal of this section is find out the minimum number of unique reviewers needed to review half of the businesses in Vaughan. 

First, check that you have the correct number of business below:

In [23]:
# TODO: Select all unique businesses from subset_review_graph and put in them in 
#       a new sdf called business_sdf keeping the column name to_node

# business_sdf = ...
subset_review_graph_sdf.createOrReplaceTempView('subset_review_graph')
business_sdf= spark.sql('''
            SELECT DISTINCT to_node 
            FROM subset_review_graph
            ''').cache()

In [24]:
business_sdf.show(10)

+--------------------+
|             to_node|
+--------------------+
|Q7oKidQDV52LEPfiw...|
|gxA-C5tbo0I1xxVyT...|
|RMjCnixEY5i12Ciqn...|
|JUFIUjadF1rCMQJ01...|
|CqTPLUHBM9AM3TEqP...|
|Us67uenyjsmaqqgZ6...|
|5GAXZ7gJ81TSR0-Q6...|
|NalTCdZL6gFsXxaQT...|
|iuya9nmV6ievrTNSO...|
|HtNjfGlSm5JmmcqaV...|
+--------------------+
only showing top 10 rows



In [25]:
# [CIS 545 Test Cases] (2 pts)

if business_sdf.count() != 768:
    raise ValueError('You do not have the correct number of businesses')

print('[CIS 545 Test Cases] (2 pts)')

[CIS 545 Test Cases] (2 pts)


Now, find the least number of people needed to cover at least 50% (384) of the businesses. There are many ways to do this, the most straightfoward is to take a greedy approach. First find the user with most reviewes from subset_review_graph, then remove all the businesses they reviewed and repeat the process. 

_Note_: This, like most of big data problems, might take a while to run.

_Hint_: If you pass the test case below your algorithm is optimal enough :)

In [26]:
# TODO: Find the minimum number of users (from_node) needed to cover at least 384
#       business. Store all those users' IDs in a new SDF called top_users_sdf
#       top_users_sdf should have 1 column called user_id

# ... 
# ...
# top_users_sdf = ...
schema = StructType([
            StructField("user_id", StringType(), True)
        ])

busi_num=0
top_users_sdf = spark.createDataFrame([{'user_id':''}],schema).cache()
subset_review_graph_sdf = subset_review_graph_sdf.select('from_node','to_node').cache()
subset_review_graph_sdf.groupBy('from_node').count().orderBy(desc('count')).show()
while True:
    find_top = subset_review_graph_sdf.groupBy('from_node').count().orderBy(desc('count')).take(1)
    most_review_user = find_top[0][0]
    sdf = spark.createDataFrame([{'user_id':most_review_user}],schema).cache()
    top_users_sdf = top_users_sdf.union(sdf).cache()
    busi_num = busi_num + find_top[0][1]
    print(busi_num)
    if busi_num>= 384 or find_top==None:
        break
    list_sdf = subset_review_graph_sdf.where(subset_review_graph_sdf['from_node']==most_review_user).cache()
    subset_review_graph_sdf = subset_review_graph_sdf.join(list_sdf, subset_review_graph_sdf.to_node==list_sdf.to_node, 'leftanti').cache()
    #subset_review_graph_sdf.show()
top_users_sdf = top_users_sdf.where(col('user_id')!='').cache()
# top_users_sdf.show()



+--------------------+-----+
|           from_node|count|
+--------------------+-----+
|CxDOIDnH8gp9KXzpB...|  160|
|A-IkCqnYosZa49XD9...|  132|
|Wu0yySWcHQ5tZ_59H...|   79|
|6-g1Aw92UoDijvc4k...|   77|
|WeVkkF5L39888IPPl...|   69|
|2CALR5iCk-ZkyFcKJ...|   59|
|qCYSdhsOzHBKT-V72...|   49|
|5snWEoA7Qsu-H7nY4...|   40|
|F3dKpfp0EpxkL-rDZ...|   38|
|l7sZTLRUBK0k3xjRT...|   36|
|rrVtOCkC50Bv_hA7j...|   34|
|Em4XpeXTKXVy4GjBL...|   31|
|0J9mtVJ4_QGsXYVK8...|   30|
|ludl6VlDreQcK4ZgO...|   30|
|EQecQy0e8i8caL5Zh...|   29|
|aR6vaoYj_SuTmz7EZ...|   28|
|MZSXGjRozn8mNMUtP...|   27|
|SJJWIZLPGXtGTy9UE...|   26|
|XChCfeJ6Yx2NDJIpI...|   25|
|65yB0ydGXOZ_-T6J_...|   24|
+--------------------+-----+
only showing top 20 rows

160
256
289
317
341
359
374
387


In [27]:
top_users_sdf.show(10)

+--------------------+
|             user_id|
+--------------------+
|CxDOIDnH8gp9KXzpB...|
|A-IkCqnYosZa49XD9...|
|6-g1Aw92UoDijvc4k...|
|WeVkkF5L39888IPPl...|
|2CALR5iCk-ZkyFcKJ...|
|Wu0yySWcHQ5tZ_59H...|
|F3dKpfp0EpxkL-rDZ...|
|ludl6VlDreQcK4ZgO...|
+--------------------+



In [28]:
# [CIS 545 Test Cases] (5 pts)

if top_users_sdf.count() > 9:
    raise ValueError('You can cover 384 business with less users')
    
print('[CIS 545 Test Cases] (5 pts)')

[CIS 545 Test Cases] (5 pts)
