In [1]:
# 1. Data Loading & Preprocessing (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, least, when, collect_set, explode, size, min as spark_min, count, array_intersect, avg, sum as spark_sum
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("Assignment2") \
    .getOrCreate()

# Read as DataFrame (skip commented lines starting with #)
edges = (
    spark.read.option("comment", "#")
    .option("delimiter", "\t")
    .csv("../Wiki-Vote.txt")
    .select(
        col("_c0").cast("int").alias("src"),
        col("_c1").cast("int").alias("dst")
    )
)

# Get unique vertices
vertices = edges.select("src").union(edges.select("dst")).distinct().withColumnRenamed("src", "id") 

In [2]:
# 2. Basic Graph Statistics
num_nodes = vertices.count()
num_edges = edges.count()

print(f"Number of nodes in graph: {num_nodes}")
print(f"Number of edges in graph: {num_edges}")

Number of nodes in graph: 7115
Number of edges in graph: 103689


In [3]:
# 3. Weakly Connected Components (WCC)

# --- Make edges undirected ---
undirected_edges = edges.select("src", "dst").union(edges.select(col("dst").alias("src"), col("src").alias("dst"))).distinct()

# --- Initialize each node's component as itself ---
components = vertices.withColumn("component", col("id"))

# --- Iteratively propagate component IDs ---
changed = True
iteration = 0
while changed:
    iteration += 1
    # Propagate component IDs via edges
    propagated = undirected_edges.join(components, undirected_edges.src == components.id, "left") \
                                    .select(col("dst").alias("id"), col("component"))
    
    # Get the minimum component ID for each node (either itself or neighbor)
    new_components = components.union(propagated) \
                                .groupBy("id") \
                                .agg(spark_min("component").alias("component"))
    
    # Check for convergence
    diff = components.join(new_components, "id") \
                        .filter(components.component != new_components.component)
    
    changed = diff.count() > 0
    components = new_components

# --- Compute WCC sizes ---
component_sizes = components.groupBy("component").agg(
    F.countDistinct("id").alias("num_nodes")
)

# --- Assign component to each edge (both endpoints) ---
edges_with_comp = undirected_edges \
    .join(components.withColumnRenamed("id", "src"), on="src") \
    .withColumnRenamed("component", "src_comp") \
    .join(components.withColumnRenamed("id", "dst").withColumnRenamed("component", "dst_comp"), on="dst")

# Keep edges where both endpoints are in the same component
edges_same_comp = edges_with_comp.filter(col("src_comp") == col("dst_comp"))

# --- Count edges per component ---
edges_per_comp = edges_same_comp.groupBy("src_comp").agg(
    F.count("*").alias("num_edges")
).withColumnRenamed("src_comp", "component")

# --- Combine node and edge stats ---
wcc_stats = component_sizes.join(edges_per_comp, "component", "left").fillna(0)

# --- Find largest WCC ---
largest_wcc = wcc_stats.orderBy(col("num_nodes").desc()).limit(1).collect()[0]

print("\n===== Largest WCC =====")
print(f"Component ID: {largest_wcc['component']}")
print(f"Number of Nodes: {largest_wcc['num_nodes']}")
print(f"Number of Edges: {largest_wcc['num_edges']}")




===== Largest WCC =====
Component ID: 3
Number of Nodes: 7066
Number of Edges: 201472


In [4]:
# ------------------ SCC computation (Python) ------------------
def kosaraju(vertices, edges):
    # Collect vertices and edges to Python
    vert_list = [int(r.id) for r in vertices.collect()]
    edge_list = [(int(r.src), int(r.dst)) for r in edges.collect()]

    # Build adjacency lists
    out_adj = {v: [] for v in vert_list}
    in_adj = {v: [] for v in vert_list}
    for src, dst in edge_list:
        out_adj[src].append(dst)
        in_adj[dst].append(src)

    # Step 1: Reverse DFS to compute finishing order
    visited = set()
    order = []

    def dfs_rev(u):
        stack = [(u, 0)]
        visited.add(u)
        while stack:
            node, idx = stack[-1]
            nbrs = in_adj.get(node, [])
            if idx < len(nbrs):
                nxt = nbrs[idx]
                stack[-1] = (node, idx + 1)
                if nxt not in visited:
                    visited.add(nxt)
                    stack.append((nxt, 0))
            else:
                order.append(node)
                stack.pop()

    for v in vert_list:
        if v not in visited:
            dfs_rev(v)

    # Step 2: Forward DFS to assign components
    comp_dict = {}
    cid = 0

    def dfs_fwd(u, cid):
        stack = [u]
        comp_dict[u] = cid
        while stack:
            node = stack.pop()
            for nxt in out_adj.get(node, []):
                if nxt not in comp_dict:
                    comp_dict[nxt] = cid
                    stack.append(nxt)

    for v in reversed(order):
        if v not in comp_dict:
            cid += 1
            dfs_fwd(v, cid)

    return comp_dict, edge_list

# Compute SCCs
comp_dict, edge_list = kosaraju(vertices, edges)

# ------------------ Analyze SCCs ------------------
from collections import Counter

# Count sizes of each component
comp_sizes = Counter(comp_dict.values())
largest_scc_id, largest_scc_nodes = max(comp_sizes.items(), key=lambda x: x[1])

# Count edges internal to largest SCC
largest_scc_set = set(v for v, cid in comp_dict.items() if cid == largest_scc_id)
largest_scc_edges_count = sum(1 for src, dst in edge_list if src in largest_scc_set and dst in largest_scc_set)

