In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [18]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [19]:
import findspark
findspark.init()

In [20]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, size

In [22]:
spark = SparkSession.builder.appName("PageRankAnalysis").getOrCreate()

In [23]:
data_df = spark.read.text('/content/drive/MyDrive/pagerank.txt')

In [24]:
# Split the lines into (source_node, target_node) pairs
edges_df = data_df.selectExpr("split(value, ' ')[0] as source", "split(value, ' ')[1] as target")


In [25]:
# 1. Python list: Output sorted (based on the node id) list of incoming edges
incoming_edges_list = edges_df.groupBy("target").agg(collect_list("source").alias("incoming_edges")).orderBy("target").collect()


In [26]:
# 2. Python dictionary: Show only the nodes who have exactly 5 incoming edges
nodes_with_5_incoming_edges = edges_df.groupBy("target").agg(size(collect_list("source")).alias("incoming_count")).filter("incoming_count = 5").select("target").collect()
nodes_with_5_incoming_edges_dict = {row["target"]: 5 for row in nodes_with_5_incoming_edges}

In [27]:
# 3. Python list: Show the incoming edges of the node '1'
incoming_edges_of_node_1 = edges_df.filter("target = '1'").select("source").collect()


In [28]:
print("1. Sorted list of incoming edges:")
print(sorted(incoming_edges_list))

print("\n2. Nodes with exactly 5 incoming edges (Python dictionary):")
print(nodes_with_5_incoming_edges_dict)

print("\n3. Incoming edges of node '1':")
print([row["source"] for row in incoming_edges_of_node_1])


1. Sorted list of incoming edges:
[Row(target='1', incoming_edges=['9', '4', '7', '1', '5']), Row(target='2', incoming_edges=['8', '7', '6', '1', '5']), Row(target='3', incoming_edges=['4', '1']), Row(target='4', incoming_edges=['9', '2', '6', '8', '1', '3', '6', '3']), Row(target='5', incoming_edges=['4', '5', '5', '2', '4']), Row(target='6', incoming_edges=['6', '9', '3', '5', '4', '8', '5', '6']), Row(target='7', incoming_edges=['7', '2', '2', '3', '6']), Row(target='8', incoming_edges=['5', '8', '3', '4', '1', '3', '9', '6']), Row(target='9', incoming_edges=['5', '9', '4', '9'])]

2. Nodes with exactly 5 incoming edges (Python dictionary):
{'7': 5, '5': 5, '1': 5, '2': 5}

3. Incoming edges of node '1':
['9', '4', '7', '1', '5']
