Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

---

### Step 2.1 Initializing a Connection to Spark

We'll open a connection to Spark as follows. Note that Spark has multiple interfaces, as you will see if you look at sample code elsewhere. `SparkSession` is the “most modern” one and we’ll be using it for this course.  From `SparkSession`, you can load data into Spark DataFrames as well as `RDD`s.

In [1]:
# If you want to run from an environment outside of the Docker container you'll need to uncomment 
# and run this.  Otherwise you can skip through.
# ! pip install pyspark --user
# ! pip install seaborn --user
# ! pip install plotly --user
# ! pip install imageio --user
# ! pip install folium --user
# ! pip install heapq

import numpy as np
import pandas as pd

#misc
import gc
import time
import warnings


#viz
import matplotlib.pyplot as plt
import seaborn as sns 
import matplotlib.gridspec as gridspec 
import matplotlib.gridspec as gridspec 

# graph viz
import plotly.offline as pyo
from plotly.graph_objs import *
import plotly.graph_objs as go

#map section
import imageio
import folium
import folium.plugins as plugins
# from mpl_toolkits.basemap import Basemap


#graph section
import networkx as nx
import heapq  # for getting top n number of things from list,dict

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pyspark.sql as sql

import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
# os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/ipython3'

spark = SparkSession.builder.appName('Graphs-HW2').getOrCreate()
spark

### Step 2.2 Download data

The following code retrieves the Yelp dataset in a zipfile and decompresses it.  It will take quite a while - you may want to take a break while it runs.

In [5]:
# Based on https://stackoverflow.com/questions/9419162/download-returned-zip-file-from-url

link:https://drive.google.com/open?id=19C4_AQ8_hUOgxVcQznAYDoW3tiPZlE1f


### Step 2.3 Load Our Graph Datasets.

For this assignment, we’ll be looking at graph data (reviews, reviewers, businesses) downloaded from Yelp.

**A very brief review of graph theory**. Recall that a graph $G$ is composed of a set of vertices $V$ (also called nodes) and edges $E$ (sometimes called links).  Each vertex $v \in V$ has an identity (often represented in the real world as a string or numeric “node ID”).  Each edge $e \in E$ is a tuple $(v_i,...,v_j)$ where $v_i$ represents the source or origin of the edge, and $v_j$ represents the target or destination.  In the simplest case, the edge tuple above is simply the pair $(v_i,v_j)$ but in many cases we may have additional fields such as a label or a distance.  Recall also that graphs may be undirected or directed; in undirected graphs, all edges are symmetric whereas in directed graphs, they are not.  For instance, airline flights are directed, whereas Facebook friend relationships are undirected.

Let’s read our social graph data from Yelp, which forms a directed graph.  Here, the set of nodes is also not specified; the assumption is that the only nodes that matter are linked to other nodes, and thus their IDs will appear in the set of edges.  To load the file `input.txt` into a Spark DataFrame, you can use lines like the following.

```
# Read lines from the text file
input_sdf = spark.read.load('input.txt', format="text")
```

We’ll use the suffix `_sdf` to represent “Spark DataFrame,” much as we used `_df` to denote a Pandas DataFrame in Homework 1.  Load the various files from Yelp.

Your datasets should be named `yelp_business_sdf`, `yelp_business_attributes_sdf`, `yelp_business_horus_sdf`, `yelp_check_in_sdf`, `yelp_reviews_sdf`, and `yelp_users_sdf`.

In [3]:
# TODO: load Yelp datasets

# Worth 1 point per successful load, 5 additional points if valid schema

# YOUR CODE HERE
# raise NotImplementedError()
# Read yelp Business file
yelp_business_sdf=spark.read.format("csv").option("header","true").option("inferSchema","true")\
    .load("file:///home/ashrumochan/yelp_data/yelp_business.csv")
#Read yelp Business hours data from csv file
yelp_business_hours_sdf=spark.read.format("csv").option("header","true").option("inferSchema","true")\
                        .load("file:///home/ashrumochan/yelp_data/yelp_business_hours.csv")
    
yelp_reviews_sdf=spark.read.format("csv").option("header","true").option("inferSchema","true")\
 .load("file:///home/ashrumochan/yelp_data/yelp_review2.csv")

