# Homework 3 - Mining Data Streams

*Group 6: Andrea Camilloni, Yutong Jiang*

TRIEST (De Stefani et al., 2016) presents a suite of fixed memory size algorithms
to estimate triangle counts in a streaming graph consisting of streaming edges.

In [203]:
from collections import defaultdict
import random

#link to data: https://snap.stanford.edu/data/ca-HepTh.html
filename = 'CA-HepTh.txt'

data = []

with open(filename) as f:
        for line in f:
            if line.startswith('#'):
                continue
        
            u,v = list(map(int,line.strip().split()))
            if u!=v:
                data.append(frozenset((u,v))) # to remove duplicates
            #data.append((u,v))

data = list(frozenset(data)) # to remove duplicates

### 1. Reservoir sampling

The reservoir sampling technique works in the following way: 
* if t ≤ M, the edge is inserted into the subgraph S, 
* otherwise, if t > M, a random edge from S is sampled with a uniform probability $M/t$ and removed from the subgraph S.

The reservoir sampling technique is implemented in TRIEST-BASE by the method sampleedge().

TRIEST-BASE keeps also a counter for the sampled edge, which is decreased if removed.
The method sampleedge() returns True if the considered edge must be inserted into the subgraph, and False otherwise

### 2. TRIEST 
TRIEST implements: 
* a global counter to estimate the global number of triangles,
* a set of local counters to estumete the local number of triangles. 

For an edge $e_t = (u,v)$, the method update_counters() computes the set of common neighborhood of
u and v in S and for each of them, updates the global and local
counters of u, v and c (depending on the selected operator, + or -).

Local counters which value is updated to 0 are popped, in order to reduce the space complexity. 

### Code implemented by Andrea

#### Triest Base

In [204]:
class TriestBase:
    def __init__(self,M):
        self.M = M # M edges to keep
        self.t = 0 # time 
        self.tau = 0 # global counter
        self.G = defaultdict(set) # set of neighbors
        self.local_cnt = defaultdict(int) # local count
        self.S = [] # edge sample

    def sampleedge(self,u,v):
        if self.t <= self.M:
            return True
        elif self.flip_biased_coin():
            u_dash, v_dash = self.S.pop(random.randint(0,len(self.S)-1))
            self.update_counters(u_dash,v_dash,'-')
            return True
        return False

    def update_counters(self,u,v,op):
        common_neighborhood = self.G[u].intersection(self.G[v])
        
        if not common_neighborhood:
            return

        for c in common_neighborhood:

            if op == '+':
                self.tau += 1
                self.local_cnt[c] += 1
                self.local_cnt[u] += 1
                self.local_cnt[v] += 1

            elif op == '-':
                self.tau -= 1
                self.local_cnt[c] -= 1

                if self.local_cnt[c] == 0:
                    self.local_cnt.pop(c)
                
                self.local_cnt[u] -= 1            
                
                if self.local_cnt[u] == 0:
                    self.local_cnt.pop(u)
                
                self.local_cnt[v] -= 1
                
                if self.local_cnt[v] == 0:
                    self.local_cnt.pop(v)

    def flip_biased_coin(self):
        head_prob = random.random()
        if head_prob <= self.M/self.t:
            return True
        else:
            return False

    def return_counters(self):
        estimate = max(1, (self.t * (self.t - 1) * (self.t - 2))/(self.M * (self.M - 1) * (self.M - 2)))

        global_estimate = int(estimate * self.tau)

        for key in self.local_cnt:
            self.local_cnt[key] = int(self.local_cnt[key] * estimate)

        return {'global':global_estimate,'local':self.local_cnt}

    def run(self,u,v):
        self.t += 1
        if self.sampleedge(u,v):
            self.S.append((u,v))
            self.update_counters(u,v,'+')
            self.G[u].add(v)
            self.G[v].add(u)


In [205]:
M = len(data)
model = TriestBase(M)

for u,v in data:
    model.run(u,v)

res = model.return_counters()
# Number of triangles in the graph
print('Number of triangles in the graph: ',res['global'])


Number of triangles in the graph:  28339


In [206]:
M = 15000
model = TriestBase(M)

for u,v in data:
    model.run(u,v)

res = model.return_counters()
# Number of triangles in the graph
print('Number of triangles in the graph: ',res['global'])

Number of triangles in the graph:  28238


In [207]:
M = 10000
model = TriestBase(M)

for u,v in data:
    model.run(u,v)

res = model.return_counters()
# Number of triangles in the graph
print('Number of triangles in the graph: ',res['global'])

