# Data Stream Mining

Find three examples in Python for Data Mining streams (Bloom filters, etc) and
compare their efficiency.

For the data stream, we will generate 100 random numbers between 1 and 100

In [27]:
import random


def getStream():
    items = []
    for x in range(100):
        items.append(round(random.random() * 100))
    return items

## Reservoir Sampling

Source: [GeeksforGeeks: Reservoir Sampling](https://www.geeksforgeeks.org/reservoir-sampling)

Reservoir Sampling is a technique which handler very big samples of data, that
usually stretch across long time and are so large that won't fit into computer
memory. The idea is taking the data in chunks of _n_ items and sampling _k_ items
from each chunk randomly.

Reservoir Sampling is performance-efficient and crude, but it serves it purpose
of keeping up with a generic sample from the stream.

In [28]:


# A utility function to print an array.
def printArray(stream, n):
    for i in range(n):
        print(stream[i], end=" ")
    print()

# A function to randomly select k items from stream[0..n-1].
def selectKItems(stream, n, k):
        i = 0

        # reservoir[] is the output array. Initialize it with
        # first k elements from stream[]
        reservoir = [0] * k
        for i in range(k):
            reservoir[i] = stream[i]

        # Iterate from the (k+1)th element to nth element
        while(i < n):
            # Pick a random index
            # from 0 to i.
            j = random.randrange(i + 1)

            # If the randomly picked index is smaller than k,
            # then replace the element present at the index
            # with new element from stream
            if j < k:
                reservoir[j] = stream[i]
            i += 1

        print("Following are 'k' - randomly selected items:")
        printArray(reservoir, k)

stream = getStream()
n = len(stream)
k = 5
selectKItems(stream, n, k);

Following are 'k' - randomly selected items:
95 73 49 8 12 


## Bloom Filter

Source: [Devopedia: Bloom Filter](https://devopedia.org/bloom-filter)

Bloom filter is used to compress the data records into a hash table. It then
stores much less space than the original items, and it much faster to determine
if an item is in that hash table or not, in a probabilistic fashion (result is
not 100% guaranteed). This is usable in sampling large data streams, where 100%
precision is not required.

The Bloom Filter has a little different purpose than the Reservoir Sampling.
Sometimes they are used together (Sampling to get a n updated sample, and the
Bloom Filter to detect that the sample does not have repeated items).

Bloom Filter is very size-efficient. Form mere bits of data it can detect if an
entry is present in a sample with very high probability. It is especially useful
in the area of data layers and IoT, where costly data requests can be mitigated
by fast and inexpensive checks in the bit-string for whether such requests are
necessary. Sometimes a mere boolean response is enough (in some preliminary
security response inside an IoT device, for example).

In [29]:
# Source: https://www.kdnuggets.com/2016/08/gentle-introduction-bloom-filter.html
# Accessed 2020-05-11

from bitarray import bitarray
import mmh3

class BloomFilter(set):

    def __init__(self, size, hash_count):
        super(BloomFilter, self).__init__()
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)
        self.size = size
        self.hash_count = hash_count

    def __len__(self):
        return self.size

    def __iter__(self):
        return iter(self.bit_array)

    def add(self, item):
        for ii in range(self.hash_count):
            index = mmh3.hash(item, ii) % self.size
            self.bit_array[index] = 1

        return self

    def __contains__(self, item):
        out = True
        for ii in range(self.hash_count):
            index = mmh3.hash(item, ii) % self.size
            if self.bit_array[index] == 0:
                out = False

        return out

def main():
    bloom = BloomFilter(100, 10)
    animals = ['dog', 'cat', 'giraffe', 'fly', 'mosquito', 'horse', 'eagle',
               'bird', 'bison', 'boar', 'butterfly', 'ant', 'anaconda', 'bear',
               'chicken', 'dolphin', 'donkey', 'crow', 'crocodile']
    # First insertion of animals into the bloom filter
    for animal in animals:
        bloom.add(animal)

    # Membership existence for already inserted animals
    # There should not be any false negatives
    for animal in animals:
        if animal in bloom:
            print('{} is in bloom filter as expected'.format(animal))
        else:
            print('Something is terribly went wrong for {}'.format(animal))
            print('FALSE NEGATIVE!')

    # Membership existence for not inserted animals
    # There could be false positives
    other_animals = ['badger', 'cow', 'pig', 'sheep', 'bee', 'wolf', 'fox',
                     'whale', 'shark', 'fish', 'turkey', 'duck', 'dove',
                     'deer', 'elephant', 'frog', 'falcon', 'goat', 'gorilla',
                     'hawk' ]
    for other_animal in other_animals:
        if other_animal in bloom:
            print('{} is not in the bloom, but a false positive'.format(other_animal))
        else:
            print('{} is not in the bloom filter as expected'.format(other_animal))

main()

dog is in bloom filter as expected
cat is in bloom filter as expected
giraffe is in bloom filter as expected
fly is in bloom filter as expected
mosquito is in bloom filter as expected
horse is in bloom filter as expected
eagle is in bloom filter as expected
bird is in bloom filter as expected
bison is in bloom filter as expected
boar is in bloom filter as expected
butterfly is in bloom filter as expected
ant is in bloom filter as expected
anaconda is in bloom filter as expected
bear is in bloom filter as expected
chicken is in bloom filter as expected
dolphin is in bloom filter as expected
donkey is in bloom filter as expected
crow is in bloom filter as expected
crocodile is in bloom filter as expected
badger is not in the bloom filter as expected
cow is not in the bloom filter as expected
pig is not in the bloom filter as expected
sheep is not in the bloom, but a false positive
bee is not in the bloom filter as expected
wolf is not in the bloom filter as expected
fox is not in the blo

## Count-Min Sketch

Sources:
- [Big Data and Sketchy algorithms](https://towardsdatascience.com/big-data-and-sketchy-algorithms-3e4d850e1a82)
- [Count Min Sketch in Pythonic Art](https://mohammedzu.medium.com/count-min-sketch-in-pythonic-art-66eca72666c6)

The Count-Min Sketch algorithm also uses hashed values and bitwise operations,
but it uses them differently than the Bloom algorithm. Count-Min creates a table
of "buckets" and increments the bits when the sample values repeat themselves.
Thus, it uses hashing and bitwise operators to no only keep a bit-encoded
representation of the values, but also to measure their frequency. Thanks to
hash representations, it allows to handle the data that it would otherwise be
hard or impossible to store in memory directly.

This technique allows finding the most common elements in the data stream without
using too much memory. However, while memory-efficient, this approach is highly
computation-heavy. In the example below, it takes half a dozen of seconds for
each of the three iterations.

In [30]:
#
# Rabbi Zidni Ilma
# "Simple Count Min Sketch in Pythonic Art"
# - Part II of the 2020 Weekend coding series on Probabilistic Data Structures.
# - Count Min Sketch is a space efficient, constant time Probabilistic data structure that
# -     guarantees zero under-counting and tunable probability over-counting of events in a stream.
# - This example use case illustrates counting the views for movies in a click stream.
# MZ: mohammedzu@gmail.com
# MZ: 20200119 - Initial
#

import random
import math
from collections import Counter


class CountMinSketch:

    def __init__(self, m, k=3):
        self.size = self.nearest_prime(m)
        self.min_sketch = [[0] * self.size] * k
        self.hash_fns = k
        self.salts = random.sample(range(1, self.size), self.hash_fns)
        # print(f'Salts: {self.salts}')

    def __len__(self):
        return self.size * self.hash_fns

    def increment_counters(self, key):
        for _i, hash_i in enumerate(self.get_hashes(key)):
            self.min_sketch[_i][hash_i] += 1
        return self

    def get_min_count(self, key):
        _all_count = [self.min_sketch[_i][hash_i]
                      for _i, hash_i in enumerate(self.get_hashes(key))]
        return min(_all_count), _all_count

    def get_hashes(self, key):
        return [self.hash_fn(key=key, size=self.size,
                             salt=self.salts[i])
                for i in range(self.hash_fns)]

    @classmethod
    def hash_fn(cls, key, size=1009, salt=1):
        """
        Simple hashing using Horner's rule
        """
        hx = int()
        for c in key:
            hx = ((hx * 31) + ord(c) + salt) % size
        return hx

    @classmethod
    def nearest_prime(cls, n):
        for num in range(n + 1 if n % 2 == 0 else n, (2 * n) + 1, 2):
            for divisor in range(3, math.ceil(math.sqrt(num)), 2):
                if num % divisor == 0:
                    break
            else:
                return num
        return n


def test_driver(data, n_trials, n_hfn, filter_size_multiplier=0.1, max_count_multiplier=1.0):

    total_true_count = 0
    _max_count = int(max_count_multiplier * 1000)
    random_click_stream_len = len(data) * _max_count
    _filter_size = int(random_click_stream_len * filter_size_multiplier)

    for trial in range(n_trials):

        over_counted = {}
        true_count = Counter()

        print(f'\nTrial: {trial + 1}/{n_trials} [Initializing...]')
        cms = CountMinSketch(_filter_size, n_hfn)
        print(f'Filter Size: {len(cms)}'
              f' Hash Functions: {cms.hash_fns}'
              f' Unique Events: {len(data)}'
              f'\nLoose Limit for Events: {_max_count}')  # For testing here

        print(f'Generating Random Click Stream and counting... Please wait')
        random.shuffle(data)
        for _ in range(random_click_stream_len):
            _random_key = data[random.randint(0, random.randint(0, len(data) - 1))]
            cms.increment_counters(_random_key)
            true_count.update([_random_key])
        print(f'Stream Size: {sum(true_count.values())} ')  # Same as random_click_stream_len

        # # Warning: Un-commenting this will be noisy!
        # print(f'Click Stream True Count: {true_count}')

        print('Validating Counter Sketch...')
        for _key in data:
            tc, (mc, _counter_vec) = true_count.get(_key, 0), cms.get_min_count(_key)
            # # Warning: Un-commenting this will be noisy!
            # print(f'"{_key}" '
            #       f'True Count: {tc} '
            #       f'Min Count: {mc} '
            #       f'Count Vector: {_counter_vec}',
            #       end=' || ')
            if mc > tc:
                over_counted[_key] = {'True Count': tc, 'Min Count': mc,
                                      'Count Vector': _counter_vec}

        print(f'Over-counted ({len(over_counted)}/{len(data)}): {over_counted}')
        total_true_count += len(over_counted)

    print(f'\nAccuracy over {n_trials} Trials: {1 - (total_true_count / (len(data) * n_trials))}')


if __name__ == '__main__':
    # Test data of random sci-fi movies! Geeky list... :)
    all_movies = ['The Terminator (1984)', 'The Matrix (1999)', 'The Matrix Reloaded (2003)',
                  'The Matrix Revolutions (2003)', 'Tron (1982)', 'TRON: Legacy (2010)', 'Stargate (1994)',
                  'Coherence (2013)', 'The Chronicles of Riddick (2004)', 'Riddick (2013)', 'Pitch Black (2000)',
                  'Star Wars (1977)', 'The Empire Strikes Back (1980)', 'Return of the Jedi (1983)',
                  'Sunshine (2007)', 'Cowboy Bebop: The Movie (2001)', 'Westworld (1973)',
                  'Perfect Sense (2011)', 'The Martian (2015)', 'Close Encounters of the Third Kind (1977)',
                  'I Origins (2014)', "The World's End (2013)", 'The Mist (2007)', 'Robocop (1987)',
                  'Frequently Asked Questions About Time Travel (2009)', 'The Animatrix (2003)',
                  'The Girl Who Leapt Through Time (2006)', 'Gattaca (1997)', 'The Thing (1982)',
                  'The Truman Show (1998)', 'The Iron Giant (1999)', 'Eternal Sunshine of the Spotless Mind (2004)',
                  'Donnie Darko (2001)', 'V for Vendetta (2005)', 'Cloud Atlas (2012)', 'K-PAX (2001)', 'Cube (1997)',
                  'Back to the Future (1985)', 'Back to the Future Part II (1989)',
                  'Back to the Future Part III (1990)', 'Predator (1987)', 'Jurassic Park (1993)',
                  'Alien (1979)', 'Planet of the Apes (1968)', 'Rise of the Planet of the Apes (2011)',
                  'The Adjustment Bureau (2011)', 'I, Robot (2004)', 'X-Men (2000)',
                  'E.T. the Extra-Terrestrial (1982)', 'WALL·E (2008)', 'The Fifth Element (1997)',
                  'Ghostbusters (1984)', 'District 9 (2009)', 'Terminator 2: Judgment Day (1991)',
                  'The Man From Earth (2007)', 'Inception (2010)', 'Twelve Monkeys (1995)',
                  'Nineteen Eighty-Four (1984)', 'Spectral (2016)', 'Arrival (2016)', 'Dune (1984)',
                  'Blade Runner (1982)', 'Ghost in the Shell (1995)', 'Flatliners (1990)', 'Wargames (1983)',
                  '10 Cloverfield Lane (2016)', 'Robot & Frank (2012)', 'Hardcore Henry (2015)',
                  'Sound of My Voice (2011)', 'Her (2013)', 'Deadpool (2016)', 'Star Wars: The Force Awakens (2015)',
                  'Safety Not Guaranteed (2012)', 'The Lobster (2015)', 'Dredd (2012)', 'Circle (2015)',
                  'Time Lapse (2014)', 'A Scanner Darkly (2006)', 'Transfer (2010)', 'Sleeper (1973)',
                  'The Jacket (2005)', 'Monsters (2010)', 'Cargo (2009)', 'Attack the Block (2011)',
                  'Lockout (2012)', 'Mr. Nobody (2009)', 'Another Earth (2011)',
                  "Bill & Ted's Excellent Adventure (1989)", '2001: A Space Odyssey (1968)',
                  'Déjà Vu (2006)', 'Predestination (2014)', 'The Signal (2014)', 'Signs (2002)',
                  'Event Horizon (1997)', 'Source Code (2011)', 'Edge of Tomorrow (2014)', 'Priest (2011)',
                  'The Abyss (1989)', 'Cloverfield (2008)', 'The Fountain (2006)', 'Brazil (1985)',
                  'Snowpiercer (2013)', 'Pandorum (2009)', 'The Thirteenth Floor (1999)', 'Existenz (1999)',
                  'Mad Max 2: The Road Warrior (1981)', 'Primer (2004)', 'A.I. Artificial Intelligence (2001)',
                  'Oblivion (2013)', 'Akira (1988)', 'Total Recall (1990)', 'Next (2007)', 'Planet Terror (2007)',
                  'Mars Attacks! (1996)', 'Demolition Man (1993)', "Ender's Game (2013)", 'Contact (1997)',
                  'The Time Machine (2002)', 'Spy Kids (2001)', 'The Butterfly Effect (2004)',
                  'War of the Worlds (2005)', 'S1m0ne (2002)', 'Looper (2012)', 'Dark City (1998)',
                  'Galaxy Quest (1999)', 'Starship Troopers (1997)', 'Critters (1986)', 'Resident Evil (2002)',
                  'Transformers (2007)', 'Men in Black (1997)', 'Men in Black II (2002)',
                  'Terminator 3: Rise of the Machines (2003)', 'Phenomenon (1996)', 'Idiocracy (2006)',
                  'Watchmen (2009)', 'Children of Men (2006)', 'Paycheck (2003)', 'Independence Day (1996)',
                  'Bicentennial Man (1999)', 'Chronicle (2012)', 'Serenity (2005)',
                  'Frequency (2000)', 'Aliens (1986)', 'In Time (2011)', 'Paul (2011)', 'Gamer (2009)',
                  'Limitless (2011)', 'Minority Report (2002)', 'The Island (2005)',
                  'The Day the Earth Stood Still (2008)', 'The Happening (2008)', 'Knowing (2009)',
                  'Equilibrium (2002)', 'I Am Legend (2007)', 'Timecrimes (2007)', 'Triangle (2009)',
                  'Star Trek (1966)', 'Star Trek: The Animated Series', 'Star Trek: The Motion Picture (1979)',
                  'Star Trek II: The Wrath of Khan (1982)', 'Star Trek: The Search for Spock',
                  'Star Trek IV: The Voyage Home (1986)', 'Star Trek the Next Generation',
                  'Star Trek VI: the Undiscovered Country', 'Star Trek: Deep Space Nine',
                  'Star Trek: Generations (1994)', 'Star Trek: Voyager', 'Star Trek: First Contact (1996)',
                  'Star Trek: Insurrection (1998)', 'Star Trek: Enterprise', 'Star Trek: Nemesis (2002)',
                  'Star Trek (2009)', 'Star Trek Into Darkness (2013)']

    # Tunable Parameters, Play around with these.
    number_of_trials = 3
    number_of_hash_functions_k = 3
    _max_count_multiplier = 1.295
    _filter_size_multiplier = 0.0163

    test_driver(data=all_movies,
                filter_size_multiplier=_filter_size_multiplier,
                n_trials=number_of_trials,
                n_hfn=number_of_hash_functions_k,
                max_count_multiplier=_max_count_multiplier)


Trial: 1/3 [Initializing...]
Filter Size: 10977 Hash Functions: 3 Unique Events: 173
Loose Limit for Events: 1295
Generating Random Click Stream and counting... Please wait
Stream Size: 224035 
Validating Counter Sketch...
Over-counted (1/173): {'Cargo (2009)': {'True Count': 1183, 'Min Count': 1536, 'Count Vector': [1536, 2758, 2523]}}

Trial: 2/3 [Initializing...]
Filter Size: 10977 Hash Functions: 3 Unique Events: 173
Loose Limit for Events: 1295
Generating Random Click Stream and counting... Please wait
Stream Size: 224035 
Validating Counter Sketch...
Over-counted (0/173): {}

Trial: 3/3 [Initializing...]
Filter Size: 10977 Hash Functions: 3 Unique Events: 173
Loose Limit for Events: 1295
Generating Random Click Stream and counting... Please wait
Stream Size: 224035 
Validating Counter Sketch...
Over-counted (0/173): {}

Accuracy over 3 Trials: 0.9980732177263969
