<a href="https://colab.research.google.com/github/YaswanthAd/Spark_GraphX/blob/main/GraphFrames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1><center>Apache Spark GraphFrames</center></h1>

Let's run GraphFrame code on Google Colab and see if it's faster than Databricks.


### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [1]:
!rm -rf spark-3.1.1-bin-hadoop3.2

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q --show-progress http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
#!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark pyspark
#!pip -q install findspark pyspark graphframes

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Set Environment Variables:

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
!ls

sample_data


In [5]:
!pip show pyspark

Name: pyspark
Version: 3.5.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: 


### Installing GraphFrames

In [6]:
!pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7


In [7]:
!python -V

Python 3.10.12


In [8]:
!curl -L -o "/usr/local/lib/python3.10/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar" https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.1-s_2.12/graphframes-0.8.2-spark3.1-s_2.12.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  242k  100  242k    0     0   724k      0 --:--:-- --:--:-- --:--:--  726k


In [9]:
!ls /usr/local/lib/python3.10/dist-packages/pyspark/jars/

activation-1.1.1.jar
aircompressor-0.25.jar
algebra_2.12-2.0.1.jar
annotations-17.0.0.jar
antlr4-runtime-4.9.3.jar
antlr-runtime-3.5.2.jar
aopalliance-repackaged-2.6.1.jar
arpack-3.0.3.jar
arpack_combined_all-0.1.jar
arrow-format-12.0.1.jar
arrow-memory-core-12.0.1.jar
arrow-memory-netty-12.0.1.jar
arrow-vector-12.0.1.jar
audience-annotations-0.5.0.jar
avro-1.11.2.jar
avro-ipc-1.11.2.jar
avro-mapred-1.11.2.jar
blas-3.0.3.jar
bonecp-0.8.0.RELEASE.jar
breeze_2.12-2.1.0.jar
breeze-macros_2.12-2.1.0.jar
cats-kernel_2.12-2.1.1.jar
chill_2.12-0.10.0.jar
chill-java-0.10.0.jar
commons-cli-1.5.0.jar
commons-codec-1.16.0.jar
commons-collections-3.2.2.jar
commons-collections4-4.4.jar
commons-compiler-3.1.9.jar
commons-compress-1.23.0.jar
commons-crypto-1.1.0.jar
commons-dbcp-1.4.jar
commons-io-2.13.0.jar
commons-lang-2.6.jar
commons-lang3-3.12.0.jar
commons-logging-1.1.3.jar
commons-math3-3.6.1.jar
commons-pool-1.5.4.jar
commons-text-1.10.0.jar
compress-lzf-1.1.2.jar
curator-client-2.13.0.jar
cur

### Starting Spark with Libraries Loaded

In [10]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars", "/usr/local/lib/python3.9/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar") \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)  # Property used to format output tables better\


### Example Dataset

In [11]:
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"])
# Edge DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])
# Create a GraphFram

In [12]:
from graphframes import *
from graphframes import GraphFrame

In [13]:
print('PySpark Version :'+spark.version)
print('PySpark Version :'+spark.sparkContext.version)


PySpark Version :3.5.0
PySpark Version :3.5.0


In [14]:
g = GraphFrame(v, e)



In [15]:
g.vertices.count()

7

In [16]:
g.edges.count()

8

In [17]:
type(g.vertices)

pyspark.sql.dataframe.DataFrame

In [18]:
g.vertices.select("name").show()

+-------+
|   name|
+-------+
|  Alice|
|    Bob|
|Charlie|
|  David|
| Esther|
|  Fanny|
|  Gabby|
+-------+



In [19]:
g.edges.select("relationship").distinct().show()

+------------+
|relationship|
+------------+
|      friend|
|      follow|
+------------+



In [20]:
g.edges.select("src").distinct().show()

+---+
|src|
+---+
|  f|
|  c|
|  b|
|  a|
|  e|
|  d|
+---+



In [21]:
g.bfs("name = 'Alice'", "name = 'David'").show()



+--------------+--------------+---------------+--------------+--------------+
|          from|            e0|             v1|            e1|            to|
+--------------+--------------+---------------+--------------+--------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}|{d, David, 29}|
+--------------+--------------+---------------+--------------+--------------+



In [22]:
g.bfs("name = 'Esther'", "age  < 32", \
      edgeFilter= "relationship != 'friend'", maxPathLength=4)

from,e0,v1,e1,to
"{e, Esther, 32}","{e, f, follow}","{f, Fanny, 36}","{f, c, follow}","{c, Charlie, 30}"


In [23]:

# Search from "Esther" for users of age < 32.
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