yelp_business_attributes_sdf=spark.read.format("csv").option("header","true").option("inferSchema","true")\
.load("file:///home/ashrumochan/yelp_data/yelp_business_attributes.csv")

yelp_check_in_sdf=spark.read.format("csv").option("header","true").option("inferSchema","true")\
.load("file:///home/ashrumochan/yelp_data/yelp_checkin.csv")

yelp_users_sdf=spark.read.format("csv").option("header","true").option("inferSchema","true")\
    .load("file:///home/ashrumochan/yelp_data/yelp_user.csv")
    
yelp_tip_sdf=spark.read.format("csv").option("header","true").option("inferSchema","true")\
    .load("file:///home/ashrumochan/yelp_data/yelp_tip.csv")

In [4]:
yelp_business_sdf.show(10)

+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+-------------+--------------+-----+------------+-------+--------------------+
|         business_id|                name|neighborhood|             address|          city|state|postal_code|     latitude|     longitude|stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+-------------+--------------+-----+------------+-------+--------------------+
|FYWN1wneV18bWNgQj...|    Dental by Design|        null|4855 E Warner Rd,...|     Ahwatukee|   AZ|      85044|   33.3306902|  -111.9785992|  4.0|          22|      1|Dentists;General ...|
|He-G7vWjzVUysIKrf...| Stephen Szabo Salon|        null|  3101 Washington Rd|      McMurray|   PA|      15317|   40.2916853|   -80.1048999|  3.0|          11|      1|Hair Stylists;Hai...|
|KQPW8lFf1y5BT2Mxi...|Western Motor Veh...|        null|6025

In [None]:
yelp_reviews_sdf.show(10)

In [None]:
yelp_business_hours_sdf.show(10)

In [None]:
yelp_business_attributes_sdf.show(10)

In [None]:
yelp_check_in_sdf.show(10)

In [None]:
yelp_users_sdf.show(10)

In [6]:
yelp_reviews_sdf.dtypes

[('review_id', 'string'),
 ('user_id', 'string'),
 ('business_id', 'string'),
 ('stars', 'string'),
 ('date', 'string'),
 ('text', 'string'),
 ('useful', 'string'),
 ('funny', 'string'),
 ('cool', 'string')]

In [5]:
if yelp_reviews_sdf.dtypes[0][1] != 'string':
    raise ValueError('Unexpected datatype on ' + yelp_reviews.dtypes[0][0])
    


### Step 2.4 Simple Wrangling in Spark DataFrames

Currently, some of the data from the Yelp dataset is a bit ugly.

You should:

* Clean `yelp_business_hours` so `"None"` is replaced by a Spark `null`.
* Clean `yelp_users` so `"None"` is replaced by a Spark `null`.



First, create SQL tables for each of your Spark DataFrames.  Take the same names as you've used previously, except remove the `_sdf` suffix.

In [7]:
# TODO: save tables with names such as yelp_business, yelp_users

# Worth 5 points if done successfully

# YOUR CODE HERE
#raise NotImplementedError()
yelp_business_hours_sdf.registerTempTable("yelp_business_hours")
yelp_business_attributes_sdf.registerTempTable("yelp_business_attributes")
yelp_business_sdf.registerTempTable("yelp_business")
yelp_check_in_sdf.registerTempTable("yelp_check_in")
yelp_reviews_sdf.registerTempTable("yelp_reviews")
yelp_tip_sdf.registerTempTable("yelp_tip")
yelp_users_sdf.registerTempTable("yelp_users_sdf")

In [8]:
if spark.sql('SELECT COUNT(*) AS count FROM yelp_business').take(1)[0]['count'] != 174567:
    raise ValueError("Unexpected count or table not found")
    

Now you'll need to define a **user-defined function** `replace_none_with_null` to apply to your string fields.  It should take a string parameter and compare it with `None` and `Na`.  If there is a match to either, it should return the Python None value (which will become a Spark null), otherwise it shoudl return the value.

In [9]:
# Define your function here.
# Worth 7 points if done successfully

def replace_none_with_null(x):
    if x=="None" or x=="Na":
        return ''
    else:
        return x
def replace_none_with_nulls(x):
    return F.when(F.col(x) != "None", F.col(x)).otherwise("Null")

In [10]:
if replace_none_with_null('None'):
    raise ValueError('Your function does not work')
    
