# CS483 - Colab 2
## Frequent Pattern Mining in Spark

In [1]:
from IPython.display import Image
print("Colab 2 Mascot")
Image(url='https://cdn.dribbble.com/users/222579/screenshots/1654898/stubby-ben-rex-roll.gif',width=150)

Colab 2 Mascot


### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [2]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 49 not upgraded.
Need to get 39.6 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 123622 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u422-b05-1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u422-b05-1~22.04) ...
Sel

Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

If you executed the cells above, you should be able to see the dataset we will need for this Colab under the "Files" tab on the left panel.

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [4]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
# Load the data from graph-small.txt
data = sc.textFile("graph-small.txt")

# Parse the data to extract source and destination nodes and store them as (source, destination) pairs
edges = data.map(lambda line: tuple(map(int, line.split()))).distinct()

# Get the list of nodes by extracting all the unique source and destination nodes
nodes = edges.flatMap(lambda edge: [edge[0], edge[1]]).distinct()

# Get the number of nodes in the graph
n = nodes.count()

# Function to initialize the ranks vector, storing ranks in memory as a dictionary
def initialize_ranks_in_memory(n):
    return {node: 1/n for node in range(1, n+1)}

# Build the adjacency list as an RDD, storing each node along with its outgoing edges
adj_list_rdd = edges.groupByKey().mapValues(list)

# Define a function to distribute the rank of each node to its neighbors using RDD operations
def distribute_ranks_using_rdd(ranks_in_memory, adj_list_rdd, beta=0.8):
    # Broadcast the in-memory rank vector to all worker nodes
    rank_broadcast = sc.broadcast(ranks_in_memory)

    # Use the adjacency list RDD to calculate rank contributions
    rank_contributions_rdd = adj_list_rdd.flatMap(lambda node_neighbors:
        [(neighbor, rank_broadcast.value[node_neighbors[0]] / len(node_neighbors[1]) * beta)
         for neighbor in node_neighbors[1]]
    )

    # Sum the rank contributions for each node
    rank_sums_rdd = rank_contributions_rdd.reduceByKey(lambda a, b: a + b)

    # Collect the new rank sums as a dictionary to update the in-memory ranks
    new_ranks = rank_sums_rdd.collectAsMap()

    return new_ranks

# Initialize PageRank vector (r) with equal values for all nodes, stored in memory
ranks = initialize_ranks_in_memory(n)

# Set the teleportation factor (1-beta)
beta = 0.8
teleport_value = (1 - beta) / n

# Iteratively compute PageRank over 40 iterations
for i in range(40):
    print("Iteration: ", i + 1)
    # Distribute ranks using the adjacency list and keep results in memory
    rank_contributions = distribute_ranks_using_rdd(ranks, adj_list_rdd, beta)

    # Update the rank vector by adding the teleportation factor and update the in-memory ranks
    ranks = {node: rank_contributions.get(node, 0) + teleport_value for node in range(1, n+1)}

# Sort the ranks by value
final_ranks_sorted = sorted(ranks.items(), key=lambda x: x[1], reverse=True)

# Get the top 5 and bottom 5 nodes by PageRank score
top_5 = final_ranks_sorted[:5]
bottom_5 = final_ranks_sorted[-5:]

# Print the results
print("Top 5 nodes by PageRank score:")
for node, score in top_5:
    print(f"Node {node}, Score: {score}")

print("Bottom 5 nodes by PageRank score:")
for node, score in bottom_5:
    print(f"Node {node}, Score: {score}")

