# Assignment 02     
## CSCI E-108
### Steve Elston


In this assignment you will work with some basic streaming analytic algorithms. To avoid the complexities of installing and setting up a real streaming analytics platform, you will work with stream flow data loaded from local files. Specifically in this assignment you will:    
1. Create and apply code to perform basic stream queries.    
2. Using stream queries and plots, explore the stream data.    
3. Use moving windows to compute moving averages and sub-sample a stream.    
4. Use exponential decay filters to compute moving averages and sub-sample a stream.    
5. Work with a Bloom and quotient filters to filter for customer identifiers on a list.
6. Use the count-min-sketch algorithm to find counts of unique event identifiers.
7. Use the HyperLogLog algorithm to find the cardinality of events in a stream.  

## Overview

The United States Geological Survey (USGS) maintains over 13,500 stream flow gages in the United States. Measurements from most of these gages are recoded every 15 min and uploaded every 4 hours are [available for download](https://waterdata.usgs.gov/nwis/rt). Stream flow data are used as inputs for complex water management tasks for purposes such as agriculture, residential use and conservation. For this assignment you will work with the time series of measurements for two stream flow gages on tributaries of the the Columbia River in the US State of Washington.     

To get started, execute the code in the cell below to import the packages you will need.

> **Note:** You will need to pip install four packages to run the code in this notebook. You can perform the installation of these packages by uncommenting the shell commands shown in the cell below. Alternatively, you can also Conda install these packages.    
> 1. [Mmh3](https://github.com/hajimes/mmh3)      
> 2. [Bitarray](https://github.com/ilanschnell/bitarray)
> 3. [PyProbables](https://pyprobables.readthedocs.io/en/latest/index.html)
> 4. [Datasketch](https://ekzhu.com/datasketch/index.html)

In [1]:
!pip install mmh3
!pip install bitarray
!pip install pyprobables
!pip install datasketch

Collecting mmh3
  Downloading mmh3-4.1.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (67 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/67.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.6/67.6 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mmh3
Successfully installed mmh3-4.1.0
Collecting bitarray
  Downloading bitarray-2.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (288 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m288.3/288.3 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: bitarray
Successfully installed bitarray-2.9.2
Collecting pyprobables
  Downloading pyprobables-0.6.0-py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.0/40.0 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling col

In [2]:
import pandas as pd
import numpy as np
import numpy.random as nr
import mmh3
from bitarray import bitarray
from probables import QuotientFilter, CountMinSketch, HeavyHitters
from datasketch import HyperLogLog
import math
import matplotlib.pyplot as plt
from decimal import Decimal

%matplotlib inline

## Loading the Dataset  

The next step is to load the stream gage data. The code in the cell below loads the time series data for the first gage. This gage is sited on the Okanogan river.  

The code in the cell below does the following:  
1. Loads the data from a .csv file.
2. Converts the time stamp column to an index of the Pandas data frame.
3. Assigns human-understandable names to the columns.  
4. Returns just the first 4 columns of the data frame.

Execute this code and examine the results.

In [3]:
def read_index_series(file_name):
    '''Function to read time series data from a file.
    Argument is the path and filename.'''
    df = pd.read_csv(file_name, sep='\t')
    df.index = df.datetime
    df.drop('datetime', axis=1, inplace=True)
    df = df.iloc[:,:4]
    df.columns = ['Agency', 'Site_number', 'Time_zone', 'Stream_flow']
    return df.iloc[:,:4]

Malott = read_index_series('../data/12447200_Okanogan_at_Malott.txt')
Malott

FileNotFoundError: [Errno 2] No such file or directory: '../data/12447200_Okanogan_at_Malott.txt'

The other time series is for a gage on the Yakima River. Execute the code in the cell below and examine the result.

In [None]:
CleElm = read_index_series('../data/12479500_Yakima_At_CleElm.txt')
CleElm

Since we really only want to work with one data frame. The code in the cell below merges the two time series and sorts them into time index order. Execute this code and examine the result, paying attention to the site number and the datetime index.  

In [None]:
stream_flow = pd.concat([Malott,CleElm]).sort_index()
stream_flow

## Querying Stream Data

Common stream data operations are often formulated as queries on the stream data. Many streaming data platforms use extensions of SQL for these queries.   

To keep things simple in this assignment we will just use a basic query function. The function shown in the cell below supports queries on the time series.      

In [None]:
def query_stream(df, Columns=None, site_numbers=None, start_time=None, end_time=None):
    '''
    Function to query the stream gage time series data. The arguments are:
    df = the data frame containing the data.
    Columns = a list of columns to return.
    site_numbers = a list of gage site numbers to query data.
    start_time = the start time of the returned data as datatime string or integer index.
    end_time = the end time of the returned data as datatime string or integer index.
    '''
    ## First set values for arguments set to Null
    if Columns==None: Columns = df.columns
    if start_time==None: start_time = df.index[0]
    if end_time==None: end_time = df.index[df.shape[0]-1]
    if site_numbers==None: site_numbers = df.Site_number.unique()

    ## Select the sites
    df = df.loc[df.Site_number.isin(site_numbers), Columns]

    ## Test if index is a string datetime or an integer
    ## use iloc method if an integer.
    ## A slice over the time range is created based on the index type.
    if isinstance(df,pd.DataFrame):
        if isinstance(start_time, str):
            df = df.loc[start_time:end_time,:]
        else:
            df = df.iloc[start_time:end_time,:]
    else: # Must be a Pandas series
        if isinstance(start_time, str):
            df = df.loc[start_time:end_time]
        else:
            df = df.iloc[start_time:end_time]
    ## Return the results of the query
    return df

You can see the options to run `query_stream` function by executing the code in the cell below.

In [None]:
print(query_stream.__doc__)

An example of using the query function is shown in the cell below. Execute this code and examine the result.

In [None]:
len(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500]))

> **Exercise 02-01:** Using the `query_stream` function, write and execute the code in the cell below to compute and display the mean `Stream_flow`for the month of April of 2020 of site 12484500. Use the [Pandas.DataFrame.mean](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.mean.html) method to compute the mean. Notice that using this approach we can compute most any statistic of interest on the query result.    

In [None]:
## Put your code below


## Plotting Streaming Data

Visualization is important tool in data exploration and discovery. Numerical stream data is ideal for visual exploration if it can be subsampled to manageable size.  

The function in the cell below creates a time series plot. The time index of the Pandas data frame is used to generate the x-axis values. Execute the code in this cell to load this function.

In [None]:
def plot_time_series(df, ax=None, ylabel='Stream flow', title=''):
    if ax==None: fig, ax = plt.subplots(figsize=(20, 6))
    df.plot(ax=ax);
    ax.set_xlabel('Date');
    ax.set_ylabel(ylabel);
    ax.set_title(title)
    plt.show()
    return ax

The code in the cell below creates time series plots of the stream flow data. The flow time series for two stream gages queried as arguments to the plot function. Execute this code and examine the results.

In [None]:
_=plot_time_series(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12447200]), title='Flow on Okanogan at Malott')
_=plot_time_series(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500]), title='Flow on Yakima at Cle Elm')

The time series of stream flow at both of these gages is rather complex. Both rivers have several dams used to control the flow. The flow is optimized to conserve fisheries and to supply agriculture in the Columbia River Basin. Water in reservoirs accumulates in the spring as mountain snow melts. The water is then released throughout the spring and summer.

But, what can we make of the noticeable spikes in flow, particularly for gage $12484500$ on the Yakima River. Even with the control provided by dams and reservoirs spring and early summer storm events can cause temporary increases in water flow. These storms bring heavy, and often warm, rain. Flow in the rivers increases not only because of the rainfall, but also since warm rain accelerates snow melt in the higher elevations.   

> **Exercise 02-02:** The transitory flow events on the Yakima River warrant some further investigation. You now have the tools to query and plot the stream flow time series. Your goal is to determine if there are common properties (e.g. duration or amplitude) of these events. Plot the results of a query for stream flows on gage $12484500$, Yakima River, from the 6th day of April to the 20th day of June, 2020.

In [None]:
## Put your code below


> Discuss any common pattern in terms of approximately common high amplitudes or durations of these flow events you can see. Note, this question is a bit open ended.      

> **Answer:**     

## Applying Moving Window Filters

Moving window filters are a commonly used method to compute statistical samples from streaming data.

Apply a moving window filter.

> **Exercise 02-03:** You will complete and test the function in the cell below. The function queries a time series to create overlapping windows of a specified length and stride. For each window the mean of the stream flow is computed. The function returns a [Pandas Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) object. The time index of the Series object is the mid-point index of the window used to compute the statistic.
> 1. Use a `for` loop to create the overlapping moving window samples of the input. The window will use the `length` and `stride` arguments to the function. At each iteration, the window will advance by `stride` time steps. See the [tutorial on the Python range function](https://www.w3schools.com/python/ref_func_range.asp) for help.    
>   - Query the input stream data for the stream flow values in the window. The `query_stream` function accepts integer indices for the `start_time` and `end_time` arguments. Make sure the these indicies are within the range of the original time series.  
>   - Append the mean of the stream flow values in the window to the `out` list.    
>   - Append the difference between the sample at index `half_length` and the sample mean to the `difference` list.     
>   - Append the time index of the midpoint of the window to the `idx` list.  
> 2. Once the loop has terminated, use the [Pandas.Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) constructor to instantiate the return series of both the mean filtered list and the differences list. Use the `idx` list as the idex for each of these series.   
> 3. Return both the moving average series and the difference series.      

In [None]:
def window_average(ts, length=16, stride=8, Columns='Stream_flow', site_numbers=[12484500]):
    half_length = int(length/2)-1
    idx = []
    out = []
    difference = []
    ## Put your code below








    return out, difference

Yakima = query_stream(stream_flow, site_numbers=[12484500])

filtered_12, Yakima_difference_12 = window_average(Yakima)

> 3. Next you will test your function by completing and executing the code in the cell below. Use your `window_average` function to create a Pandas Series with 4-hour stream flow averages (length of 16 time steps), taken every 2 hours (stride of 8 time steps). Name your return Series `filtered_12 and difference series `Yakima_difference_12`. Compute and print the length of the mean filtered series. The code provided queries the data so that you are working with only values from the Yakima River gage. Using flow rate values from only one gage simplifies the bookkeeping for window sampling.

In [None]:
## Query to create a series with only the Yakima River stream gage data.
Yakima = query_stream(stream_flow, site_numbers=[12484500])

filtered_12, Yakima_difference_12 = window_average(Yakima)
len(filtered_12)

> Notice how the length of the time series has been significantly reduced. Is the reduction in length of the time series consistent with a stride of 8 time steps?   
> **End of Exercise.**

> **Answer:**      

To examine the 4-hour moving average time series, the difference series, and the root mean squared error or difference between the two, execute the code in the cell below.

In [None]:
_=plot_time_series(filtered_12, title='Flow on Yakima at Cle Elm. 4-hour average')
_=plot_time_series(Yakima_difference_12, title='Difference with original time series')
print('RMS error = ' + str(np.std(Yakima_difference_12)))

Notice the following about the results above. The moving average smoother has had only minimal effect on the stream flow time series. The difference series shows only small values with respect to the range of values in the filtered series. Further, the root mean square (RMS) error is also small compared to the range of values of the filtered series.    

> **Exercise 02-4:** You will compute and display a time series using a longer, 1-day (96 time steps) moving window with a stride of 1/2 of a day (48 time steps). You will also perform evaluation of the resulting series. For this exercise, do the following:   
> 1. Query the Yakima River stream gage data with the longer time window and stride.
> 2. Use te `window_average` function you completed in te pervios exerciese to compute the smoothed series and the difference series.
> 4. Print the length of the resulting Pandas Series.
> 5. Plot the filtered (moving average smoothed) time series.
> 6. Plot the difference series.     
> 7. Compute and display the RMSE between the smoothed series and the original series.      

In [None]:
## Put your code below



_=plot_time_series(filtered_96, title='Flow on Yakima at Cle Elm, 1-day average')
_=plot_time_series(Yakima_difference_96, title='Difference with original time series')
print('RMS error = ' + str(np.std(Yakima_difference_96)))

> Answer the following questions:   
> 1. What is the data compression ratio with respect to the original stream flow series? Is this consistent with the stride of the window?   
> 2. Compare the plots of the results of the two moving window summaries. What are the obvious differences?
> 3. How do the errors compare between the two moving window summaries, and why is this expected?       
> 4. If the goal is to measure total volume of water passing the gage on a daily and weekly basis, does the series from the longer filter contain sufficient detail?

> **Answers:**     
> 1.        
> 2.      
> 3.         
> 4.     

## Exponential Decay Filters

The idea of using exponential smooth for time series analysis is an old one, dating at least to use by Weiner in the 1920s. The related idea of moving average filters was developed by Kolmogorov and Zurbenko in the 1940s. Exponential smoothers were used extensively in signal process in the 1940s. The general idea was expanded by Robert Goodell Brown (1956) and C.E. Holt (1957) and his student P.R. Winters (1960). The higher-order Holt-Winters model accounts for trend and seasonality of time series.

### Basic exponential Smoothing

Exponential smoothing uses a weighted sum of the current observation and the past smoothed value to compute a new smoothed value. This basic exponential smoothing relationship is shown below.  

$$
s_0 = x_0\\    
s_t = \alpha x_t + (1-\alpha) s_{t-1} = s_{t-1} + \alpha(x_t - s_{t-1}),\ t \gt 0
$$

The smoothing hyperparameter, $\alpha$, controls the trade-off between the last observation and the previous smoothed values. The possible values are in the range, $0 \le \alpha \le 1$. A large value of $\alpha$ puts more weight on the current observation. Whereas, a small value of $\alpha$ puts more weight on the smoothed history.      

How can we understand the exponential decay of the smoothed history of a time series? The smoothing hyperparameter, $\alpha$, an be expressed in terms of the decay constant, $\tau$ and time interval $\Delta T$ as shown below.  

$$
\alpha = 1 - e^{\big( \frac{- \Delta T}{\tau} \big)}
$$

From this relationship you can see that the influence of the smoothed history decays exponentially as $\delta T$ increases. The decay time increases as $\tau$ decreases.   

### Smoothing with higher-order terms   

The basic exponential smoothing algorithm is effective in many cases. However, the simple first order exponential smoothing method cannot accommodate time series with trend or seasonality. Higher order smoothing models are required.   

The **double exponential smoothing** or **Holt-Winters double exponential smoothing** algorithm is a second order smoothing method. Using two coupled difference equations a trend and non-seasonal component of the time series can be modeled. The model updates a smoothed measure of the non-seasonal component and the trend.   

The model is initialized with the values:   
$$
s_1 = x_1 \\
b_1 = x_2 - x_1
$$

At each time step the a pair of time difference equations are updated. The following relationships update the smoothed non-seasonal component, $s_t$, and the slope, $b_t$:   

$$
s_t = \alpha x_t + (1-\alpha) (s_{t-1} + b_{t-1}) \\
b_t = \beta(s_t - s_{t-1}) + (1 - \beta)b_{t-1}
$$

The smoothed non-seasonal component and smoothed slope can be used to compute a forecast. The relationship below computes the forecast $m$ time steps ahead.      

$$ F_{t+m} = s_t + m b_t $$   

What about seasonal components? A third-order difference relationship can updated a smoothed seasonal component, along with the smoothed non-seasonal and slope components. The details of this process are not discussed further here. The details are available elsewhere, including the [exponential smoothing Wikipedia page](https://en.wikipedia.org/wiki/Exponential_smoothing#:~:text=Exponential%20smoothing%20is%20a%20rule,exponentially%20decreasing%20weights%20over%20time.).  

### Example of Exponential Decay Filtering     

> **Exercise 02-5:** You will now create and test a single or simple exponentially weighted decay filter. This function will have a stride argument just as the window filter function. Your function, `exponential_smooth`, will have arguments of the time series, the exponential smoothing parameter and a stride. Your function will do the following:    
> 1. Initialize a list for the filtered samples with the first value of the input series. The samples list will contain the exponentially weighted smoothed samples. Make sure you save the first value in the list.   
>    a. Initialize empty lists for the output values, differences, and time index of the output values.   
>    b, Iterates over all the values of the time series starting with the second one. In this case a query is not used since for a live stream the exponential decay filter is updated each time a value arrives. In the loop update the exponentially weighted smoothed values using the first order smoothing algorithm.
>    c. If the loop index modulo the stride $= 0$ then, do the following:     
>           - Append the smothed sample value to the output list.     
>           - Compute and append the difference between the sample value at the index of the update and the updated smoothed value.     
>           - Append the time index of that sample to the index list.    
>    d. Create an output Pandas Series from the output list and the difference list using the output index list as the index of both series.   
>    e. Return both Pandas Series.   
> 8. Compute the value of $\alpha$ given $\tau = 24.0$.   
> 9. Execute your function for site number $12484500$ and arguments of the computed $\alpha$ and stride $=48$, or 12 hours.  
> 10. Print the length of the resulting series.     
> 11. Print the value of $\alpha$, given the time step and value of $\tau$.    
> 12. Pint the RMSE between the smothed series and the original values.    
> 13. Plot the smothed series and the difference series using the `plot_time_series` function.    

In [None]:
time_step = 15/60  # Step time in hours
stride = 48 # Stride in samples
tau = 24.0 # Decay time in hours
def compute_tau(alpha, time_step):
    '''Compute the value of tau given the time step and alpha'''
    return -time_step / math.log(1 - alpha)
def compute_alpha(tau, time_step):
    '''Compute the value of alpha given the time step and tau'''
    return 1 - math.exp(-time_step/tau)

def exponential_smooth(ts, alpha=0.01, stride=8):
    ## Put your code below









    return out, difference

alpha = compute_alpha(tau, time_step)
smoothed_24, difference_24 = exponential_smooth(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500]), alpha=alpha, stride=stride)

print(len(smoothed_24))
print('alpha = ' + str(round(alpha, 4)))
_=plot_time_series(smoothed_24, title='Flow on Yakima at Cle Elm, EWMA filter, alpha=' + str(round(alpha, 3)) + ', tau=' + str(tau))
_=plot_time_series(difference_24, title='Difference with original time series')
print('RMS error = ' + str(np.std(difference_24)))

> Provide short answers to the following questions:   
> 1. Are the number of smoothed samples correct for the stride of the exponential decay filter selected?     
> 2. How does the exponentailly smoothed series compare to the original series in terms of frequency components?      
> 3. Given the value of $\tau$ (in hours) what does this tell you about the decay length and smoothing of the filter?     
> 4. Examine the plot of the difference series. What do the frequency components of this difference series tell you about the smoothing?     
> 5. Given the range of values in the difference series and the RMSE is it reasonable to use the smoothed series if we are primarily interested in daily averages?          
> **End of exercise.**

> **Answers:**    
> 1.          
> 2.      
> 3.      
> 4.     
> 5.          

> **Exercise 02-6:** A question we should ask is what happens if you increase the smoothing constant of the exponential decay filter? In other words, what is the effect of giving greater weight to the most recent values? To find out do the following:  
> 1. Compute a value of alpha given $\tau = 1.0$, one hour.     
> 2. Execute the `exponential_smooth` function with arguments of the computed value of $\alpha$ and `stride=4`.
> 3. Print the length of the resulting Pandas Series.
> 4. Print the value of $\alpha$ computed.
> 5. Compute and print the RMSE of the difference series.   
> 6. Plot the smoothed time series using the `plot_time_series` function.   
> 7. Plot the difference series using the `plot_time_series` function.

In [None]:
## Put your code below




print(len(smoothed_01))
print('alpha = ' + str(round(alpha, 4)))
_=plot_time_series(smoothed_01, title='Flow on Yakima at Cle Elm, EWMA filter, alpha=' + str(round(alpha, 3)) + ', tau=' + str(tau))
_=plot_time_series(difference_01, title='Difference with original time series')
print('RMS error = ' + str(np.std(difference_01)))

> Compare the plot of this time series to the previous series less smoothing and provide short answers to the following questions.   
> 1. How do the lengths of the smoothed series compare and is this differnce consistent with the stride lengths?    
> 2. What is the main difference you can see between these smoothed series in terms of frequency components and why is this expected given the decay times?     
> 3. How can you compare the difference series in terms of frequency components and why is this expected given the decay times?
> 4. How do the difference values and RMSE values compare between two values of $\tau$ and why is ths expected?    
> **End of exercise.**

> **Answers:**   
> 1.      
> 2.     
> 3.     
> 4.     

> **Exercise 02-7:** So far, we have picked a stride that is about 1/2 the decay time, which is a typical choice. The next question to ask is what is the effect of using a much longer stride? A longer stride reduces the number of smoothed samples used for further processing. To find out repeate the calculations and plotting of exercise 2-5, but with stride $=12$.

In [None]:
## Put your code below



print(len(smoothed_24_96))
print('alpha = ' + str(round(alpha, 4)))
_=plot_time_series(smoothed_24_96, title='Flow on Yakima at Cle Elm, EWMA filter, alpha=0.01')
_=plot_time_series(difference_24_96, title='Difference with original time series')
print('RMS error = ' + str(np.std(difference_24_96)))

> Compare the result you just created with that from Exercise 2-5 and answer the following questions:
> 1. Is the error series or RMSE significantly different, or nearly the same?
> 2. Why do you think foregoing outcome is expected?        
> **End of exercise.**

> **Answers:**           
> 1.      
> 2.    

## Filtering Discrete Events  

In many cases our goal is to filter discreete events based on some type of identifier. The identifier is a hash of nearly any hashable data type. Many examples of these data types include, string customers identifiers, numeric event identifiers, IP addresses, email addresses.  

One example of such a method is the [**Bloom filter**](https://en.wikipedia.org/wiki/Bloom_filter). A Bloom filter is quite efficient in terms of computing and memory.  However, the Bloom filter does not allow for deleting matches once they are added to the hash table. An alternative is a **[quotient filter](https://en.wikipedia.org/wiki/Quotient_filter)**. The quotient filter keeps a count of events for each hash. As a result, an event identifier can be removed from the table by decrementing the counts for the hashes. To perform this extra operation the quotient filter uses more memory and is a bit less computationally efficient.   

Both Bloom filters and quotient filters operate by the same principle. A hash table of key-values pairs is created. The keys are the hashes of the event identifiers. For a Bloom filter, the value is binary, the event is in the table or it is not. The quotient filter on the other hand, maintains a count of events in each bucket. The count can be incremented as new instances are encountered, or decrimented to delete events.     

In the following exercises, you will construct a Bloom filter using a bit array and the Python [mmh3 package](https://github.com/hajimes/mmh3?tab=readme-ov-file). The mmh3 package implements the widely used [MurmurHash (MurmurHash3)](https://en.wikipedia.org/wiki/MurmurHash) non-cryptographic hash algorithm.                      

### Instantiate the identifier lists      

To start the example, lists of identifiers are created. There is one list of customer identifiers and one list of non-customers identifiers. Our goal is to construct a hash filter that will differentiate customers from non-customers.

The code in the cell below does the following:   
1. Defines the size of the hash table (dictionary) and a set of 3 even binary number hash keys. These three hash keys are used for inserts, deletes and look-ups in this example.     
2. Randomly generated lists of customer and non-customer ids are created. A multiplier is used to ensure that some IDs values are outside the range of the length of the hash table. The customer and non-customer identifiers are all integers.   
3. The non-customer list is filtered to ensure there are no identifiers common with the customer list.     
4. The lengths of the lists and a sample of the customer key values are printed.  

Execute the code in the cell below.

In [None]:
number_of_hashes = 1024
ID_multiplier = 39
number_customers = 100
number_not_customers = 200

nr.seed(4565)
customers = [int(ID_multiplier * number_of_hashes * i) for i in nr.uniform(size=number_customers)]
not_customers = [int(ID_multiplier * number_of_hashes * i) for i in nr.uniform(size=number_not_customers)]

## Ensure there are no common ids between customers and non-customers
del_list = []
for i in range(len(not_customers)):
    if not_customers[i] in customers: del_list.append(i)
for i in sorted(del_list, reverse=True):
    del not_customers[i]

## instantiate the empty hash dictionary
hash_dict = dict([(i,0) for i in range(number_of_hashes)])

print('Number of non-customers = ' + str(len(np.unique(not_customers))))
print('Number of customers = ' + str(len(np.unique(customers))))
print(customers[:20])

### Using the Murmur Hash

You will use the 32 bit Murmur hash function implemented in the Python mmh3 package to create a Bloom fliter. First we will investigate some basic properties of the Murmur hash. Specifically, we will investigate how uniform the hash values computed are by the following steps:   
1. Created a hash table of length 1024. The table is filled with hashes computed from random integers. Notice that the `mmh3.hash` function only accepts a string argument, requiring the integer to be wrapped in the `str()` function.
2. A histogram and cummulative density plot of the hash values are displayed.
3. The number of hash collisions is computed and displayed.    

In [None]:
filter_length = 1024
number_samples = 128
multiplier = 1019

## Compute random numbers and find the corresponding hash values
samples = [int(multiplier * filter_length * i) for i in nr.uniform(size=number_samples)]
hash_list = [mmh3.hash(str(i)) % filter_length for i in samples]

## Histogram of the hash values
fig, ax = plt.subplots(figsize=(20, 6))
counts, bins, _ = ax.hist(hash_list, bins=filter_length);
ax.set_ylabel('Count of keys');
ax.set_xlabel('Hash value');
ax.set_title('Count of keys vs. hash values');
ax.set_xlim(0.0, filter_length)
plt.show();

## Cumulative density plot of the hash values
fig, ax = plt.subplots(figsize=(6,6))
ax.plot(range(len(counts)), np.cumsum(counts))
ax.plot([0.0,len(counts)], [0.0,len(hash_list)], color='red');
ax.set_ylabel('Cumulative Sum');
ax.set_xlabel('Hash index');
ax.set_title('Cumulative sum of hash values');
ax.set_xlim(0.0, filter_length)
plt.show();

## Compute and display the number of hash collisions
print('Number of collisions = ' + str(int(sum([count - 1 if count>1 else 0 for count in counts]))))

> **Exercise 02-8:** Compare the plots shown above to the plots of the hash function using a prime number key created for Exercise 01-1. Is the Murmur hash function more or less optimal compared to the function tested in Exercise 01-1 and why?    

> **Answer:**     

### Create Bloom filter and insert function    

You will now instatiate a Bloom filter and create an insert function. As a first step, execute the code in the cell beolow to instantiate the Bloom filter as an empty bit array.     

In [None]:
filter_length = 1024

bloom_filter = bitarray(filter_length)
bloom_filter

> **Exercise 2-9:** You will now complete and test a function to insert an event identifier into the Bloom filter array. To complete the code in the cell below do the following:
> 1. Iterate over the hash keys. The key for mmh3 is just an integer.   
> 2. For the value, `x`, and integer key compute the hash. Make sure you take the modulo with the length of the Bloom filter.
> 3. Set the bit indexed by the hash value to 1.
> 4. Return the Bloom filter array.        
> Execute your code to add the customers to the hash table.

In [None]:
n_hashes = 3
def bloom_insert(x, bloom_filter, n_hashes=3):
    ## Put your code below



    return bloom_filter

for customer in customers:
    bloom_filter = bloom_insert(str(customer), bloom_filter, n_hashes=n_hashes)
bloom_filter

> Notice that most bits in the Bloom filter array are still 0, and only a small fraction of the bits set to 1.
>
> Next, you will complete the `bloom_query` function in the cell below. Recall that a query returns True only if all bits indexed by the hash values are 1. To complete the function, do the following:     
> 1. Set a counter variable to 0.     
> 2. Iterate over the number of hashes.     
> 3. Compute the hash for the input value, `x`, and integer hash key.    
> 4. If the bit value indexed for by the key equals 1, add 1 to the counter value.      
> 5. If the counter value equals the number of hashes, return True, and if not, return False.     
> Now, execute the code in the cell below to test your function and find the number of false positives.    

In [None]:
def bloom_query(x, bloom_filter, n_hashes=3):
    ## Put your code below




false_positives = 0
for customer in not_customers:
    if bloom_query(str(customer), bloom_filter, n_hashes=n_hashes): false_positives += 1
print('Total number of false positives = ' + str(false_positives))

> For a perfect (uniformly distributed) hash function the theoretical false positive rate can be computed by the following relationship:  > $$P(false\ positive) = \Big[1 - exp \big(\frac{- k n}{m} \big) \Big]^k$$    
> Where,
> - $k = $ number of hash functions.    
> - $m = $ length of the hash table.   
> - $n = $ number of identifiers in the hash table.
>  
> Now answer the following questions:     
> 1. Given this result, what is the emperical false positive rate?     
> 2. How does the observed false positive rate compare to the theoretical rate? You should create a function to compute the theoretical false positive rate, which you can use for the next exercise. Name your function `false_positive_rate` with arguments, number of events, length of the bit array, and number of hashes.      

> **Answers:**
> 1.      
> 2.      

In [None]:
def false_positive_rate(n, m, k):
    return (1 - math.exp(-k * n / m))**k
false_positive_rate(number_customers, filter_length, n_hashes)

> Answer 2 Continued;      

In [None]:
#m_list = [1e+14, 1e+1, 1e+16]
m_list = [2**12, 2**16, 2**24, 2**32]
k_list = [3, 5, 9, 15]
n = 10e+7

fig, ax = plt.subplots(figsize=(8,8))
for k in k_list:
    fp_list = [false_positive_rate(n, m, k) for m in m_list]
    ax.plot(m_list, fp_list, label=str(k) + ' hashes');
#    ax.plot(k_list, fp_list, label="{:.1E}".format(Decimal(m)) + ' bit array');
ax.set_yscale('log');
ax.set_ylabel('Log false positive rate');
ax.set_xlabel('Length of bit array');
ax.set_title('False postive rate vs. length of bit array for 10 million events');
ax.legend();

> Finally, answer these questions:    
> 3. What is the relationship between length of the bit array and false error rate, and why do you think this makes sense?     
> 4. How does increasing the number of hashes affect the false positive rates, and why do you think this makes sense?       
> **End of exercise.**

> **Answers:**     
> 3.      
> 4.     

## Applying the Quotient Filter

The quotient filter is an improvement on a Bloom filter, and uses a more compact hashing method. Like Bloom filters, quotient filters are not particularly difficult to implement. There are also several Python packages that support Bloom and quotient filters. For the following exercise you will use the [PyProbabilies](https://pyprobables.readthedocs.io/en/latest/index.html) package.     

> **Exercise 02-10:** In this exercise you will work with a quotient filter. To instantiate and add hashes for customer IDs do the following:   
> 1. Instantiate the quotient filter using the [QuotientFilter](https://pyprobables.readthedocs.io/en/latest/code.html#probables.QuotientFilter) funciton using default argument values. Name your filter object `quotient_filter`.      
> 2. To get a feel for the quotient filter created print the following attributes of the filter object, `quotient`, `remainder` and `num_elements`.     
> 3. Iterate over the customer list and add the customer identifier to the quotient filter using the `add` method. Note that the key argument must be of a string type.
> 4. Print the `elements_added` attribute of the quotient filter.
> Execute your code.   

In [None]:
## Put your code below.





for customer in customers:
    quotient_filter.add(str(customer))

print('Elements added = ' + str(quotient_filter.elements_added))

> Next, you will query the quotient filter with the identifiers in the not_customer list, by executing the code in the cell below.  

In [None]:
false_positives = 0
for customer in not_customers:
    if quotient_filter.check(str(customer)): false_positives += 1
print('Total number of false positives = ' + str(false_positives))

> Compare the false positive rates of the Bloom filter example of Exercise 2-9 and the quotient filter. Can you see that one filter has an advantage for this case and why?  

> **Answer:**     

## Count Min Sketch

The probabilistic count min sketch algorithm uses multiple hash functions in an efficient data structure to estimate

[Zipf's law](https://en.wikipedia.org/wiki/Zipf%27s_law) is generally considered a model for activity of online user activity, spot event attendance, and other human activities. One possible model for this behavior is the [Zipf-Mandelbrot distribution](https://en.wikipedia.org/wiki/Zipf%E2%80%93Mandelbrot_law), with the following **probability mass function**:   

$$f(k,N,\alpha,\beta)=\frac{1/(k + \beta)^\alpha}{H_{N,q,s}}$$
where:   
$k=$ rank in set of size $N$.       
$\alpha$ abd $\beta$ are parameters.     
$H_{N,q,s}=\sum^N_{k=1} 1/(k + \beta)^\alpha$



In [None]:
alpha = 1.0
beta = 2.7
zipf = [1/(i + beta)**alpha for i in range(len(customers))]
zipf = np.divide(zipf, np.sum(zipf))

fig, ax = plt.subplots(figsize=(6,5))
ax.plot(range(len(customers)), zipf)
ax.set_ylabel('Probability density')
ax.set_xlabel('Rank')
ax.set_title('Density of Zipf distribution')

Notice the rapid decay of this distribution with event identifier. This rapid decay of event frequency is often observed in many real-world phenomenon.    

Next, execute the code in the cell below to generate a data set using the Zipf-Mandelbrot distribution of events just computed.  

In [None]:
n_samples = int(10e+5)
stream = np.random.choice(customers, size=n_samples, p=zipf)
stream[:200]

Notice the random nautre of the indentifiers in the first 200 events in the stream.  

### Applying the Heavy Hitters Algorithm

A variation on the count-min-sketch algorithm is used to find the **heavy hitters**, or the event ids that occur most frequently. This approach allows one to find the counts of a pre-determined number of common events, using the small memory footprint of the count-min-sketch algorithm, but at a cost of some additional compuation for sorting and bookkeeping.  

> **Exercise 02-11:** You will now use the [probables.HeavyHitters](https://pyprobables.readthedocs.io/en/latest/code.html) function to find the 20 event identifiers with the highest event count. Do the following:
> 1. Conmpute the required width of the data structure to achieve an approximate 1% error rate using the following relationship:     
>    $$width = 2^{log_2(e/\epsilon)}$$       
>    To perform this calculation use the Python `math.log2` functiion and the `math.ceil` function to round up the next highest integer. Display the computed result
> 3. Compute the required depth to ensure a less than 1% probability, $\delta$, of breaching the error bound using the following relationship:
>    $$depth = ln(1/\delta)$$
>    Use the `math.ceil` function to round up to the next highest integer. Display the computed result.    
> 5. Instantiate a HeavyHitters object with the arguments computed above and `num_hitters = 20`.
> 6. Iterative over the events in the stream applying the `add` method for each event, with the num_els argument set to 1. Make sure you coerce the integer identifier to a string.  
> 7. Save the `heavy_hitters` attribute to a variable. The data structure for this attribute is a dictionary.         

In [None]:
## Put your code below



heavy_hitters = heavy_hitters.heavy_hitters

> 4. Execute the provided function, using the `heavy_hitters` attribute variable you have saved.   

In [None]:
def display_dist(hitters):
    heavy_hitters = np.flip(np.sort(list(hitters.values())))
    print('The ordered list of heavy hitters:')
    print(heavy_hitters)

    fig, ax = plt.subplots(figsize=(5,4))
    ax.plot([x + 1 for x in range(len(heavy_hitters))], heavy_hitters)
    ax.set_ylabel('Count')
    ax.set_xlabel('Ordered event')
    ax.set_title('Ordered event frequencies')

display_dist(heavy_hitters)

> Now answer the following questions.
> 1. Does the distribution of event counts for the top 20 identifiers appear to follopw the expected distribution.      
> 2. Compute the required memory for the count-min-hash data structure, assuming 4-byte (32 bit) counters.      
> 3. How does the memory required for the count-min-hash data structure compare to using a simple linear (or flat) table for the $10^5$ event identifiers, assuming 4-byte (32 bit) counters? Keeping in mind that $10^5$ event identifiers, what does your comparison mean for the memory efficiency of the count-min-sketch algorithm?       

> **Answers:**
> 1.      
> 2.      
> 3.     

## HyperLogLog Algorithm   

The HyperLogLog algorithm and its derivatives are fast and efficient methods for determining the cardinality of events in streams. The HyperLogLog uses the harmonic mean of a set of cardinality estimates. Each estimate is based on a hash-based sketch of cardinality.      

> **Exercise 02-12:** You will now work with the HyperLogLog algorithym using the Python [Datasketch package](https://ekzhu.com/datasketch/index.html). Do the following:      
> 1. Start with the typical value of $p=14$ (confunsingly precision by the creaters of the DataSketch package), resulting in the numnber of registers, $m = 2^{14}$.     
> 2. Compute and display the execpected error rate, $\epsilon$, given the value of $p$.      
> 3. Instantiate a HyperLogLog object using the value of $p$ perviously specified as the argument.      
> 4. Assuming that 1-byte registers are used for accoumulating the counts, compute and display the size of the HyperLogLog data structure. You can find the size of this data structure using the Python `len` function.       
> 5. Iterate over all the events in the stream applying the `update` method to the HyperLogLog object.      
> 6. Display the `count` attribute of the HyperLogLog object, which contains the estimated cardinality of the stream.                 

In [None]:
## Put your code below







> Now, answer these questions:     
> 1. Based on the estimated error and the estimted cardinality value computed, is the error within the expected range?   
> 2. How much memory would be required if you needed to find the cardinality of $2^{14}=16384$ streams in a large scale application?      

> **Answers:**
> 1.      
> 2.    

#### Copyright, 2021, 2022, 2023, 2024, Stephen F Elston. All rights reserved.