# Assignment 02     
## CSCI E-108
### Steve Elston


In this assignment you will work with some basic streaming analytic algorithms. To avoid the complexities of installing and setting up a real streaming analytics platform, you will work with stream flow data loaded from local files. Specifically in this assignment you will:    
1. Create and apply code to perform basic stream queries.    
2. Using stream queries and plots, explore the stream data.    
3. Use moving windows to compute moving averages and sub-sample a stream.    
4. Use exponential decay filters to compute moving averages and sub-sample a stream.    
5. Work with a Bloom and quotient filters to filter for customer identifiers on a list.
6. Use the count-min-sketch algorithm to find counts of unique event identifiers.
7. Use the HyperLogLog algorithm to find the cardinality of events in a stream.  

## Overview 

The United States Geological Survey (USGS) maintains over 13,500 stream flow gauges in the United States. Measurements from most of these gauges are recorded every 15 min and uploaded every 4 hours are [available for download](https://waterdata.usgs.gov/nwis/rt). Stream flow data are used as inputs for complex water management tasks for purposes such as agriculture, residential use and conservation. For this assignment you will work with the time series of measurements for two stream flow gauges on tributaries of the the Columbia River in the US State of Washington.     

To get started, execute the code in the cell below to import the packages you will need. 

> **Note:** You must pip install four packages to run the code in this notebook. You can perform the installation of these packages by uncommenting the shell commands shown in the cell below. Alternatively, you can also Conda install these packages.    
> 1. [Mmh3](https://github.com/hajimes/mmh3)      
> 2. [Bitarray](https://github.com/ilanschnell/bitarray)
> 3. [PyProbables](https://pyprobables.readthedocs.io/en/latest/index.html)
> 4. [Datasketch](https://ekzhu.com/datasketch/index.html)
> 5. [DataSketches](https://apache.github.io/datasketches-python/5.2.0/index.html)
>
> The three packages, PyProbables, Datasketch and Apache DataSketches have considerable overlap in the algorithms available. Of the three, Apache DataSketches is considered most suitable for enterprise scale deployments. For example, [DataSketches is integrated with BigQuery](https://github.com/apache/datasketches-bigquery) and [Apache SPARK](https://github.com/apache/datasketches-spark). Unfortunately, the DataSketches' Python API is incomplete and poorly documented. Therefore, we will use several sketch packages in these exercises.     

In [None]:
#!pip install mmh3
#!pip install bitarray 
#!pip install pyprobables
#!pip install datasketch
#!pip install datasketches

In [None]:
import pandas as pd
import numpy as np
import numpy.random as nr
import mmh3
from bitarray import bitarray
from probables import QuotientFilter, HeavyHitters, CountMinSketch
from datasketch import HyperLogLog
from datasketches import req_floats_sketch, tdigest_float

import math
import random
from collections import deque
import matplotlib.pyplot as plt
from decimal import Decimal
import sys

%matplotlib inline

## Loading the Dataset  

The next step is to load the stream gauge data. The code in the cell below loads the time series data for the first gauge. This gauge is sited on the Okanogan river.  

The code in the cell below does the following:  
1. Loads the data from a .csv file. 
2. Converts the time stamp column to an index of the Pandas data frame. 
3. Assigns human-understandable names to the columns.  
4. Returns just the first 4 columns of the data frame. 

Execute this code and examine the results.

In [None]:
def read_index_series(file_name):  
    '''Function to read time series data from a file.
    Argument is the path and filename.
    Returns a data frame with the file contents'''
    df = pd.read_csv(file_name, sep='\t')
    df.index = df.datetime
    df.drop('datetime', axis=1, inplace=True)
    df = df.iloc[:,:4]
    df.columns = ['Agency', 'Site_number', 'Time_zone', 'Stream_flow']
    return df.iloc[:,:4]

Malott = read_index_series('../data/12447200_Okanogan_at_Malott.txt')
Malott

The other time series is for a gauge on the Yakima River. Execute the code in the cell below and examine the result. 

In [None]:
CleElm = read_index_series('../data/12479500_Yakima_At_CleElm.txt')
CleElm

Since we really only want to work with one data frame. The code in the cell below merges the two time series and sorts them into time index order. Execute this code and examine the result, paying attention to the site number and the datetime index.  

In [None]:
stream_flow = pd.concat([Malott,CleElm]).sort_index()
stream_flow

## Querying Stream Data

Common stream data operations are often formulated as queries on the stream data. Many streaming data platforms use extensions of SQL for these queries.   

To keep things simple in this assignment we use a basic query function. The function shown in the cell below supports queries on the time series. This function calls a function that returns time slices of the stream data frame.       

In [None]:
def slice_dataframe(df, start_time=None, end_time=None):
    """
    Function returns a time slice out of a stream of data

    Args:
        df: data frame containing the stream data
        start_time: the starting time of the slice, can be in datetime form or an integer
        end_time: the ending time of the slice, can be in datetime form or an integer
    """
    ## Set the start and end times to the extremes if not specified
    if start_time==None: start_time = df.index[0]
    if end_time==None: end_time = df.index[df.shape[0]-1]
   
    ## Test if index is a string datetime or an integer
    ## use iloc method if an integer.
    ## A slice over the time range is created based on the index type. 
    if isinstance(df,pd.DataFrame):
        if isinstance(start_time, str):
            df = df.loc[start_time:end_time,:]
        else:     
            df = df.iloc[start_time:end_time,:]
    else: # Must be a Pandas series 
        if isinstance(start_time, str):
            df = df.loc[start_time:end_time]
        else:     
            df = df.iloc[start_time:end_time]
    return df


def query_stream(df, Columns=None, site_numbers=None, start_time=None, end_time=None):    
    '''
    Function to query the stream gage time series data. 
    Args:    
        df = the data frame containing the data.  
        Columns = a list of columns to return.   
        site_numbers = a list of gage site numbers to query data. 
        start_time = the start time of the returned data as datatime string or integer index.   
        end_time = the end time of the returned data as datatime string or integer index.
    Returns: DataFramew or Pandas series with datatime index and columns sepecified over range of 
             datetime (index) specified. 
    '''
    ## First set values for arguments set to Null  
    if Columns==None: Columns = df.columns
    if site_numbers==None: site_numbers = df.Site_number.unique()
    
    ## Select the sites
    df = df.loc[df.Site_number.isin(site_numbers), Columns]

    ## A slice over the time range is created based on the arguments. 
    df = slice_dataframe(df, start_time=start_time, end_time=end_time)
    ## Return the results of the query
    return df

query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500])

You can see the options to run `query_stream` function by executing the code in the cell below. 

An example of using the query function is shown in the cell below. Execute this code and examine the result. 

In [None]:
len(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500]))

> **Exercise 02-01:** Using the `query_stream` function, write and execute the code in the cell below to compute and display the mean `Stream_flow`for the month of April of 2020 of site 12484500. Use the [Pandas.DataFrame.mean](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.mean.html) method to compute the mean. Notice that using this approach we can compute most any statistic of interest on the query result.    

In [None]:
## Put your code below


## Plotting Streaming Data

Visualization is important tool in data exploration and discovery. Numerical stream data is ideal for visual exploration if it can be subsampled to manageable size.  

The function in the cell below creates a time series plot. The time index of the Pandas data frame is used to generate the x-axis values. Execute the code in this cell to load this function.

In [None]:
def plot_time_series(df, ax=None, ylabel='Stream flow', title=''): 
    if ax==None: fig, ax = plt.subplots(figsize=(20, 6))
    df.plot(ax=ax);
    ax.set_xlabel('Date');
    ax.set_ylabel(ylabel);
    ax.set_title(title)
    plt.show()
    return ax    

The code in the cell below creates time series plots of the stream flow data. The flow time series for two stream gauges queried as arguments to the plot function. Execute this code and examine the results. 

In [None]:
_=plot_time_series(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12447200]), title='Flow on Okanogan at Malott') 
_=plot_time_series(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500]), title='Flow on Yakima at Cle Elm')    

