In [1]:
# 2021.12.04 / KwnakiAhn / How to setup pyspark on jupyuter

# install java (https://m.blog.naver.com/opusk/220985259485)
# !sudo apt-get install default-jdk

# Install apache spark
# !wget https://spark.apache.org/downloads.html 
# (wget https://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz)
# (wget https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz) / EMR release 6.0.0
# !tar -xzf spark-1.2.0-bin-hadoop2.4.tgz
# !sudo mv spark-3.2.0-bin-hadoop3.2 /opt/spark-3.2.0
# !sudo ln -s /opt/spark-3.2.0 /opt/spark̀
# !export SPARK_HOME=/opt/spark
# !export PATH=$SPARK_HOME/bin:$PATH

# install pyspark
# !pip install pyspark
# !pip install findspark

In [2]:
import os, sys
os.environ["SPARK_HOME"] = ['/opt/spark-2.3.0', '/opt/spark-2.4.4', '/opt/spark-3.2.0'][2]
sys.path.insert(0, os.environ["SPARK_HOME"])

import findspark
findspark.init()

# 2021.12.04 / KwankiAhn / How to use graphframes in pyspark
# 1. config('spark.jars.packages') = pyspark --packages graphframes:graphframes:0.6.0-spark2.3-s_2.11
# 2. graphframes <-> spark version sensitive!
#  a. https://spark-packages.org/package/graphframes/graphframes
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("GraphFramePractice1") \
    .config("spark.some.config.option", "some-value") \
    .config('spark.jars.packages', 'graphframes:graphframes:0.8.2-spark3.2-s_2.12')\
    .getOrCreate()



:: loading settings :: url = jar:file:/opt/spark-3.2.0/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jupyter/.ivy2/cache
The jars for the packages stored in: /home/jupyter/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-49fa0a35-9428-4915-8ada-fe411ce45060;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 191ms :: artifacts dl 11ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	------------------------------

In [3]:
# tutorial guide : https://graphframes.github.io/graphframes/docs/_site/user-guide.html

In [None]:
# Vertex DataFrame
v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"])
# Edge DataFrame
e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)

In [16]:
from graphframes.examples import Graphs
g = Graphs(spark).friends()
print("vertices"); g.vertices.show(); print("edges"); g.edges.show()

# Get a DataFrame with columns "id" and "inDegree" (in-degree)
vertexInDegrees = g.inDegrees
print("vertexInDegrees"); vertexInDegrees.show()

# Find the youngest user's age in the graph.
# This queries the vertex DataFrame.
g.vertices.groupBy().min("age").show()

# Count the number of "follows" in the graph.
# This queries the edge DataFrame.
numFollows = g.edges.filter("relationship = 'follow'").count()
print(f"numFollows: {numFollows}")

vertices
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 36|
+---+-------+---+

edges
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
+---+---+------------+

vertexInDegrees
+---+--------+
| id|inDegree|
+---+--------+
|  b|       2|
|  c|       2|
|  f|       1|
|  d|       1|
|  a|       1|
+---+--------+

+--------+
|min(age)|
+--------+
|      29|
+--------+

numFollows: 4


In [18]:
from graphframes.examples import Graphs
g = Graphs(spark).friends()  # Get example graph

# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
print("motifs"); motifs.show()

# More complex queries can be expressed by applying filters.
print("motifs query : b.age > 30"); motifs.filter("b.age > 30").show()

motifs
+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|{b, c, follow}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|
+----------------+--------------+----------------+--------------+

motifs query : b.age > 30
+----------------+--------------+------------+--------------+
|               a|             e|           b|            e2|
+----------------+--------------+------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|{b, Bob, 36}|{b, c, follow}|
+----------------+--------------+------------+--------------+



In [26]:
from pyspark.sql.functions import col, lit, when
from pyspark.sql.types import IntegerType
from functools import reduce

chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
print("chain4"); chain4.show()

# Query on sequence, with state (cnt)
#  (a) Define method for updating state given the next element of the motif.
sumFriends = lambda cnt, relationship: when(relationship == "friend", cnt + 1).otherwise(cnt)
#  (b) Use sequence operation to apply method to sequence of elements in motif.
#      In this case, the elements are the 3 edges.
condition = reduce(lambda cnt, e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
#  (c) Apply filter to DataFrame.
chainWith2Friends2 = chain4.where(condition >= 2)
print("chainWith2Friends2"); chainWith2Friends2.show()

chain4
+----------------+--------------+----------------+--------------+----------------+--------------+----------------+
|               a|            ab|               b|            bc|               c|            cd|               d|
+----------------+--------------+----------------+--------------+----------------+--------------+----------------+
| {e, Esther, 32}|{e, d, friend}|  {d, David, 29}|{d, a, friend}|  {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|
| {e, Esther, 32}|{e, f, follow}|  {f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|
|  {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|
|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|
|  {f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|{c, b, follow}|    {b, 

In [24]:
reduce(lambda a, b: a + b, ["ab", "bc", "cd"])

'abbccd'