if not replace_none_with_null('x'):
    raise ValueError('Your function does not work')     



# Wrap the Python code in a Spark UDF

# We're providing this since it's basically a template

from pyspark.sql.functions import udf
import pyspark.sql.types

spark.udf.register("replace_none_with_null", replace_none_with_null)

spark_replace_none_with_null = udf(replace_none_with_null, pyspark.sql.types.StringType())

Now use `replace_with_none` in SQL, or `spark_replace_none_with_null` if you prefer Pandas-style Spark statements, to clean the data as described above.

In [11]:
# TODO: Clean yelp_business_hours_sdf

# Worth 5 points for each of the two tables specified in 2.4 instructions, if done successfully

# YOUR CODE HERE
spark_replace_none_with_null = F.udf(replace_none_with_null, sql.types.StringType())
yelp_business_hours_sdf1 = yelp_business_hours_sdf.rdd.map(lambda x: replace_none_with_null(x))
yelp_business_hours_sdf=yelp_business_hours_sdf1.toDF()
columns = yelp_business_hours_sdf.columns
for col1 in columns:
    yelp_business_hours_sdf = yelp_business_hours_sdf.withColumn(col1, replace_none_with_nulls(col1))
    
yelp_business_hours_sdf.registerTempTable("yelp_business_hours")

In [12]:
yelp_business_hours_sdf.show()
    
    

+--------------------+----------+----------+----------+----------+----------+----------+---------+
|         business_id|    monday|   tuesday| wednesday|  thursday|    friday|  saturday|   sunday|
+--------------------+----------+----------+----------+----------+----------+----------+---------+
|FYWN1wneV18bWNgQj...| 7:30-17:0| 7:30-17:0| 7:30-17:0| 7:30-17:0| 7:30-17:0|      Null|     Null|
|He-G7vWjzVUysIKrf...|  9:0-20:0|  9:0-20:0|  9:0-20:0|  9:0-20:0|  9:0-16:0|  8:0-16:0|     Null|
|KQPW8lFf1y5BT2Mxi...|      Null|      Null|      Null|      Null|      Null|      Null|     Null|
|8DShNS-LuFqpEWIp0...| 10:0-21:0| 10:0-21:0| 10:0-21:0| 10:0-21:0| 10:0-21:0| 10:0-21:0|11:0-19:0|
|PfOCPjBrlQAnz__NX...|  11:0-1:0|  11:0-1:0|  11:0-1:0|  11:0-1:0|  11:0-1:0|  11:0-2:0| 11:0-0:0|
|o9eMRCWt5PkpLDE0g...|  18:0-0:0|  18:0-0:0|  18:0-0:0|  18:0-0:0|  18:0-0:0|  18:0-0:0|     Null|
|kCoE3jvEtg6UVz5SO...|  8:0-17:0|  8:0-17:0|  8:0-17:0|  8:0-17:0|  8:0-17:0|      Null|     Null|
|OD2hnuuTJ

In [20]:
if spark.sql('select count(*) as count from yelp_business_hours where wednesday=\'None\'').take(1)[0]['count'] > 0:
    raise ValueError('Did not successfully clean business hours')
    

In [22]:
# TODO: Clean yelp_users, which has schema
# |user_id|   name|review_count|yelping_since|             friends|useful|funny|cool|fans|elite|average_stars|compliment_hot
# |compliment_more|compliment_profile|compliment_cute|compliment_list|compliment_note|compliment_plain|compliment_cool
# |compliment_funny|compliment_writer|compliment_photos|
# YOUR CODE HERE
#raise NotImplementedError()
columns2=yelp_users_sdf.columns
columns2
for col2 in columns2:
    yelp_users_sdf = yelp_users_sdf.withColumn(col2, replace_none_with_nulls(col2))
    
yelp_users_sdf.registerTempTable("yelp_users")

In [23]:
if spark.sql('select count(*) as count from yelp_users where elite=\'None\'').take(1)[0]['count'] > 0:
    raise ValueError('Did not successfully clean users')
    

### Step 2.5 Simple Analytics on the Data

In this section, we shall be executing Spark operations on the data given. Beyond simply executing the queries, you may try using `.explain()` method to see more about the query execution. Also, please read the data description prior to attempting the following questions to understand the data.

#### 2.5.1 Businesses with the best average review

