# Homework 3: Mining Data Streams
#### Authors: Sherly Sherly and Anna Martignano

## Task
You are to study and implement a streaming graph processing algorithm described in one of the above papers of your choice. In order to accomplished your task, you are to perform the following two steps

1. Implement the reservoir sampling or the Flajolet-Martin algorithm used in the graph algorithm presented in the paper you have selected;
2. Implement the streaming graph algorithm presented in the paper that make use of the algorithm implemented in the first step. 

## Reservoir sampling in TRIEST-BASE
The trièst-base algorithm uses standard reservoir sampling to maintain the edge sample S:
- If t ≤ M, then the edge et = (u, v) on the stream at time t is deterministically inserted in S.
- If t > M, trièst-base flips a biased coin with heads probability M/t. If the outcome is heads, it chooses an edge (w, z) ∈ S uniformly at random, removes (w, z) from S, and inserts (u, v) in S. Otherwise, S is not modified.

        Input: Insertion-only edge stream Σ, integer M ≥ 6
        S ← ∅, t ← 0, τ ← 0
        for each element (+, (u, v)) from Σ do
            t ← t + 1
            if SampleEdge((u, v), t) then
                S ← S ∪ {(u, v)}
                UpdateCounters(+, (u, v))

        function SampleEdge((u, v), t)
            if t ≤ M then
                return True
            else if FlipBiasedCoin(M/t) = heads then
                (u', v') ← random edge from S
                S ← S \ {(u', v')}
                UpdateCounters(−, (u',v'))
                return True
            return False

        function UpdateCounters((•, (u, v)))
            NSu,v ← NSu ∩ NSv
            for all c ∈ NSu,v do
                τ ← τ • 1
                τc ← τc • 1
                τu ← τu • 1
                τv ← τv • 1


    

In [1]:
from collections import defaultdict
import random
import operator

class Graph:
    def __init__(self, M):
        self.M = M
        self.t = 0
        self.sampleSet = set()
        self.dictNeighbor = {}
        self.tau = defaultdict(lambda:0)
        self.TAU = 0
        self.epsilon = 0
        self.globalCounter = 0
        
        
    def addEdge(self, u, v):
        self.sampleSet.add((u,v))
            
        if u in self.dictNeighbor.keys():
            self.dictNeighbor[u].add(v)
        else:
            self.dictNeighbor[u] = set([v])

        if v in self.dictNeighbor.keys():
            self.dictNeighbor[v].add(u)
        else:
            self.dictNeighbor[v] = set([u])
    
    def removeEdge(self, w, x):
        self.sampleSet.remove((w,x))
            
        self.dictNeighbor[w].remove(x)
        self.dictNeighbor[x].remove(w)
            
        if(len(self.dictNeighbor[w]) == 0):
            del self.dictNeighbor[w]
        if(len(self.dictNeighbor[x]) == 0):
            del self.dictNeighbor[x]
                
    def sampleEdge(self):
        if(self.t <= self.M):
            return True
        else:
            flipCoin = random.random()
            if(flipCoin < self.M/float(self.t)):
                edge = random.choice(tuple(self.sampleSet))
                w = int(edge[0])
                x = int(edge[1])
                self.removeEdge(w,x)
                self.updateCounters("-",w,x)
                return True
        return False
    
    def updateCounters(self,operation,u,v):
        if(u in self.dictNeighbor.keys() and v in self.dictNeighbor.keys()):
            NSu = self.dictNeighbor[u]
            NSv = self.dictNeighbor[v]

            NSuv = NSu & NSv

            ops = { "+": operator.add, "-": operator.sub }        
            if(len(NSuv) > 0):
                for c in NSuv:
                    self.tau[c] = ops[operation](self.tau[c],1)
                    self.tau[u] = ops[operation](self.tau[u],1)
                    self.tau[v] = ops[operation](self.tau[v],1)
                    self.TAU = ops[operation](self.TAU,1)
                    
                    if(self.tau[c] == 0):
                        del self.tau[c]
                    if(self.tau[u] == 0):
                        del self.tau[u]
                    if(self.tau[v] == 0):
                        del self.tau[v]
            
            self.epsilon = max(1, (self.t*(self.t-1)*(self.t-2))/(self.M*(self.M-1)*(self.M-2)))
            self.globalCounter = int(self.epsilon * self.TAU)
                
    def launchTriest(self,filepath):
        file = open(filepath,'r')
        for line in file:
            input = line.split()
            u = int(min(input))
            v = int(max(input))
            
            if((u,v) in self.sampleSet or u == v):
                continue
            
            self.t += 1
            
            if(self.sampleEdge()):
                self.addEdge(u,v)
                self.updateCounters("+",u,v)
            
            if(self.t%100==0):
                print("Iteration {:5d}|Global TAU {:5d}|Global Counter{:5d}|Samples {:5d}".format(self.t, self.TAU, self.globalCounter, len(self.sampleSet)))
        file.close() 

In [2]:
triestBase = Graph(1000)
triestBase.launchTriest('maayan-faa.tar/maayan-faa.tar')

