# Homework 3: Mining Data Streams
Homework Group 54: Xu Wang

## Introduction
In this homework I implemented the algrithm proposed in [TRIÈST: Counting Local and Global Triangles in Fully-Dynamic Streams with Fixed Memory Size](https://www.kdd.org/kdd2016/papers/files/rfp0465-de-stefaniA.pdf) to count global triangles in a given graph dataset. The dataset can be found via the link: [HEP-TH](https://snap.stanford.edu/data/ca-HepTh.html), which covers collaborations between authors papers submitted to High Energy Physics - Theory category from the e-print arXiv. In the dataset statistics we can see that the total number of triangles in this graph dataset is 28339. The goal of this homework is to generate an approximation of the number of triangles using two versions of the TRIEST algorithm that is based on reservoir sampling. 

In [71]:
import random
import time

In [72]:
# read in the dataset as a streaming graph
def undirected_edge(u,v): # convert every edge to undirected edge
    return (u,v) if u < v else (v,u)

def read_stream(filename):
    stream = set()
    with open(filename,'r') as f:
        for line in f:
            if line[0] == '#':
                continue
            u,v = line.split()
            if u != v:
                stream.add(undirected_edge(u, v))
    return stream

In [73]:
stream = read_stream("CA-HepTh.txt")
print('Total number of edges in the stream: ', len(stream))

Total number of edges in the stream:  25973


## TRIEST_BASE
Which works on insertion-only streams and uses standard reservior sampling to maintain the edge sample S.

In [74]:
class TRIEST_base():
    def __init__(self, M=100):
        self.edges = set() # maintain a sample of edges
        self.M = M # sample size
        self.global_counter = 0
        self.local_counter = {}

    def alg(self, stream):
        t = 0
        for edge in stream:
            t += 1
            if self.sample_edge(edge, t):
                self.edges.add(edge)
                self.update_counters(1, edge)
        eps = (t * (t - 1) * (t - 2)) / (self.M * (self.M - 1) * (self.M - 2))
        eps = max(eps, 1)
        estimated_triangles = self.global_counter * eps
        return estimated_triangles

    def sample_edge(self, edge, t):
        if t<=self.M:
            return True
        elif random.random() <= self.M/t: # reservoir sampling
            removed_edge = random.sample(self.edges, 1)[0]
            self.edges.remove(removed_edge)
            self.update_counters(-1, edge)
            return True 
        else:
            return False

    def update_counters(self, flag, edge):
        u, v = edge[0], edge[1]
        Nu = set()
        Nv = set()
        for e in self.edges:
            if e[0] == u and e[1] != v:
                Nu.add(e[1])
            if e[1] == u and e[0] != v:
                Nu.add(e[0])
            if e[0] == v and e[1] != u:
                Nv.add(e[1])
            if e[1] == v and e[0] != u:
                Nv.add(e[0])
        shared_neighbors = Nu.intersection(Nv)

        for c in shared_neighbors:
            self.global_counter += flag
            self.local_counter[c] = self.local_counter.get(c, 0) + flag
            self.local_counter[edge[0]] = self.local_counter.get(edge[0], 0) + flag
            self.local_counter[edge[1]] = self.local_counter.get(edge[1], 0) + flag

In [75]:
sample_size = 2000

In [76]:
t1 = time.time()
triest_base_true = TRIEST_base(len(stream))
true_triangles = triest_base_true.alg(stream)
run1 = time.time() - t1
t2 = time.time()

triest_base = TRIEST_base(sample_size)
estimated_triangles = triest_base.alg(stream)
run2 = time.time() - t2
print('---------------- Results with TRIEST_BASE ----------------')
print('Number of triangles with sample size {}: {} | Run time: {} seconds'.format(sample_size, estimated_triangles, run2))
print('Number of triangles with full sample size: {} | Run time: {} seconds'.format(true_triangles, run1))
print('Difference rate between the two estimates: ', abs(estimated_triangles - true_triangles)/true_triangles)

since Python 3.9 and will be removed in a subsequent version.
  removed_edge = random.sample(self.edges, 1)[0]


---------------- Results with TRIEST_BASE ----------------
Number of triangles with sample size 2000: 32897.97508378063 | Run time: 3.3732173442840576 seconds
Number of triangles with full sample size: 28339.0 | Run time: 48.15503692626953 seconds
Difference rate between the two estimates:  0.16087282839128508


## TRIEST_IMPR
With small modifications that result in higher-quality estimations. The changes are:   
1. update_counter is called unconditionally for each element on the stream, before the algorithm decides whether or not to insert the egde into S.  
2. never decrements the counters when an edge is removed from S.  
3. update_counter performs a weighted increase of the counters using $\eta_{t} = max(1, (t-1)(t-2)/M(M-1))$

In [77]:
class TRIEST_impr():
    def __init__(self, M=1000):
        self.edges = set() # maintain a sample of edges
        self.M = M # sample size
        self.global_counter = 0
        self.local_counter = {}

    def alg(self, stream):
        t = 0
        for edge in stream:
            t += 1
            self.update_counters(t, edge) # move the update_counter before the if block
            if self.sample_edge(edge, t):
                self.edges.add(edge)

        return self.global_counter # return the global counter, unmodified

    def sample_edge(self, edge, t):
        if t<=self.M:
            return True
        elif random.random() <= self.M/t: 
            removed_edge = random.sample(self.edges, 1)[0]
            self.edges.remove(removed_edge)
            # self.update_counters(-1, edge) # remove the update_counter so to never decrement the counters
            return True 
        else:
            return False

    def update_counters(self, t, edge):
        u, v = edge[0], edge[1]
        Nu = set()
        Nv = set()
        for e in self.edges:
            if e[0] == u and e[1] != v:
                Nu.add(e[1])
            if e[1] == u and e[0] != v:
                Nu.add(e[0])
            if e[0] == v and e[1] != u:
                Nv.add(e[1])
            if e[1] == v and e[0] != u:
                Nv.add(e[0])
        shared_neighbors = Nu.intersection(Nv)

        eta = (t-1)*(t-2)/(self.M*(self.M-1)) # perfrom a weighted increase on the counters
        eta = max(eta, 1) 

        for c in shared_neighbors: 
            self.global_counter += eta
            self.local_counter[c] = self.local_counter.get(c, 0) + eta
            self.local_counter[edge[0]] = self.local_counter.get(edge[0], 0) + eta
            self.local_counter[edge[1]] = self.local_counter.get(edge[1], 0) + eta

In [78]:
t1 = time.time()
triest_impr_true = TRIEST_impr(len(stream))
true_triangles = triest_impr_true.alg(stream)
run1 = time.time() - t1
t2 = time.time()
triest_impr = TRIEST_impr(sample_size)
estimated_triangles = triest_impr.alg(stream)
run2 = time.time() - t2
print('---------------- Results with TRIEST_IMPROVED ----------------')
print('Number of triangles with sample size {}: {} | Run time: {} seconds'.format(sample_size, estimated_triangles, run2))
print('Number of triangles with full sample size: {} | Run time: {} seconds'.format(true_triangles, run1))
print('Difference rate between the two estimates: ', abs(estimated_triangles - true_triangles)/true_triangles)

since Python 3.9 and will be removed in a subsequent version.
  removed_edge = random.sample(self.edges, 1)[0]


---------------- Results with TRIEST_IMPROVED ----------------
Number of triangles with sample size 2000: 26999.255763940964 | Run time: 14.219647407531738 seconds
Number of triangles with full sample size: 28339 | Run time: 48.179219007492065 seconds
Difference rate between the two estimates:  0.04727563555732509


## Optional task

1. What were the challenges you faced when implementing the algorithm?    
The first challenge is to find the shared neighborhood of the two vertices of one edge, which is used in the update_counter function. In the pseudocode it is just one line of mathematical notation but I find it hard to implement via code. It took me some time to figure out the data structure I should use.    
Another one is choosing the sample size. At first I only use 100 and the algorithm output nothing. I thought maybe there was something wrong with my implementation and I spent a lot of time checking the structure of the algorithm. It turned out that the sample size is too small and there is no shared neighborhood found in the sampled graph.

2. Can the algorithm be easily parallelized? If yes, how? If not, why? Explain.    
I think this algorithm is hard to be parallelized, or need other modifications. To parallelize this algorithm, the stream will be partitioned to different threads and each thread counts its triangles and then add up to be the total triangles in the dataset. However, take one triangle for example, if its three edges are unfortunately distributed into three different threads, then in each thread this triangle is incomplete thus will not be taken into consideration. But globally the triangle does exist. In this case the estimation is not accurate.

3. Does the algorithm work for unbounded graph streams? Explain.   
Yes, this algorithm is proposed to eastimate both global and local triangles in an infinite streaming graph. It only maintains a sample of edges with a fixed size M that is chosen by the user, so there won't be any concerns regarding running out of memory. The sample of edges is obtained via reservoir sampling, which in each time step t>M, replaces an uniformly-chosen edge in the sample with the coming new edge with probability M/t. In this way, even if the graph is unbounded, since the sample size is fixed, the algorithm will still work well.

4. Does the algorithm support edge deletions? If not, what modification would it need? Explain.   
No, for the two versions of the TRIEST algorithms(BASE and IMPROVED) implemented here, they work for insertion-only graphs. To support edge deletions, we could further adopt the TRIEST_FD proposed in the paper as well, which uses random pairing to extend reservoir sampling. The general idea is edge deletions will be "compensated" by future edge insertions and this algorithm keeps two counters to keep track of the number of uncompensated edge deletions involving an edge that was (resp. was not) in S at the time the deletion for that edge was on the stream.