# Coursework 3 Streaming Algorithm

## **Task1：DGIM**

DGIM is an efficient algorithm in processing large streams. When it's infeasible to store the flowing binary stream, DGIM can estimate the number of 1-bits in the window. In this coding, you're given the stream_data_dgim.txt (binary stream), and you need to implement the DGIM algorithm to count the number of 1-bits. Write code below.

### 1. Set the window size to 1000, and count the number of 1-bits in the current window.

In [1]:
import math
import time
from collections import deque

filename = './stream_data_dgim.txt'

class Bucket():
    def __init__(self, timestamp, size):
        self.timestamp = timestamp
        self.size = size

class DGIM():
    def __init__(self, window_size):
        self.window_size = window_size
        self.timestamp = 0
        self.buckets = deque()
    
    def update(self, bit):
        self.timestamp += 1
        
        if self.buckets and self.buckets[0].timestamp <= (self.timestamp - self.window_size):
            self.buckets.popleft()

        if bit == 0:
            return
        
        self.buckets.append(Bucket(self.timestamp, 1))
        self.merge()

    def index(self):
        if len(self.buckets) < 3:
            return

        for i in range(len(self.buckets) - 1, 1, -1):
            curr_size = self.buckets[i].size
            if (curr_size == self.buckets[i - 1].size
                    and curr_size == self.buckets[i - 2].size):
                return i

    def merge(self):
        i = self.index()

        while i is not None:
            self.buckets[i - 1].size += self.buckets[i - 2].size
            del self.buckets[i - 2]

            i = self.index()

    def count(self, k):
        total_size = 0
        last_bucket_size = 0
        for i in range(len(self.buckets) - 1, -1, -1):
            if self.buckets[i].timestamp <= self.timestamp - k:
                break

            total_size += self.buckets[i].size
            last_bucket_size = self.buckets[i].size

        return total_size - int(math.ceil(last_bucket_size / 2.0))

def tic():
    global _start_time
    _start_time = time.perf_counter()


def toc():
    return 1000 * (time.perf_counter() - _start_time)

with open (filename, 'r') as f:
    stream = list(map(int, f.read().split()))

dgim = DGIM(1000)

for bit in stream:
    dgim.update(bit)

tic()

print("The number of 1-bits in the current window:", dgim.count(1000))
print("DGIM query time: %.2f ms" % toc())

The number of 1-bits in the current window: 508
DGIM query time: 0.17 ms


### 2. With the window size 1000, count the number of 1-bits in the last 500 and 200 bits of the bitstream.

In [2]:
print("The number of 1-bits in the last 500 bits of the bitstream:", dgim.count(500))

print("The number of 1-bits in the last 200 bits of the bitstream:", dgim.count(200))

The number of 1-bits in the last 500 bits of the bitstream: 220
The number of 1-bits in the last 200 bits of the bitstream: 76


### 3. Write a function that accurately counts the number of 1-bits in the current window. Caculate the accuracy of your own DGIM algorithm and compare the running time difference.

In [3]:
tic()

print("The accurate number of 1-bits in the current window:", abs(sum(stream[-1000:])))
print("Accurate counter time: %.2f ms" % toc())
print("Rel. err. %.2f%%" % (100 * abs(sum(stream[-1000:]) - dgim.count(1000)) / sum(stream[-1000:])))

The accurate number of 1-bits in the current window: 391
Accurate counter time: 0.44 ms
Rel. err. 29.92%


## **Task2: Bloom Filter**

A Bloom filter is a space-efficient probabilistic data structure. Here the task is to implement a bloom filter by yourself. 

### Data loading:

From the NLTK (Natural Language ToolKit) library, we import a large list of English dictionary words, commonly used by the very first spell-checking programs in Unix-like operating systems.

In [4]:
import nltk
from nltk.corpus import words
nltk.download('words')
word_list = words.words()