The time series of stream flow at both of these gauges is rather complex. Both rivers have several dams used to control the flow. The flow is optimized to conserve fisheries and to supply agriculture in the Columbia River Basin. Water in reservoirs accumulates in the spring as mountain snow melts. The water is then released throughout the spring and summer. 

But, what can we make of the noticeable spikes in flow, particularly for gauge $12484500$ on the Yakima River. Even with the control provided by dams and reservoirs spring and early summer storm events can cause temporary increases in water flow. These storms bring heavy, and often warm, rain. Flow in the rivers increases not only because of the rainfall, but also since warm rain accelerates snow melt in the higher elevations.   

> **Exercise 02-02:** The transitory flow events on the Yakima River warrant some further investigation. You now have the tools to query and plot the stream flow time series. Your goal is to determine if there are common properties (e.g. duration or amplitude) of these events. Plot the results of a query for stream flows on gauge $12484500$, Yakima River, from the 6th day of April to the 20th day of June, 2020. 

In [None]:
## Put your code below


> Discuss any common pattern in terms of approximately common high amplitudes or durations of these flow events you can see. Note, this question is a bit open ended.      

> **Answer:**               

## Applying Moving Window Filters

Moving window filters are a commonly used method to compute statistical samples from streaming data. 

Apply a moving window filter. 

