In [4]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark==3.0.0
!pip install graphframes



# Step 2: Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.10/dist-packages/pyspark"

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044158 sha256=44f30e78c09d2d8eb718c8318cc500eedb075ddcd402571195762188556d532c
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attem



In [5]:
from pyspark.sql import SparkSession
from graphframes import GraphFrame
from pyspark.sql.functions import desc

# Initialize Spark session
spark = SparkSession.builder.appName("Graph Analytics") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# Create vertices DataFrame
vertices = spark.createDataFrame([
    ("Alice", 45),
    ("Jacob", 43),
    ("Roy", 21),
    ("Ryan", 49),
    ("Emily", 24),
    ("Sheldon", 52)
], ["id", "age"])

# Create edges DataFrame
edges = spark.createDataFrame([
    ("Sheldon", "Alice", "Sister"),
    ("Alice", "Jacob", "Husband"),
    ("Emily", "Jacob", "Father"),
    ("Ryan", "Alice", "Friend"),
    ("Alice", "Emily", "Daughter"),
    ("Alice", "Roy", "Son"),
    ("Jacob", "Roy", "Son")
], ["src", "dst", "relation"])

# Create the GraphFrame
g = GraphFrame(vertices, edges)

# 1. Group and count the edges, then order by count
print("Grouping and ordering edges:")
g.edges.groupBy("src", "dst").count().orderBy(desc("count")).show(5)

# 2. Filter the GraphFrame by source or destination and order
print("Filtering edges where src or dst is 'Alice':")
g.edges.where("src = 'Alice' OR dst = 'Alice'").groupBy("src", "dst").count().orderBy(desc("count")).show(5)

# 3. Create a subgraph where Alice is either the source or destination
print("Creating a subgraph where 'Alice' is involved:")
subgraph_edges = g.edges.where("src = 'Alice' OR dst = 'Alice'")
subgraph = GraphFrame(g.vertices, subgraph_edges)
subgraph.edges.show()

# 4. Find motifs (e.g., Alice and another person connected by a relation)
print("Finding motifs:")
motifs = g.find("(a)-[ab]->(b)")
motifs.show()

# Filtering the motifs for specific relation types
print("Filtering motifs with relation 'Son' or 'Husband':")
motifs_filtered = g.find("(a)-[ab]->(b)").filter("ab.relation = 'Son' OR ab.relation = 'Husband'")
motifs_filtered.show()

# 5. Compute PageRank
print("Computing PageRank:")
page_rank = g.pageRank(resetProbability=0.15, maxIter=5)
page_rank.vertices.orderBy(desc("pagerank")).show(5)

# 6. Compute in-degrees and out-degrees
print("In-degree of vertices:")
in_degree = g.inDegrees
in_degree.orderBy(desc("inDegree")).show(5)

print("Out-degree of vertices:")
out_degree = g.outDegrees
out_degree.orderBy(desc("outDegree")).show(5)

# 7. Compute connected components and strongly connected components
# print("Computing connected components:")
# spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
# connected_components = g.connectedComponents()
# connected_components.show()

print("Computing strongly connected components:")
strongly_connected_components = g.stronglyConnectedComponents(maxIter=5)
strongly_connected_components.show()

# 8. Perform BFS to search paths within the graph
print("Performing BFS from 'Alice' to 'Jacob':")
bfs_result = g.bfs(fromExpr="id = 'Alice'", toExpr="id = 'Jacob'", maxPathLength=2)
bfs_result.show()

# Stop the Spark session after completion
spark.stop()

Py4JJavaError: An error occurred while calling o111.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