[nltk_data] Downloading package words to
[nltk_data]     C:\Users\xxhyy\AppData\Roaming\nltk_data...
[nltk_data]   Package words is already up-to-date!


Then we load another dataset from the NLTK Corpora collection: movie_reviews.

The movie reviews are categorized between positive and negative, so we construct a list of words (usually called bag of words) for each category.

In [5]:
from nltk.corpus import movie_reviews
nltk.download('movie_reviews')

neg_reviews = []
pos_reviews = []

for fileid in movie_reviews.fileids('neg'):
    neg_reviews.extend(movie_reviews.words(fileid))
for fileid in movie_reviews.fileids('pos'):
    pos_reviews.extend(movie_reviews.words(fileid))

[nltk_data] Downloading package movie_reviews to
[nltk_data]     C:\Users\xxhyy\AppData\Roaming\nltk_data...
[nltk_data]   Package movie_reviews is already up-to-date!


Here we get a data stream (word_list) and 2 query lists (neg_reviews and pos_reviews).

### 1. Write a function that accurately determines whether each word in neg_reviews and pos_reviews belongs to word_list.

In [6]:
def determine(needles, haystack):
    hashed = set(needles)
    return list(map(hashed.__contains__, haystack))

word_pos = determine(word_list, pos_reviews)
word_neg = determine(word_list, neg_reviews)

print("Postive word number:", len(word_pos))
print("Negative word number:", len(word_neg))

Postive word number: 832564
Negative word number: 751256


 ### 2. Implement the bloom filter by yourself and add all words in word_list in your bloom filter. Compare the running time difference between linear search on a list and multiple hash computations in a Bloom filter.

In [7]:
import numpy
import hashlib
import random
random.seed(9527)


str = "QWERTYUIOP[]ASDFGHJKL;ZXCVBNM,./123456789"


class Bloom(object):
    def __init__(self, needles, m, k):
        super().__init__()
        salts = []
        self.k = k
        self.m = m
        for i in range(k):
            q = list(str)
            random.shuffle(q)
            salts.append(''.join(q[:16]))
        self.salts = salts
        self.hashalg = hashlib.sha1
        khash = numpy.zeros([m], dtype=numpy.bool_)
        for needle in needles:
            for i in range(k):
                khash[self.myhash(salts[i], needle, m)] = True
        self.khash = khash

    def myhash(self, salt, needle, modp):
        return int.from_bytes(self.hashalg((salt + needle).encode()).digest(), 'little') % modp

    def query(self, haystack_single):
        for i in range(self.k):
            if not self.khash[self.myhash(self.salts[i], haystack_single, self.m)]:
                return False
        return True


bloom = Bloom(word_list, 1000, 4)
tic()
for i in range(200):
    pos_reviews[i] in word_list
print("Scan: %.2f ms" % toc())
tic()
for i in range(200):
    bloom.query(pos_reviews[i])
print("Bloom: %.2f ms" % toc())

Scan: 328.67 ms
Bloom: 1.29 ms


### 3. Use different bit array length ‘m’ and number of hash functions ‘k’ to implement the bloom filter algorithm. Then compare the impact of different m and k on the false positive rate.

In [8]:
def bloom_test():
    k = 2

    for lg10m in range(5, 8):
        bloom = Bloom(word_list, 10 ** lg10m, k)
        fp = 0
        neg = 0

        for i, rev in enumerate(pos_reviews):
            fp += (not word_pos[i]) and bloom.query(rev)
            neg += not word_pos[i]
        for i, rev in enumerate(neg_reviews):
            fp += (not word_neg[i]) and bloom.query(rev)
            neg += not word_neg[i]

        print("m =", bloom.m, "k =", k, "fp = %.2f%%" % (100 * fp / neg))

    m = 10 ** 6

    for k in [1, 2, 3, 4, 6, 9]:
        bloom = Bloom(word_list, m, k)
        fp = 0
        neg = 0

        for i, rev in enumerate(pos_reviews):
            fp += (not word_pos[i]) and bloom.query(rev)
            neg += not word_pos[i]
        for i, rev in enumerate(neg_reviews):
            fp += (not word_neg[i]) and bloom.query(rev)
            neg += not word_neg[i]
            
        print("m =", bloom.m, "k =", k, "fp = %.2f%%" % (100 * fp / neg))

