# Mining Data Streams

In this lab, we are supposed to pick a dataset and a paper. 

## Dataset
We use the [Google Webgraph dataset](https://snap.stanford.edu/data/web-Google.html). Potential caveat that it contains *directed* edges. 

| **Property** | **Value** |
| --- | --- |
| Nodes | 875713          |
| Edges | 5105039         |
| Nodes in largest WCC | 855802 (0.977)  |
| Edges in largest WCC | 5066842 (0.993) |
| Nodes in largest SCC | 434818 (0.497)  |
| Edges in largest SCC | 3419124 (0.670) |
| Average clustering coefficient   | 0.5143          |
| Number of triangles              | 13391903        |
| Fraction of closed triangles     | 0.01911         |
| Diameter (longest shortest path) | 21              |
| 90-percentile effective diameter | 8.1             |

## Paper
* ~~We chose the algorithm in the paper [A Space-Efficient Streaming Algorithm for Estimating Transitivity and Triangle Counts Using the Birthday Paradox](https://arxiv.org/pdf/1212.2264.pdf) by M. Jha, C. Seshadhri, and A. Pinar.~~
* Second thought, how about no. Terrible psuedocode made us turn to [TRIÈST: Counting Local and Global Triangles in
Fully-Dynamic Streams with Fixed Memory Size](https://www.kdd.org/kdd2016/papers/files/rfp0465-de-stefaniA.pdf)

### How to run this Notebook with Docker
If you want to run this Notebook with Docker, the Jupyter team has several images over at ... . In the [documentation](https://jupyter-docker-stacks.readthedocs.io/en/latest/index.html) they show an example:

```bash
docker run -p 10000:8888 jupyter/pyspark-notebook:latest
```

> Visiting `http://<hostname>:10000/?token=<token>` in a browser loads JupyterLab, where `<hostname> = 0.0.0.0` and token is generated in the terminal.

In [23]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("lab3").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark

In [26]:
# read a csv file with pyspark
rdd = spark.read.option("delimiter", "\t").csv("../../data/web-Google.txt", header=True, inferSchema=True)

In [27]:
# show df schema
rdd.show(5)

+----------+--------+
|FromNodeId|ToNodeId|
+----------+--------+
|         0|   11342|
|         0|  824020|
|         0|  867923|
|         0|  891835|
|     11342|       0|
+----------+--------+
only showing top 5 rows



I don't know how to simulate a stream properly, so I'll convert the rdd to a Panda's dataframe, and iterate over each row 🤷🏻‍♂️

In [30]:
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = rdd.toPandas()
stream = []
for index, row in df.iterrows():
    edge = (row[0], row[1])
    stream.append(edge)

Parameters in the paper:

* Our algorithms keep an edge sample $\mathcal{S}$ of up to $M$ edges from the stream (as contrary to sampling with a probability $p$)

* Trièst algorithms keep counters to compute the estimations of the global and local number of triangles. They always keep one global counter $\tau$ for the estimation of the global number of triangles.

* For any t ≥ 0, let $G^S = (V^S, E^S)$ be the subgraph of $G(t)$ containing all and only the edges in the currentsample $\mathcal{S}$. We denote with $\mathcal{N}^S_u$ the neighborhood of $u$ in $G^S: \mathcal{N}^S_u = \{v ∈ V^{(t)}: (u, v) ∈ \mathcal{S}\}$ and with $\mathcal{N}^\mathcal{S}_{u, v} = \mathcal{N}^\mathcal{S}_u \cap \mathcal{N}^\mathcal{S}_v$ the shared neighborhood of $u$ and $v$ in $G^\mathcal{S}$

In [251]:
import numpy as np
from random import sample
from collections import defaultdict
from operator import add, sub

class TriestBase:
    # set up nasty globals >_<
    def __init__(self, stream, M=6):
        self.stream = stream
        # defaultdict avoids missing key errors, and with 'int' it defaults to 0
        self.tau = defaultdict(int)
        self.S = set()
        self.t = 0
        self.M = M

    def run(self):
        for edge in self.stream:
            self.t+=1
            if self.sample_edge(edge):
                self.S = self.S ^ {edge}
                self.update_counters(operator=add, edge=edge) 

    def sample_edge(self, edge):
        if self.t <= self.M:
            return True
        elif self.flip_biased_coin(self.M/self.t):
            # pick a random edge from S
            random_edge = sample(self.S, 1) # set of tuples require sample?
            # remove it from S
            self.S = self.S - {random_edge[0]}
            self.update_counters(operator=sub, edge=random_edge[0])
            return True
        else:
            return False

    def flip_biased_coin(self, p):
        # TODO is olikheten correct or should it be '>'?
        return np.random.rand() < p

    def update_counters(self, operator, edge):
        u, v = edge
        # TODO is this really right?
        neighborhood = set(self.neighbors(u)) & set(self.neighbors(v))
        for c in neighborhood:
            # bug here, the operator is malfunctioning???
            self.tau['global'] = operator(self.tau['global'], 1)
            self.tau[c] = operator(self.tau[c], 1)
            self.tau[u] = operator(self.tau[u], 1)
            self.tau[v] = operator(self.tau[v], 1)

    def neighbors(self, vertex):
        # if this is a bottle neck, it could be optimized by array indexing
        for u, v in self.S:
            if u != vertex:
                continue
            else:
                yield v

In [194]:
from random import sample
test_set = {(1,2), (3,4), (5,6)}
sample(test_set, 1)

since Python 3.9 and will be removed in a subsequent version.
  sample(test_set, 1)


[(1, 2)]

In [254]:
tb = TriestBase(stream, M=100)
tb.run()

since Python 3.9 and will be removed in a subsequent version.
  random_edge = sample(self.S, 1) # set of tuples require sample?


In [260]:
sum = 0
for v in tb.tau.values():
    sum += v
sum

-152

In [228]:
from collections import defaultdict
from operator import add, sub   

d = defaultdict(int)
f = add
f(d['a'], 1)

f = sub
print(f(d['b'], 1))
f(d['b'], 1)
f(d['b'], 1)

d['b']

-1


0

In [185]:
add(1,1)

2

In [125]:
test_graph = {(0,1), (0,2), (1,2), (3,4)}
b = (1,3)

test_graph ^ {b}

{(0, 1), (0, 2), (1, 2), (1, 3), (3, 4)}