# ------------------ Print results ------------------
print("\n===== Largest SCC =====")
print(f"  • Component ID: {largest_scc_id}")
print(f"  • Nodes: {largest_scc_nodes}")
print(f"  • Edges (internal): {largest_scc_edges_count}")
print(f"  • Fraction of Nodes: {largest_scc_nodes / num_nodes:.4f}")
print(f"  • Fraction of Edges: {largest_scc_edges_count / num_edges:.4f}")

# ------------------ Optional: Create DataFrame of largest SCC vertices ------------------
largest_scc_vertices_df = spark.createDataFrame(
    [(v,) for v in largest_scc_set],
    schema=["nid"]
)


===== Largest SCC =====
  • Component ID: 1954
  • Nodes: 1300
  • Edges (internal): 39456
  • Fraction of Nodes: 0.1827
  • Fraction of Edges: 0.3805


In [5]:
# 5. Clustering Metrics & Triangles
edges = edges.filter(F.col("src") != F.col("dst"))

# Undirected edges (both directions)
edges_undir = (
    edges.select("src", "dst")
    .union(edges.select(F.col("dst").alias("src"), F.col("src").alias("dst")))
    .distinct()
)

num_nodes = edges_undir.select("src").union(edges_undir.select("dst")).distinct().count()
num_edges = edges.count()

# ---------------- Step 1: Create adjacency lists ----------------
adjacency = (
    edges_undir.groupBy("src")
    .agg(F.collect_set("dst").alias("nbrs"))
    .withColumnRenamed("src", "vertex")
)

# ---------------- Step 2: Canonicalize edges (u < v) ----------------
edges_canon = (
    edges_undir
    .withColumn("u", F.least(F.col("src"), F.col("dst")))
    .withColumn("v", F.greatest(F.col("src"), F.col("dst")))
    .select("u", "v")
    .distinct()
)

# ---------------- Step 3: For each edge (u,v), find shared neighbors ----------------
adj1 = adjacency.alias("a1")
adj2 = adjacency.alias("a2")

common_neighbors = (
    edges_canon
    .join(adj1, F.col("u") == F.col("a1.vertex"))
    .join(adj2, F.col("v") == F.col("a2.vertex"))
    .select(
        F.col("u"),
        F.col("v"),
        F.size(F.array_intersect(F.col("a1.nbrs"), F.col("a2.nbrs"))).alias("tri_shared")
    )
)

# ---------------- Step 4: Each triangle is shared by 3 edges → divide by 3 ----------------
total_triangles = common_neighbors.agg(F.sum("tri_shared")).first()[0] / 3.0

# ---------------- Step 5: Per-vertex triangle counts ----------------
triangles_per_vertex = (
    common_neighbors
    .select(F.col("u").alias("vertex"), F.col("tri_shared"))
    .union(common_neighbors.select(F.col("v").alias("vertex"), F.col("tri_shared")))
    .groupBy("vertex")
    .agg(F.sum("tri_shared").alias("triangles"))
)

# ---------------- Step 6: Compute degree per vertex ----------------
degree_df = (
    edges_undir.groupBy("src")
    .agg(F.countDistinct("dst").alias("degree"))
    .withColumnRenamed("src", "vertex")
)

# ---------------- Step 7: Combine, compute clustering coefficient ----------------
clustering_df = (
    degree_df.join(triangles_per_vertex, "vertex", "left")
    .fillna(0, subset=["triangles"])
    .withColumn("possible_triplets", (F.col("degree") * (F.col("degree") - 1)) / 2)
    .withColumn(
        "clustering_coeff",
        F.when(F.col("possible_triplets") > 0,
               F.col("triangles") / F.col("possible_triplets")
        ).otherwise(0.0)
    )
)

# ---------------- Step 8: Global stats ----------------
total_triplets = clustering_df.agg(F.sum("possible_triplets")).first()[0]
fraction_closed = total_triangles / total_triplets if total_triplets > 0 else 0.0
avg_clustering = clustering_df.agg(F.avg("clustering_coeff")).first()[0]/2

# ---------------- Print results ----------------
print(f"Total triangles : {int(total_triangles)}")
print(f"Fraction of closed triplets: {fraction_closed:.6f}")
print(f"Average clustering coefficient: {avg_clustering:.6f}")


Total triangles : 608389
Fraction of closed triplets: 0.041826
Average clustering coefficient: 0.140898


In [6]:
# 6. Distance-based Metrics
from collections import deque
import numpy as np

# ------------------ Collect graph to Python ------------------
edge_list = [(int(r.src), int(r.dst)) for r in edges_undir.collect()]
vert_list = [int(r.id) for r in vertices.collect()]

# ------------------ Build adjacency list ------------------
adj = {v: [] for v in vert_list}
for src, dst in edge_list:
    adj[src].append(dst)

# ------------------ BFS function ------------------
def bfs_shortest_paths(start, adj):
    visited = {start: 0}  # node -> distance from start
    q = deque([start])
    while q:
        u = q.popleft()
        for v in adj.get(u, []):
            if v not in visited:
                visited[v] = visited[u] + 1
                q.append(v)
    return visited  # distances from start to reachable nodes

# ------------------ Compute all-pairs distances ------------------
all_distances = []

for i, node in enumerate(vert_list):
    dist_map = bfs_shortest_paths(node, adj)
    all_distances.extend(dist_map.values())

all_distances = [d for d in all_distances if d > 0]  # ignore self-distances

# ------------------ Diameter and Effective Diameter ------------------
diameter = max(all_distances)
effective_diameter = int(np.percentile(all_distances, 90))

print(f"\nGraph Diameter: {diameter}")
print(f"Effective Diameter (90th percentile): {effective_diameter}")


Graph Diameter: 7
Effective Diameter (90th percentile): 4