Iteration:  1
Iteration:  2
Iteration:  3
Iteration:  4
Iteration:  5
Iteration:  6
Iteration:  7
Iteration:  8
Iteration:  9
Iteration:  10
Iteration:  11
Iteration:  12
Iteration:  13
Iteration:  14
Iteration:  15
Iteration:  16
Iteration:  17
Iteration:  18
Iteration:  19
Iteration:  20
Iteration:  21
Iteration:  22
Iteration:  23
Iteration:  24
Iteration:  25
Iteration:  26
Iteration:  27
Iteration:  28
Iteration:  29
Iteration:  30
Iteration:  31
Iteration:  32
Iteration:  33
Iteration:  34
Iteration:  35
Iteration:  36
Iteration:  37
Iteration:  38
Iteration:  39
Iteration:  40
Top 5 nodes by PageRank score:
Node 53, Score: 0.03573120223267161
Node 14, Score: 0.034170906972591376
Node 40, Score: 0.03363008718974389
Node 1, Score: 0.030005979479788617
Node 27, Score: 0.029720144201405386
Bottom 5 nodes by PageRank score:
Node 89, Score: 0.003922466019802269
Node 37, Score: 0.003808204291611451
Node 81, Score: 0.0036953517493609916
Node 59, Score: 0.0036698606601272845
Node 85, Sco

In [None]:
# Load the data from graph-full.txt
data = sc.textFile("graph-full.txt")

# Parse the data to extract source and destination nodes and store them as (source, destination) pairs
edges = data.map(lambda line: tuple(map(int, line.split()))).distinct()

# Get the list of nodes by extracting all the unique source and destination nodes
nodes = edges.flatMap(lambda edge: [edge[0], edge[1]]).distinct()

# Get the number of nodes in the graph
n = nodes.count()

# Function to initialize the ranks vector, storing ranks in memory as a dictionary
def initialize_ranks_in_memory(n):
    return {node: 1/n for node in range(1, n+1)}

# Build the adjacency list as an RDD, storing each node along with its outgoing edges
adj_list_rdd = edges.groupByKey().mapValues(list)

# Define a function to distribute the rank of each node to its neighbors using RDD operations
def distribute_ranks_using_rdd(ranks_in_memory, adj_list_rdd, beta=0.8):
    # Broadcast the in-memory rank vector to all worker nodes
    rank_broadcast = sc.broadcast(ranks_in_memory)

    # Use the adjacency list RDD to calculate rank contributions
    rank_contributions_rdd = adj_list_rdd.flatMap(lambda node_neighbors:
        [(neighbor, rank_broadcast.value[node_neighbors[0]] / len(node_neighbors[1]) * beta)
         for neighbor in node_neighbors[1]]
    )

    # Sum the rank contributions for each node
    rank_sums_rdd = rank_contributions_rdd.reduceByKey(lambda a, b: a + b)

    # Collect the new rank sums as a dictionary to update the in-memory ranks
    new_ranks = rank_sums_rdd.collectAsMap()

    return new_ranks

# Initialize PageRank vector (r) with equal values for all nodes, stored in memory
ranks = initialize_ranks_in_memory(n)

# Set the teleportation factor (1-beta)
beta = 0.8
teleport_value = (1 - beta) / n

# Iteratively compute PageRank over 40 iterations
for i in range(40):
    print("Iteration: ", i + 1)
    # Distribute ranks using the adjacency list and keep results in memory
    rank_contributions = distribute_ranks_using_rdd(ranks, adj_list_rdd, beta)

    # Update the rank vector by adding the teleportation factor and update the in-memory ranks
    ranks = {node: rank_contributions.get(node, 0) + teleport_value for node in range(1, n+1)}

# Sort the ranks by value
final_ranks_sorted = sorted(ranks.items(), key=lambda x: x[1], reverse=True)

# Get the top 5 and bottom 5 nodes by PageRank score
top_5 = final_ranks_sorted[:5]
bottom_5 = final_ranks_sorted[-5:]

# Print the results
print("Top 5 nodes by PageRank score:")
for node, score in top_5:
    print(f"Node {node}, Score: {score}")

print("Bottom 5 nodes by PageRank score:")
for node, score in bottom_5:
    print(f"Node {node}, Score: {score}")

