## CountMin Sketch

### Zhe HUANG

In [20]:
import random
import math

In [21]:
class CountMinSketch:

    def __init__(self, epsilon, delta, p):
        self.epsilon = epsilon
        self.delta = delta
        # a large prime number p
        self.p = p

        # d hash functions over w values
        self.d = int(math.ceil(math.e / epsilon))
        self.w = int(math.ceil(math.log(1 / delta)))

        # a structure which host d × w cells
        self.table = [[0] * self.w for _ in range(self.d)]

        # The hash_params variable is a list of d tuples
        # where each tuple contains the values of a and b for the corresponding hash function.
        self.hash_params = [(random.randrange(p), random.randrange(p)) for _ in range(self.d)]
    

    def update(self, x):
        for i in range(self.d):
            # h is hash value
            a, b = self.hash_params[i]
            h = ((a * hash(x) + b) % self.p) % self.w
            
            self.table[i][h] += 1


    def query(self, x):
        min_count = float('inf')

        for i in range(self.d):
            a, b = self.hash_params[i]
            h = ((a * hash(x) + b) % self.p) % self.w

            min_count = min(min_count, self.table[i][h])

        return min_count

In [48]:
def test(epsilon, delta, p):
    cms = CountMinSketch(epsilon, delta, p)
    for word in list:
        cms.update(word)

    for word in set:
        print('{0}: {1}'.format(word, cms.query(word)))

In [45]:
list = ['Elephant', 'Rainbow', 'Butterfly', 'Sunflower', 'Crocodile',
        'Waterfall', 'Snowflake', 'Raspberry', 'Telephone' ,'Butterfly']

set = set(list)

In [49]:
test(epsilon=0.1, delta=0.01, p=122354367)

Butterfly: 2
Elephant: 1
Waterfall: 1
Telephone: 1
Raspberry: 1
Rainbow: 1
Sunflower: 1
Crocodile: 1
Snowflake: 1


In [50]:
test(epsilon=0.1, delta=0.3, p=122354367)

Butterfly: 3
Elephant: 2
Waterfall: 2
Telephone: 2
Raspberry: 2
Rainbow: 3
Sunflower: 3
Crocodile: 1
Snowflake: 2


In [51]:
test(epsilon=0.1, delta=0.4, p=122354367)

Butterfly: 10
Elephant: 10
Waterfall: 10
Telephone: 10
Raspberry: 10
Rainbow: 10
Sunflower: 10
Crocodile: 10
Snowflake: 10


Conclusion :

Increasing delta in a count-min sketch algorithm increases the number of hash functions used to update the counters, which reduces collisions between elements and makes the sketch more accurate. 

However, count-min sketch is a probabilistic algorithm, and increasing delta also increases the range of error allowed in the frequency estimates. Therefore, when we increase delta, the count numbers may become larger than the exact value, but they will still be accurate within the allowed range of error.

## Extra  

1.Explain why a count-min sketch cannot be implemented directly in Spark Streaming. 

<font color = orange> 
This is because Spark Streaming operates on a micro-batch processing model, where incoming data is partitioned into small batches and processed in discrete time intervals. 
But CountMin Sketch requires a continuous and online updating of the counters for each incoming element, which is not possible with the batch processing model of Spark Streaming.
<font>

2.What would you need to implement it?

<font color = orange> 
To implement CountMin Sketch in Spark Streaming, I will modify the processing model to support a continuous and incremental updating of the counters. 
This can be done using stateful operations, such as mapWithState or updateStateByKey, which maintain a running state across multiple batches. 
Specifically, the state would need to maintain the count-min sketch matrix, and each incoming data element would update the corresponding counters in the matrix based on their hash values.
<font>