# Homework 3, Part I: Spark (25 points)


You are expected to run this notebook on Google Cloud.  Please make sure the Jupyter menu above shows "PySpark" at the upper right, and not "Python 3".  If it says "Python 3" then go to the *Kernel* menu and choose *PySpark*.

## Step 1: "Big Data": Transitive Closure over the Yelp Network

In the previous assignment, you had built several primitives, including breadth-first search, to compute over the Yelp social graph.  Some of your computations were limited by the amount of computational resources available.  We will now try to explore the broader graph.

Hints: For early testing of your solutions to Step 1, you should probably just load a subset of this file into `graph_sdf` to validate your solutions.  Later go back, add the contents of the other files to graph_sdf, and re-run your solutions.  Make sure you rerun your code with the full `graph_sdf` before submitting. 

### Step 1.1 Spark Initialization in Google DataProc

Spark is already pre-initialized in DataProc.  You'll want to import some of the libraries, though.

In [1]:
from pyspark.sql import SparkSession
# from pyspark import SparkContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import mean, count, col

import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
# os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/ipython3'

spark = SparkSession.builder.appName('Graphs-HW2').getOrCreate()

# spark = SparkContext(appName="PythonTransitiveClosure")

### Step 1.2 Loading the Data

Let’s read our social graph data from Yelp, which forms a directed graph.  Here, the set of nodes is also not specified; the assumption is that the only nodes that matter are linked to other nodes, and thus their IDs will appear in the set of edges.  For example, to load the file `input.txt` into a Spark DataFrame, you can use lines like the following. Note that we aren't running just `.txt` files in this assignment!

```
# Read lines from the text file, from Google Storage bucket my-bucket
input_sdf = spark.read.format("txt").load("gs://my-bucket/mypath/input.txt")
```