Compute, stored in `best_average_sdf`, the list of names of businesses based on their average review score (review stars), in decreasing order, sorted lexicographically (in increasing order) by name if they have the same score.  Output the number of reviews also.  Call the columns `name`, `avg_rating`, and `count`.

In [24]:
yelp_reviews_sdf.columns, yelp_business_sdf

(['review_id',
  'user_id',
  'business_id',
  'stars',
  'date',
  'text',
  'useful',
  'funny',
  'cool'],
 DataFrame[business_id: string, name: string, neighborhood: string, address: string, city: string, state: string, postal_code: string, latitude: double, longitude: double, stars: double, review_count: int, is_open: int, categories: string])

In [25]:
# TODO: Businesses with best average review

# Worth 5 points if done successfully

# YOUR CODE HERE

best_average_sdf=spark.sql("select u.name, round(avg(r.stars)) as avg_Rating ,count(review_id) as count \
                                from yelp_reviews r join yelp_business u\
                           on r.business_id=u.business_id\
                           group by u.name order by avg_Rating desc")
# sqlContext.sql("""SELECT DISTINCT customer.name AS name FROM purchase JOIN book ON purchase.isbn = book.isbn \
# JOIN customer ON customer.cid = purchase.cid WHERE customer.name != \
# 'Harry Smith' AND purchase.isbn IN (SELECT purchase.\
# isbn FROM customer JOIN purchase ON customer.cid = purchase.cid WHERE customer.name = 'Harry Smith')""")

In [26]:
row = best_average_sdf.take(10)

In [27]:
avg_review=spark.sql("select round(avg(stars))  avg_review from yelp_reviews")


avg_list=[(i.avg_review) for i in avg_review.select('avg_review').collect()]

#### 2.5.2 Users who are more negative than average

Find the users whose average review is below the *average of the per-user* average reviews.  Think about how to factor that into steps!

* Compute the (floating-point) variable `overall_avg` as the average of the users' average reviews. (You might compute this in a Spark DataFrame first).
* Then output `negative_users_sdf` as the users whose average rating is below that.  This Spark dataframe should have `name` and `avg_rating` and should be sorted first (from lowest to highest) by average rating, then lexicographically (in ascending order) by name.  You should drop cases where the name is null.


In [28]:
# TODO: compute overall_avg as a VALUE, and negative_users_sdf as a Spark dataframe
# Worth 5 points each

# YOUR CODE HERE
#raise NotImplementedError(),
avg_review=spark.sql("select round(avg(stars))  avg_review from yelp_reviews")
avg_list=[(i.avg_review) for i in avg_review.select('avg_review').collect()]
overall_avg=" ".join(str(x) for x in avg_list)
a=44
negative_users_sdf=spark.sql("select u.name, r.stars as below_avg_rating  from yelp_users u join yelp_reviews r \
                              on u.user_id=r.user_id where r.stars < {}  \
                              ORDER BY r.stars asc ".format(overall_avg) ).dropna()


In [None]:
if not overall_avg:
    raise ValueError('Forgot to compute the overall average')
    

In [29]:
negative_users_sdf.show(10)


+-----------+----------------+
|       name|below_avg_rating|
+-----------+----------------+
|      Jason|               1|
|      Luann|               1|
|Christopher|               1|
|       Russ|               1|
|   Brittany|               1|
|       Zach|               1|
|         ab|               1|
|      Scott|               1|
|         ab|               1|
|     Ashley|               1|
+-----------+----------------+
only showing top 10 rows



#### 2.5.4 Cities by number of businesses

Find the top 10 cities by number of (Yelp-listed) businesses.

This time, use the `take()` function to create a *list* of the top 10 cities (as Rows).  Call this list `top10_cities` and make sure it includes city `name` and `num_restaurants`.

In [30]:
# TODO: cities by number of businesses
# Worth 5 points

# YOUR CODE HERE
yelp_business_sdf.columns

#raise NotImplementedError()

top10_cities= spark.sql("select u.city as City, count(*) as num_restaurants from yelp_business u\
                            GROUP BY city").take(10)


In [31]:
if len(top10_cities) != 10:
    raise ValueError('Not top10!')


# Step 3. Computing Simple Graph Centrality