Iteration:  1
Iteration:  2
Iteration:  3
Iteration:  4
Iteration:  5
Iteration:  6
Iteration:  7
Iteration:  8
Iteration:  9
Iteration:  10
Iteration:  11
Iteration:  12
Iteration:  13
Iteration:  14
Iteration:  15
Iteration:  16
Iteration:  17
Iteration:  18
Iteration:  19
Iteration:  20
Iteration:  21
Iteration:  22
Iteration:  23
Iteration:  24
Iteration:  25
Iteration:  26
Iteration:  27
Iteration:  28
Iteration:  29
Iteration:  30
Iteration:  31
Iteration:  32
Iteration:  33
Iteration:  34
Iteration:  35
Iteration:  36
Iteration:  37
Iteration:  38
Iteration:  39
Iteration:  40
Top 5 nodes by PageRank score:
Node 263, Score: 0.002020291181518219
Node 537, Score: 0.00194334157145315
Node 965, Score: 0.0019254478071662627
Node 243, Score: 0.001852634016241731
Node 285, Score: 0.0018273721700645142
Bottom 5 nodes by PageRank score:
Node 408, Score: 0.00038779848719291705
Node 424, Score: 0.00035481538649301454
Node 62, Score: 0.00035314810510596274
Node 93, Score: 0.0003513568937516

In [6]:
# Load the data from graph-small.txt
data = sc.textFile("graph-small.txt")

# Parse the data to extract source and destination nodes and store them as (source, destination) pairs
edges = data.map(lambda line: tuple(map(int, line.split()))).distinct()

# Get the list of nodes by extracting all the unique source and destination nodes
nodes = edges.flatMap(lambda edge: [edge[0], edge[1]]).distinct()

# Get the number of nodes in the graph
n = nodes.count()

# Initialize hub scores with all 1s
hubbiness = nodes.map(lambda node: (node, 1.0))

# Build the adjacency list, storing each node along with its outgoing edges
adj_list = edges.groupByKey().mapValues(list)

# Create the link matrix L and its transpose LT
# L[i -> j] is represented as (i, j) for edges, LT is the reverse (j, i)
link_matrix = adj_list
link_matrix_T = edges.map(lambda edge: (edge[1], edge[0])).groupByKey().mapValues(list)

# Function to compute authority vector from hubbiness
def compute_authority(hubbiness_vector, link_matrix_T):
    # Multiply LT by hubbiness (i.e. for each destination, sum up the hubbiness of incoming nodes)
    auth_contributions = link_matrix_T.join(hubbiness_vector).flatMap(lambda x: [(node, x[1][1]) for node in x[1][0]])
    authority_vector = auth_contributions.reduceByKey(lambda a, b: a + b)
    # Scale the authority vector so that the max value is 1
    max_auth = authority_vector.map(lambda x: x[1]).treeReduce(lambda a, b: np.max([a, b]))
    authority_vector = authority_vector.mapValues(lambda v: v / max_auth)
    return authority_vector

# Function to compute hubbiness vector from authority
def compute_hubbiness(authority_vector, link_matrix):
    # Multiply L by authority (i.e. for each source, sum up the authority of outgoing nodes)
    hub_contributions = link_matrix.join(authority_vector).flatMap(lambda x: [(node, x[1][1]) for node in x[1][0]])
    hubbiness_vector = hub_contributions.reduceByKey(lambda a, b: a + b)
    # Scale the hubbiness vector so that the max value is 1
    max_hub = hubbiness_vector.map(lambda x: x[1]).treeReduce(lambda a, b: np.max([a, b]))
    hubbiness_vector = hubbiness_vector.mapValues(lambda v: v / max_hub)
    return hubbiness_vector

# Iterate to compute hubbiness and authority scores
for i in range(40):
    print("Iteration: ", i + 1)
    # Compute the authority vector from the current hubbiness vector
    authority = compute_authority(hubbiness, link_matrix_T)

    # Compute the hubbiness vector from the current authority vector
    hubbiness = compute_hubbiness(authority, link_matrix)

# Collect the final hubbiness and authority vectors
final_hubbiness = hubbiness.collect()
final_authority = authority.collect()

