# Homework 3, Part I: Spark (25 points)


You are expected to run this notebook on Google Cloud.  Please make sure the Jupyter menu above shows "PySpark" at the upper right, and not "Python 3".  If it says "Python 3" then go to the *Kernel* menu and choose *PySpark*.

## Step 1: "Big Data": Transitive Closure over the Yelp Network

In the previous assignment, you had built several primitives, including breadth-first search, to compute over the Yelp social graph.  Some of your computations were limited by the amount of computational resources available.  We will now try to explore the broader graph.

Hints: For early testing of your solutions to Step 1, you should probably just load a subset of this file into `graph_sdf` to validate your solutions.  Later go back, add the contents of the other files to graph_sdf, and re-run your solutions.  Make sure you rerun your code with the full `graph_sdf` before submitting. 

### Step 1.1 Spark Initialization in Google DataProc

Spark is already pre-initialized in DataProc.  You'll want to import some of the libraries, though.

In [1]:
# YOUR CODE HERE
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

### Step 1.2 Loading the Data

Let’s read our social graph data from Yelp, which forms a directed graph.  Here, the set of nodes is also not specified; the assumption is that the only nodes that matter are linked to other nodes, and thus their IDs will appear in the set of edges.  For example, to load the file `input.txt` into a Spark DataFrame, you can use lines like the following. Note that we aren't running just `.txt` files in this assignment!

```
# Read lines from the text file, from Google Storage bucket my-bucket
input_sdf = spark.read.format("txt").load("gs://my-bucket/mypath/input.txt")
```