The study of networks has proposed a wide variety of measures for measuring importance of nodes.  A popular metric that is easy to compute is the degree centrality.  The degree centrality of a node is simply the number of connections to the node.  In a directed graph such as ours, you will want to compute both the indegree centrality (number of nodes with edges coming to this node) and outdegree centrality (number of nodes with edges coming from this node).

## 3.1 Generate user-business graph

For this step, we want to construct a *directed* graph with edges from users to business for every interaction (review, in our case). To do this, we will use the `yelp_reviews_sdf` dataframe and extract the `user_id` as `from_node` and `business_id` as `to_node` into a different dataframe called `review_graph_sdf`.  Finally, include the `stars` field but call it `score`.  Also make sure it is available as a table in SQL called `review_graph`.

Some of the values may be null; remove these for `user_id` or `business_id`.

In [32]:
# Create review graph SDF from yelp_reviews
# Worth 5 points

# YOUR CODE HERE
#raise NotImplementedError()
review_graph_sdf=spark.sql("select user_id as from_node,business_id as to_node,\
                            stars as score from yelp_reviews")
review_graph_sdf=review_graph_sdf.dropna(subset=['from_node','to_node'])
review_graph_sdf.registerTempTable("review_graph")


In [33]:
# Display the count of number of edges
review_graph_sdf.count()

### BEGIN HIDDEN TEST
if review_graph_sdf.count() != 5273700:
    raise ValueError('Unexpected graph size')
### END HIDDEN TEST

## 3.2 Businesses with highest indegree

Find, in decreasing order, the businesses with the most (highest number of) reviews, using either `review_graph_sdf` (or its SQL version) or `yelp_reviews` as well as `yelp_business`.  The dataframe should have the fields `name`,`count`, and `rating` (average `score`).  Assign these to a new dataframe `most_reviewed_sdf`.

In [34]:
# TODO: most_reviewed_sdf
# Worth 5 points

# YOUR CODE HERE
#raise NotImplementedError()

most_reviewed_sdf=spark.sql("select u.name as Name, count(*) as count,\
                                round(avg(r.stars)) as Rating from yelp_business u join yelp_reviews r \
                                on u.business_id=r.business_id GROUP by Name\
                                ORDER BY count desc")

In [35]:
most_reviewed_sdf.show(10)

+--------------------+-----+------+
|                Name|count|Rating|
+--------------------+-----+------+
|           Starbucks|19518|   3.0|
|  Hash House A Go Go| 9840|   4.0|
|          McDonald's| 9415|   2.0|
|Chipotle Mexican ...| 7980|   3.0|
|        Mon Ami Gabi| 7362|   4.0|
|    Bacchanal Buffet| 7006|   4.0|
|  Buffalo Wild Wings| 5998|   3.0|
|        Wicked Spoon| 5951|   4.0|
|     In-N-Out Burger| 5598|   4.0|
| Gordon Ramsay BurGR| 5448|   4.0|
+--------------------+-----+------+
only showing top 10 rows



## 3.2 Outdegree

Next get a list of users whose vertices have the highest outdegree, i.e., they write the most reviews.  Return this in a dataframe `prolific_reviewers_sdf` with fields `name` and `num_reviews`.

In [36]:
# TODO: reviews who created most reviews (beware duplicate names)
# Worth 5 points

# YOUR CODE HERE

#raise NotImplementedError()
prolific_reviewers_sdf=spark.sql("Select u.name AS name, count(r.stars) as num_reviews from yelp_users u \
                                join yelp_reviews r on u.user_id=r.user_id\
                                 GROUP BY name order by num_reviews desc ")

In [38]:
top10_reviewers = prolific_reviewers_sdf.take(10)

[Row(name='Michael', num_reviews=45190),
 Row(name='David', num_reviews=44056),
 Row(name='John', num_reviews=43566),
 Row(name='Jennifer', num_reviews=42405),
 Row(name='Chris', num_reviews=41125),
 Row(name='Mike', num_reviews=38512),
 Row(name='Jessica', num_reviews=31774),
 Row(name='Sarah', num_reviews=31733),
 Row(name='Michelle', num_reviews=29978),
 Row(name='Lisa', num_reviews=27100)]

Note that the indegree also gives you the most reviewed restaurants and the outdegree gives you the information about the users who write the most reviews.

For the advanced part of this Homework, we'll consider more complex measures than indegree / outdegree.  For now let's move on to the more general problem of graph traversal.

