In [1]:
# CIS 545

# Homework 3, Part I: Spark

## Spark Setup

Please make sure the Jupyter menu above shows "PySpark" at the upper right, and not "Python 3".  If it says "Python 3" then go to the *Kernel* menu and choose *PySpark*.


## Step 1

### Breadth-first Search over the NotreDame Graph

In the previous assignment, you had built several primitives, including breadth-first search, to compute over the Yelp social graph.  Some of your computations were limited by the amount of computational resources available.  We will now try to explore the broader graph.

### Read **prior** to starting assignment

You will see some of the following in the cells below, and you should treat them as follows:
- `[hw3-cellname]` This is a cell label. You should not delete these.
- For the CIS 545 test case cells with score additions, those are just for padding, they have no purpose but to let the autograder run more smoothly! The `score` variable **is not** what you will get in an autograder score
- try/catch: This is for the name of your dataset. Please read the instructions carefully.

Hints: For early testing of your solutions to Step 2, you should probably just load a subset of thisl file into `graph_sdf` to validate your solutions.  Later go back, add the contents of the other files to graph_sdf, and re-run your solutions.  Make sure you rerun your code with the full `graph_sdf` before submitting. 

### Google Code Spark Initialization

These codes are a setup for your reference.

In [2]:
!pip install pandas