Number of triangles in the graph:  26672


In [208]:
M = 5000
model = TriestBase(M)

for u,v in data:
    model.run(u,v)

res = model.return_counters()
# Number of triangles in the graph
print('Number of triangles in the graph: ',res['global'])

Number of triangles in the graph:  22017


TRIEST-IMPROVED, never decreases the counters, but instead it only increases them of a factor $\eta_t$, 

$\eta_t = max\{1,\frac{(t−1)(t−2)}{M(M −1)}\}$

#### Triest Improved

In [209]:
class TriestImpr(TriestBase):
    def __init__(self,M):
        super().__init__(M)

    def sampleedge(self,u,v):
        if self.t <= self.M:
            return True
        elif self.flip_biased_coin():
            u_dash, v_dash = self.S.pop(random.randint(0,len(self.S)-1))
            #self.update_counters(u_dash,v_dash,'-')
            return True
        return False

    def update_counters(self,u,v,op):
        common_neighborhood = self.G[u].intersection(self.G[v])
        
        if not common_neighborhood:
            return

        for c in common_neighborhood:
            #no decrease
            eta = max(1, (self.t - 1) * (self.t - 2)/(self.M * (self.M - 1)))
            self.tau += eta
            self.local_cnt[c] += eta
            self.local_cnt[u] += eta
            self.local_cnt[v] += eta


    def run(self,u,v):
        self.t += 1
        self.update_counters(u,v,'+')
        if self.sampleedge(u,v):
            self.S.append((u,v))
            #self.update_counters(u,v,'+')
            self.G[u].add(v)
            self.G[v].add(u)
    

In [210]:
M = len(data)
model = TriestImpr(M)

for u,v in data:
    model.run(u,v)

res = model.return_counters()
res['global']

28339

In [211]:
M = 15000
model = TriestImpr(M)

for u,v in data:
    model.run(u,v)

res = model.return_counters()
res['global']

246387

In [212]:
M = 10000
model = TriestImpr(M)

for u,v in data:
    model.run(u,v)

res = model.return_counters()
res['global']

1375236

### Code implemented by Yutong

In [213]:
dataset = set()
with open('ca-HepTh.txt') as f:
    for idx, data in enumerate(f):
        # There are some heads contained in the dataset
        if idx < 4:
            continue
        split_data = data.strip().split()
        # Here is to remove self-loop
        if split_data[0] != split_data[1]:
            # set and frozenset could remove duplicate
            dataset.add(frozenset(split_data))

dataset = list(dataset)

#### Triest Base

In [214]:
from collections import defaultdict
import random

class triest_base():
    def __init__(self, M):
        # M stands for the maximum edge sample
        # t stands for the time point
        # tau stands for the total global triangles count
        # G stands for the graph
        # localtri stands for the local count
        # S stands for the edge
        self.M = M
        self.t = 0
        self.tau = 0
        self.G = defaultdict(set)
        self.local_cnt = defaultdict(int)
        self.S = []

    def sampleedge(self):
        ''' 
        Here is the implementation of reservoir
        type of edge should be set
        '''
        if self.t <= self.M:
            return True
        elif random.random() <= self.M/self.t:
            chosen_idx = random.randint(0, self.M - 1)

            # the edge in S and node in graph should be removed
            # remove edge
            removed_edge = self.S.pop(chosen_idx)
            # remove node
            u, v = removed_edge
            self.G[u].remove(v)
            self.G[v].remove(u)

            self.updatecounters('sub', removed_edge)
            return True
        return False
    
    def updatecounters(self, operation, operated_edge):
        u, v = operated_edge
        N_Su = self.G[u]
        N_Sv = self.G[v]
        inter_node = N_Su & N_Sv

        for c in inter_node:
            if operation == 'add':
                self.tau += 1
                self.local_cnt[c] += 1
                self.local_cnt[u] += 1
                self.local_cnt[v] += 1
            else:
                self.tau -= 1
                self.local_cnt[c] += 1
                self.local_cnt[u] -= 1
                self.local_cnt[v] -= 1
        # print(self.local_cnt)
        
    def find_eps(self):
        eps = max(1, (self.t * (self.t - 1) * (self.t - 2)/(self.M * (self.M - 1) * (self.M - 2))))
        return eps
    
    def forward(self, dataset):
        for data in dataset:
            self.t += 1
            if self.sampleedge():
                self.S.append(data)
                u, v = data
                self.G[u].add(v)
                self.G[v].add(u)
                self.updatecounters('add', data)
        eps = self.find_eps()
        estimator = eps * self.tau
        return int(estimator), self.tau, self.local_cnt