# Step 4. “Traversing” a Graph

For our next tasks, we will be “walking” the graph and making connections.


## 4.1 Distributed Breadth-First Search
A search algorithm typically starts at a node or set of nodes, and “explores” or “walks” for some number of steps to find a match or a set of matches.

Let’s implement a distributed version of a popular algorithm, breadth-first-search (BFS).  This algorithm is given a graph `G`, a set of origin nodes `N`, and a depth `d`.  In each iteration or round up to depth `d`, it explores the set of all new nodes directly connected to the nodes it already has seen, before going on to the nodes another “hop” away.  If we do this correctly, we will explore the graph in a way that (1) avoids getting caught in cycles or loops, and (2) visits each node in the fewest number of “hops” from the origin.  BFS is commonly used in tasks such as friend recommendation in social networks.

**How does distributed BFS in Spark work**?  Let’s start with a brief sketch of standard BFS.  During exploration “rounds”, we can divide the graph into three categories:

1. *unexplored nodes*.  These are nodes we have not yet visited.  You don’t necessarily need to track these separately from the graph.
2. *visited nodes*.  We have already reached these nodes in a previous “round”.
3. *frontier nodes*.  These are nodes we have visited in this round.  We have not yet checked whether they have out-edges connecting to unexplored nodes.

We can illustrate these with a figure and an example.