# Specify edge filters or max path lengths.
g.bfs("name = 'Esther'", "age < 32",\
  edgeFilter="relationship != 'friend'", maxPathLength=3)

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|{e, Esther, 32}|{e, d, friend}|{d, David, 29}|
+---------------+--------------+--------------+



from,e0,v1,e1,to
"{e, Esther, 32}","{e, f, follow}","{f, Fanny, 36}","{f, c, follow}","{c, Charlie, 30}"


In [24]:
results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

+---+----------------+
| id|       distances|
+---+----------------+
|  a|{d -> 2, a -> 0}|
|  b|              {}|
|  c|              {}|
|  d|{d -> 0, a -> 1}|
|  e|{d -> 1, a -> 2}|
|  f|              {}|
|  g|              {}|
+---+----------------+



In [41]:
g.bfs("name = 'Alice'", "name = 'Charlie'", edgeFilter=None, maxPathLength=2)

from,e0,v1,e1,to
"{a, Alice, 34}","{a, b, friend}","{b, Bob, 36}","{b, c, follow}","{c, Charlie, 30}"


In [44]:
#shortest path between two nodes
g.bfs(fromExpr="name = 'Alice'", toExpr= "age > 20 and name!= 'Alice'", \
      edgeFilter = None, maxPathLength=3)



from,e0,to
"{a, Alice, 34}","{a, b, friend}","{b, Bob, 36}"
"{a, Alice, 34}","{a, e, friend}","{e, Esther, 32}"


In [26]:
g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

In [27]:
g1.vertices.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  b|   Bob| 36|
|  a| Alice| 34|
|  e|Esther| 32|
+---+------+---+



In [28]:
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  c|       2|
|  b|       2|
|  f|       1|
|  e|       1|
|  d|       1|
|  a|       1|
+---+--------+



In [29]:
g.outDegrees.show()

+---+---------+
| id|outDegree|
+---+---------+
|  f|        1|
|  c|        1|
|  b|        1|
|  a|        2|
|  e|        2|
|  d|        1|
+---+---------+



In [31]:
results = g.pageRank(resetProbability=0.15, tol = 0.1 )



In [32]:
g1.edges.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



In [33]:
sc = spark.sparkContext
sc.setCheckpointDir("/tmp")

result = g.connectedComponents() #sets of nodes, where atleast one component from each to the other(undirected edge); g has no connections with other that is reason g has 6
result.select("id", "component").orderBy("component").show()

+---+---------+
| id|component|
+---+---------+
|  a|        0|
|  d|        0|
|  b|        0|
|  e|        0|
|  c|        0|
|  f|        0|
|  g|        6|
+---+---------+



In [34]:
sc = spark.sparkContext
sc.setCheckpointDir("/tmp")

result = g.stronglyConnectedComponents(maxIter=10) #sets of nodes; there is a direction connection between a&d, d&e, a&e, strongly connected there is a way to go from a to e, e to d, and d to a
result.select("id", "component").orderBy("component").show()

+---+---------+
| id|component|
+---+---------+
|  a|        0|
|  d|        0|
|  e|        0|
|  b|        1|
|  c|        1|
|  f|        5|
|  g|        6|
+---+---------+



In [35]:


sc.setCheckpointDir("/tmp")
# Run PageRank until convergence to tolerance "tol".
results = g.pageRank(resetProbability=0.15, tol=0.01)
# Display resulting pageranks and final edge weights
# Note that the displayed pagerank may be truncated, e.g., missing the E notation.
# In Spark 1.5+, you can use show(truncate=False) to avoid truncation.
results.vertices.select("id", "pagerank").show()
results.edges.select("src", "dst", "weight").show()

+---+-------------------+
| id|           pagerank|
+---+-------------------+
|  a|0.44910633706538744|
|  b|  2.655507832863289|
|  c| 2.6878300011606218|
|  d| 0.3283606792049851|
|  e|0.37085233187676075|
|  f| 0.3283606792049851|
|  g| 0.1799821386239711|
+---+-------------------+

+---+---+------+
|src|dst|weight|
+---+---+------+
|  a|  b|   0.5|
|  a|  e|   0.5|
|  b|  c|   1.0|
|  c|  b|   1.0|
|  d|  a|   1.0|
|  e|  d|   0.5|
|  e|  f|   0.5|
|  f|  c|   1.0|
+---+---+------+



In [36]:
results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

+---+----------------+
| id|       distances|
+---+----------------+
|  a|{d -> 2, a -> 0}|
|  b|              {}|
|  c|              {}|
|  d|{d -> 0, a -> 1}|
|  e|{d -> 1, a -> 2}|
|  f|              {}|
|  g|              {}|
+---+----------------+



