In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

spark_session = SparkSession.builder\
        .master("spark://192.168.2.70:7077") \
        .appName("TianruZ_lecture2_GraphX")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.cores.max", 4)\
        .config("spark.jars.packages","graphframes:graphframes:0.8.2-spark3.2-s_2.12")\
        .getOrCreate()
sqlContext = SQLContext(spark_session.sparkContext)

Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars


:: loading settings :: url = jar:file:/home/ubuntu/spark-3.2.3-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1007f98e-7a10-4dfb-a301-399e1310ce55;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 495ms :: artifacts dl 4ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-1007f98e-7a10-4dfb-a3

In [2]:
sqlContext

<pyspark.sql.context.SQLContext at 0x7f038dfb3f10>

In [3]:
#!pip3 install graphframes
from graphframes import *

Let's try to create an example social network

![https://databricks.com/wp-content/uploads/2016/03/social-network-graph-diagram.png](https://databricks.com/wp-content/uploads/2016/03/social-network-graph-diagram.png)

In [4]:
# Create a Vertex DataFrame
vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])
# Create an Edge DataFrame with "src", "dst", and 'relationship' columns
edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(vertices, edges)
print(g)

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])


In [5]:
# Basic graph and DataFrame queries

## Vertices
g.vertices.show()

## Edges
g.edges.show()

## The incoming degree of the vertices:  
g.inDegrees.show()

## The outgoing degree of the vertices:
g.outDegrees.show()

## The degree of the vertices:
g.degrees.show()

                                                                                

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 36|
|  g|  Gabby| 60|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+

+---+--------+
| id|inDegree|
+---+--------+
|  c|       2|
|  b|       2|
|  f|       1|
|  e|       1|
|  d|       1|
|  a|       1|
+---+--------+

+---+---------+
| id|outDegree|
+---+---------+
|  f|        1|
|  c|        1|
|  b|        1|
|  a|        2|
|  e|        2|
|  d|        1|
+---+---------+

+---+------+
| id|degree|
+---+------+
|  f|     2|
|  c|     3|
|  b|     3|
|  a|     3|
|  e|     3|
|  d|     2|
+---+------+



In [6]:
# You can run queries directly on the vertices DataFrame. 
# For example, we can find the age of the youngest person in the graph:
youngest = g.vertices.groupBy().min("age")
youngest.show()

# Likewise, you can run queries on the edges DataFrame. 
# For example, let's count the number of 'follow' relationships in the graph:
numFollows = g.edges.filter("relationship = 'follow'").count()
print("The number of follow edges is", numFollows)

+--------+
|min(age)|
+--------+
|      29|
+--------+

The number of follow edges is 4


In [7]:
# Motif finding
## Using motifs you can build more complex relationships involving edges and vertices. 
## The following codes find the pairs of vertices with edges in both directions between them. 
## The result is a DataFrame, in which the column names are given by the motif keys.
## Check out the GraphFrame User Guide for more details on the API.

# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()

+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|{b, c, follow}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|
+----------------+--------------+----------------+--------------+



In [8]:
# Since the result is a DataFrame, more complex queries can be built on top of the motif.
filtered = motifs.filter("b.age > 30")
filtered.show()

+----------------+--------------+------------+--------------+
|               a|             e|           b|            e2|
+----------------+--------------+------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|{b, Bob, 36}|{b, c, follow}|
+----------------+--------------+------------+--------------+



In [9]:
# Subgraphs
# GraphFrames provides APIs for building subgraphs by filtering on edges and vertices. 
# These filters can be composed together, for example the following subgraph only includes people who are more than 30 years old and have friends who are more than 30 years old.

g2 = g.filterEdges("relationship = 'friend'").filterVertices("age > 30").dropIsolatedVertices()
g2.vertices.show()
g2.edges.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  b|   Bob| 36|
|  a| Alice| 34|
|  e|Esther| 32|
+---+------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



In [10]:
# Standard graph algorithms
# GraphFrames comes with a number of standard graph algorithms built in:

# Breadth-first search (BFS)
# Connected components
# Strongly connected components
# Label Propagation Algorithm (LPA)
# PageRank (regular and personalized)
# Shortest paths
# Triangle count

# find more via https://graphframes.github.io/graphframes/docs/_site/user-guide.html

In [11]:
# Breadth-first search (BFS)
# Search from "Esther" for users of age < 32.

paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|{e, Esther, 32}|{e, d, friend}|{d, David, 29}|
+---------------+--------------+--------------+



In [12]:
# Shortest paths
# Computes shortest paths to the given set of landmark vertices, where landmarks are specified by vertex ID.

results = g.shortestPaths(landmarks=["a", "d"])
results.show()

[Stage 174:>                                                        (0 + 1) / 1]

+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  g|  Gabby| 60|              {}|
|  f|  Fanny| 36|              {}|
|  e| Esther| 32|{a -> 2, d -> 1}|
|  d|  David| 29|{a -> 1, d -> 0}|
|  c|Charlie| 30|              {}|
|  b|    Bob| 36|              {}|
|  a|  Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+



                                                                                

In [13]:
# Triangle count
# Computes the number of triangles passing through each vertex.

results = g.triangleCount()
results.show()

+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    0|  c|Charlie| 30|
|    0|  b|    Bob| 36|
|    1|  a|  Alice| 34|
|    0|  g|  Gabby| 60|
|    0|  f|  Fanny| 36|
|    1|  e| Esther| 32|
|    1|  d|  David| 29|
+-----+---+-------+---+



In [14]:
# PageRank
# Identify important vertices in a graph based on connections.

results = g.pageRank(resetProbability=0.15, tol=0.01)#, maxIter=10) # or for a certain num of iters
results.vertices.show()
results.edges.show()

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  g|  Gabby| 60| 0.1799821386239711|
|  f|  Fanny| 36| 0.3283606792049851|
|  e| Esther| 32|0.37085233187676075|
|  d|  David| 29| 0.3283606792049851|
|  c|Charlie| 30| 2.6878300011606218|
|  b|    Bob| 36|  2.655507832863289|
|  a|  Alice| 34|0.44910633706538744|
+---+-------+---+-------------------+

+---+---+------------+------+
|src|dst|relationship|weight|
+---+---+------------+------+
|  f|  c|      follow|   1.0|
|  e|  f|      follow|   0.5|
|  e|  d|      friend|   0.5|
|  d|  a|      friend|   1.0|
|  c|  b|      follow|   1.0|
|  b|  c|      follow|   1.0|
|  a|  e|      friend|   0.5|
|  a|  b|      friend|   0.5|
+---+---+------------+------+



In [15]:
spark_session.stop()