bloom_test()

m = 100000 k = 2 fp = 98.57%
m = 1000000 k = 2 fp = 46.19%
m = 10000000 k = 2 fp = 0.11%
m = 1000000 k = 1 fp = 26.60%
m = 1000000 k = 2 fp = 8.52%
m = 1000000 k = 3 fp = 7.24%
m = 1000000 k = 4 fp = 20.03%
m = 1000000 k = 6 fp = 16.00%
m = 1000000 k = 9 fp = 57.07%


## **Task3: Statistics Estimation**

Here we use the query stream (neg_reviews) from task 2 to estimate 1) the number of distinct words appeared, and 2)the surprise number of the stream.

### 1. 	Write a function that accurately counts the occurrence times of each word in neg_reviews.

In [9]:
from collections import Counter

occurrence_times = Counter(neg_reviews)
print("The occurrence times of each word in neg_reviews:", len(occurrence_times))

The occurrence times of each word in neg_reviews: 28480


### 2. Implement the Flajolet-Martin alg. to estimate the number of distinct words occurred. Try multiple hash functions to improve the estimate.

In [10]:
def flajolet_martin():
    hasher = Bloom([], 1, 40)
    modp = 2 ** 30
    est = []

    for i in range(8):
        r = 0
        salt = hasher.salts[i]
        hash_fn = hasher.myhash

        for word in neg_reviews:
            mh = hash_fn(salt, word, modp)
            r = max(r, len(bin(mh)) - len(bin(mh).rstrip('0')))
        est.append(r)
        des = []
        for k in range(5):
            if len(est[k::5]):
                des.append(numpy.mean(2 ** numpy.array(est[k::5])))
                
        if i % 2 == 1:
            print("n_hash =", i + 1, "est =", numpy.median(des))

flajolet_martin()

n_hash = 2 est = 1052672.0
n_hash = 4 est = 73728.0
n_hash = 6 est = 36864.0
n_hash = 8 est = 36864.0


### 3.Estimate the surpise number with limited memory to store words.

In [11]:
from collections import defaultdict

def dp(memory_limit=100):
    n = len(neg_reviews)
    dp = numpy.zeros(n, dtype=numpy.int64)
    last_map = dict()

    for i in reversed(range(n)):
        dp[i] = 1
        if neg_reviews[i] in last_map:
            dp[i] += dp[last_map[neg_reviews[i]]]
        last_map[neg_reviews[i]] = i

    return (dp[numpy.random.randint(n, size=[memory_limit])].mean() * 2 - 1) * n


def surprise_number(memory_limit=100):
    c = 0
    n = len(neg_reviews)
    track = defaultdict(int)
    ts = sorted(numpy.random.randint(n, size=[memory_limit]))[::-1]

    for t, w in enumerate(neg_reviews):
        while len(ts) and t == ts[-1]:
            track[w] += 1
            ts.pop()
        if w in track:
            c += track[w]

    return (2 * c - 1) * n / memory_limit


print("Ground truth:", numpy.sum(numpy.array(list(occurrence_times.values())).astype(numpy.float32) ** 2))

for lim in [1000, 3000, 10000, 100000]:
    print("Estimation of surprise number with", lim, ":", surprise_number(lim))


Ground truth: 5896116000.0
Estimation of surprise number with 1000 : 5760093859.96
Estimation of surprise number with 3000 : 5998618641.634666
Estimation of surprise number with 10000 : 5917395973.148
Estimation of surprise number with 100000 : 5905360423.81208