In [215]:
M = len(dataset)
estimator, global_cnt, local_dict = triest_base(M).forward(dataset)
estimator

28339

In [216]:
M = 15000
estimator, global_cnt, local_dict = triest_base(M).forward(dataset)
estimator

28140

In [217]:
M = 10000
estimator, global_cnt, local_dict = triest_base(M).forward(dataset)
estimator

30702

In [218]:
M = 5000
estimator, global_cnt, local_dict = triest_base(M).forward(dataset)
estimator

23700

#### Triest improved

In [219]:
from collections import defaultdict
import random

class triest_impr():
    def __init__(self, M):
        # M stands for the maximum edge sample
        # t stands for the time point
        # tau stands for the total global triangles count
        # G stands for the graph
        # localtri stands for the local count
        # S stands for the edge
        self.M = M
        self.t = 0
        self.tau = 0
        self.G = defaultdict(set)
        self.local_cnt = defaultdict(int)
        self.S = []

    def sampleedge(self):
        ''' 
        Here is the implementation of reservoir
        type of edge should be set
        '''
        if self.t <= self.M:
            return True
        elif random.random() <= self.M/self.t:
            chosen_idx = random.randint(0, self.M - 1)

            # the edge in S and node in graph should be removed
            # remove edge
            removed_edge = self.S.pop(chosen_idx)
            # remove node
            u, v = removed_edge
            self.G[u].remove(v)
            self.G[v].remove(u)

            # This update has been removed
            #self.updatecounters('sub', removed_edge)
            return True
        return False
    
    def updatecounters(self, operation, operated_edge):
        u, v = operated_edge
        N_Su = self.G[u]
        N_Sv = self.G[v]
        inter_node = N_Su & N_Sv

        eta = max(1, (self.t - 1)*(self.t - 2)/(self.M * (self.M - 1)))
        for c in inter_node:
            #if operation == 'add':
            self.tau += eta
            self.local_cnt[c] += eta
            self.local_cnt[u] += eta
            self.local_cnt[v] += eta
            #else:
                # As there is no substract anymore, so this part has no effect in improved algorithm
                #self.tau -= eta
                #self.local_cnt[c] += eta
                #self.local_cnt[u] -= eta
                #self.local_cnt[v] -= eta
        # print(self.local_cnt)
        
    def find_eps(self):
        eps = max(1, (self.t * (self.t - 1) * (self.t - 2)/(self.M * (self.M - 1) * (self.M - 2))))
        return eps
     
    def forward(self, dataset):
        for data in dataset:
            self.t += 1
            if self.sampleedge():
                self.S.append(data)
                u, v = data
                self.G[u].add(v)
                self.G[v].add(u)
                self.updatecounters('add', data)
        eps = self.find_eps()
        estimator = eps * self.tau
        return int(estimator), self.tau, self.local_cnt

In [220]:
M = len(dataset)
estimator, global_cnt, local_dict = triest_impr(M).forward(dataset)
estimator

28339

### 3.Bonus part

* What were the challenges you faced when implementing the algorithm?

    The main challenge faced was to re-implement suitable data structures for the algorithm. Implementing the algorithm was not that hard since we had just to follow the steps presented in the work of De Stefani et al.. 
    

* Can the algorithm be easily parallelized? If yes, how? If not, why? Explain.

    Implementing data parallelism would require to keep counters synchronized between each node as each count depends on the previsous samples, but this would be unfeasible. 

* Does the algorithm work for unbounded graph streams? Explain.

    The algorithm works on unbounded data streams since it's keeping a fixed memory footprint M. Keeping a fixed memory M avoids memory issues, i.e. running out of memory. And by setting a threshold on t, we could terminate the program and return the estimations based on the previsous samples. 

* Does the algorithm support edge deletions? If not, what modification would it need? Explain.

    Both the TRIÈST algorithms don't support edge deletions, but in (De Stefani et al., 2016) they presented a version of TRIÈST working with both insertions and deletions of edges. TRIÈST-FD is a fully-dynamic version of TRIÈST, which relies on the concept of Random Pairing (RP). 
    The idea is that edge deletions will be compensated by
    further edge insertions seen later in the stream. The way it keeps track
    is by having 2 additional counters $d_i$ and $d_o$ to keep track of uncompensated edge deletions.


### Reference

De Stefani, A. Epasto, M. Riondato, and E. Upfal. TRIEST: Counting `
local and global triangles in fully-dynamic streams with fixed memory size.
pages 825–834, 08 2016.