## Final Project - Yelp Data Analysis
### Big Data Machine Learning - PySpark

Members:

**1. Rolamjaya Hotmartua**

**2. YiChin Tzou**

**3. Zoey Chen**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.context import SparkContext
from graphframes import *

In [3]:
#create Spark session
spark = SparkSession.builder.appName('YelpBigDataML').config("spark.jars.packages", "graphframes:graphframes:0.8.1-spark2.4-s_2.11").getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.submit.pyFiles',
  '/home/jovyan/.ivy2/jars/graphframes_graphframes-0.8.1-spark2.4-s_2.11.jar,/home/jovyan/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar'),
 ('spark.executor.id', 'driver'),
 ('spark.executor.memory', '5g'),
 ('s

### 1) Data Exploration

#### a. Read the TOP 2000 users json files

In [43]:
df_user_2000 = spark.read\
    .json("df_users_2000-part-00000-26a96dae-e5e4-44b5-a005-1aa3bae7d700-c000.json" )

In [44]:
df_user_2000_1 = spark.read\
    .json("df_users_2000-part-00001-26a96dae-e5e4-44b5-a005-1aa3bae7d700-c000.json" )

In [45]:
df_user_2000_2 = spark.read\
    .json("df_users_2000-part-00002-26a96dae-e5e4-44b5-a005-1aa3bae7d700-c000.json" )

In [46]:
df_user_2000_3 = spark.read\
    .json("df_users_2000-part-00003-26a96dae-e5e4-44b5-a005-1aa3bae7d700-c000.json" )

In [47]:
df_user_2000_4 = spark.read\
    .json("df_users_2000-part-00004-26a96dae-e5e4-44b5-a005-1aa3bae7d700-c000.json" )

In [48]:
df_user_2000_5 = spark.read\
    .json("df_users_2000-part-00005-26a96dae-e5e4-44b5-a005-1aa3bae7d700-c000.json" )

In [49]:
df_user_2000 = df_user_2000.union(df_user_2000_1).union(df_user_2000_2).union(df_user_2000_3).union(df_user_2000_4).union(df_user_2000_5)

In [50]:
df_user_2000.count()

2000

In [51]:
df_user_2000.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- count: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [52]:
df_user_2000.show(10)

+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+-----+--------------------+----+--------------------+-----+-------+------------+------+--------------------+-------------------+
|average_stars|compliment_cool|compliment_cute|compliment_funny|compliment_hot|compliment_list|compliment_more|compliment_note|compliment_photos|compliment_plain|compliment_profile|compliment_writer| cool|count|               elite|fans|             friends|funny|   name|review_count|useful|             user_id|      yelping_since|
+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+-----+--------------------+----+--------------------+-----+-------+------------+------+--------------------+-------------------

### Create Graph

In [53]:
from pyspark.sql.functions import split, explode, col,trim,lit,size
#Create dataframes for vertices and edges 
cols_to_drop = ["friends"]
df_user_2000 = df_user_2000.withColumn("profile_response", col("compliment_cool")+col("compliment_cute")+col("compliment_funny")+\
                                                    col("compliment_hot")+col("compliment_list")+col("compliment_more")+\
                                                    col("compliment_note")+col("compliment_photos")+col("compliment_plain")+\
                                                    col("compliment_profile")+col("compliment_writer"))\
                            .drop("compliment_cool","compliment_cute","compliment_funny",\
                                    "compliment_hot","compliment_list","compliment_more",\
                                    "compliment_note","compliment_photos","compliment_plain",\
                                    "compliment_profile","compliment_writer")
df_user_2000 = df_user_2000.withColumn("response", col("cool") + col("funny") + col("useful")).drop("cool","funny","useful")
df_user_2000 = df_user_2000.withColumn("friends_array", split(col("friends"), ", ")).withColumn("count_friends", size(col("friends_array")))
vertices = df_user_2000.drop(*cols_to_drop)
vertices = vertices.withColumnRenamed("count", "profile_count")

edges_friends = df_user_2000.select(["user_id","friends"]) 

df_expand = edges_friends.withColumn("friends", split(col("friends"), ","))  # split the friends column by comma
df_expand = df_expand.select(col("*"), explode(col("friends")).alias("friend"))  # explode the friend column

# remove leading and trailing whitespace from "my_column"
df_expand = df_expand.withColumn("friend", trim(df_expand["friend"]))

df_expand = df_expand.withColumn("relationship", lit("friends"))

cols_to_drop = ["friends"]
df_expand = df_expand.drop(*cols_to_drop)

edges = df_expand

In [55]:
vertices = vertices.drop("friends_array")

In [57]:
vertices = vertices.drop("elite")

In [60]:
from pyspark.sql.functions import year
vertices = vertices.withColumn("since_rev", col("yelping_since").cast("date")).drop("yelping_since")\
                    .withColumn("since_year", year(col("since_rev"))).drop("since_rev")

In [61]:
vertices.show(5)

+-------------+-------------+----+-------+------------+--------------------+----------------+--------+-------------+----------+
|average_stars|profile_count|fans|   name|review_count|             user_id|profile_response|response|count_friends|since_year|
+-------------+-------------+----+-------+------------+--------------------+----------------+--------+-------------+----------+
|         2.95|          300|  35|  Robyn|         518|IpLRJY4CP3fXtlEd8...|             164|    2123|          605|      2009|
|         4.17|          203| 804| Monica|        1282|RgDVC3ZUBqpEe6Y1k...|            9347|   33794|         4689|      2009|
|          4.1|          353| 944|    Joi|        2009|0G-QF457q_0Z_jKqh...|           17136|   61571|         6234|      2008|
|         3.67|          117|  11|Melissa|         159|8zkkuk8KOoNojZhTx...|              57|     384|          226|      2010|
|         3.79|          197|  36|Matthew|         593|Q5jOFJYhIsN8ouJ1r...|             400|    2025|  

In [62]:
edges.count()

711209

In [63]:
edges.show(5)

+--------------------+--------------------+------------+
|             user_id|              friend|relationship|
+--------------------+--------------------+------------+
|IpLRJY4CP3fXtlEd8...|hdwDo7CLh9aN_9Pck...|     friends|
|IpLRJY4CP3fXtlEd8...|ci-mepWGgsgGT3sFI...|     friends|
|IpLRJY4CP3fXtlEd8...|aQcdY0ILoJ8K7_RqM...|     friends|
|IpLRJY4CP3fXtlEd8...|s3AFxEpI_VV8-CUZf...|     friends|
|IpLRJY4CP3fXtlEd8...|jUoT_ixOdsLUzaUYM...|     friends|
+--------------------+--------------------+------------+
only showing top 5 rows



In [64]:
distinct_names = edges.select("friend").distinct()

In [65]:
distinct_names.count()

341621

In [66]:
vertices_new = distinct_names.join(vertices, vertices.user_id == distinct_names.friend, "left")

In [73]:
vertices_new = vertices_new.withColumnRenamed("user_id", "id")
edges = edges.withColumnRenamed("user_id", "src")
edges = edges.withColumnRenamed("friend", "dst")
edges = edges.withColumnRenamed("relationship", "type")
g = GraphFrame(vertices_new, edges)

### Explore Graph

In [75]:
## Check the number of edges
g.edges.show(5)

+--------------------+--------------------+-------+
|                 src|                 dst|   type|
+--------------------+--------------------+-------+
|IpLRJY4CP3fXtlEd8...|hdwDo7CLh9aN_9Pck...|friends|
|IpLRJY4CP3fXtlEd8...|ci-mepWGgsgGT3sFI...|friends|
|IpLRJY4CP3fXtlEd8...|aQcdY0ILoJ8K7_RqM...|friends|
|IpLRJY4CP3fXtlEd8...|s3AFxEpI_VV8-CUZf...|friends|
|IpLRJY4CP3fXtlEd8...|jUoT_ixOdsLUzaUYM...|friends|
+--------------------+--------------------+-------+
only showing top 5 rows



In [76]:
## Check the number of edges of each vertex
g.degrees.show(5)

+--------------------+------+
|                  id|degree|
+--------------------+------+
|ZFxU8-XX9R2Oe1Ugt...|    15|
|PSIVYM2UTI-cB6BgK...|     1|
|YVkppuzTLw3zvVC_v...|    11|
|O43tL4--1RlSe-0TD...|     2|
|sZdy2AQD0gn7RMAJc...|     2|
+--------------------+------+
only showing top 5 rows



In [69]:
#The incoming degree of the vertices
g.inDegrees.show(5)

+--------------------+--------+
|                  id|inDegree|
+--------------------+--------+
|ZFxU8-XX9R2Oe1Ugt...|      15|
|PSIVYM2UTI-cB6BgK...|       1|
|YVkppuzTLw3zvVC_v...|      11|
|O43tL4--1RlSe-0TD...|       2|
|sZdy2AQD0gn7RMAJc...|       2|
+--------------------+--------+
only showing top 5 rows



In [70]:
#The outgoing degree of the vertices
g.outDegrees.show(5)

+--------------------+---------+
|                  id|outDegree|
+--------------------+---------+
|FlXBpK_YZxLo27jcM...|      604|
|dioXbYFdMCyE7zCK9...|      116|
|ET3TgSSQ3shm2mj5v...|      198|
|s7cUp9ma9h9FYN-fa...|      284|
|-3s52C4zL_DHRK0UL...|       18|
+--------------------+---------+
only showing top 5 rows



### Connected Components

A connected component of a graph is a subgraph in which any two vertices are connected to each other by one or more edges, and which is connected to no additional vertices in the supergraph. Connected components detection can be interesting for clustering, but also to make your computations more efficient. 

Practically, GraphFrames requires you to set a directory where it can save checkpoints. Create such a folder in your working directory and drop the following line (where graphframes_cps is your new folder) in Jupyter to set the checkpoint directory.

The number of connected components for directed and undirected graphs are different.

Our mini-graph has two connected components, which are described for each vertex in the component column.

In [77]:
## Display all connected components
sc = spark.sparkContext
sc.setCheckpointDir('graphframes_cps')
g.connectedComponents().show(5)

+--------------------+-------------+-------------+----+--------+------------+--------------------+----------------+--------+-------------+----------+---------+
|              friend|average_stars|profile_count|fans|    name|review_count|                  id|profile_response|response|count_friends|since_year|component|
+--------------------+-------------+-------------+----+--------+------------+--------------------+----------------+--------+-------------+----------+---------+
|dioXbYFdMCyE7zCK9...|         3.71|          133|  23| Michael|         242|dioXbYFdMCyE7zCK9...|             189|     820|          116|      2008|        0|
|FlXBpK_YZxLo27jcM...|         4.02|          489| 102| Mallory|        1158|FlXBpK_YZxLo27jcM...|             731|    7274|          604|      2011|        0|
|jxac2pQOPQvYY32Zk...|         3.92|          177|  10|Clarissa|         362|jxac2pQOPQvYY32Zk...|              42|     687|           53|      2008|        0|
|CDxlgCoUoNRV3VMkY...|         3.64|    

In [78]:
result = g.stronglyConnectedComponents(maxIter=10)

In [81]:
result.select("id", "component").filter(col("component") != 0).show()

+---+---------+
| id|component|
+---+---------+
+---+---------+



### Filter Graph

A GraphFrame itself can’t be filtered, but DataFrames deducted from a Graph can. Consequently, the filter-function (or any other function) can be used just as you would use it with DataFrames. Pay attention to the use of quotation marks in filter criteria.

In [80]:
## Filter vertices to show people older than 30 yrs
g.vertices.filter("count_friends > 1000").show(5)

+--------------------+-------------+-------------+----+-------+------------+--------------------+----------------+--------+-------------+----------+
|              friend|average_stars|profile_count|fans|   name|review_count|                  id|profile_response|response|count_friends|since_year|
+--------------------+-------------+-------------+----+-------+------------+--------------------+----------------+--------+-------------+----------+
|417svAEVHreK6c3SK...|         4.21|          125| 103|   Bill|         620|417svAEVHreK6c3SK...|             976|   13496|         1643|      2015|
|pou3BbKsIozfH50rx...|         4.28|          898| 914|  Brett|        2812|pou3BbKsIozfH50rx...|            8839|   46463|         4024|      2012|
|cCeCQxErE9ToaQQBE...|         3.97|          212| 181|Abigail|         595|cCeCQxErE9ToaQQBE...|             777|    5222|         1065|      2009|
|BmVwbsL8l0imz4slo...|         4.39|          187| 713| Morris|         261|BmVwbsL8l0imz4slo...|         

In [82]:
## Filter and sort vertices with degree >=2
g.inDegrees.filter("inDegree >= 2").sort("inDegree", ascending=False).show(5)

+--------------------+--------+
|                  id|inDegree|
+--------------------+--------+
|ET8n-r7glWYqZhuR6...|     669|
|Oi1qbcz2m2SnwUezt...|     650|
|BmVwbsL8l0imz4slo...|     351|
|eJnwyot0rLyWN84Dw...|     349|
|dqucQwA-CHWhUczwq...|     346|
+--------------------+--------+
only showing top 5 rows



### Motif Finding

Finding motifs helps to execute queries to discover structural patterns in graphs. 

The following example will search for pairs of vertices a,b connected by edge e and pairs of vertices b,c connected by edge e2. It will return a DataFrame of all such structures in the graph, with columns for each of the named elements (vertices or edges) in the motif.

In [83]:
g.find("(a)-[e]->(b); (b)-[e2]->(a)").show(5,truncate = False)

+-----------------------------------------------------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------------------------------------------------+---------------------------------------------------------+
|a                                                                                                    |e                                                        |b                                                                                                    |e2                                                       |
+-----------------------------------------------------------------------------------------------------+---------------------------------------------------------+-----------------------------------------------------------------------------------------------------+---------------------------------------------------------+
|{FlXBpK_YZxLo27jcMdII1w, 4.02, 48

### TriangleCount 

TriangleCount counts the number of triangles passing through each vertex in this graph. A triangle can be defined as a group of three vertices that is interrelated, i.e. a has an edge to b, b has an edge to c, and c has an edge to a. 

Triangles are used for various tasks for real‐life networks, including community discovery, link prediction, and spam filtering.

<img width="30%" src="https://miro.medium.com/max/791/0*MGZV-XXQl9YN_fwD.png"/>

In [86]:
g.triangleCount().filter(col("count")!= 0).show(5)

+-----+--------------------+-------------+-------------+----+--------+------------+--------------------+----------------+--------+-------------+----------+
|count|              friend|average_stars|profile_count|fans|    name|review_count|                  id|profile_response|response|count_friends|since_year|
+-----+--------------------+-------------+-------------+----+--------+------------+--------------------+----------------+--------+-------------+----------+
|  582|dioXbYFdMCyE7zCK9...|         3.71|          133|  23| Michael|         242|dioXbYFdMCyE7zCK9...|             189|     820|          116|      2008|
| 1562|FlXBpK_YZxLo27jcM...|         4.02|          489| 102| Mallory|        1158|FlXBpK_YZxLo27jcM...|             731|    7274|          604|      2011|
|   16|jxac2pQOPQvYY32Zk...|         3.92|          177|  10|Clarissa|         362|jxac2pQOPQvYY32Zk...|              42|     687|           53|      2008|
|  270|CDxlgCoUoNRV3VMkY...|         3.64|          185|  33| Su

### PageRank

Identify important vertices in a graph based on connections.

In [87]:
pr = g.pageRank(resetProbability=0.15, tol=0.01)## look at the pagerank score for every vertex

S = pr.vertices.orderBy('pagerank',ascending=False)

In [131]:
A = S.select(["name","since_year","average_stars","response","profile_count","count_friends", "pagerank"])
A.show(10)

+---------+----------+-------------+--------+-------------+-------------+------------------+
|     name|since_year|average_stars|response|profile_count|count_friends|          pagerank|
+---------+----------+-------------+--------+-------------+-------------+------------------+
| Michelle|      2008|         4.05|   55120|         1144|         5958|19.833261744321955|
|   Steven|      2010|         3.62|   66519|          477|        10072|17.922276178908504|
|   Morris|      2012|         4.39|   15651|          187|         3572|10.267693130330548|
|      Dee|      2010|         4.13|    9444|          361|         1448| 9.381879740046559|
|    Bruce|      2008|         3.77|    6307|          297|         1546| 9.227369028077154|
| Kimberly|      2013|         3.52|    1385|          183|         1063| 8.748330850125807|
|     Chad|      2015|          3.7|   34449|          137|         4732| 8.302947040298289|
|Gabriella|      2016|         4.33|    3895|          119|         20

In [93]:
## look at the weight of every edge
pr.edges.show(10)

+--------------------+--------------------+-------+--------------------+
|                 src|                 dst|   type|              weight|
+--------------------+--------------------+-------+--------------------+
|UlWDGR0QrBbdFsVY6...|-3s52C4zL_DHRK0UL...|friends|  0.0053475935828877|
|djxnI8Ux8ZYQJhiOQ...|-3s52C4zL_DHRK0UL...|friends|0.009345794392523364|
|h1KzT38UnfzR7maSi...|-3s52C4zL_DHRK0UL...|friends|  0.3333333333333333|
|BmVwbsL8l0imz4slo...|-7qUbOVWJp2NT3f_T...|friends|0.002849002849002849|
|2HA_Ub0s2qACtlOkr...|-7qUbOVWJp2NT3f_T...|friends|               0.125|
|6ObFF8-uKnOAlXuSH...|-7qUbOVWJp2NT3f_T...|friends|0.007874015748031496|
|_6j6uCisr8DNewdBl...|-7qUbOVWJp2NT3f_T...|friends| 0.02127659574468085|
|Js1FsZ6oP_9tO5tbJ...|-7qUbOVWJp2NT3f_T...|friends|0.043478260869565216|
|d6zIVWiJyPB6PZuAx...|-7qUbOVWJp2NT3f_T...|friends|0.034482758620689655|
|D2IUOetOVfjAkmohD...|-7qUbOVWJp2NT3f_T...|friends| 0.09090909090909091|
+--------------------+--------------------+-------+

### Degree Centrality

Identify important vertices in a graph based on connections.

In [116]:
from pyspark.sql.functions import desc

# Compute degree centrality
degree = g.degrees
degree = degree.selectExpr("id", "degree AS degree_centrality")
degree = degree.sort(desc("degree_centrality"))

In [117]:
joined_degree = degree.join(vertices, vertices.user_id == degree.id, "left")

In [118]:
A = joined_degree.select(["name","since_year","average_stars","response","fans","count_friends", "degree_centrality"])
A.show(10)

+--------+----------+-------------+--------+----+-------------+-----------------+
|    name|since_year|average_stars|response|fans|count_friends|degree_centrality|
+--------+----------+-------------+--------+----+-------------+-----------------+
|  Steven|      2010|         3.62|   66519| 739|        10072|            10722|
|    Abby|      2008|         4.15|   68127|1806|         8858|             8965|
|    Niki|      2014|         4.59|   22375|1746|         6896|             7075|
|Brittany|      2011|         4.33|   38591|2086|         6660|             6854|
|Michelle|      2008|         4.05|   55120|1353|         5958|             6627|
| Michael|      2010|         4.43|   49580|1251|         6481|             6595|
|     Joi|      2008|          4.1|   61571| 944|         6234|             6386|
|    John|      2008|         3.75|   69937|1309|         6232|             6349|
| Christy|      2011|         4.19|   96429| 740|         5522|             5716|
|    Nate|      

## Label Propagation

Run static Label Propagation Algorithm for detecting communities in networks.

In [104]:
result = g.labelPropagation(maxIter=5)
result.show(5)

+--------------------+-------------+-------------+----+--------+------------+--------------------+----------------+--------+-------------+----------+-------------+
|              friend|average_stars|profile_count|fans|    name|review_count|                  id|profile_response|response|count_friends|since_year|        label|
+--------------------+-------------+-------------+----+--------+------------+--------------------+----------------+--------+-------------+----------+-------------+
|dioXbYFdMCyE7zCK9...|         3.71|          133|  23| Michael|         242|dioXbYFdMCyE7zCK9...|             189|     820|          116|      2008|1030792151045|
|FlXBpK_YZxLo27jcM...|         4.02|          489| 102| Mallory|        1158|FlXBpK_YZxLo27jcM...|             731|    7274|          604|      2011|1030792151045|
|jxac2pQOPQvYY32Zk...|         3.92|          177|  10|Clarissa|         362|jxac2pQOPQvYY32Zk...|              42|     687|           53|      2008|1030792151045|
|CDxlgCoUoNRV3VM

In [96]:
label_counts = result.groupby(col("label")).count()

In [97]:
label_counts.show()

+-------------+-----+
|        label|count|
+-------------+-----+
| 970662608897|    2|
| 721554505733|   86|
|1030792151045|  708|
| 936302870539|    5|
|1022202216450|   46|
| 953482739716|  182|
| 472446402566|  133|
| 609885356039|    7|
| 326417514504|   27|
|1005022347270|   20|
| 704374636547|   10|
| 944892805126|  328|
|1047972020234|  111|
| 635655159814|   22|
| 558345748482|    2|
|1039382085644|  197|
|1709396983817|   48|
| 730144440327|    1|
| 627065225216|    1|
| 601295421445|    2|
+-------------+-----+



In [99]:
g.vertices.count()

341621

In [105]:
result.filter(col())

1938

In [103]:
result.count()

1938