We’ll use the suffix `_sdf` to represent “Spark DataFrame,” much as we used `_sdf` to denote a Spark DataFrame in Homework 2. You will load a `yelp_reviews_sdf` table from the `yelp_reviews2.csv` file from your HW2. You may find dataset descriptions [here](https://www.kaggle.com/yelp-dataset/yelp-dataset/version/6#yelp_review.csv).

You'll need to upload this to Google Storage.  Uploading, and subsequently reading, this file may take a while. 

Add a function call (e.g., to `selectExpr()`) to rename the `user` and `business` nodes to `from_node` and `to_node`, to convert these to strings, respectively.

Please name the tables as we have described in this assignment. This helps with our grading and test cases.

In [2]:
# TODO: load yelp_reviews_sdf
# Worth 3 points in total

# Read lines from the text file, from Google Storage bucket my-bucket
# input_sdf = spark.read.format("txt").load("gs://my-bucket/mypath/input.txt")

yelp_reviews_sdf = spark.read.option("header", "true").load('./data/yelp_review2.csv', format="csv")



In [3]:
# Just to show what it looks like
yelp_reviews_sdf.printSchema()


root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- cool: string (nullable = true)



In [4]:
# This cell intentionally left blank ;-)


Now  create a `graph_sdf` and save it in the Spark TempView so that you can call your transitive closure function on it here, regardless of whether that function is written in SparkSQL or Spark DataFrame function calls.

In [5]:
# TODO: Convert from yelp_reviews_sdf to a graph
# Worth 2 points

from pyspark.sql.types import *
import pyspark.sql.functions as F

# YOUR CODE HERE
graph_sdf = yelp_reviews_sdf.select(col("user_id").alias("from_node"),col("business_id").alias("to_node"))
graph_sdf.createOrReplaceTempView('graph')
graph_sdf.cache()

DataFrame[from_node: string, to_node: string]

In [6]:
assert graph_sdf.count() > 0

### Step 1.3. Transitive Closure of the Graph

Now we would like to do the following: given a set of nodes, compute the set of all nodes reachable from these nodes.  This can be obtained via a type of transitive closure computation.

Define a function `transitive_closure(graph_sdf, origins_sdf, depth)` that returns a Spark DataFrame.

The result should be the set of all nodes from the `input graph_sdf` that are reachable via graph edges from the set of `origins_sdf`, in at most depth iterations (hops).  Both origins_sdf and the returned result should be DataFrames with a single attribute called node.  

You should treat the edges in the `graph_sdf` as undirected edges!  You should iterate until you have either hit the maximum depth or the set of newly discovered (frontier) nodes is empty.

Hints: this resembles your BFS algorithm from HW2, but you should take advantage of the opportunities to optimize.  Both the graph and the various node sets can easily have duplicates; you should make heavy use of duplicate removal (and repartition and cache) since you are only computing the set of reachable nodes. Also, to quickly check whether a DataFrame is empty, you can use something similar to the following. 

(You are not needed to update the following code block. Please start your code in the block containing `transitive_closure`

In [7]:
def sdf_is_empty(sdf):
    try:
        sdf.take(1)
        return False
    except:
        return True

In [8]:
graph_sdf.show()



+--------------------+--------------------+
|           from_node|             to_node|
+--------------------+--------------------+
|bv2nCi5Qv5vroFiqK...|AEx2SYEUJmTxVVB18...|
|bv2nCi5Qv5vroFiqK...|VR6GpWIda3SfvPC-l...|
|bv2nCi5Qv5vroFiqK...|CKC0-MOWMqoeWf6s-...|
|bv2nCi5Qv5vroFiqK...|ACFtxLv8pGrrxMm6E...|
|bv2nCi5Qv5vroFiqK...|s2I_Ni76bjJNK9yG6...|
|_4iMDXbXZ1p1ONG29...|8QWPlVQ6D-OExqXoa...|
|u0LXt3Uea_GidxRW1...|9_CGhHMz8698M9-Pk...|
|u0LXt3Uea_GidxRW1...|gkCorLgPyQLsptTHa...|
|u0LXt3Uea_GidxRW1...|5r6-G9C4YLbC7Ziz5...|
|u0LXt3Uea_GidxRW1...|fDF_o2JPU8BR1Gya-...|
|u0LXt3Uea_GidxRW1...|z8oIoCT1cXz7gZP5G...|
|u0LXt3Uea_GidxRW1...|XWTPNfskXoUL-Lf32...|
|u0LXt3Uea_GidxRW1...|13nKUHH-uEUXVZylg...|
|u0LXt3Uea_GidxRW1...|RtUvSWO_UZ8V3Wpj0...|
|u0LXt3Uea_GidxRW1...|Aov96CM4FZAXeZvKt...|
|u0LXt3Uea_GidxRW1...|0W4lkclzZThpx3V65...|
|u0LXt3Uea_GidxRW1...|fdnNZMk1NP7ZhL-YM...|
|u0LXt3Uea_GidxRW1...|PFPUMF38-lraKzLcT...|
|u0LXt3Uea_GidxRW1...|oWTn2IzrprsRkPfUL...|
|u0LXt3Uea_GidxRW1...|zgQHtqX0gq

Please insert your code for `transitive_closure` below.

In [9]:
test_g_sdf = spark.read.option("header", "true").load('./data/test_g1.csv', format="csv")

In [10]:
test_g_sdf.show()

+---------+-------+
|from_node|to_node|
+---------+-------+
|        1|      2|
|        1|      3|
|        1|      4|
|        1|      5|
|        2|      3|
|        2|      5|
|        3|      2|
|        4|      5|
|        5|      1|
|        5|      6|
|        5|      7|
|        6|      7|
|        7|      6|
|        7|      2|
|        7|      7|
|        5|      4|
+---------+-------+



In [11]:
# TODO: write the transitive_closure function
# Will be worth 10 points in total

from pyspark.sql.types import *


def transitive_closure(graph_sdf,  origins_sdf, depth):
    
    graph_sdf.cache()
    
    # create undirected Graph
    graph_sdf = graph_sdf.select('from_node', 'to_node')
    reversed_graph = graph_sdf.select(graph_sdf.to_node.alias('from_node'), graph_sdf.from_node.alias('to_node'))
    graph_sdf = graph_sdf.unionAll(reversed_graph)
    
    # initialization 
    frontier_sdf =  origins_sdf
    reachable_sdf =  origins_sdf.cache()
    print('origins_sdf = ')
    origins_sdf.show()
    i = 0
    while( (not sdf_is_empty(frontier_sdf)) and i < depth):
        # get new_frontier
        new_frontier = graph_sdf.join(frontier_sdf, graph_sdf.from_node == frontier_sdf.node).select('to_node').distinct()
       
        # remove reached
        new_frontier = new_frontier.join(reachable_sdf, new_frontier.to_node == reachable_sdf.node,'leftanti').withColumnRenamed('to_node','node')
        frontier_sdf = new_frontier.cache()
        
        # update reachable
        reachable_sdf = reachable_sdf.union(new_frontier).cache()
        i += 1
        
    graph_sdf.unpersist()
    reachable_sdf.show()
    return reachable_sdf.distinct()


### Step 1.4 Testing transitive closure

Compute a Spark DataFrame called `nodes_sdf` as follows:
* Select from the graph those `from_node` values matching "0", "1", "2", "3", or "4".
* Rename the column to `node`


In [12]:
# TODO: create nodes_sdf
# Worth 5 points

# YOUR CODE HERE
nodes_sdf = spark.sql('SELECT * FROM graph WHERE from_node = 0 OR from_node = 1 OR from_node = 2 OR from_node = 3 OR from_node = 4')
nodes_sdf = nodes_sdf.select(col("from_node").alias("node")).distinct()
nodes_sdf = nodes_sdf.filter(col("node") != '000')
nodes_sdf.show()

+----+
|node|
+----+
|   3|
|   0|
|   1|
|   4|
|   2|
+----+



In [13]:
# You should only have one column
assert len(nodes_sdf.columns) == 1

print (nodes_sdf.count())

5


Compute a Spark DataFrame called `reachable_sdf` with the results of `transitive_closure(graph_sdf, nodes_sdf, 3)`

In [14]:
# YOUR CODE HERE
reachable_sdf = transitive_closure(graph_sdf, nodes_sdf, 3)

origins_sdf = 
+----+
|node|
+----+
|   3|
|   0|
|   1|
|   4|
|   2|
+----+

+--------------------+
|                node|
+--------------------+
|                   3|
|                   0|
|                   1|
|                   4|
|                   2|
| but I've only be...|
| fired her becaus...|
| neck tattoo girl...|
| bizarre and beau...|
| and I finally ch...|
| I'm only covered...|
| fresh squeezed b...|
| but they'll stil...|
| When I need it c...|
| fine beer on tap...|
|                   7|
| right?  So we bo...|
| a reputation tha...|
| annoyed or what....|
| some 3.1 million...|
+--------------------+
only showing top 20 rows



In [15]:
## Test case that we get a result
assert reachable_sdf.take(1)[0].node != None

Add and run two code Cells that call `count()` and then `show()`, respectively, on `reachable_sdf`

In [16]:
reachable_sdf.count()

1236

In [17]:
reachable_sdf.show(1000)

+--------------------+
|                node|
+--------------------+
| but I've only be...|
| fired her becaus...|
| neck tattoo girl...|
| I had no idea......|
| I think its quit...|
| bizarre and beau...|
| and I finally ch...|
| I'm only covered...|
| fresh squeezed b...|
| but they'll stil...|
| When I need it c...|
| fine beer on tap...|
|                   7|
| right?  So we bo...|
| but for God sake...|
| a reputation tha...|
| annoyed or what....|
| some 3.1 million...|
|until you walk a ...|
| even ones who we...|
| then how come th...|
| maybe you should...|
| Dutch Bros is go...|
| I want a glass o...|
| live music and a...|
|     sushi schemin'"|
| Bottle and a Bow...|
| my friends."" An...|
| the Foundation o...|
| they'll stay and...|
|              then."|
| shame on -- sham...|
| I think you'll l...|
| with locally sou...|
| great flavour an...|
| we found our par...|
|                  15|
|  # 13- the Origi...|
| I ate half of it...|
| invites them to ...|
| try their