In [37]:
results = g.triangleCount()
results.select("id", "count").show()

+---+-----+
| id|count|
+---+-----+
|  c|    0|
|  b|    0|
|  a|    1|
|  g|    0|
|  f|    0|
|  e|    1|
|  d|    1|
+---+-----+



In [45]:
flights = spark.read.option("header", "true").csv("/content/sample_data/T_T100D_SEGMENT_US_CARRIER_ONLY.csv")

In [48]:
airports = flights.select("ORIGIN_AIRPORT_ID", "ORIGIN").toDF("id","name").distinct()

In [49]:
display(airports)

id,name
11086,CNO
13230,MDT
14006,PAH
15153,TCL
12635,KCG
12916,LCI
13342,MKE
11559,ENN
12365,IPT
14122,PIT


In [50]:
airports.show()

+-----+----+
|   id|name|
+-----+----+
|11086| CNO|
|13230| MDT|
|14006| PAH|
|15153| TCL|
|12635| KCG|
|12916| LCI|
|13342| MKE|
|11559| ENN|
|12365| IPT|
|14122| PIT|
|14512| RFD|
|14543| RKS|
|15171| TEK|
|10747| BRO|
|12184| HOM|
|14130| PKA|
|15012| STG|
|10396| ATK|
|10800| BUR|
|12223| HTS|
+-----+----+
only showing top 20 rows



In [51]:
airports = flights.select("ORIGIN_AIRPORT_ID", "ORIGIN").toDF("id","name").distinct()

In [52]:
airportEdges = flights.select("ORIGIN", "DEST").toDF("src","dst")

In [53]:
airportEdges.show()

+---+---+
|src|dst|
+---+---+
|01A|A43|
|06A|A43|
|06A|A43|
|1G4|BLD|
|1G4|BLD|
|1G4|BLD|
|A01|FAI|
|A04|A43|
|A06|OME|
|A20|ANC|
|A20|SCC|
|A27|FAI|
|A29|A43|
|A29|A43|
|A43|01A|
|A43|06A|
|A43|06A|
|A43|A04|
|A43|A29|
|A43|A29|
+---+---+
only showing top 20 rows



In [54]:
airportGraph = GraphFrame(airports, airportEdges)



In [55]:
airportGraph.vertices.count()

908

In [66]:
airportGraph.vertices.show()

+-----+----+
|   id|name|
+-----+----+
|11086| CNO|
|13230| MDT|
|14006| PAH|
|15153| TCL|
|12635| KCG|
|12916| LCI|
|13342| MKE|
|11559| ENN|
|12365| IPT|
|14122| PIT|
|14512| RFD|
|14543| RKS|
|15171| TEK|
|10747| BRO|
|12184| HOM|
|14130| PKA|
|15012| STG|
|10396| ATK|
|10800| BUR|
|12223| HTS|
+-----+----+
only showing top 20 rows



In [56]:
airportGraph.edges.count()

34053

In [58]:
airportGraph.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|LEB|       6|
|VWD|       1|
|HYL|       5|
|KEB|       5|
|BGM|       2|
|DLG|      45|
|TYE|       5|
|OLF|       2|
|ELI|      12|
|INL|       2|
|PSE|       7|
|MSY|     257|
|PPG|       2|
|BNF|       1|
|GEG|      82|
|KNW|      10|
|HVR|       1|
|FOK|      12|
|CDW|       8|
|DRT|       5|
+---+--------+
only showing top 20 rows



In [60]:
airportGraph.outDegrees.show()



+---+---------+
| id|outDegree|
+---+---------+
|BGM|        2|
|HYL|        3|
|KEB|        5|
|LEB|        6|
|VWD|        1|
|DLG|       52|
|ELI|       15|
|INL|        2|
|OLF|        1|
|PSE|        6|
|TYE|        5|
|BNF|        1|
|MSY|      256|
|PPG|        2|
|CDW|        4|
|DRT|        5|
|FOK|        8|
|GEG|       79|
|HVR|        1|
|KNW|       11|
+---+---------+
only showing top 20 rows



In [73]:
airportGraph.cache()

GraphFrame(v:[id: string, name: string], e:[src: string, dst: string])

In [75]:
bfs_result = airportGraph.bfs("id = 'DFW'", "id = 'MCO'")

In [78]:
bfs_result.show()


+---+----+
| id|name|
+---+----+
+---+----+



In [72]:
airportGraph.edges.where("src = 'DFW' and dest = 'MCO'").groupBy("src", "dst").count()

src,dst,count
DFW,MCO,12


In [None]:
result.show()