<img src="data/images/lecture-notebook-header.png" />

# Data Stream Mining: Sampling

Sampling over large data streams refers to the process of selecting a representative subset of data from an ongoing or continuous stream of data. It is a technique used to address the challenges posed by the high velocity and volume of data in streaming environments.

In the context of data streams, traditional sampling techniques may not be directly applicable due to the continuous nature of the data and the potentially infinite size of the stream. Sampling over large data streams typically involves selecting a fraction of the incoming data points based on certain sampling strategies to approximate the characteristics of the entire stream.

The main goals of sampling over large data streams are:

* **Efficiency:** Sampling reduces the computational and storage requirements by processing only a subset of the data instead of the entire stream. This is important in scenarios where processing the entire stream in real-time is infeasible due to resource constraints.

* **Scalability:** By sampling a representative subset of the data, algorithms and models can be developed and updated on smaller data volumes, enabling scalability and real-time analysis of the stream.

* **Generalization:** The sampled subset should accurately represent the underlying data distribution, allowing for generalizations and inferences to be made about the entire data stream.

Sampling strategies for large data streams typically employ techniques such as reservoir sampling, random sampling, or time-based sampling. Reservoir sampling maintains a fixed-size sample that is updated as new data arrives. Random sampling selects data points randomly based on a predetermined probability distribution. Time-based sampling selects data points at regular time intervals.

## Setting up the Notebook

### Make all Required Imports

In [None]:
import numpy as np
import pandas as pd

import hashlib
import random

---

## Sampling with a Given Probability

The most basic way of sampling is to consider only items in a stream with a given probability, say 10%, to reduce the data volume. However, we saw an example in the lecture, where simply ignoring 90% of stream items can lead to wrong results depending on the analysis we are interested in.

### Example Dataset

In the following, we use publicly available [Apache access log provided by NASA](http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html). To keep it simple and the file size small, the file contains only the IP address and the timestamp of the request.

In [None]:
df_log = pd.read_csv('data/datasets/nasa-apache-logs/ip-ts-only-nasa-access.log')

df_log.head()

In [None]:
print('Number of log entries: {}'.format(df_log.shape[0]))
print('Number of unique IP addresses: {}'. format(len(set(df_log['ip'].to_list()))))

**Side note:** Here we use a pandas frame mainly to display some samples and calculate the basic statistics. Below, we read the file again line by line since `df.iterrows()` to loop over all rows has a very poor performance.

### Straightforward Case: Sampling by Position

If we would just be interested in the number of requests per day, we could indeed just consider a random subset of all incoming data items. This is, of course, almost trivial to implement using any random number generator.

In [None]:
SAMPLE = []
probability = 0.1


with open('data/datasets/nasa-apache-logs/ip-ts-only-nasa-access.log') as f:
    for line in f:
        ip, ts = line.split(',')

        if np.random.rand(1)[0] <= probability:
            SAMPLE.append(line)
    
    
print('Number of items in SAMPLE: {}'.format(len(SAMPLE)))

Unsurprisingly, the number of sampled items is roughly 10% of the original number of log entries.

### Sampling Based on Keys

For many types of analyses or questions we want to answer, a (completely) random subset of items will generally not yield correct results (here: correct = being a good estimate). For example, if we want to know how often the average user -- identified by the IP address -- is accessing the server per day, we have to sample by IP address.

#### Hash Function

In the lecture, we saw that a practical way to sample based on a key (i.e., an attribute of the data items) is to use a hash functions that maps each item to `0..(b-1)` buckets, and then only accept items that fall into a bucket with a number less than `a`. With this, we keep only `a/b` items.

The method below defines a hash function that maps any string to an integer from `0..(num_buckets-1)`. We ignore the details here, and there are probably better hash functions, but an in-depth discussion about what makes a hash function "good" is beyond our scope here.

In [None]:
def h(s, num_buckets):
    return int(hashlib.sha1((s).encode("utf-8")).hexdigest(), 16) % num_buckets

We can test the hash function for a couple of random strings by hashing them into 10 buckets.

In [None]:
for s in ['apple', 'pear', 'banana', 'lemon', 'durian']:
    print('The hash value for {} is: {}'.format(s, h(s, 10)))

#### Creating the Sample

By choosing the values for `a` and `b` accordingly, we can specify the probability for an IP address to be accepted into the sample or being discarded. For example, if we again go for a probability of 10%, we can set `a=1` and `b=10`.

In [None]:
a, b = 1, 10

With the hash function in place, we can now create the sample in a straightforward way.

In [None]:
SAMPLE = []

with open('data/datasets/nasa-apache-logs/ip-ts-only-nasa-access.log') as f:
    for line in f:
        ip, ts = line.split(',')

        if h(ip, b) < a:
            # NOTE: We just add the key (IP address) to the sample here to make it easier to look at the result
            # In practice, you obviously add the complete item (i.e., row) to the sample
            SAMPLE.append(ip)

In [None]:
print('Number of log entries in SAMPLE: {}'.format(len(SAMPLE)))
print('Number of unique IP addresses in SAMPLE: {}'.format(len(set(SAMPLE))))

When looking at the sample, we can see that the number of items are no longer roughly 10% of the number of all log entries. Here, the number of unique IP addresses in the sample is about 10% of the number of unique IP addresses in the original dataset, which is what we are interested in.

---

## Sampling with a Fixed Size (Reservoir Sampling)

Depending on the dynamics of the data stream, sampling based on a given probability can yield samples of unpredictable size -- although this can be addressed to some extent by dynamically adjusting the values `a` and `b`. However, in many cases there is only a maximum sample size possible, e.g., due to memory limitations particularly on resource-constrained devices.

