# Homework 3: Mining Data Streams

### By Group 16

In this project, we implement a streaming graph processing algorithm from the paper Stefani, L. D., Epasto, A., Riondato, M., & Upfal, E. (2017). Triest: Counting local and global triangles in fully dynamic streams with fixed memory size. ACM Transactions on Knowledge Discovery from Data (TKDD), 11(4), 1-50 (link:http://www.kdd.org/kdd2016/papers/files/rfp0465-de-stefaniA.pdf). First the reservoir sampling algorithm described in the paper is implemented. Then, the streaming graph algorithms, Trièst-Base and Trièst-Base improved, from the paper that make use of the algorithm implemented in the previous step. 

In order to ensure that our implementation is correct, we test out implementation with the following datasets:

#### Notre Dame 
Nodes represent pages from University of Notre Dame (domain nd.edu) and directed edges represent hyperlinks between them. The data was collected in 1999 by Albert, Jeong and Barabasi.

| Dataset statistics               |                 |
|----------------------------------|-----------------|
| Nodes                            | 325729          |
| Edges                            | 1497134         |
| Nodes in largest WCC             | 325729 (1.000)  |
| Edges in largest WCC             | 1497134 (1.000) |
| Nodes in largest SCC             | 53968 (0.166)   |
| Edges in largest SCC             | 304685 (0.204)  |
| Average clustering coefficient   | 0.2346          |
| Number of triangles              | 8910005         |
| Fraction of closed triangles     | 0.03104         |
| Diameter (longest shortest path) | 46              |
| 90-percentile effective diameter | 9.4             |

In [1]:
import pandas as pd
data = pd.read_csv('web-NotreDame.txt', sep="\t", header=3)

In [2]:
# Selfloops
selfloops = [x for x in data.to_numpy() if x[0]==x[1]]

In [3]:
edges_no_selfloops = [frozenset(x) for x in data.to_numpy() if x[0]!=x[1]] # self-loops filtered out

In [4]:
data_edges = list(set(edges_no_selfloops)) # directed changed to undirected 

#### General Relativity and Quantum Cosmology 

Arxiv GR-QC (General Relativity and Quantum Cosmology) collaboration network is from the e-print arXiv and covers scientific collaborations between authors papers submitted to General Relativity and Quantum Cosmology category. If an author i co-authored a paper with author j, the graph contains a undirected edge from i to j. If the paper is co-authored by k authors this generates a completely connected (sub)graph on k nodes.

The data covers papers in the period from January 1993 to April 2003 (124 months). It begins within a few months of the inception of the arXiv, and thus represents essentially the complete history of its GR-QC section.

| Dataset statistics               |               |
|----------------------------------|---------------|
| Nodes                            | 5242          |
| Edges                            | 14496         |
| Nodes in largest WCC             | 4158 (0.793)  |
| Edges in largest WCC             | 13428 (0.926) |
| Nodes in largest SCC             | 4158 (0.793)  |
| Edges in largest SCC             | 13428 (0.926) |
| Average clustering coefficient   | 0.5296        |
| Number of triangles              | 48260         |
| Fraction of closed triangles     | 0.3619        |
| Diameter (longest shortest path) | 17            |
| 90-percentile effective diameter | 7.6           |

In [5]:
# Try on smaller dataset
df = pd.read_csv('CA-GrQc.txt', sep="\t", header=3)
edges_no_selfloops = [frozenset(x) for x in df.to_numpy() if x[0]!=x[1]] # self-loops filtered out
df_edges = list(set(edges_no_selfloops)) # directed changed to undirected 

### Triest-Base

In [6]:
import random

class triest:
    def __init__(self, M):
        """ Initialization
        M (int): max edges in edge sample S (at least 6)
        t (int): time t in the stream t>=0
        tau (int): global triangle counter
        localCounter (set): local counter for subset nodes u in V^t 
        S (set): edge sample up to M edges from stream
        """
        self.M = M
        self.t = 0
        self.tau = 0
        self.localCounter = {}
        self.S = set()
        
    def reservoir(self,e):
        """ Standard reservoir sampling
        e (frozenset): one undirected edge {a,b} from node a to node b
        Returns 
        bool: False if edge sample S is not modified, True otherwise 
        """ 
        if self.t <= self.M:
            return True;
        elif random.random() <= self.M / self.t:
            removeEdge = random.sample(self.S, 1)[0]  
            self.S.remove(removeEdge)
            self.updateCounters(0,removeEdge)
            return True
        return False

    def updateCounters(self,operation, newEdge):  
        """ Increment/decrement counters after each insertion/removal
        operation (int): 1 if insertion (+), 0 if removal (-)
        newEdge (list of one frozenset): edge to insert/remove
        Returns
        Updated counters
        """
        Nu = set() # Neighbourhood of u in G^s
        Nv = set() # Neighbourhood of v in G^s
        l = list(newEdge)   #to ensure order

        for x in self.S:   #for each set in S
            if l[0] in x:       #first element is u and second v
                temp = set(x)   #remove u from xES to get its neighbor
                temp.remove(l[0]) #cannot remove in frozenset
                Nu.add(temp.pop())  #add the one element there is in temp
                
            elif l[1] in x:  #no duplicates no loops, (not adding something to triangles)
                temp = set(x)
                temp.remove(l[1])
                Nv.add(temp.pop())
        
        Nuv = Nu.intersection(Nv) # intersection of neighbourhoods u and v in G^s
        for c in Nuv:
            if operation == 1:
                self.tau +=1
                self.localCounter[c] = self.localCounter.get(c,0) + 1 # by one rest by |Nuv|
                self.localCounter[l[0]] = self.localCounter.get(l[0],0) + 1
                self.localCounter[l[1]] = self.localCounter.get(l[1],0) + 1
            elif operation ==0:
                if self.tau>0:
                    self.tau-=1
                self.localCounter[c] = self.localCounter.get(c) - 1 # by one rest by |Nuv|
                self.localCounter[l[0]] = self.localCounter.get(l[0],0) - 1
                #print("deletion",l[0],self.localCounter)
                self.localCounter[l[1]] = self.localCounter.get(l[1],0) - 1
                #print("deletion",l[1],self.localCounter)
                if self.localCounter[l[0]] <= 0: 
                    del self.localCounter[l[0]]
                if self.localCounter[l[1]] <= 0:
                    del self.localCounter[l[1]]
                if self.localCounter[c] <= 0:
                    del self.localCounter[c]
                    
    def call(self, stream):
        """ Triest-base algo
        stream (list of frozensets): edge stream 
        Returns dictionary with:
            tau (int): global counter with key globalcount
            localCounter (set): local counter with key localcount
            estimation (int): estimation global triangle count with key estimation
        """
        #time_count = 0 # to check progress
        for ed in stream:
            #time_count +=1 # to check progress
            #print(f"{time_count}/{len(stream)}...") # to check progress
            self.t+=1
            if self.reservoir(ed): # if True, i.e. S will be modified
                self.updateCounters(1,ed) # update counters
                self.S.add(ed) # add edge to edge sample S
        epsilon = max(1, (self.t*(self.t-1)*(self.t*-2))/(self.M*(self.M-1)*(self.M*-2))) 
        estimation = epsilon*self.tau # estimation global triangle count
        return {"globalcount": self.tau, "localcount":self.localCounter, "estimation":estimation} 

In [7]:
# Test set with 5 nodes, 8 edges, 5 triangles
# 1-4-5, 1-4-2, 1-2-3, 1-2-5, 2-4-5
edges = [frozenset({1,2}),frozenset({1,3}),frozenset({1,5}),frozenset({1,4}),
         frozenset({2,3}),frozenset({2,5}),frozenset({2,4}),frozenset({4,5})]

In [8]:
result = triest(len(edges)).call(edges)
result

{'globalcount': 5,
 'localcount': {1: 4, 2: 4, 3: 1, 5: 3, 4: 3},
 'estimation': 5}

#### Number of triangles is 48260

In [9]:
globalcounts = []
estimations = []
times = []

In [10]:
import time
start_time = time.time()
result = triest(len(df_edges)).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 17.50273370742798 seconds ---
48260 48260


In [11]:
start_time = time.time()
result = triest(14000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 18.85434579849243 seconds ---
43366 48021.07119654726


In [12]:
start_time = time.time()
result = triest(10000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 22.559886932373047 seconds ---
15796 47998.329156771484


In [13]:
start_time = time.time()
result = triest(5000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 11.07068681716919 seconds ---
2025 49230.862572012


In [14]:
start_time = time.time()
result = triest(1000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 0.9893519878387451 seconds ---
12 36496.50865643243


In [15]:
start_time = time.time()
result = triest(500).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 0.302501916885376 seconds ---
2 48710.771072513024


In [16]:
start_time = time.time()
result = triest(100).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 0.025968551635742188 seconds ---
0 0.0


In [17]:
start_time = time.time()
result = triest(50).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 0.012964725494384766 seconds ---
0 0.0


In [18]:
start_time = time.time()
result = triest(10).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 0.006966829299926758 seconds ---
0 0.0


In [19]:
start_time = time.time()
result = triest(6).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))

--- 0.0069789886474609375 seconds ---
0 0.0


### Triest-Base improved
1. UpdateCounters is called unconditionally for each element on the stream, before the algorithm decides whether or not to insert the edge into S. 
2. trièst-impr never decrements the counters when an edge is removed from S. 
3. UpdateCounters performs a weighted increase of the counters using 
$η^{(t)} = max\{1,(t − 1)(t − 2)/(M(M − 1))\}$ as weight.

In [20]:
import random

class triestImpr:
    def __init__(self, M):
        """ Initialization
        M (int): max edges in edge sample S (at least 6)
        t (int): time t in the stream t>=0
        tau (int): global triangle counter
        localCounter (set): local counter for subset nodes u in V^t 
        S (set): edge sample up to M edges from stream
        """
        self.M = M
        self.t = 0
        self.tau = 0
        self.localCounter = {}
        self.S = set()
        
    def reservoir(self,e):
        """ Standard reservoir sampling
        e (frozenset): one undirected edge {a,b} from node a to node b
        Returns 
        bool: False if edge sample S is not modified, True otherwise 
        """ 
        if self.t <= self.M:
            return True;
        elif random.random() <= self.M / self.t:
            removeEdge = random.sample(self.S, 1)[0]  
            self.S.remove(removeEdge)
            # IMPROVEMENT 2: updateCounters function removed
            return True
        return False

    def updateCounters(self, newEdge):  # IMPROVEMENT 3: removed operation
        """ Increment/decrement counters after each insertion/removal
        newEdge (list of one frozenset): edge to insert/remove
        Returns
        Updated counters
        """
        Nu = set()
        Nv = set()
        l = list(newEdge)  

        for x in self.S:   
            if l[0] in x:       
                temp = set(x)   
                temp.remove(l[0]) 
                Nu.add(temp.pop())  
            elif l[1] in x:  
                temp = set(x)
                temp.remove(l[1])
                Nv.add(temp.pop())
                
        Nuv = Nu.intersection(Nv)
        # IMPROVEMENT 3: replace 1 with weighted increase of counters
        weight =  max(1,((self.t - 1)*(self.t - 2))/(self.M*(self.M - 1)))
        for c in Nuv:
            self.tau +=weight
            self.localCounter[c] = self.localCounter.get(c,0) + weight  
            self.localCounter[l[0]] = self.localCounter.get(l[0],0) + weight
            self.localCounter[l[1]] = self.localCounter.get(l[1],0) + weight

                    
    def call(self, stream):
        """ Triest-base algo
        stream (list of frozensets): edge stream 
        Returns dictionary with:
            tau (int): global counter with key globalcount
            localCounter (set): local counter with key localcount
            estimation (int): estimation global triangle count with key estimation
        """
        #time_count = 0 # to check progress
        for ed in stream:
            #time_count +=1 # to check progress
            #print(f"{time_count}/{len(stream)}...") # to check progress
            self.t+=1
            self.updateCounters(ed) # IMPROVEMENT 1: Moved to before if-block
            if self.reservoir(ed):
                self.S.add(ed)
        epsilon = max(1, (self.t*(self.t-1)*(self.t*-2))/(self.M*(self.M-1)*(self.M*-2))) 
        estimation = epsilon*self.tau # estimation global triangle count
        return {"globalcount": self.tau, "localcount":self.localCounter, "estimation":estimation} 

In [21]:
result = triestImpr(len(edges)).call(edges)
result

{'globalcount': 5,
 'localcount': {1: 4, 2: 4, 3: 1, 5: 3, 4: 3},
 'estimation': 5}

#### Triangle count is 48260

In [22]:
globalcounts_impr = []
estimations_impr = []
times_impr = []

In [23]:
import time
start_time = time.time()
result = triestImpr(len(df_edges)).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 16.790833234786987 seconds ---
48260 48260


In [24]:
start_time = time.time()
result = triestImpr(14000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 17.066330432891846 seconds ---
48254.20062461474 53433.98984750957


In [25]:
start_time = time.time()
result = triestImpr(10000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 15.357870101928711 seconds ---
48397.14687246792 147061.41971596156


In [26]:
start_time = time.time()
result = triestImpr(5000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 10.133779287338257 seconds ---
47844.28368401679 1163168.0764958945


In [27]:
start_time = time.time()
result = triestImpr(1000).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 2.28937029838562 seconds ---
51432.644966966975 156425997.68834385


In [28]:
start_time = time.time()
result = triestImpr(500).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 1.1865403652191162 seconds ---
48307.790813627245 1176554869.6707222


In [29]:
start_time = time.time()
result = triestImpr(100).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 0.2579233646392822 seconds ---
18829.42383838384 57787964804.974556


In [30]:
start_time = time.time()
result = triestImpr(50).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 0.133652925491333 seconds ---
125601.74775510204 3115266155736.678


In [31]:
start_time = time.time()
result = triestImpr(10).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 0.04886817932128906 seconds ---
0 0.0


In [32]:
start_time = time.time()
result = triestImpr(6).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))

--- 0.03594064712524414 seconds ---
0 0.0


### Comparison of Triest-Base and Improved Version

In [33]:
algotype = ["triest-base"]*len(times)
Ms = [len(df_edges),14000,10000,5000,1000,500,100,50,10,6]
triest_base = pd.DataFrame(
    {'times': times,
     'estimations': estimations,
     'globalcounts': globalcounts,
     'type': algotype,
     'M': Ms
    })

In [34]:
algotype = ["triest_impr"]*len(times_impr)
triest_impr = pd.DataFrame(
    {'times': times_impr,
     'estimations': estimations_impr,
     'globalcounts': globalcounts_impr,
     'type': algotype,
     'M': Ms
    })

In [35]:
df_results = triest_base.append(triest_impr)
df_results

Unnamed: 0,times,estimations,globalcounts,type,M
0,17.50373,48260.0,48260.0,triest-base,14484
1,18.85435,48021.07,43366.0,triest-base,14000
2,22.56088,47998.33,15796.0,triest-base,10000
3,11.07069,49230.86,2025.0,triest-base,5000
4,0.99035,36496.51,12.0,triest-base,1000
5,0.3025,48710.77,2.0,triest-base,500
6,0.02597,0.0,0.0,triest-base,100
7,0.01396,0.0,0.0,triest-base,50
8,0.00697,0.0,0.0,triest-base,10
9,0.00698,0.0,0.0,triest-base,6


In [41]:
import plotly.express as px

fig = px.scatter(df_results, x="M", y="globalcounts", color="type",
                title="Global Triangle Count for Triest-Base and Improved Version Based on M")
fig.add_hline(y = 48260)
fig.show()

In [43]:
import plotly.express as px

fig = px.scatter(df_results, x="M", y="estimations", color="type",
                title="Triangle Count Estimation for Triest-Base and Improved Version Based on M")
fig.add_hline(y = 48260)
fig.show()

In [44]:
fig = px.line(df_results, x="M", y="times", color="type",
             title="Runtime for Triest-Base and Improved Version Based on M")
fig.show()

## Optional task for extra bonus

1. **What were the challenges you have faced when implementing the algorithm?**


2. **Can the algorithm be easily parallelized? If yes, how? If not, why? Explain.**

It is possible to parallelize the algorithm. This can be done by partitioning the set into subsets. Another option is to parallelize the counters using Spark since associative and commutative properties are applied to addition and substraction. The associative property states that you can re-group numbers and you will get the same answer and the commutative property states that you can move numbers around and still arrive at the same answer.

3. **Does the algorithm work for unbounded graph streams? Explain.**

The algorithm works with time $t$, where $t \geq 0$. More specifically, "when queried at the end of time $t$, trièst-base returns $ξ^{(t)} τ^{(t)}$ as the estimation for the global triangle count". So, when we have an unbounded graph stream, the algorithm can retun an estimation for the global triangle count i.e. we estimate the total global triangle counts so far in the stream.

4. **Does the algorithm support edge deletions? If not, what modification would it need? Explain.**

No, but the fully dynamic algorithm proposed in the same paper called Trièst-FD is able to: "It is based on random pairing, a sampling scheme that extends reservoir sampling and can handle deletions. The idea behind the RP scheme is that edge deletions seen on the stream will be compensated by future edge insertions".