Iteration   100|Global TAU     2|Global Counter    2|Samples   100
Iteration   200|Global TAU     7|Global Counter    7|Samples   200
Iteration   300|Global TAU    22|Global Counter   22|Samples   300
Iteration   400|Global TAU    29|Global Counter   29|Samples   400
Iteration   500|Global TAU    34|Global Counter   34|Samples   500
Iteration   600|Global TAU    39|Global Counter   39|Samples   600
Iteration   700|Global TAU    51|Global Counter   51|Samples   700
Iteration   800|Global TAU    72|Global Counter   72|Samples   800
Iteration   900|Global TAU    90|Global Counter   90|Samples   900
Iteration  1000|Global TAU   121|Global Counter  121|Samples  1000
Iteration  1100|Global TAU   108|Global Counter  143|Samples  1000
Iteration  1200|Global TAU   102|Global Counter  176|Samples  1000
Iteration  1300|Global TAU    96|Global Counter  211|Samples  1000
Iteration  1400|Global TAU    78|Global Counter  214|Samples  1000
Iteration  1500|Global TAU    70|Global Counter  236|Samples  

## Reservoir sampling in TRIEST-IMPR
        Input: Insertion-only edge stream Σ, integer M ≥ 6
        S ← ∅, t ← 0, τ ← 0
        for each element (+, (u, v)) from Σ do
            t ← t + 1
            UpdateCounters((u, v))
            if SampleEdge((u, v), t) then
                S ← S ∪ {(u, v)}

        function SampleEdge((u, v), t)
            if t ≤ M then
                return True
            else if FlipBiasedCoin(M/t) = heads then
                (u', v') ← random edge from S
                S ← S \ {(u', v')}
                return True
            return False

        function UpdateCounters((u, v))
            NSu,v ← NSu ∩ NSv
            η(t) = max{1,(t − 1)(t − 2)/(M(M − 1))}
            for all c ∈ NSu,v do
                τ ← τ + η(t)
                τc ← τc + η(t)
                τu ← τu + η(t)
                τv ← τv + η(t)

In [3]:
from collections import defaultdict
import random
import operator

class GraphImpr:
    def __init__(self, M):
        self.M = M
        self.t = 0
        self.sampleSet = set()
        self.dictNeighbor = {}
        self.tau = defaultdict(lambda:0)
        self.TAU = 0
        
        
    def addEdge(self, u, v):
        self.sampleSet.add((u,v))
            
        if u in self.dictNeighbor.keys():
            self.dictNeighbor[u].add(v)
        else:
            self.dictNeighbor[u] = set([v])

        if v in self.dictNeighbor.keys():
            self.dictNeighbor[v].add(u)
        else:
            self.dictNeighbor[v] = set([u])
    
    def removeEdge(self, w, x):
        self.sampleSet.remove((w,x))
            
        self.dictNeighbor[w].remove(x)
        self.dictNeighbor[x].remove(w)
            
        if(len(self.dictNeighbor[w]) == 0):
            del self.dictNeighbor[w]
        if(len(self.dictNeighbor[x]) == 0):
            del self.dictNeighbor[x]
                
    def sampleEdge(self):
        if(self.t <= self.M):
            return True
        else:
            flipCoin = random.random()
            if(flipCoin < self.M/float(self.t)):
                edge = random.choice(tuple(self.sampleSet))
                w = int(edge[0])
                x = int(edge[1])
                self.removeEdge(w,x)
                return True
        return False
    
    def updateCounters(self,u,v):
        if(u in self.dictNeighbor.keys() and v in self.dictNeighbor.keys()):
            NSu = self.dictNeighbor[u]
            NSv = self.dictNeighbor[v]

            NSuv = NSu & NSv
            
            eta = max(1, int((self.t-1)*(self.t-2)/(self.M*(self.M-1))) )
      
            if(len(NSuv) > 0):
                for c in NSuv:
                    self.tau[c] += eta
                    self.tau[u] += eta
                    self.tau[v] += eta
                    self.TAU += eta
                
                    if(self.tau[c] == 0):
                        del self.tau[c]
                    if(self.tau[u] == 0):
                        del self.tau[u]
                    if(self.tau[v] == 0):
                        del self.tau[v]
                
    def launchTriest(self,filepath):
        file = open(filepath,'r')
        for line in file:
            input = line.split()
            u = int(min(input))
            v = int(max(input))
            
            if((u,v) in self.sampleSet or u == v):
                continue
            
            self.t += 1
            
            self.updateCounters(u,v)
            
            if(self.sampleEdge()):
                self.addEdge(u,v)
            
            if(self.t%100==0):
                print("Iteration {:5d}|Global TAU {:5d}|Samples {:5d}".format(self.t, self.TAU, len(self.sampleSet)))
        file.close() 

In [4]:
triestImpr = GraphImpr(1000)
triestImpr.launchTriest('maayan-faa.tar/maayan-faa.tar')

Iteration   100|Global TAU     2|Samples   100
Iteration   200|Global TAU     7|Samples   200
Iteration   300|Global TAU    22|Samples   300
Iteration   400|Global TAU    29|Samples   400
Iteration   500|Global TAU    34|Samples   500
Iteration   600|Global TAU    39|Samples   600
Iteration   700|Global TAU    51|Samples   700
Iteration   800|Global TAU    72|Samples   800
Iteration   900|Global TAU    90|Samples   900
Iteration  1000|Global TAU   121|Samples  1000
Iteration  1100|Global TAU   129|Samples  1000
Iteration  1200|Global TAU   150|Samples  1000
Iteration  1300|Global TAU   162|Samples  1000
Iteration  1400|Global TAU   171|Samples  1000
Iteration  1500|Global TAU   191|Samples  1000
Iteration  1600|Global TAU   199|Samples  1000
Iteration  1700|Global TAU   209|Samples  1000
Iteration  1800|Global TAU   214|Samples  1000
Iteration  1900|Global TAU   223|Samples  1000
Iteration  2000|Global TAU   241|Samples  1000
Iteration  2100|Global TAU   261|Samples  1000
Iteration  22