### Data Mining Assignment #3

#### Group 27: Max Beinhauer, Davis Siemens
#### Dataset: https://snap.stanford.edu/data/ego-Twitter.html

In [1]:
import random
from pyspark.sql import SparkSession
from graphframes import GraphFrame
from pyspark.sql import Row
from collections import defaultdict

FILE_PATH = "data/twitter_combined.txt"

In [13]:
# Spark stop in case error occurs during execution
# spark.stop()

### Sanity checks on graph

In [3]:
spark = (
    SparkSession.builder
    .appName("Assignment2")
    .config(
        "spark.jars.packages",
        "io.graphframes:graphframes-spark4_2.13:0.10.0"
    )
    .getOrCreate()
)


# Read the graph file
graph_rdd = spark.sparkContext.textFile(FILE_PATH)

# Show first few lines
print("Sample lines from the graph file:")
print(graph_rdd.take(5))

# Create edges RDD (u, v)
edges = graph_rdd.map(lambda line: tuple(map(int, line.split()))) 

# Make edges undirected by sorting endpoints, e.g. (3, 10) and (10, 3) -> (3, 10)
undirected_edges = edges.map(lambda e: (min(e[0], e[1]), max(e[0], e[1])))

# Remove duplicate undirected edges
undirected_edges_df = undirected_edges.distinct().map(
    lambda e: Row(src=e[0], dst=e[1])
)

print("Distinct undirected edges:", undirected_edges_df.count())

# Create vertices DataFrame
vertices = (
    undirected_edges_df
    .flatMap(lambda edge: edge)
    .distinct()
    .map(lambda vid: Row(id=vid))
    .toDF()
)

# Create edges DataFrame
edges_df = undirected_edges_df.map(lambda e: Row(src=e[0], dst=e[1])).toDF()

# Build GraphFrame
graph = GraphFrame(vertices, edges_df)

# Show stats

print("Expected Vertices: 81306 - Actual Vertices: ", graph.vertices.count())
print("Edges:", graph.edges.count())

:: loading settings :: url = jar:file:/Users/davis/VSCode/Data%20Mining/data_mining_assignment_3/.venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/davis/.ivy2.5.2/cache
The jars for the packages stored in: /Users/davis/.ivy2.5.2/jars
io.graphframes#graphframes-spark4_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0fab54fa-1a31-48e9-aed3-8c4b72c862c5;1.0
	confs: [default]
	found io.graphframes#graphframes-spark4_2.13;0.10.0 in central
	found io.graphframes#graphframes-graphx-spark4_2.13;0.10.0 in central
:: resolution report :: resolve 109ms :: artifacts dl 5ms
	:: modules in use:
	io.graphframes#graphframes-graphx-spark4_2.13;0.10.0 from central in [default]
	io.graphframes#graphframes-spark4_2.13;0.10.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            

Sample lines from the graph file:


                                                                                

['214328887 34428380', '17116707 28465635', '380580781 18996905', '221036078 153460275', '107830991 17868918']


                                                                                

Distinct undirected edges: 1342310


                                                                                

Expected Vertices: 81306 - Actual Vertices:  81306


[Stage 16:>                                                         (0 + 2) / 2]

Edges: 1342310


                                                                                

### Part 1: Reservoir Sampling 

In [None]:
# Iimpleemnt reservoir sampling
def reservoir_sample_rdd(rdd, k):
    """
    Reservoir sampling of size k over an RDD.
    Iterates once over the RDD using toLocalIterator.
    """
    sample = []
    t = 0
    # Stores only one partition in memory at a time
    # Run time: O(n) 
    # Memory: O(k) 
    for item in rdd.toLocalIterator():
        t += 1
        if t <= k:
            sample.append(item)
        else:
            # P(j <= k) = k / t  
            j = random.randint(1, t)
            if j <= k:
                sample[j - 1] = item
    return sample

In [None]:
# Test reservoir sampling

# Read the graph file again
edge_rdd = spark.sparkContext.textFile(FILE_PATH).map(lambda line: tuple(map(int, line.split())))
# Assign a unique ID to each edge to verify sampling correctness
edge_rdd = edge_rdd.zipWithIndex().map(lambda x: (x[1], x[0][0], x[0][1]))

k = 100

reservoir_sample = reservoir_sample_rdd(edge_rdd, k)
print(f"Reservoir Sample of size {k}:")
for edge in reservoir_sample[:5]:
    # Print first 5 sampled edges and their IDs
    print(f"Edge ID: {edge[0]}, Vertices: ({edge[1]}, {edge[2]})")