We’ll use the suffix `_sdf` to represent “Spark DataFrame,” much as we used `_sdf` to denote a Spark DataFrame in Homework 2. You will load a `yelp_reviews_sdf` table from the `yelp_reviews2.csv` file from your HW2. You may find dataset descriptions [here](https://www.kaggle.com/yelp-dataset/yelp-dataset/version/6#yelp_review.csv).

You'll need to upload this to Google Storage.  Uploading, and subsequently reading, this file may take a while. 

Add a function call (e.g., to `selectExpr()`) to rename the `user` and `business` nodes to `from_node` and `to_node`, to convert these to strings, respectively.

Please name the tables as we have described in this assignment. This helps with our grading and test cases.

In [2]:
# TODO: load yelp_reviews_sdf
# Worth 3 points in total
    
# YOUR CODE HERE
# yelp_reviews_sdf = spark.read.load("gs://bucket-name-af/notebooks/yelp_review2.csv",
yelp_reviews_sdf = spark.read.load("/home/jovyan/work/hw2/data/yelp_review2.csv",
                     format="csv", inferSchema="true", header="true")
yelp_reviews_sdf = yelp_reviews_sdf.selectExpr("user_id as from_node", "business_id as to_node")

In [3]:
# Just to show what it looks like
yelp_reviews_sdf.printSchema()


root
 |-- from_node: string (nullable = true)
 |-- to_node: string (nullable = true)



In [4]:
# This cell intentionally left blank ;-)


Now  create a `graph_sdf` and save it in the Spark TempView so that you can call your transitive closure function on it here, regardless of whether that function is written in SparkSQL or Spark DataFrame function calls.

In [5]:
# TODO: Convert from yelp_reviews_sdf to a graph
# Worth 2 points

from pyspark.sql.types import *
import pyspark.sql.functions as F

# YOUR CODE HERE
graph_sdf = yelp_reviews_sdf

# YOUR CODE HERE
graph_sdf.createOrReplaceTempView('graph')

graph_sdf.cache()

DataFrame[from_node: string, to_node: string]

In [6]:
assert graph_sdf.count() > 0

### Step 1.3. Transitive Closure of the Graph

Now we would like to do the following: given a set of nodes, compute the set of all nodes reachable from these nodes.  This can be obtained via a type of transitive closure computation.

Define a function `transitive_closure(graph_sdf, origins_sdf, depth)` that returns a Spark DataFrame.

The result should be the set of all nodes from the `input graph_sdf` that are reachable via graph edges from the set of `origins_sdf`, in at most depth iterations (hops).  Both origins_sdf and the returned result should be DataFrames with a single attribute called node.  

You should treat the edges in the `graph_sdf` as undirected edges!  You should iterate until you have either hit the maximum depth or the set of newly discovered (frontier) nodes is empty.

Hints: this resembles your BFS algorithm from HW2, but you should take advantage of the opportunities to optimize.  Both the graph and the various node sets can easily have duplicates; you should make heavy use of duplicate removal (and repartition and cache) since you are only computing the set of reachable nodes. Also, to quickly check whether a DataFrame is empty, you can use something similar to the following. 

(You are not needed to update the following code block. Please start your code in the block containing `transitive_closure`

In [7]:
def sdf_is_empty(sdf):
    try:
        sdf.take(1)
        return False
    except:
        return True

Please insert your code for `transitive_closure` below.

In [8]:
# TODO: write the transitive_closure function
# Will be worth 10 points in total

from pyspark.sql.types import *

def transitive_closure(graph, origin, depth):
    # YOUR CODE HERE
    G = graph.dropDuplicates()
    G.createOrReplaceTempView('G')
    G = G.cache()
    
    # initialize frontier & final DFs based on origin input
    origin.createOrReplaceTempView('origin')
    
    final = origin
    final.createOrReplaceTempView('final')
    final = final.cache()

    frontier = origin
    frontier.createOrReplaceTempView('frontier')
    frontier = frontier.cache()
    
    # iterate up to depth, or until frontier is empty
    for i in range(0,depth):
        # find neighbors
        neigh1 = spark.sql('SELECT to_node as node FROM G '\
                            'WHERE EXISTS (SELECT * FROM frontier WHERE G.from_node=frontier.node)')
        neigh2 = spark.sql('SELECT from_node as node FROM G '\
                            'WHERE EXISTS (SELECT * FROM frontier WHERE G.to_node=frontier.node)')
        # add neighbors to final output
        final = final.union(neigh1)
        final = final.union(neigh2)
        final = final.dropDuplicates()
        final.createOrReplaceTempView('final')
        final = final.repartition(10)
        final = final.cache()
        # update frontier
        frontier = neigh1.union(neigh2)
        frontier = frontier.dropDuplicates()
        try: # see if frontier is empty, if so break the loop
            frontier.take(1)
            frontier.createOrReplaceTempView('frontier')
            frontier = frontier.repartition(10)
            frontier = frontier.cache()
        except:
            break
    
    return final

### Step 1.4 Testing transitive closure

Compute a Spark DataFrame called `nodes_sdf` as follows:
* Select from the graph those `from_node` values matching "0", "1", "2", "3", or "4".
* Rename the column to `node`


In [9]:
# TODO: create nodes_sdf
# Worth 5 points

# YOUR CODE HERE
nodes_sdf = spark.sql('SELECT DISTINCT from_node FROM graph '\
                      'WHERE from_node="0" OR from_node="1" OR from_node="2" OR from_node="3" OR from_node="4" ')

# YOUR CODE HERE
nodes_sdf = nodes_sdf.selectExpr("from_node as node")

In [10]:
# You should only have one column
assert len(nodes_sdf.columns) == 1

print (nodes_sdf.count())

5


Compute a Spark DataFrame called `reachable_sdf` with the results of `transitive_closure(graph_sdf, nodes_sdf, 3)`

In [11]:
# YOUR CODE HERE
reachable_sdf = transitive_closure(graph_sdf, nodes_sdf, 3)

In [12]:
## Test case that we get a result
assert reachable_sdf.take(1)[0].node != None

Add and run two code Cells that call `count()` and then `show()`, respectively, on `reachable_sdf`

In [13]:
reachable_sdf.count()

1236

In [14]:
reachable_sdf.show()

+--------------------+
|                node|
+--------------------+
| When I need it c...|
| Bottle and a Bow...|
| more kawfee plea...|
| we don't do that...|
| Sauerkraut-Marti...|
|     good riddance!"|
| looking very org...|
| if different.  T...|
| but don't go out...|
|          I would.."|
| and now after th...|
|                no."|
| boring right? I ...|
| and very persona...|
|                  28|
| but it's hard to...|
| but with the has...|
| I just took a ri...|
|                  35|
| you got to be at...|
+--------------------+
only showing top 20 rows