[31mpyspark 2.3.2 requires py4j==0.10.7, which is not installed.[0m
[33mYou are using pip version 10.0.1, however version 19.0.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [3]:
# [hw3-import]
# Run this cell to import the various PySpark and OS operations

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
import pyspark.sql.functions as F
import pyspark.sql.functions as f

import pandas as pd
import logging
import os

In [4]:
# [hw3-sparkcheck]
# Just a check to make sure you have the pyspark kernel.

try:
    if(spark == None): 
        raise ValueError("PySpark Kernel not ready! ")
        # The following sentence only run on local machine. 
#         spark = SparkSession.builder.appName('Graphs-HW3').getOrCreate()
except NameError:
    raise ValueError("PySpark Kernel not ready! ")
    # The following sentence only run on local machine. 
#     spark = SparkSession.builder.appName('Graphs-HW3').getOrCreate()

In [5]:
# This is a padding cell for Spark operation

In [6]:
# [hw3-pad]
# This cell is for grading purposes only!

score = 0 # for manual grading

print("[CIS 545 Homework 3] Test Case Padding")

[CIS 545 Homework 3] Test Case Padding


### Step 1.1. Loading the Data

Let’s read our graph data from NotreDame, which forms a directed graph.  Here, the set of nodes is also not specified; the assumption is that the only nodes that matter are linked to other nodes, and thus their IDs will appear in the set of edges.  For example, to load the file `input.txt` into a Spark DataFrame, you can use lines like the following. Note that we aren't running just `.txt` files in this assignment!

```
# Read lines from the text file, from Google Storage bucket my-bucket
input_sdf = spark.read.format("txt").load("gs://my-bucket/mypath/input.txt")
```

We’ll use the suffix `_sdf` to represent “Spark DataFrame,” much as we used `_sdf` to denote a Spark DataFrame in Homework 2. You will load a `ndgraph_sdf` table from the `web-NotreDame.csv` file. 

Downloading these files may take a while, so don’t worry. See if you can further add a function call (e.g., to selectExpr()) to rename the edges to `from_node` and `to_node`, to convert these to integers, respectively. Remove duplicates to create `graph_sdf` with all data.

Please name the tables as we have described in this assignment. This helps with our grading and test cases.

In [7]:
# [hw3-filepath]
# We'll use the variable dataset to store the filepath for your csv

try:
    # This is how you should load your data
    dataset = 'web-NotreDame.csv'
except:
    # Note that this is not going to be a valid link until we grade!
    dataset = 'https://s3.amazonaws.com/cis545-hw3data/web-NotreDame.csv'

Above, note that we stored `dataset` as the name of your CSV. You should use the variable dataset below when importing, in lieu of a filepath directly. If you need to change it, you may change the allocation of the variable below.

Note that you should not change the S3 link, as doing so (or not using the variable `dataset` below) will lead to issues autograding your homework.

In [8]:
# [hw3-importdata]
# Import your dataset

# YOUR CODE HERE
ndgraph_sdf = spark.read.format("csv").load("gs://bucket-name_rj16/notebooks/web-NotreDame.csv", header=True)
ndgraph_sdf.createOrReplaceTempView('ndgraph')

ndgraph_sdf = spark.sql('select from as from_node, to as to_node from ndgraph')
ndgraph_sdf = ndgraph_sdf.withColumn("from_node", ndgraph_sdf["from_node"].cast(IntegerType()))
ndgraph_sdf = ndgraph_sdf.withColumn("to_node", ndgraph_sdf["to_node"].cast(IntegerType()))
ndgraph_sdf.createOrReplaceTempView('ndgraph')

The following cells are used for grading and data entry purposes, if necessary. Please do not edit the cells (although it is allowed to be edited). If you must, add cells **below** them.

In [9]:
# [hw3-dataload]
# Do not edit!

You may edit cells below **here**.

In [10]:
# [hw3-printschema]
# View the schema presented 

ndgraph_sdf.printSchema()

root
 |-- from_node: integer (nullable = true)
 |-- to_node: integer (nullable = true)



In [11]:
# [hw3-showndgraph]
# View the output of ndgraph. You should see your correct columns

ndgraph_sdf.show()

+---------+-------+
|from_node|to_node|
+---------+-------+
|        0|      0|
|        0|      1|
|        0|      2|
|        0|      3|
|        0|      4|
|        0|      5|
|        0|      6|
|        0|      7|
|        0|      8|
|        0|      9|
|        0|     10|
|        0|     11|
|        0|     12|
|        0|     13|
|        0|     14|
|        0|     15|
|        0|     16|
|        1|      0|
|        1|      7|
|        1|     17|
+---------+-------+
only showing top 20 rows



In [12]:
# [test-colcount]
# [CIS 545 Homework 3] Test Case (test case pad below!)

assert (len(ndgraph_sdf.columns) == 2)


In [13]:
# [CIS 545 Test Cases] (2 pts)
print("[CIS 545 Homework 3] Test Case - 2 points")
score = score + 2

[CIS 545 Homework 3] Test Case - 2 points


In [14]:
# [test-colset]
# [CIS 545 Homework 3] Test Case


In [15]:
# [CIS 545 Test Cases] (5 pts)
print("[CIS 545 Homework 3] Test Case - 5 points")
score = score + 5

[CIS 545 Homework 3] Test Case - 5 points


You will want to create a `graph_sdf` and save it in the Spark TempView so that you can call your transitive closure function on it here.

Hints: For early testing of your solutions to Step 2, you should probably just load a subset of this file into `graph_sdf` to validate your solutions.  Later go back, add the contents of the other files to graph_sdf, and re-run your solutions.  Make sure you rerun your code with the full `graph_sdf` before submitting. 

In [16]:
# [hw3-pysparkF]
# Import PySpark types and functions

from pyspark.sql.types import *
import pyspark.sql.functions as F

In [17]:
# [hw3-graph]
# Create graph_sdf in ndgraph_sdf and create a SQL table if you decide to use it later.

# YOUR CODE HERE
graph_sdf = ndgraph_sdf.distinct()
#graph_sdf = ndgraph_sdf[ndgraph_sdf['from_node']<150]
graph_sdf.createOrReplaceTempView('graph')


In [18]:
graph_sdf.count()

1497134

In [19]:
# [hw3-padding]
# Just a padding cell
print("[CIS 545 Homework 3] Padding")

[CIS 545 Homework 3] Padding


In [20]:
# [hw3-cache]
# You should now cache for efficiency
graph_sdf.cache()

DataFrame[from_node: int, to_node: int]

### Step 1.2. Transitive Closure of the Graph

Now we would like to do the following: given a set of nodes, compute the set of all nodes reachable from these nodes.  This can be obtained via a type of transitive closure computation. Google it if you don't know Transitive Closure. 

Define a function `transitive_closure(graph_sdf, origins_sdf, depth)` that returns a Spark DataFrame.

The result should be the set of all nodes from the `input graph_sdf` that are reachable via graph edges from the set of `origins_sdf`, in at most depth iterations (hops).  Both origins_sdf and the returned result should be DataFrames with a single attribute called node.  

You should treat the edges in the `graph_sdf` as directed edges!  You should iterate until you have either hit the maximum depth or the set of newly discovered (frontier) nodes is empty.

Hints: this resembles your BFS algorithm from HW2, but you should take advantage of the opportunities to optimize.  Both the graph and the various node sets can easily have duplicates; you should make heavy use of duplicate removal (and repartition and cache) since you are only computing the set of reachable nodes. Also, to quickly check whether a DataFrame is empty, you can use something similar to the following. 

(You are not needed to update the following code block. Please start your code in the block containing `transitive_closure`

In [21]:
# [hw3-sdfempty]
# This code block may be used in transitive_closure

def sdf_is_empty(sdf):
    try:
        sdf.take(1)
        return False
    except:
        return True

Please insert your code for `transitive_closure` below. You may find repartitioning or caching valuable in speeding up your queries.

In [22]:
# [hw3-transitive]
# Write your transitive closure function here. Take note of the types of the inputs!

def transitive_closure(graph, origin, depth):
    G = graph
    schema = StructType([
        StructField("node", IntegerType(), True),
        StructField("distance", IntegerType(), False)])
    origin_nodes = []
    for element in origin.select('node').collect():        
        origin_nodes.append((element.node, 0))
    #print(my_list_of_maps)
    frontier_sdf = spark.createDataFrame(origin_nodes, schema)
    if depth == 0:
        return frontier_sdf
    visited_sdf = spark.createDataFrame([], schema)
    G.createOrReplaceTempView('graph')
    #G_sdf = spark.sql(
    #    'SELECT from_node, to_node FROM graph UNION SELECT to_node, from_node FROM graph')
    return_sdf = frontier_sdf
    for i in range(0, depth):
        #print(return_sdf.show(20))
        visited_sdf = visited_sdf.union(frontier_sdf)
        edge_sdf = frontier_sdf.join(G, frontier_sdf.node == G.from_node)
        next_sdf = edge_sdf.select(edge_sdf.to_node.alias('node')).withColumn("distance", F.lit(i + 1))
        next_sdf = next_sdf.join(visited_sdf, visited_sdf.node == next_sdf.node, "leftanti")
        return_sdf = return_sdf.union(next_sdf).distinct()
        #print(next_frontier_sdf.show(5))
        frontier_sdf = next_sdf.distinct()
        #print(next_frontier_sdf.distinct())
        #print(sdf_is_empty(frontier_sdf))
        #print(frontier_sdf.select('node').collect())
        if frontier_sdf.select('node').collect()==[] :
            return return_sdf
    return return_sdf


### Step 1.3 Testing transitive closure

Compute a Spark DataFrame called `nodes_sdf` as follows:
* Select `from_node` id 113, 114, 115 and 116
* Rename the column to `node`


In [23]:
# [hw3-nodes]
# Create nodes_sdf as described above

nodes_sdf = spark.sql("SELECT from_node AS node FROM graph WHERE from_node>=113 AND from_node<=116")
nodes_sdf = nodes_sdf.distinct()

In [24]:
# [hw3-nodeshow]
# View nodes_sdf

nodes_sdf.show()

+----+
|node|
+----+
| 115|
| 114|
| 113|
| 116|
+----+



In [25]:
for i in nodes_sdf.select('node').collect():
    print(i.node)

115
114
113
116


In [26]:
# [test-nodes]
# Testing nodes setup


In [27]:
# [CIS 545 Test Cases] (5 pts)
print("[CIS 545 Homework 3] Test Case - 5 points")
score = score + 5

[CIS 545 Homework 3] Test Case - 5 points


Compute a Spark DataFrame called `reachable_sdf` with the results of `transitive_closure(graph_sdf, nodes_sdf, 3)` over the NotreDame dataset. 

In [28]:
# [hw3-reachable]
# Create reachable_sdf using transitive closure
reachable_sdf = transitive_closure(graph_sdf, nodes_sdf, 3)

In [29]:
# [test-getresult]
# This is a no-points test case, making sure we get a result
assert (reachable_sdf.take(1)[0].node != None)

In [30]:
# [hw3-padding]
# Just a padding cell

print("[CIS 545 Homework 3] Padding")

[CIS 545 Homework 3] Padding


In [31]:
reachable_sdf.count()

426

Run the code cell to call `show()` and `count` on `reachable_sdf`

In [32]:
# [hw3-showreach]
# Show reachacble_sdf

reachable_sdf.show(20)

+-----+--------+
| node|distance|
+-----+--------+
| 4383|       2|
| 3146|       2|
|11759|       3|
| 1124|       1|
| 3359|       2|
|   16|       3|
|11762|       3|
|14424|       3|
|12183|       3|
| 4366|       2|
| 3150|       2|
| 3297|       2|
|12311|       3|
| 1141|       1|
| 4363|       2|
| 3291|       2|
|12333|       3|
| 4386|       2|
| 4368|       2|
| 3244|       2|
+-----+--------+
only showing top 20 rows



In [33]:
# [hw3-count]
# Show the count of reachable_sdf

reachable_count = reachable_sdf.count()
reachable_count

426

In [34]:
# [test-reachable1]
# Tests on reachable_sdf


In [35]:
# [CIS 545 Test Cases] (5 pts)
print("[CIS 545 Homework 3] Test Case - 5 points")
score = score + 5

[CIS 545 Homework 3] Test Case - 5 points


In [36]:
# [test-reachable1]
# Tests on reachable_sdf


In [37]:
# [CIS 545 Test Cases] (5 pts)
print("[CIS 545 Homework 3] Test Case - 5 points")
score = score + 5

[CIS 545 Homework 3] Test Case - 5 points