> **Exercise 02-03:** You will complete and test the function in the cell below. The function queries a time series to create overlapping windows of a specified length and stride. For each window the mean of the stream flow is computed. The function returns a [Pandas Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) object. The time index of the Series object is the mid-point index of the window used to compute the statistic. 
> 1. Define empty lists for the output values and the time index of the output values.
> 2. Use the `window_sample` function, including arguments `length` and `stride`, to create a list of the samples over the series of windows. Notice, that for real streaming data, one would work with the continuous stream to create the window samples. Here we use a the Python [yield](https://docs.python.org/3/reference/expressions.html#yield-expressions) as [generator](https://wiki.python.org/moin/Generators) to simulate the stream of (overlapping) window values. Use the default values for `length` and `stride`. The yield declaration allows the function to emit one output at a time.     
> 3. Use a `for` loop to create the overlapping moving window samples of the input. The `window_sample` function provided to create a list of window samples using the `length` and `stride` arguments. At each iteration, the window will advance by `stride` time steps.
> 4. In a loop over the window samples do the following:
>    - Compute the mean of the window sample and append it to the appropriate list.
>    - Find the midpoint time index of the window and append it to the appropriate list.   
> 6. Once the loop has terminated, use the [Pandas.Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) constructor to instantiate the return series of the mean of the window samples and return this series.      

In [None]:
def window_sample(ts, 
                  length=16, 
                  stride=8): 
    for ix_end in range(length, len(ts), stride):
        ix_start = ix_end - length 
        yield slice_dataframe(ts, start_time=ix_start, end_time=ix_end)


def window_average(ts, length=16, stride=8, Columns='Stream_flow', site_numbers=[12484500]):
    half_length = int(length/2)-1

    ## Put your code below





> 3. Next you will test your function by completing and executing the code in the cell below. Use your `window_average` function to create a Pandas Series with 4-hour stream flow averages (length of 16 time steps), taken every 2 hours (stride of 8 time steps). Name your return Series `filtered_16. Compute and print the length of the mean filtered series. The code provided queries the data so that you are working with only values from the Yakima River gauge. Using flow rate values from only one gauge simplifies the bookkeeping for window sampling. 

In [None]:
## Query to create a series with only the Yakima River stream gauge data. 
Yakima = query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500])

filtered_16 = window_average(Yakima)
print(filtered_16.head())
len(filtered_16)

> Notice how the length of the time series has been significantly reduced. Is the reduction in length of the time series consistent with a stride of 8 time steps?   
> **End of Exercise.**

> **Answer:**               

To examine the 4-hour moving average time series, the difference series, and the root mean squared error or difference between the two, execute the code in the cell below. 

In [None]:
_=plot_time_series(filtered_16, title='Flow on Yakima at Cle Elm. 4-hour average')

Notice the following about the results above. The moving average smoother has had only minimal effect on the stream flow time series. The difference series shows only small values with respect to the range of values in the filtered series. Further, the root mean square error (RMSE) is also small compared to the range of values of the filtered series.    

> **Exercise 02-4:** You will compute and display a time series using a longer, 1-day (96 time steps) moving window with a stride of 1/2 of a day (48 time steps). You will also perform evaluation of the resulting series. For this exercise, do the following:   
> 1. Use the `window_average` function you completed in the previous exercise to compute the smoothed series. 
> 4. Print the length of the resulting Pandas Series.
> 5. Plot the filtered (moving average smoothed) time series.

In [None]:
## Put your code below




> Answer the following questions:   
> 1. What is the data compression ratio with respect to the original stream flow series? Is this consistent with the stride of the window?   
> 2. Compare the plots of the results of the two moving window summaries. What are the obvious differences?
> 3. If the goal is to measure total volume of water passing the gauge on a daily and weekly basis, does the series from the longer filter contain sufficient detail? 

> **Answers:**     
> 1.             
> 2.          
> 3.                    

## Reservoir Sampling   

At large scale, one needs to randomly sample from streams. Some of the many examples where working with a random sample of the streams is required include the following.     
1. An Internet security system analyzes the IP address of of Internet traffic to detect denial of service attacks. The system needs to compute various statistics on the IP address pair (origin and destination) streams. But, given the massive volume of Internet traffic, it is impossible to store much history of these streams.    
2. A large e-commerce company must analyze the click streams on it many web sites. These analytics must consider every click on the sites, not just clicks leading to purchases. It is impractical economically to store long histories of these click streams.

Could one use Poisson sampling to reduce the volume of data used for analysis? Let's do some analysis. For basic Poisson sampling an observation is included in the sample with probability $p$. Once we have seen $i$ samples in a stream our sample will be of size $p\ i$. As time goes on the memory requirement, $m(t)$, continues to increase.   
$$m(t) = p\ i(t) \rightarrow \infty\ as\ t \rightarrow \infty$$
It is clear that Poisson sampling is not an option for infinite streams of data! We need another approach.   

Reservoir sampling maintains a buffer of constant size $k$. The first $k$ sample to arrived are placed in the buffer. When the $i$th sample arrives a random integer $r(i) \sim Unif(0,i)$ is generated. If $r(i) \le k$, the $r$th sample of the buffer is replaced by the new sample. The probability of the $i$th sample being included in the buffer is:   
$$p(s(i)) = \frac{k}{i+1}$$     

The code in the cell below computes and displays the size of the required memory and the probability of a sample being included are computed and displayed as a function of $i$. Execute this code and examine the result.    

In [None]:
N_samples = 1000
weight = 1.0
k=100
p=0.01

porbability =  lambda i: k * weight / (i + 1) if i > k  else 1
length = lambda i: k if i > k else i

i_vector = range(N_samples)
probabilities = [porbability(i) for i in i_vector]
lengths_resivoir = [length(i) for i in i_vector]
lengths_poisson = [i for i in i_vector]

_, ax = plt.subplots(1,2, figsize=(10,4))
ax[0].plot(i_vector, lengths_resivoir, label='Reservoir Sampling');
ax[0].plot(i_vector, lengths_poisson, label='Poisson Sampling');
ax[0].legend()
ax[0].set_xlabel('Number of observations seen');
ax[0].set_ylabel('Length of sample vector');
ax[0].set_title('Length of sample vs. samples observed');


ax[1].plot(i_vector, probabilities, label='Reservoir Sampling');
ax[1].plot(i_vector, [p]*len(i_vector), label='Poission Sampling');
ax[1].legend()
ax[1].set_yscale('log');
ax[1].set_xlabel('Number of observations seen');
ax[1].set_ylabel('Log probability of inclusion in buffer');
ax[1].set_title('Log probabiliy of inclusion vs. samples observed');

Examine these graphs and notice the following, noticing that the probability graph uses a log vertical scale:     
- The memory required for Poisson sampling grows linearly with the number of arriving samples.
- The memory required for reservoir sampling increases linearly until the buffer is filled and then is constant.
- The probability of a sample being included is constant for Poisson sampling.
- The probability of a sample being included is constant until the buffer fills and then decreases linearly (exponential on log scale).

The foregoing observations show that both sampling methods result in random samples. Reservoir sampling maintains a random sample with fixed memory per stream.      

### An Efficient Algorithm

The basic reservoir sampling algorithm is simple, but not very efficient. A random number needs to be generated for each arriving sample. Random number generation is an expensive process. This simple algorithm scales poorly large numbers of streams and high arrival rates.    

A better approach can be obtained by considering the [**geometric probability distribution**](https://en.wikipedia.org/wiki/Geometric_distribution) of the interval between samples. First we generate a random number with the required distribution: 

$$W = exp \Bigg( \frac{log \big(Unif(0,1) \big)}{k}   \Bigg)$$
Now the index of the next sample to be placed in the reservoir is sampled:   
$$i = i + floor \Bigg( log \Big( \frac{Unif(0,1) }{1-W)} \Big) \Bigg) + 1$$

Note that when using Python with zero-based indexing, the $+1$ term should be dropped.  

The last step is to find the index of the sample replaced in the reservoir from a uniform distribution, $r = Unif(1,k)$. Here $Unif(x,y)$ is the uniform distribution on the interval $[x,y]$ and $floor$ is a function that rounds down to the next lowest integer.       

Comparing the foregoing algorithm to the initial simple approach we see that the efficient algorithm needs to compute three random numbers each time a sample is added to the reservoir. However, the three random number need only be computed for samples that are actually placed in the reservoir. Given that the probability of a sample being placed in the reservoir decreases linearly with the number of samples seen in the stream, the number of random numbers generated continues to decrease as the sampling progresses.   

> **Exercise 2-5:** The code in the cell below implements an efficient version of the reservoir sampling algorithm by the following steps.
> 1. The reservoir is filled with the first $k$ samples from the stream.
> 2. For the rest of the stream an index for the next sample is computed by a random draw from a Geometric distribution.
> 3. Each time a sample is to be added to the reservoir a random draw of an index which determines which sample in the reservoir is replaced.
>
> Execute this function with the `print_test` and `print_updates` arguments set to `True`.          

In [None]:
def reservoir_sampling(stream, k, analytic_function=None, print_test=False, print_updates=False): 
    """
    Performs reservoir sampling on a stream of data.

    Args:
        stream: iterable data stream
        k: size of the reservoir
        analytic_function: A function that operates on the samples in the reservoir each time a sample is added
        print_test: produces voluminous test output when set to True. No analytic results are returned    
        print_updates: print number of buffer updates performed on a stream if set to True
    """
    reservoir_buffer = []
    out_list = []
    out_index = []
    n = len(stream)
    n_updates = 0

    ## First, fill the buffer with the first k observations 
    for i in range(k):
        reservoir_buffer.append(stream.iloc[i])
        if print_test: print(reservoir_buffer)
        if analytic_function != None: 
            n_updates += 1
            out_list.append(analytic_function(reservoir_buffer))
            out_index.append(i)

    ## Draw a random number from the distribution  
    W = math.exp(math.log(random.random())/k)    
    ## Loop over the remaining samples in the stream
    while i < n:
        i = i + math.floor(math.log(random.random())/math.log(1 - W)) # + 1
        if i < n:
            indx = random.randint(1, k) -1 
            reservoir_buffer[indx] = stream.iloc[i]
            W = W * math.exp(math.log(random.random())/k)  
            n_updates += 1
            if print_test: 
                print('For sample ' + str(i) + '  with buffer element replaced = ' + str(indx + 1))
                print(reservoir_buffer)
            if analytic_function != None: 
                out_list.append(analytic_function(reservoir_buffer))
                out_index.append(i)
    if print_updates: 
        print('Number of samples = '+ str(len(stream)))
        print('Number of buffer updates = ' + str(n_updates))
    if analytic_function != None: return pd.Series(out_list, index=out_index)

k = 8
Yakima = query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500], start_time='2020-04-06 00:00', end_time='2020-04-07 11:59')
random.seed(345)
reservoir_sampling(Yakima, k, print_test=True, print_updates=True)

> Examine these results and answer the following questions:      
> 1. Do the indices of the samples selected appear to be a sequence with an increasing interval? Keep in mind that for small $n$ (number of samples seen) the Geometric distribution has a high probability of sampling a 0 interval.    
> 2. Do the samples replaced in the reservoir buffer appeared to have uniformly random indices?    
> 3. Compare the number of samples added to the reservoir buffer to the number of samples in the stream. What does this comparison tell you about the efficiency of this algorithm, which requires three random number draws, to the simple algorithm that requires a random number draw for each arriving sample?      

> **Answers:**       
> 1.        
> 2.            
> 3.               

> The code in the cell below does the following:
> 1. Performs a query for the entire Yakima river flow time series.
> 2. Performs reservoir sampling on the stream and computes and returns the mean of the sample each time a sample is added to the reservoir buffer.
>
> Execute this code, noting the number of samples taken and the execution time.  

In [None]:
k=64
Yakima = query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500])
random.seed(121)
%time analytic_series = reservoir_sampling(Yakima, k, lambda x: np.mean(x), print_updates=True)

> The code in the cell below computes the mean statistic on the entire expanding window of the stream samples. Execute this code noting the execution time. 

In [None]:
def compute_full_analytic(stream, index, analytic_function):
    out_list = []
    for i in index:
        out_list.append(analytic_function(stream[:i]))
    return pd.Series(out_list, index = index)

%time full_analytic = compute_full_analytic(Yakima, analytic_series.index, lambda x: np.mean(x))

> Finally, to compare the mean statistic computed from the reservoir sample to the statistic computed with the cumulative sample, execute the code in the cell below.   

In [None]:
_,ax=plt.subplots(figsize=(8,6))
ax.scatter(analytic_series.index, analytic_series, label='Resivoir sample');
ax.plot(analytic_series.index, full_analytic, lw=2, c='orange', label='Full sample');
ax.set_title('Mean estimate vs. stream sample index');
ax.set_xlabel('Stream sample index');
ax.set_ylabel('Mean estimate from reservoir buffer');
ax.legend();

> Examine the foregoing results and answer these questions:       
> 4. How can you account for the for the difference in time required to compute the mean using reservoir sampling versus using the full samples in the expanding window?      
> 5. Compare the full sample mean estimates and the mean estimates from the reservoir samples shown in the plots. How would you characterize the reservoir sampling approximation, keeping in mind that we are using a very small buffer with $k=64$ and the values in the stream are not stationary (have a trend)?     

> **Answers:**    
> 4.             
> 5.              

## Exponential Decay Filters

The idea of using exponential smooth for time series analysis is an old one, dating at least to use by Weiner in the 1920s. The related idea of moving average filters was developed by Kolmogorov and Zurbenko in the 1940s. Exponential smoothers were used extensively in signal process in the 1940s. The general idea was expanded by Robert Goodell Brown (1956) and C.E. Holt (1957) and his student P.R. Winters (1960). The higher-order Holt-Winters model accounts for trend and seasonality of time series.

### Basic exponential Smoothing

Exponential smoothing uses a weighted sum of the current observation and the past smoothed value to compute a new smoothed value. This basic exponential smoothing relationship is shown below.  

$$
s_0 = x_0\\    
s_t = \alpha x_t + (1-\alpha) s_{t-1} = s_{t-1} + \alpha(x_t - s_{t-1}),\ t \gt 0
$$

The smoothing hyperparameter, $\alpha$, controls the trade-off between the last observation and the previous smoothed values. The possible values are in the range, $0 \le \alpha \le 1$. A large value of $\alpha$ puts more weight on the current observation. Whereas, a small value of $\alpha$ puts more weight on the smoothed history.      

How can we understand the exponential decay of the smoothed history of a time series? The smoothing hyperparameter, $\alpha$, an be expressed in terms of the decay constant, $\tau$ and time interval $\Delta T$ as shown below.  

$$
\alpha = 1 - e^{\big( \frac{- \Delta T}{\tau} \big)}
$$

From this relationship you can see that the influence of the smoothed history decays exponentially as $\delta T$ increases. The decay time increases as $\tau$ decreases.   

### Smoothing with higher-order terms   

The basic exponential smoothing algorithm is effective in many cases. However, the simple first order exponential smoothing method cannot accommodate time series with trend or seasonality. Higher order smoothing models are required.   

The **double exponential smoothing** or **Holt-Winters double exponential smoothing** algorithm is a second order smoothing method. Using two coupled difference equations a trend and non-seasonal component of the time series can be modeled. The model updates a smoothed measure of the non-seasonal component and the trend.   

The model is initialized with the values:   
$$
s_1 = x_1 \\
b_1 = x_2 - x_1
$$

At each time step the a pair of time difference equations are updated. The following relationships update the smoothed non-seasonal component, $s_t$, and the slope, $b_t$:   

$$
s_t = \alpha x_t + (1-\alpha) (s_{t-1} + b_{t-1}) \\
b_t = \beta(s_t - s_{t-1}) + (1 - \beta)b_{t-1}
$$

The smoothed non-seasonal component and smoothed slope can be used to compute a forecast. The relationship below computes the forecast $m$ time steps ahead.      

$$ F_{t+m} = s_t + m b_t $$   

What about seasonal components? A third-order difference relationship can updated a smoothed seasonal component, along with the smoothed non-seasonal and slope components. The details of this process are not discussed further here. The details are available elsewhere, including the [exponential smoothing Wikipedia page](https://en.wikipedia.org/wiki/Exponential_smoothing#:~:text=Exponential%20smoothing%20is%20a%20rule,exponentially%20decreasing%20weights%20over%20time.).  

### Example of Exponential Decay Filtering     

> **Exercise 02-6:** You will now create and test a single or simple exponentially weighted decay filter. This function will have a stride argument just as the window filter function. Your function, `exponential_smooth`, will have arguments of the time series, the exponential smoothing parameter and a stride. In this case the stride $=48$ and $\tau=24.0$. Your function will do the following:    
> 1. Initialize a list for the filtered samples with the first value of the input series. The samples list will contain the exponentially weighted smoothed samples. Make sure you save the first value in the list.   
>    a. Initialize empty lists for the output values and time index of the output values.   
>    b, Iterates over all the values of the time series starting with the second one. The first observation is the intial value. In the loop update the exponentially weighted smoothed values using the first order smoothing algorithm.
>    c. If the loop index modulo the stride $= 0$ then, do the following:     
>           - Append the smoothed sample value to the output list.       
>           - Append the time index of that sample to the index list.    
>    d. Create an output Pandas Series from the output list using the output index list as the index of the series.   
>    e. Return the Pandas Series.   
> 8. Compute and print the value of $\alpha$ given $\tau = 24.0$.   
> 9. Execute your function for site number $12484500$ and arguments of the computed $\alpha$ and stride $=48$, or 12 hours.  
> 10. Print the length of the resulting series.     
> 11. Print the value of $\alpha$, given the time step and value of $\tau$.    
> 13. Plot the smoothed series using the `plot_time_series` function.    

In [None]:
time_step = 15/60  # Step time in hours 
stride = 48 # Stride in samples
tau = 24.0 # Decay time in hours 
def compute_tau(alpha, time_step): 
    '''Compute the value of tau given the time step and alpha'''
    return -time_step / math.log(1 - alpha)
def compute_alpha(tau, time_step):  
    '''Compute the value of alpha given the time step and tau'''
    return 1 - math.exp(-time_step/tau)

def exponential_smooth(ts, alpha=0.01, stride=8):
    ## Put your code below  
    st = ts.iloc[0]
    idx = []
    out = []
    difference = []
    ## Put your code below
    

    
    out = pd.Series(out, index=idx)     
    return out      

alpha = compute_alpha(tau, time_step)
smoothed_24 = exponential_smooth(query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500]), alpha=alpha, stride=stride)

print('The resulting series length = ' + str(len(smoothed_24)))
print('alpha = ' + str(round(alpha, 4)))
_=plot_time_series(smoothed_24, title='Flow on Yakima at Cle Elm, EWMA filter, alpha=' + str(round(alpha, 3)) + ', tau=' + str(tau))

> Provide short answers to the following questions:   
> 1. Are the number of smoothed samples correct for the stride of the exponential decay filter selected?     
> 2. How does the exponentially smoothed series compare to the original series in terms of frequency components?      
> 3. Given the value of $\tau$ (in hours) what does this tell you about the decay length and smoothing of the filter?     
> **End of exercise.** 

> **Answers:**    
> 1.           
> 2.      
> 3.           

> **Exercise 02-7:** A question we should ask is what happens if you increase the smoothing constant of the exponential decay filter? In other words, what is the effect of giving greater weight to the most recent values? To find out do the following:  
> 1. Compute a value of alpha given $\tau = 1.0$, one hour.     
> 2. Execute the `exponential_smooth` function with arguments of the computed value of $\alpha$ and `stride=4`.
> 3. Print the length of the resulting Pandas Series.
> 4. Print the value of $\alpha$ computed.
> 5. Plot the smoothed time series using the `plot_time_series` function.   

In [None]:
## Put your code below





> Compare the plot of this time series to the previous series less smoothing and provide short answers to the following questions.   
> 1. How do the lengths of the smoothed series compare and is this difference consistent with the stride lengths?    
> 2. What is the main difference you can see between these smoothed series in terms of frequency components and why is this expected given the decay times?     
> **End of exercise.**

> **Answers:**   
> 1.        
> 2.             

> **Exercise 02-8:** So far, we have picked a stride that is about 1/2 the decay time, which is a typical choice. The next question to ask is what is the effect of using a longer stride? A longer stride reduces the number of smoothed samples used for further processing. To find out, repeat the calculations and plotting of exercise 2-07, but with stride $=96$ and $\tau = 24.0$. 

In [None]:
tau = 24.0   
stride = 96

## Put your code below



print('Thw length of the resulting series = '  + str(len(smoothed_24_96)))
print('alpha = ' + str(round(alpha, 4)))
_=plot_time_series(smoothed_24_96, title='Flow on Yakima at Cle Elm, EWMA filter, alpha=0.01')

> Compare the result you just created with that from Exercise 2-7 and answer the following questions:
> 1. Is the filter series significantly different, or nearly the same as the series with shorter stride?
> 2. Why do you think foregoing outcome is expected?        
> **End of exercise.**

> **Answers:**           
> 1.         
> 2.              

## Filtering Discrete Events  

In many cases our goal is to filter discrete events based on some type of identifier. The identifier is a hash of nearly any hashable data type. Many examples of these data types include, string customers identifiers, numeric event identifiers, IP addresses, email addresses.  

One example of such a method is the [**Bloom filter**](https://en.wikipedia.org/wiki/Bloom_filter). A Bloom filter is quite efficient in terms of computing and memory.  However, the Bloom filter does not allow for deleting matches once they are added to the hash table. An alternative is a **[quotient filter](https://en.wikipedia.org/wiki/Quotient_filter)**. The quotient filter keeps a count of events for each hash. As a result, an event identifier can be removed from the table by decrementing the counts for the hashes. To perform this extra operation the quotient filter uses more memory and is a bit less computationally efficient.   

Both Bloom filters and quotient filters operate by the same principle. A hash table of key-values pairs is created. The keys are the hashes of the event identifiers. For a Bloom filter, the value is binary, the event is in the table or it is not. The quotient filter on the other hand, maintains a count of events in each bucket. The count can be incremented as new instances are encountered, or decremented when deleting events.     

In the following exercises, you will construct a Bloom filter using a bit array and the Python [mmh3 package](https://github.com/hajimes/mmh3?tab=readme-ov-file). The mmh3 package implements the widely used [MurmurHash (MurmurHash3)](https://en.wikipedia.org/wiki/MurmurHash) non-cryptographic hash algorithm.                      

### Instantiate the identifier lists      

To start the example, we generate lists of random customer identifiers (IDs) for customers and non-customers by the following steps.     
1. Initialize the arguments for the generate of the customer ID lists.  
2. Randomly generated lists of customer and non-customer ids are created. A multiplier is used to ensure that some IDs values are outside the range of the length of the hash table. The customer and non-customer identifiers are all integers.   
3. The non-customer list is filtered to ensure there are no identifiers common with the customer list.     
4. The lengths of the lists and a sample of the customer key values are printed.  

Execute the code in the cell below.

In [None]:
number_of_hashes = 1024
ID_multiplier = 39
number_customers = 100 
number_not_customers = 200

nr.seed(4565)
customers = [int(ID_multiplier * number_of_hashes * i) for i in nr.uniform(size=number_customers)]
not_customers = [int(ID_multiplier * number_of_hashes * i) for i in nr.uniform(size=number_not_customers)]

## Ensure there are no common ids between customers and non-customers
del_list = []
for i in range(len(not_customers)): 
    if not_customers[i] in customers: del_list.append(i)
for i in sorted(del_list, reverse=True):
    del not_customers[i]

print('Number of non-customers = ' + str(len(np.unique(not_customers))))
print('Number of customers = ' + str(len(np.unique(customers))))
print(customers[:20])

### Using the Murmur Hash

You will use the 32 bit Murmur hash function implemented in the Python mmh3 package to create a Bloom filter. First we will investigate some basic properties of the Murmur hash. Specifically, we will investigate how uniform the hash values computed are by the following steps:   
1. Created a hash table of length 1024. The table is filled with hashes computed from random integers. Notice that the `mmh3.hash` function only accepts a string argument, requiring the integer to be wrapped in the `str()` function.
2. A histogram and cumulative density plot of the hash values are displayed.
3. The number of hash collisions is computed and displayed.    

In [None]:
filter_length = 1024
number_samples = 128 
multiplier = 1019

## Compute random numbers and find the corresponding hash values   
samples = [int(multiplier * filter_length * i) for i in nr.uniform(size=number_samples)]
hash_list = [mmh3.hash(str(i)) % filter_length for i in samples]

## Histogram of the hash values
fig, ax = plt.subplots(figsize=(20, 6))
counts, bins, _ = ax.hist(hash_list, bins=filter_length);
ax.set_ylabel('Count of keys');
ax.set_xlabel('Hash value');
ax.set_title('Count of keys vs. hash values');
ax.set_xlim(0.0, filter_length)
plt.show();

## Cumulative density plot of the hash values
fig, ax = plt.subplots(figsize=(6,6))
ax.plot(range(len(counts)), np.cumsum(counts))
ax.plot([0.0,len(counts)], [0.0,len(hash_list)], color='red');
ax.set_ylabel('Cumulative Sum');
ax.set_xlabel('Hash index');
ax.set_title('Cumulative sum of hash values');
ax.set_xlim(0.0, filter_length)
plt.show();  

## Compute and display the number of hash collisions
print('Number of collisions = ' + str(int(sum([count - 1 if count>1 else 0 for count in counts]))))

> **Exercise 02-9:** Compare the plots shown above to the plots of the hash function using a prime number key created for Exercise 01-1. Is the Murmur hash function more or less optimal compared to the function tested in Exercise 1-1 and why?    

> **Answer:**             

### Create Bloom filter and insert function    

You will now instatiate a Bloom filter and create an insert function. As a first step, execute the code in the cell below to instantiate the Bloom filter as an empty bit array.     

In [None]:
filter_length = 1024
bloom_filter = bitarray([0]*filter_length)
bloom_filter

> **Exercise 2-10:** You will now complete and test a function to insert an event identifier into the Bloom filter array. To complete the code in the cell below do the following: 
> 1. Iterate over three hash seeds to create three independent hashes for each key, `x`. The seed is the second argument for the [mmh3.hash function](https://mmh3.readthedocs.io/en/latest/api.html) and is just an integer. Any three integers will work for this case.   
> 2. For each seed compute the hash value and take the modulo with the length of the Bloom filter.
> 3. Set the bit of the Bloom filter bit array indexed by the hash value to 1.
> 4. Return the Bloom filter array.        
> Execute your code to add the customers to the hash table. 

In [None]:
n_hashes = 3
def bloom_insert(x, bloom_filter, n_hashes=3):
    ## Put your code below
    

    
    return bloom_filter    

for customer in customers:  
    bloom_filter = bloom_insert(str(customer), bloom_filter, n_hashes=n_hashes)
bloom_filter    

> Notice that most bits in the Bloom filter array are still 0, and only a small fraction of the bits set to 1.
>
> Next, you will complete the `bloom_query` function in the cell below. Recall that a query returns True only if all bits indexed by the hash values are 1. To complete the function, do the following:     
> 1. Set a counter variable to 0.     
> 2. Iterate over the number of hashes.     
> 3. Compute the hash for the input value, `x`, and integer hash key.    
> 4. If the bit value indexed for by the key equals 1, add 1 to the counter value.      
> 5. If the counter value equals the number of hashes, return True, and if not, return False.     
> Now, execute the code in the cell below to test your function and find the number of false positives.    

In [None]:
def bloom_query(x, bloom_filter, n_hashes=3):   
    ## Put your code below  
    
    

false_positives = 0 
for customer in not_customers:  
    if bloom_query(str(customer), bloom_filter, n_hashes=n_hashes): false_positives += 1
print('Total number of false positives = ' + str(false_positives))        

> For a perfect (uniformly distributed) hash function the theoretical false positive rate can be computed by the following relationship:  > $$P(false\ positive) = \Big[1 - exp \big(\frac{- k n}{m} \big) \Big]^k$$    
> Where, 
> - $k = $ number of hash functions.    
> - $m = $ length of the hash table.   
> - $n = $ number of identifiers in the hash table.
>  
> Now answer the following questions:     
> 1. Given this result, what is the empirical false positive rate?     
> 2. How does the observed false positive rate compare to the theoretical rate? You should create a function to compute the theoretical false positive rate, which you can use for the next exercise. Name your function `false_positive_rate` with arguments, number of events, length of the bit array, and number of hashes.      

> **Answers:** 
> 1.        
> 2.         

In [None]:
def false_positive_rate(n, m, k):
    return (1 - math.exp(-k * n / m))**k 
false_positive_rate(number_customers, filter_length, n_hashes)

> Answer 2 Continued; 

> Next, execute the code in the cell below that uses your `false_positive_rate` function to plot log false positive rates for a number of filter configurations.

In [None]:
m_list = [2**12, 2**16, 2**24, 2**32]
k_list = [2**2, 2**4, 2**6, 2**8, 2**10]
n = 10e+7

fig, ax = plt.subplots(figsize=(8,8))
for k in k_list:    
    fp_list = [false_positive_rate(n, m, k) for m in m_list]
    ax.plot(m_list, fp_list, label=str(k) + ' hashes');
#    ax.plot(k_list, fp_list, label="{:.1E}".format(Decimal(m)) + ' bit array');
ax.set_yscale('log');
ax.set_ylabel('Log false positive rate');
ax.set_xlabel('Length of bit array');
ax.set_title('False postive rate vs. length of bit array for 10 million events');
ax.legend();

> Finally, answer these questions:    
> 3. What is the relationship between length of the bit array and false error rate, and why do you think this makes sense?     
> 4. How does increasing the number of hashes affect the false positive rates, and why do you think this makes sense?       
> 5. Describe how you can merge two Bloom filters.       
> **End of exercise.**

> **Answers:**     
> 3.          
> 4.        
> 5.             

## Applying the Quotient Filter

The quotient filter is an improvement on a Bloom filter, and uses a more compact hashing method. Like Bloom filters, quotient filters are not particularly difficult to implement. There are also several Python packages that support Bloom and quotient filters. For the following exercise you will use the [PyProbabilies](https://pyprobables.readthedocs.io/en/latest/index.html) package.     

> **Exercise 02-11:** In this exercise you will work with a quotient filter. To instantiate and add hashes for customer IDs do the following:   
> 1. Instantiate the quotient filter using the [QuotientFilter](https://pyprobables.readthedocs.io/en/latest/code.html#probables.QuotientFilter) function using default argument values. Name your filter object `quotient_filter`.      
> 2. To get a feel for the quotient filter created print the following attributes of the filter object, `quotient`, `remainder` and `num_elements`.     
> 3. Iterate over the customer list and add the customer identifier to the quotient filter using the `add` method. Note that the key argument must be of a string type.
> 4. Print the `elements_added` attribute of the quotient filter.
> Execute your code.   

In [None]:
## Put your code below.   




> Next, you will query the quotient filter with the identifiers in the not_customer list, by executing the code in the cell below.  

In [None]:
false_positives = 0 
for customer in not_customers:  
    if quotient_filter.check(str(customer)): false_positives += 1
print('Total number of false positives = ' + str(false_positives))        

> Compare the false positive rates of the Bloom filter example of Exercise 2-9 and the quotient filter. Can you see that one filter has an advantage for this case and why?  

> **Answer:**              

## Counting Events with 

Counting discrete events is a fundamental algorithm for stream processing. Naive algorithms require large amounts of memory to maintain a table of counts for each event identifier. Instead, we use sketch algorithms, in particular count-min-sketch.

[Zipf's law](https://en.wikipedia.org/wiki/Zipf%27s_law) is generally considered a model for activity of online user activity, sport event attendance, and other human activities. It is also thought that the frequency of occurrence of words in strings. One possible model for this behavior is the [Zipf-Mandelbrot distribution](https://en.wikipedia.org/wiki/Zipf%E2%80%93Mandelbrot_law), with the following **probability mass function**:   

$$f(k,N,\alpha,\beta)=\frac{1/(k + \beta)^\alpha}{H_{N,q,s}}$$
where:   
$k=$ rank in set of size $N$.       
$\alpha$ abd $\beta$ are parameters.     
$H_{N,q,s}=\sum^N_{k=1} 1/(k + \beta)^\alpha$

Execute the code in the cell below to sample from the Zipf-Mandelbrot distribution and display a density plot.  

In [None]:
alpha = 1.0
beta = 2.7
zipf = [1/(i + beta)**alpha for i in range(len(customers))]
zipf = np.divide(zipf, np.sum(zipf))

fig, ax = plt.subplots(figsize=(6,5))
ax.plot(range(len(customers)), zipf)
ax.set_ylabel('Probability density')
ax.set_xlabel('Rank')
ax.set_title('Density of Zipf distribution')

Notice the rapid decay of this distribution with event identifier. This rapid decay of event frequency is often observed in many real-world phenomenon.    

Next, execute the code in the cell below to generate a data set of identifiers drawn from the Zipf-Mandelbrot distribution of events just computed.  

In [None]:
n_samples = int(10e+5)
stream = np.random.choice(customers, size=n_samples, p=zipf)
stream[:200]

Notice the random nature of the identifiers in the first 200 events in the stream.

### The Count-Min-Sketch Algorithm 

The probabilistic count min sketch algorithm uses multiple hash functions in an efficient data structure to estimate 



> **Exercise 02-12:** You will now use the [probables.HeavyHitters](https://pyprobables.readthedocs.io/en/latest/code.html) function to find the 20 event identifiers with the highest event count. Do the following:
> 1. Compute the required width of the data structure to achieve an approximate 1% error rate using the following relationship:     
>    $$width = 2^{log_2(e/\epsilon)}$$       
>    To perform this calculation use the Python `math.log2` function and the `math.ceil` function to round up the next highest integer. Display the computed result
> 2. Compute the required depth to ensure a less than 1% probability, $\delta$, of breaching the error bound using the following relationship:
>    $$depth = ln(1/\delta)$$
>    Use the `math.ceil` function to round up to the next highest integer. Display the computed result.    
>
> Execute the code that creates a list of count-min-sketches using the [probables.CountMinSketch](https://pyprobables.readthedocs.io/en/latest/code.html#countminsketch) by these steps.
> - A function is instantiated to create a count-min-sketch from a 'stream' of event identifiers with the specified width and depth. The `add` method is used to add each single instances of each identifier in the stream to the sketch.
> - A list of count-min-sketches is created for 5 segments of the overall stream.
> - The first 20 identifiers and counts for the first of the sketches in the list is printed.       

In [None]:
error = 0.01
delta = 0.01
## Put your code below  



print(f"Sketch has width: {width}  and depth:{depth}")

def build_CountMinSketch(stream, width, depth): 
    cms = CountMinSketch(width=width, depth=depth)
    for customer in stream: 
        cms.add(str(customer), 1)
    return cms

sketch_list = []
stream_length = 20000
for end in range(20000, 120000, stream_length): 
    start = end - stream_length
    sketch_list.append(build_CountMinSketch(stream[start:end], width, depth))

for customer in list(stream)[:20]: 
    print(f'count for customer {customer}: {sketch_list[0].check(str(customer))}')

> Notice the variation in the number of events for these identifiers.  
>
> Merging is an essential operation for sketches. The PyProbables CountMinSketch uses the `join` method on pairs of sketches. In this case, merging the 5 sub-stream sketches creates a sketch for the entire stream length. The results for the first 50 identifiers is then printed.
>
> Execute this code.   

In [None]:
long_sketch = sketch_list[0]
for i in range(1,len(sketch_list)):
    long_sketch.join(sketch_list[i])

for customer in list(stream)[:20]: 
    print(f'count for customer {customer}: {long_sketch.check(str(customer))}')

> Examine the foregoing results and answer these questions.
> 1. Is the merged sketch the same size as the sub-stream sketches? Explain your reasoning.        
> 2. Compare the counts for the identifiers shown for the single sub-stream and the entire stream makes sense and how can you explain these differences?
> 3. Compute the required memory for the count-min-hash data structure, assuming 4-byte (32 bit) counters.      
> 4. How does the memory required for the count-min-hash data structure compare to using a simple linear (or flat) table for the $10^5$ event identifiers, assuming 4-byte (32 bit) counters? Keeping in mind that $10^5$ event identifiers, what does your comparison mean for the memory efficiency of the count-min-sketch algorithm?
> 5. If smaller values for $\epsilon$ and $\delta$ are selected, how is the size of the sketch changed and why?   

> **Answers:**
> 1.          
> 2.           
> 3.           
> 4.         
> 5.           

### The Heavy Hitters Algorithms

A variation on the count-min-sketch algorithm is used to find the **heavy hitters**, or the event ids that occur most frequently. This approach allows one to find the counts of a pre-determined number of common events, using the small memory footprint of the count-min-sketch algorithm, but at a cost of some additional computation for sorting and bookkeeping.  

> **Exercise 2-13:** You will now apply the heavy hitters algorithm to the event identifier stream. The code in the cell below does the following.  
> 1. Instantiates a HeavyHitters object with the arguments computed above and `num_hitters = 20`.    
> 2. Iterates over the events in the stream applying the `add` method for each event, with the num_els argument set to 1.      
> 3. Save the `heavy_hitters` attribute to a variable. The data structure for this attribute is a dictionary.
>
> Execute this code   

In [None]:
heavy_hitters = HeavyHitters(num_hitters=20, width=width, depth=depth)
for event in stream:   
    heavy_hitters.add(str(event), num_els=1)

heavy_hitters = heavy_hitters.heavy_hitters

> Now, execute the provided function, using the `heavy_hitters` sketch created above.   

In [None]:
def display_dist(hitters):
    heavy_hitters = np.flip(np.sort(list(hitters.values()))) 
    print('The ordered list of heavy hitters:')
    print(heavy_hitters)  

    fig, ax = plt.subplots(figsize=(5,4))
    ax.plot([x + 1 for x in range(len(heavy_hitters))], heavy_hitters)
    ax.set_ylabel('Count')
    ax.set_xlabel('Ordered event')
    ax.set_title('Ordered event frequencies')

display_dist(heavy_hitters)

> Does the distribution of event counts for the top 20 identifiers appear to follow the expected distribution?      

> **Answer:**               

## HyperLogLog Algorithm   

The HyperLogLog algorithm and its derivatives are fast and efficient methods for determining the cardinality of events in streams. The HyperLogLog uses the harmonic mean of a set of cardinality estimates. Each estimate is based on a hash-based sketch of cardinality.      

> **Exercise 02-14:** You will now work with the HyperLogLog algorithm using the Python [Datasketch package](https://ekzhu.com/datasketch/index.html). Do the following:      
> 1. Start with the typical value of $p=14$ (confusingly precision by the creators of the DataSketch package), resulting in the number of registers, $m = 2^{14}$.     
> 2. Compute and display the expected error rate, $\epsilon$, given the value of $p$.      
> 3. Instantiate a HyperLogLog object using the value of $p$ previously specified as the argument.      
> 4. Assuming that 1-byte registers are used for accumulating the counts, compute and display the size of the HyperLogLog data structure. You can find the size of this data structure using the Python `len` function.           

In [None]:
## Put your code below





> The HyperLogLog sketch will now be applied to 10 segments of the event stream by the following steps:     
> 1. A function is instantiated to create a HLL sketch from a stream of event identifiers.
> 2. The full stream is iterated over in 10. segments. A HLL sketch is appended to the list for each segment.
> 3. The estimated cardinality is printed.
>   
> Execute this code.   

In [None]:
def HLL_sketch(stream, p): 
    sketch = HyperLogLog(p) 
    for event in stream: 
        sketch.update(event)   
    return sketch

sketch_list = []
stream_length = 10000
for end in range(10000, 110000, stream_length): 
    start = end - stream_length
    sketch_list.append(HLL_sketch(stream[start:end], p))

print(f"The cardinality of the first stream segment: {round(sketch_list[0].count(), 0)}")

> The ability to merge HypterLogLog sketches is an important attribute for stream processing at a massive scale. HLL sketches of multiple streams can be computed in parallel and then merged. Alternatively, HLL sketches can be computed for short segments of a stream and then merged to form sketches of longer segments.
>
> The code in the cell below merges the HLL sketches for the 10 segments of the stream and then prints the results. Execute this code.     

In [None]:
long_HLL_sketch = sketch_list[0]
for i in range(1,len(sketch_list)):
    long_HLL_sketch = HyperLogLog.union(long_HLL_sketch, sketch_list[i])

print(f"The cardinality of the full stream: {round(long_HLL_sketch.count(), 0)}")

> Now, answer these questions:     
> 1. Based on the estimated error and the estimated cardinality value computed, is the error within the expected range for both the stream segment and the full stream?   
> 2. How much memory would be required if you needed to find the cardinality of $2^{14}=16384$ streams in a large scale application?
> 3. Explain why the merged sketch must be the same size as the sketches for the segments.      

> **Answers:**
> 1.             
> 2.    
> 3.         

## Quantile Estimation 

The empirical estimation of quantiles of unknown distributions is a fundamental problem in data mining. Quantiles estimates have many uses in data mining. One example is a median estimate for some time period of a stream. As another example, one can use extreme quantiles of time intervals of a stream to monitor for anomalies. There are many other applications. As yet another example, quantile estimation is used to evaluate performance of networks and server clusters as well as for cybersecurity monitoring.        

For the examples in this notebook we will apply two of the many possible algorithms quantile estimation sketch algorithms; the widely used [t-digest algorithm of Dunning and Ertl, 2019](https://arxiv.org/abs/1902.04023), and the [Relative Error Streaming Quantile algorithm of Cormode, et. al., 2019](https://arxiv.org/abs/2004.01668).    

These quantile estimation sketches are **mergeable**. As a result, one can construct a hierarchy of time intervals from shorter to longer.   

For the exercises in this notebook you will apply both of these algorithms to the one of the stream flow time series. You will apply this the quantile estimation sketches to 24 hour periods. These 24 hours sketches will be merged into 20 day intervals.    

### Quantile estimate with t-digest   

The [**t-digest sketch algorithm**}(https://arxiv.org/abs/1902.04023) is based on an heuristic model. Consequently, there are no know analytical error bounds on the performance of the t-digest algorithm. Empirical tests show that the t-digest algorithm reliably produces accurate results, particularly at extreme quantiles.          

In summary, the algorithm has two major components.   
1. A sampling scheme using **scale functions** to construct the sketch. The scale function favor samples at extreme values, thereby reducing the error of estimating extreme quantiles at the expense of accuracy of the mid-range quantiles. The scale function has one parameter, $\delta$, which controls the number of centroids. Larger $\delta$ leads to a larger sketch with more accurate quantile estimates.    
2. A complex interpolation scheme is used between the centroids to improve accuracy of the quantile estimates.  

> **Exercise 02-15:** You will now use a t-digest to find quantiles of a the stream flow time series using the [Apache DataSketches t-digest sketch](https://apache.github.io/datasketches-python/5.2.0/quantiles/tdigest.html). Now do the following:
> 1. Instantiate a `tdigest_float` object with argument `k=100`. Make sure you return an int type from your function. 
> 2. Iterate over the values in `stream_ts` applying the `update` method for each value
> 3. Print the quantile values for $q=0.001$ and $q=0.999$ using the `get_quantile` method.
>
> Execute your code      

In [None]:
stream_ts = query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500])

## Put your code below 
## instantiate the t-digest 





> Notice the rage of values of these quantiles.
>  
> The code in the cell below does the following:      
> 1. The `t_build_digest` function adds the values to the sketch.     
> 2. The `merge_sketches` function iterates over a list of sketches and merges them. The function also returns an empty list to accumulate the next sketch list.     
> 3. The main function, `t_digests_example` does the following:       
>    a. Instantiates a generator for the data to efficiently iterate through windowed samples of the data.    
>    b. Iterate over the time windows and create a digest using the t_digest_build function and quantile estimate for each.     
>    c. If the number of windows = n_periods, then merge the digests and compute a quantile estimate for the longer period.     
> 4. The results are plotted for the quantiles specified.  
>
> Execute this code   

In [None]:
def plot_quantiles(q_time_series, long_q_time_series, q, period, ax):
    ax.plot(range(len(q_time_series)), q_time_series, label = 'weekly q = ' + str(q));
    x_vals = [x - period/2 for x in range(0, len(q_time_series), period)[1:]]
    ax.scatter(x_vals, long_q_time_series, label = 'monthly q = ' + str(q));

def plot_mulitple_quantiles_t(ts, q_list, k, period, num_samples):  
    """
    Function to plot quantiles for daily and 20 day periods for the time series, ts, from a list of quantile values, q_list
    """
    _, ax = plt.subplots(figsize=(8,4))
    for q in q_list:   
        q_time_series, long_q_time_series = t_digests_example(stream_ts, q, k, period, num_samples)
        plot_quantiles(q_time_series, long_q_time_series, q, period, ax)
    plt.legend()
    ax.set_xlabel('Days')
    ax.set_ylabel('Quantile')
    ax.set_title(f'Quatile vs. sample number')


def sketch_build(sketch, ts):
    for val in ts: sketch.update(val) 
    return sketch

def merge_sketches(sketch_list):
    long_sketch = sketch_list[0]
    for sketch in sketch_list[1:]:
        long_sketch.merge(sketch)
    sketch_list = []
    return long_sketch, sketch_list

def t_digests_example(window_samples, q, k, n_days, num_samples):
    # Instantiate the generator 
    window_samples = window_sample(stream_ts, 
                  length=num_samples, 
                  stride=num_samples)

    # Initialize the lists  
    digests = []
    q_time_series = []
    long_q_time_series = []
    
    # Build the daily digests and estimate the quantile
    day_count = 1  
    for window in window_samples: 
        t_digest = tdigest_float(k=k)
        temp_digest = sketch_build(t_digest, window)
        digests.append(temp_digest)
        q_time_series.append(temp_digest.get_quantile(q))
        # If we have enough time steps merge the digests and clear the digests list
        if day_count % n_periods == 0: 
            long_digest, digests = merge_sketches(digests)
            long_q_time_series.append(long_digest.get_quantile(q))
        day_count += 1
    return q_time_series, long_q_time_series

# Plot the results for multiple quantiles 
num_samples = 7*4*24
q=0.001
day_count = 1
n_periods = 4
q_list = [0.001, 0.999]
plot_mulitple_quantiles_t(stream_ts, q_list, k, n_periods, num_samples)

> Examine the results and answer the following questions.
> 1. As a result of the weekly window having only $7 \times 24 \times 4= 682$ samples, a small value of the hyperparamter K has been used. In one or two sentences describe the effect of this choice on the accuracy of the quantile estimates.
> 2. Careful examination of the difference between $q=0.001$ and $q=0.999$ daily quantiles displayed on the graph shows that the difference between these quantiles change over time. In one or a few sentences describe the relationship between these quantiles and the behavior of the stream flow.
> 3. Notice the monthly $q=0.001$ and $q=0.999$ quantiles. In one or a few sentences describe the relationship between the monthly and weekly quantiles and are this relationship expected and why.            

> **Answers:**
> 1.  
> 2.         
> 3.                  

### Quantile Estimation with REQ Sketch

The [Relative Error Streaming Quantiles (REQ)](https://arxiv.org/pdf/2004.01668) sketch provides a highly efficient algorithm with bounded error. The REQ algorithm is constructed from a stack of **compactors** with increasing buffer capacity. The buffers of length $B$ are sampled as follows:   
1. The smallest (or largest) $b/2$ values in the buffer are retained. This approach retains the most important samples for determining extreme quantiles are retained.
2. The largest (or smallest) $b/2$ values are divided into $k$ buckets of ascending (or descending) order. The buckets are then sampled from a deterministic [**Geometric Distribution**](https://en.wikipedia.org/wiki/Geometric_distribution). This approach again ensures accuracy for extreme quantiles while limiting the size of the sketch.

The **relative error**, $\epsilon$, of the quantile estimate, $\hat{R}(q)$, is measured with respect to the true quantile value, $R(q)$ is written:   

$$|\hat{R}(q) - R(q)| \le \epsilon R(q)$$

The parameter $k$ is computed for an given relative error, $\epsilon$, and a probability of exceeding the error bound, $\delta$, by the following relationship:   

$$k = 2 \Bigg[\frac{4}{\epsilon} \sqrt{\frac{log(1/\delta)}{log_2(\epsilon n)}} \Bigg]$$

The required buffer size is then:  

$$B = 2k\ log_2(n/k)$$

> **Exercise 02-16:** You will now use a REQ sketch to find quantiles of a the stream flow time series using the [Apache DataSketches Relative Error Quantile sketch](https://apache.github.io/datasketches-python/5.2.0/quantiles/req.html#). Now do the following:
> 1. Complete the code in the function to compute the parameter k. 
> 2. Instantiate a `req_sketch` object with argument `k=100`. 
> 3. Iterate over the values in `stream_ts` applying the `update` method for each value
> 4. Print the quantile values for $q=0.001$ and $q=0.999$ using the `get_quantile` method.
>
> Execute your code    

In [None]:
# Parameters to compute k
eps = 0.05
delta = 0.01    
n = 180*4*24
 
def compute_K(eps, delta, n):
    ## Put your code below
    
    

k = compute_K(eps, delta, n)
print(f"Value of k: {k}")

stream_ts = query_stream(stream_flow, Columns='Stream_flow', site_numbers=[12484500])

req_sketch = req_floats_sketch(k=k)
for val in stream_ts: 
    req_sketch.update(val)

print(f"Lower quantile: {req_sketch.get_quantile(0.001)}")
print(f"Upper quantile: {req_sketch.get_quantile(0.999)}")

> The code in the cell below executes and displays the results for weekly and monthly quantile estimates using the REQ algorithm. The organization of the code is nearly identical to the code used for the t-digest quantile analysis. Execute this code.     

In [None]:
def plot_mulitple_quantiles_req(ts, q_list, k, period, num_samples):  
    """
    Function to plot quantiles for daily and 20 day periods for the time series, ts, from a list of quantile values, q_list
   """
    _, ax = plt.subplots(figsize=(8,4))
    for q in q_list:   
        q_time_series, long_q_time_series = req_sketch_example(ts, q, k, period, num_samples)
        plot_quantiles(q_time_series, long_q_time_series, q, period, ax)
    plt.legend()
    ax.set_xlabel('Days')
    ax.set_ylabel('Quantile')
    ax.set_title(f'Quatile vs. sample number')    

        
def req_sketch_example(window_samples, q, k, n_periods, num_samples):
    window_samples = window_sample(stream_ts, 
                  length=num_samples, 
                  stride=num_samples)
    
    sketches = []
    q_time_series = []
    long_q_time_series = []
    
    day_count = 1
    for window in window_samples: 
        kll_sketch = req_floats_sketch(k)
        temp_sketch = sketch_build(kll_sketch, window)
        sketches.append(temp_sketch)
        q_time_series.append(kll_sketch.get_quantile(q))
        if day_count % n_periods == 0: 
            long_sketch, sketches = merge_sketches(sketches)
            long_q_time_series.append(long_sketch.get_quantile(q))
        day_count += 1
    return q_time_series, long_q_time_series

# Parameters to compute k
eps = 0.05
delta = 0.01    
num_samples = 7*4*24
 
k = compute_K(eps, delta, num_samples)
print(f"Value of k: {k}")

day_count = 1
n_periods = 4
q_list = [0.001, 0.999]
plot_mulitple_quantiles_req(stream_ts, q_list, k, n_periods, num_samples)

> The results of the two quantile estimation algorithms are quite similar, which is not terribly surprising since both algorithms use probabilistic sketches optimized to limit relative error. Answer the following questions:
> 1. The 4 sketches for both algorithms were merged in a single step. However, one may want to have quantile estimates for several periods. Explain how this can be done and give an example.   
> 2. Given the small number of samples in 7 days and given the available [empirical comparison](https://datasketches.apache.org/docs/tdigest/tdigest.html) between the two algorithms is any significant difference in relative error or bias expected and why?
> 3. Compare the two values of $k$ computed for the two examples using the REQ algorithm. Explain why they are different.    
> 4. How will k change if you reduce the value of $\epsilon$.    
> 5. For a much larger sample sizes, explain how the memory use and relative accuracy of the t-digest and REQ algorithm compare?          

> **Answers:**
> 1.        
> 2.       
> 3.      
> 4.        
> 5.           

#### Copyright, 2021, 2022, 2023, 2024, 2025, Stephen F Elston. All rights reserved. 