Reservoir sampling is a technique for choosing a simple random sample, without replacement, of `B` items from a population of unknown size. The size of the population is not known to the algorithm and is typically too large for all items to fit into main memory. Throughout the notebook, we set `B=10`. Feel free to change that.

In [None]:
B = 10

### Creating an Example Stream

The following method creates a simple example stream containing letters as items of the stream; similar to the example used in the lecture. The method takes a list of integers as input, which represent the number of letters the stream should contain.

In [None]:
def generate_stream(counts):
    stream = []
    for idx, cnt in enumerate(counts):
        stream.extend([chr(idx+65)]*cnt)
    # Shuffle the list of letters
    random.shuffle(stream)
    # Return shuffled list of letters
    return stream

The following example create a stream contains 10 As, 5 Bs and 2 Es.

In [None]:
stream = generate_stream([10, 5, 2])

print(stream)

For the sample, we create a larger stream to make it a bit more interesting.

In [None]:
stream = generate_stream([160, 80, 40, 20, 10, 5])

### Sampling the Whole Stream

We first pretend that we have complete access to all items in the stream. This allows us to easily create samples by using for example the [`np.random.choice`](https://numpy.org/doc/stable/reference/random/generated/numpy.random.choice.html) method.

The following loop generates 5 samples based on the complete stream. We only do this to compare it to the sample generated by Reservoir Sampling.

In [None]:
for _ in range(5):
    print(np.random.choice(stream, size=B).tolist())

Unsurprisingly, most items in the samples are As or Bs since those are the most frequent items in the stream.

### Reservoir Sampling

We start by creating our empty reservoir, which is simply a list of size `B` here. We use `_` to indicate empty buckets in the reservoir. This is an arbitrary choice, but it makes printing the results more convenient; see below.

In [None]:
RESERVOIR = ['_']*B

print(RESERVOIR)

#### Handle New Item

The method `handle_new_item()` receives the latest item from the stream and decides whether to add it into the reservoir or to discard it. Note that this requires the parameter `t` as the current item number to ensure that each item in the reservoir is in there with a probability of `B/t`.

The method implements this probability by generating a random integer between `0` and `t-1`. If this number is smaller then the number of buckets, we replace the current item at the respective position with the new item.

The method returns a message for each decision taken. Again, this is only for printing the results.

In [None]:
def handle_new_item(t, item):
    
    ## If the reservoir is not full yet, just add the item
    if t < B:
        RESERVOIR[t] = item
        return 'Item {} added to reservoir'.format(item)
    
    # Get a random number between 0 and (t-1)
    pos = np.random.randint(0, t)

    # If the random number is a valid index of the reservoir, replace item at this index
    if pos < len(RESERVOIR):
        RESERVOIR[pos] = item
        return '{} < {} => RESERVOIR[{}] replaced with item {}'.format(pos, len(RESERVOIR), pos, item)
    else:
        return '{} > {} => item {} discarded'.format(pos, len(RESERVOIR), item)

#### Creating the Sample

Using the method `handle_new_item()` we can go over the stream to create the sample.

In [None]:
RESERVOIR = ['_']*B

for t, item in enumerate(stream):
    msg = handle_new_item(t, item)
    print('{} {}'.format(RESERVOIR, msg))

The results should be rather self-explanatory:

* As long as the reservoir is not full, we just add an new incoming item.
* The more items we add, i.e., the larger the value of `t`, the less likely we add the new item by replacing an existing item in the reservoir. This is because the probability `B/t` decreases over time.
* Multiple runs of the loop above will of course yield different samples, but all are arguably similar to the samples above that were generated based on the complete stream available.

## Summary

Data sampling in data streams refers to the process of selecting a subset of data from a continuous and potentially infinite stream of data. It involves extracting a representative sample that approximates the characteristics of the entire stream. Sampling in data streams offers several benefits and presents unique challenges.

One of the main benefits of data sampling in data streams is improved efficiency. Sampling allows for processing and analysis of a fraction of the data, reducing computational and storage requirements. This is particularly important in scenarios where processing the entire data stream in real-time is impractical or infeasible due to the high velocity and volume of the data. Sampling enables real-time analysis and scalability in resource-constrained environments.

Another benefit is the ability to make inferences and generalizations about the entire data stream based on the sampled subset. By selecting a representative sample, statistical and machine learning techniques can be applied to the sample to gain insights, build models, and make predictions about the overall data stream. Sampling provides a practical way to extract meaningful information from large and continuous data streams.

However, data sampling in data streams also presents challenges. One major challenge is ensuring the representativeness and accuracy of the selected sample. The sample may not perfectly capture the underlying distribution of the entire data stream, leading to potential bias or loss of information. Careful consideration should be given to selecting an appropriate sampling strategy and determining the sample size to mitigate these challenges.

Additionally, data streams are dynamic, and the data distribution may change over time, known as concept drift. Handling concept drift poses a challenge in maintaining the relevance and validity of the sampled data. Adaptive sampling techniques that can dynamically adjust the sampling strategy or update the sample based on the evolving data distribution are often necessary to address this challenge.

In summary, data sampling in data streams offers efficiency and scalability benefits, allowing for real-time analysis of large volumes of data. It enables inferences and generalizations about the data stream based on the representative sample. However, challenges such as maintaining representativeness, addressing bias, and handling concept drift must be carefully addressed to ensure the reliability and effectiveness of the sampled data in capturing the characteristics of the continuous data stream.