[Stage 32:>                                                         (0 + 1) / 1]

Reservoir Sample of size 100:
Edge ID: 1024097, Vertices: (321436616, 166405793)
Edge ID: 204423, Vertices: (477088040, 382641626)
Edge ID: 1720076, Vertices: (18753000, 43355400)
Edge ID: 1160398, Vertices: (220729949, 221582075)
Edge ID: 1247865, Vertices: (350322420, 92319025)


### Part 2: TRIÈST

This code implements TRIÈST-IMPR framework. For more details, refer to the [paper](https://www.kdd.org/kdd2016/papers/files/rfp0465-de-stefaniA.pdf).

Reference:
P. Parchas, F. Petroni, A. Rebai, F. Silvestri, and M. Vassilvitskii, 
"TRIÈST: Counting Local and Global Triangles in Fully-Dynamic Streams with Fixed Memory Size," 
in Proceedings of the 22nd ACM SIGKDD International Conference on Knowledge Discovery and Data Mining, 2016.


In [4]:
class TriestImpr:

    def __init__(self, M):
        self.M = M                      # reservoir size
        self.t = 0                      # number of processed edges
        self.S = []                     # reservoir of sampled edges
        self.neighbors = defaultdict(set)  # adjacency structure of sampled graph
        self.tau = 0.0                  # global triangle estimate
        self.tau_local = defaultdict(float)  # per-node triangle estimates

    # Count sampled triangles for edge (u, v)
    def count_sampled_triangles(self, u, v):
        # use set intersection to count common neighbors
        return len(self.neighbors[u].intersection(self.neighbors[v]))

    # Update eeight factor
    def weight(self):
        if self.t <= self.M:
            return 1.0
        # weight factor
        return ((self.t - 1) * (self.t - 2)) / (self.M * (self.M - 1))

    # Update triangle counters
    def update_counters(self, u, v, c):
        if c == 0:
            return

        w = self.weight()

        # global update
        self.tau += w * c

        # local updates
        common = self.neighbors[u].intersection(self.neighbors[v])
        for wnode in common:
            self.tau_local[wnode] += w

        self.tau_local[u] += w * c
        self.tau_local[v] += w * c

    # Manage lists
    def add_edge_to_sample(self, u, v):
        self.neighbors[u].add(v)
        self.neighbors[v].add(u)

    def remove_edge_from_sample(self, u, v):
        self.neighbors[u].discard(v)
        self.neighbors[v].discard(u)

    # Reservoir sampling step
    def reservoir_step(self, u, v):
        """
        Insert (u, v) into sample with reservoir logic.
        Returns True if edge was stored.
        """
        if self.t <= self.M:
            # Reservoir not full: always insert
            self.S.append((u, v))
            self.add_edge_to_sample(u, v)
            return True

        # Reservoir full: do probabilistic replacement
        r = random.randint(1, self.t)
        if r <= self.M:   # accept this new edge
            idx = random.randint(0, self.M - 1)
            old_u, old_v = self.S[idx]

            # remove old edge
            self.remove_edge_from_sample(old_u, old_v)

            # insert new edge
            self.S[idx] = (u, v)
            self.add_edge_to_sample(u, v)
            return True

        return False

    # Main process step
    def process_edge(self, u, v):
        """
        Process a single edge (u, v) in the stream.
        """
        self.t += 1

        # 1. Count sampled triangles for this edge
        c = self.count_sampled_triangles(u, v)

        # 2. Update global & local triangle counters
        self.update_counters(u, v, c)

        # 3. Apply reservoir sampling
        self.reservoir_step(u, v)

    # get functions
    def get_global_estimate(self):
        return self.tau

    def get_local_estimate(self, u):
        return self.tau_local[u]

In [3]:
# Read the file and process edges
with open(FILE_PATH, 'r') as file:
    edges = set()
    for line in file:
        u, v = map(int, line.split())
        # Make edges undirected by sorting endpoints
        edges.add((min(u, v), max(u, v)))

# Initialize TRIEST with a reservoir size of 200
triest = TriestImpr(M=200)

# Process each edge
for u, v in edges:
    triest.process_edge(u, v)

# Print the estimated number of triangles
print("Estimated triangles:", triest.get_global_estimate())


Estimated triangles: 0.0


Challenges: 
- parallization, 
- Runntime (1.4M nodes)