![Graph traversal](https://drive.google.com/uc?export=view&id=1I2Kc3uQcDlp7RsDqRQAfQAvS3F_VcJpA)

Let’s look at the figure, which shows a digraph.  The green node A represents the origin.

* In the first round, the origin A is the sole frontier node.  We find all nodes reachable directly from A, namely B-F; then we remove all nodes we have already visited (there are none) or that are in the frontier (the node A itself).  This leaves the blue nodes B-F, which are all reachable in (at most) 1 hop from A.
* In the second round, we move A to the visited set and B-F to the frontier.  Now we explore all nodes connected directly to frontier nodes, namely A (from B), F (from E), and the red nodes G-L.  We eliminate the nodes already contained in the frontier and visited sets from the next round’s frontier set, leaving the red nodes only.
* In the third round, we will move B-F to the visited set, G-L to the frontier set, and explore the next round of neighbors N-V.  This process continues up to some maximum depth (or until there are no more unexplored nodes).

Assume we create data structures (we can make them DataFrames) for the visited and frontier nodes.  Consider (1) how to initialize the different sets at the start of computation (note: unexplored nodes are already in the graph), and (2) how to use the graph edges and the existing data structures to update state for the next iteration “round”.

You might possibly have seen how to create a breadth-first-search algorithm in a single-CPU programming language, using a queue to capture the frontier nodes. With Spark we don’t need a queue -- we just need the three sets above.

### 4.1.1 Breadth-First Search Algorithm

Create a function `spark_bfs(G, origins, max_depth)` that takes a Spark DataFrame with a graph G (following the schema for `review_graph_sdf` described above, but to be treated as an **undirected graph**), a Python list-of-dictionaries `origins` of the form 

```
[{‘node’: nid1}, 
 {‘node’: nid2}, 
 …]
```

and a nonnegative integer “exploration depth” `max_depth` (to only run BFS on a tractable portion of the graph).  The `max_depth` will be the maximum number of edge traversals (e.g., the origin is at `max_depth=0`, one hop from the origin is `max_depth=1`, etc.  The function should return a DataFrame containing pairs of the form (node, distance), where the distance is depth at which $n$ was *first* encountered (i.e., the shortest-path distance from the origin nodes).  Note that the origin nodes should also be returned in this Spark DataFrame (with depth 0)!  

You can create a new Spark DataFrame with an integer `node` column from the above list of maps `origins`, as follows. This will give you a DataFrame of the nodes to start the BFS at

```
schema = StructType([
            StructField("node", StringType(), True)
        ])

    my_sdf = spark.createDataFrame(my_list_of_maps, schema)
```

In this algorithm, be careful in each iteration to keep only the nodes with their shortest distances (you may need to do aggregation or item removal).  You should accumulate all nodes at distances 0, 1, ..., `max_depth`.

In [39]:
# TODO: iterative search over undirected graph
# Worth 5 points directly, but will be needed later

def spark_bfs(G, origins, max_depth):
    schema = StructType([
                StructField("node", StringType(), True),
                StructField("distance", IntegerType(), False)
            ])
    
    return G

# YOUR CODE HERE
#raise NotImplementedError()


In [40]:
spark_bfs(review_graph_sdf,1,2)

In [None]:
raise NotImplementedError()

In [41]:
orig = [{'node': 'bv2nCi5Qv5vroFiqKGopiw'}]

count= spark_bfs(review_graph_sdf, orig, 3).count()


AttributeError: 'NoneType' object has no attribute 'count'

## Step 4.2. Restaurant Recommendation

Now create a function `friend_rec` that takes in two arguments: the graph_sdf and the ID of a user, `me`.  Using `spark_bfs` it should recommend restaurants that are highly popular among the reviewers who reviewed the same restaurants as `me`.

Then, take the review_graph, filter it to only consider 3-star reviews or above, and run `friend_rec` over the results and the user with ID `bv2nCi5Qv5vroFiqKGopiw`.  In the Spark dataframe `rec_rest_sdf`, give us the `name`s of the most-highly recommended restaurants, sorted primarily in descending order of count, and then secondarily by lexicographic order of name.

In [None]:
# TODO: restaurant recommendation using spark_bfs
# Worth 5 points

# YOUR CODE HERE
raise NotImplementedError()



In [None]:
rec_rest = rec_rest_sdf.take(10)

## Step 4.3. Friend Visualization

#### 4.3.1: Loading data subsets
A closer look at the `yelp_user` dataframe tells us that there is an attribute called `friends` that we can use in order to construct an undirected friend graph.  For this part of the assignment we'll go back to Pandas -- not Spark -- DataFrames.

We will work with the first 200 entries from the `yelp_user` data file and visualize these users' friends.

Read the first 200 entries of the `yelp_user.csv` file into a pandas dataframe called `user_200` (Remember: You can pass `nrows` as an option to the `pd.read_csv()`)

We’ll subsequently make use of the `networkx` graph visualization tool, which lets us see what the graph actually looks like.


In [None]:
# TODO: read 200 entries
# Worth 5 points

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
user_200


#### Step 4.3.2: Select users with at least one friend

In this part select the friends from `user_200` where the value is not `None`. The `friends` column is a string with comma-separated `user_id`s as values. We will make use of `lambda` functions in order to extract the different `user_id` from this comma separated string.

Select **only the users who have at least one friend**. That is, the `friends` column does not have the value "None".

In [None]:
# TODO: find users with friends
# Worth 2 points

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
user_200


#### Step 4.3.3: Extracting friend as list

Pandas dataframe supports the use of `df.apply()` which can take a function as a parameter.

Example use of `lambda` function with `.apply()`

`df['col_2'] = df['col_1'].apply(lambda x: x+10)`

The above statement will create a column 'col_2' in df with values of df['col_1'] + 10

For the next step, make use of `lambda` functions to `split` the value of the `friends` column by `,` and apply this to create a new column called `list_friends`.

In [None]:
# TODO: friend lists
# Worth 5 points

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
user_200[['name','list_friends']]


#### Step 4.3.4: Obtaining Friend lists

We are only interest in the `user_id` and `list_friends` columns.  Select these into a dataframe called `subset_users`.

The `.stack()` option allows you to "unnest" the items in the list, associating them with the corresponding value of the index.  

In our instance, we would like to `set_index()` on `user_id` so the user ID is the index.  Then if we `.apply()` the `stack()` operation on the `list_friends` column:

```
result_df = df.set_index(['col_1'])['col_2'].apply(pd.Series).stack()
```

We should get each element in the list associated with the appropriate `user_id`.  Performing `df.reset_index()` on `result_df` will give us `friend_data` in an edge table.  Rename the columns of `friend_data` to 'source', 'level_1', and 'target'.

In [None]:
# TODO: projection to users and lists of friends, stacked
# Worth 3 points
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
friend_data.head()


#### Step 4.3.5: Visualization using Networkx

In this step we will use the `networkx` library to visualize.


You can create the graph from the networkx function `from_pandas_edgelist` and the `friend_data`. 


In [None]:
# TODO: networkx graph ready to draw
# Worth 3 points

import networkx as nx
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
%matplotlib inline
nx.draw(graph)