final_hubbiness, final_authority = final_authority, final_hubbiness

# Sort hubbiness and authority by score
final_hubbiness_sorted = sorted(final_hubbiness, key=lambda x: x[1], reverse=True)
final_authority_sorted = sorted(final_authority, key=lambda x: x[1], reverse=True)

# Get the top 5 and bottom 5 nodes by hubbiness and authority scores
top_5_hubbiness = final_hubbiness_sorted[:5]
bottom_5_hubbiness = final_hubbiness_sorted[-5:]
top_5_authority = final_authority_sorted[:5]
bottom_5_authority = final_authority_sorted[-5:]

# Print the results
print("Top 5 nodes by Hubbiness score:")
for node, score in top_5_hubbiness:
    print(f"Node {node}, Score: {score}")

print("Bottom 5 nodes by Hubbiness score:")
for node, score in bottom_5_hubbiness:
    print(f"Node {node}, Score: {score}")

print("Top 5 nodes by Authority score:")
for node, score in top_5_authority:
    print(f"Node {node}, Score: {score}")

print("Bottom 5 nodes by Authority score:")
for node, score in bottom_5_authority:
    print(f"Node {node}, Score: {score}")

Iteration:  1
Iteration:  2
Iteration:  3
Iteration:  4
Iteration:  5
Iteration:  6
Iteration:  7
Iteration:  8
Iteration:  9
Iteration:  10
Iteration:  11
Iteration:  12
Iteration:  13
Iteration:  14
Iteration:  15
Iteration:  16
Iteration:  17
Iteration:  18
Iteration:  19
Iteration:  20
Iteration:  21
Iteration:  22
Iteration:  23
Iteration:  24
Iteration:  25
Iteration:  26
Iteration:  27
Iteration:  28
Iteration:  29
Iteration:  30
Iteration:  31
Iteration:  32
Iteration:  33
Iteration:  34
Iteration:  35
Iteration:  36
Iteration:  37
Iteration:  38
Iteration:  39
Iteration:  40
Top 5 nodes by Hubbiness score:
Node 59, Score: 1.0
Node 39, Score: 0.9810799133868425
Node 22, Score: 0.9741107079593092
Node 11, Score: 0.9574282616181096
Node 58, Score: 0.9574262000187687
Bottom 5 nodes by Hubbiness score:
Node 53, Score: 0.23548212611307842
Node 95, Score: 0.2297612686427331
Node 15, Score: 0.22106736398255405
Node 35, Score: 0.21233808216249786
Node 9, Score: 0.20936882949300997
Top 

In [7]:
# Load the data from graph-full.txt
data = sc.textFile("graph-full.txt")

# Parse the data to extract source and destination nodes and store them as (source, destination) pairs
edges = data.map(lambda line: tuple(map(int, line.split()))).distinct()

# Get the list of nodes by extracting all the unique source and destination nodes
nodes = edges.flatMap(lambda edge: [edge[0], edge[1]]).distinct()

# Get the number of nodes in the graph
n = nodes.count()

# Initialize hub scores with all 1s
hubbiness = nodes.map(lambda node: (node, 1.0))

# Build the adjacency list, storing each node along with its outgoing edges
adj_list = edges.groupByKey().mapValues(list)

# Create the link matrix L and its transpose LT
# L[i -> j] is represented as (i, j) for edges, LT is the reverse (j, i)
link_matrix = adj_list
link_matrix_T = edges.map(lambda edge: (edge[1], edge[0])).groupByKey().mapValues(list)

# Function to compute authority vector from hubbiness
def compute_authority(hubbiness_vector, link_matrix_T):
    # Multiply LT by hubbiness (i.e. for each destination, sum up the hubbiness of incoming nodes)
    auth_contributions = link_matrix_T.join(hubbiness_vector).flatMap(lambda x: [(node, x[1][1]) for node in x[1][0]])
    authority_vector = auth_contributions.reduceByKey(lambda a, b: a + b)
    # Scale the authority vector so that the max value is 1
    max_auth = authority_vector.map(lambda x: x[1]).treeReduce(lambda a, b: np.max([a, b]))
    authority_vector = authority_vector.mapValues(lambda v: v / max_auth)
    return authority_vector

# Function to compute hubbiness vector from authority
def compute_hubbiness(authority_vector, link_matrix):
    # Multiply L by authority (i.e. for each source, sum up the authority of outgoing nodes)
    hub_contributions = link_matrix.join(authority_vector).flatMap(lambda x: [(node, x[1][1]) for node in x[1][0]])
    hubbiness_vector = hub_contributions.reduceByKey(lambda a, b: a + b)
    # Scale the hubbiness vector so that the max value is 1
    max_hub = hubbiness_vector.map(lambda x: x[1]).treeReduce(lambda a, b: np.max([a, b]))
    hubbiness_vector = hubbiness_vector.mapValues(lambda v: v / max_hub)
    return hubbiness_vector

# Iterate to compute hubbiness and authority scores
for i in range(40):
    print("Iteration: ", i + 1)
    # Compute the authority vector from the current hubbiness vector
    authority = compute_authority(hubbiness, link_matrix_T)

    # Compute the hubbiness vector from the current authority vector
    hubbiness = compute_hubbiness(authority, link_matrix)

# Collect the final hubbiness and authority vectors
final_hubbiness = hubbiness.collect()
final_authority = authority.collect()

final_hubbiness, final_authority = final_authority, final_hubbiness

# Sort hubbiness and authority by score
final_hubbiness_sorted = sorted(final_hubbiness, key=lambda x: x[1], reverse=True)
final_authority_sorted = sorted(final_authority, key=lambda x: x[1], reverse=True)

# Get the top 5 and bottom 5 nodes by hubbiness and authority scores
top_5_hubbiness = final_hubbiness_sorted[:5]
bottom_5_hubbiness = final_hubbiness_sorted[-5:]
top_5_authority = final_authority_sorted[:5]
bottom_5_authority = final_authority_sorted[-5:]

# Print the results
print("Top 5 nodes by Hubbiness score:")
for node, score in top_5_hubbiness:
    print(f"Node {node}, Score: {score}")

print("Bottom 5 nodes by Hubbiness score:")
for node, score in bottom_5_hubbiness:
    print(f"Node {node}, Score: {score}")

print("Top 5 nodes by Authority score:")
for node, score in top_5_authority:
    print(f"Node {node}, Score: {score}")

print("Bottom 5 nodes by Authority score:")
for node, score in bottom_5_authority:
    print(f"Node {node}, Score: {score}")

Iteration:  1
Iteration:  2
Iteration:  3
Iteration:  4
Iteration:  5
Iteration:  6
Iteration:  7
Iteration:  8
Iteration:  9
Iteration:  10
Iteration:  11
Iteration:  12
Iteration:  13
Iteration:  14
Iteration:  15
Iteration:  16
Iteration:  17
Iteration:  18
Iteration:  19
Iteration:  20
Iteration:  21
Iteration:  22
Iteration:  23
Iteration:  24
Iteration:  25
Iteration:  26
Iteration:  27
Iteration:  28
Iteration:  29
Iteration:  30
Iteration:  31
Iteration:  32
Iteration:  33
Iteration:  34
Iteration:  35
Iteration:  36
Iteration:  37
Iteration:  38
Iteration:  39
Iteration:  40
Top 5 nodes by Hubbiness score:
Node 840, Score: 1.0
Node 155, Score: 0.9499618624906543
Node 234, Score: 0.8986645288972261
Node 389, Score: 0.8634171101843793
Node 472, Score: 0.8632841092495216
Bottom 5 nodes by Hubbiness score:
Node 889, Score: 0.07678413939216452
Node 539, Score: 0.06602659373418493
Node 141, Score: 0.06453117646225177
Node 835, Score: 0.05779059354433016
Node 23, Score: 0.04206685489