# Initialization

In [1]:
import nltk
from nltk.corpus import words
import numpy as np

SEED = 42
NUM_ELEMENTS = 10000  # 100k elements in the stream
NUM_UNIQUE = 1000  # 10k unique elements
NUM_RUNS = 25
NUM_SKETCHES = 10

# Set the seed for reproducibility
np.random.seed(SEED)

nltk.download('words')

word_list = words.words()

# Sample 10k distinct elements
unique_words = set(word_list)
distinct_elements = list(unique_words)[:NUM_UNIQUE]

# Generate the stream
stream = np.random.choice(distinct_elements, size=NUM_ELEMENTS)

# Shuffle the stream
np.random.shuffle(stream)

stream = np.array(stream)

print(f"Stream length: {len(stream)}")
print(f"Number of unique elements: {len(np.unique(stream))}")
print(f"First 10 elements of the stream: {stream[:10]}")

# Generate sketches
sketch_sizes = np.logspace(1, np.log10(NUM_ELEMENTS), num=NUM_SKETCHES, base=10, dtype=int)
print(f"Sketch sizes: {sketch_sizes}")


Stream length: 10000
Number of unique elements: 1000
First 10 elements of the stream: ['rabbitmouth' 'stageworthy' 'amidrazone' 'shockedness' 'ostealgia'
 'huaco' 'morose' 'polyandry' 'lonesome' 'pedagoguery']
Sketch sizes: [   10    21    46   100   215   464  1000  2154  4641 10000]


[nltk_data] Downloading package words to
[nltk_data]     C:\Users\galno\AppData\Roaming\nltk_data...
[nltk_data]   Package words is already up-to-date!


In [2]:
def hash_fn(element, seed=SEED):
    """
    Hash function that maps an element to a random number between 0 and 1.
    """
    return (hash(f"{element}{seed}") % 2**32) / 2**32

## Flajolet Martin

### Flajolet Martin - Alpha Version

In [3]:
def flajolet_martin(stream: np.ndarray) -> float:
    """
    Flajolet-Martin algorithm for counting distinct elements in a stream.
    :param stream: numpy array of elements
    :return: estimated number of distinct elements
    """
    X = 1  # Initialize the maximum possible hash value
    for element in stream:
        X = min(X, hash_fn(element))
    return 1 / X

print(f"Estimated number of unique elements: {flajolet_martin(stream[:10000])}")

Estimated number of unique elements: 2065.225708048431


### Flajolet Martin - Beta Version

In [4]:
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

def process_chunk(chunk: np.ndarray, seed: int) -> float:
    """
    Process a chunk of the stream.
    :param chunk: numpy array of elements
    :param seed: seed for the hash function
    :return: estimated number of distinct elements
    """
    X = 1
    for element in chunk:
        X = min(X, hash_fn(element, seed))
    return X

def flajolet_martin_beta(stream: np.ndarray, s: int, seeds: Optional[np.ndarray] = None) -> float:
    """
    Flajolet-Martin Beta algorithm for counting distinct elements in a stream.
    :param stream: numpy array of elements
    :param s: number of FM estimators to calculate
    :param seeds: numpy array of seeds for the hash functions
    :return: estimated number of distinct elements
    """
    seeds = range(s) if seeds is None else seeds
    with ThreadPoolExecutor() as executor:
        Z = list(executor.map(lambda i: process_chunk(stream, seeds[i]), range(s)))
    
    return 1 / np.mean(Z) - 1

### Flajolet Martin - Final Version

In [5]:
from concurrent.futures import ThreadPoolExecutor
import numpy as np

def flajolet_martin_final(stream: np.ndarray, t: int, s: int) -> float:
    """
    Flajolet-Martin Final algorithm that uses t instances of Flajolet-Martin Beta.
    :param stream: numpy array of elements
    :param t: number of Flajolet-Martin Beta instances to run
    :param s: number of FM estimators per Flajolet-Martin Beta instance
    :return: median of the distinct element estimates
    """
    seeds = np.arange(1, t * s + 1)
    
    with ThreadPoolExecutor() as executor:
        estimates = list(executor.map(
            lambda idx: flajolet_martin_beta(stream, s, seeds[idx * s: (idx + 1) * s]),
            range(t)
        ))
    
    return np.median(estimates)

Run 25 times for each sketch size

In [None]:
def process_sketch(sketch_size, s, t):
    fm_results = []
    fm_beta_results = []
    fm_final_results = []
    
    for _ in range(NUM_RUNS):
        # Generate sketch
        indices = np.random.choice(len(stream), size=sketch_size, replace=False)
        sketch = stream[indices]
        
        # Calculate FM, FM-beta, and FM-final estimates
        fm_estimate = flajolet_martin(sketch)
        fm_beta_estimate = flajolet_martin_beta(sketch, s)
        fm_final_estimate = flajolet_martin_final(sketch, t, s)
        
        fm_results.append(fm_estimate)
        fm_beta_results.append(fm_beta_estimate)
        fm_final_results.append(fm_final_estimate)
    
    print(f"Sketch size: {sketch_size}, FM estimate: {np.mean(fm_results):.2f}, FM-beta estimate: {np.mean(fm_beta_results):.2f}, FM-final estimate: {np.mean(fm_final_results):.2f}")
    return (fm_results, fm_beta_results, fm_final_results)

fm_results, fm_beta_results, fm_final_results = {}, {}, {}

epsilon = 0.1
delta = 0.05  # Confidence level of 95%

t = int(np.ceil(np.log(1 / delta) / epsilon**2))
print(f"Number of FM-beta estimators: {t}")

s = int(np.ceil(1 / (epsilon**2 * delta)))
print(f"Number of FM estimators per FM-beta estimator: {s}")

true_unique = len(np.unique(stream))
print(f"True number of unique elements: {true_unique}")

with ThreadPoolExecutor() as executor:
    futures = {sketch_size: executor.submit(process_sketch, sketch_size, s, t) for sketch_size in sketch_sizes}
    
    for sketch_size, future in futures.items():
        sz_fm_results, sz_fm_beta_results, sz_fm_final_results = future.result()
        
        fm_results[sketch_size] = sz_fm_results
        fm_beta_results[sketch_size] = sz_fm_beta_results
        fm_final_results[sketch_size] = sz_fm_final_results


Number of FM-beta estimators: 300
Number of FM estimators per FM-beta estimator: 2000
True number of unique elements: 1000


In [6]:
import matplotlib.pyplot as plt

def calculate_metrics(estimates: dict) -> dict:
    """
    Calculate the accuracy, bias, and normalized variance of the estimates.
    :param estimates: dictionary of estimates
    :return: dictionary of metrics
    """
    metrics = {}
    for sketch_size, size_estimates in estimates.items():
        metrics[sketch_size] = {
            "accuracy": np.mean([max(0, 1 - abs(est - true_unique) / true_unique) for est in size_estimates]),
            "bias": np.mean([est - true_unique for est in size_estimates]),
            "normalized_variance": np.mean([((est - true_unique) / true_unique) for est in size_estimates])
        }
    return metrics

fm_metrics = calculate_metrics(fm_results)
fm_beta_metrics = calculate_metrics(fm_beta_results)
fm_final_metrics = calculate_metrics(fm_final_results)

# Create a figure and axis
fig, ax = plt.subplots(3, 1, figsize=(10, 15))

# Plot the accuracy
ax[0].plot(sketch_sizes, [fm_metrics[sketch_size]["accuracy"] for sketch_size in sketch_sizes], label="FM")
ax[0].plot(sketch_sizes, [fm_beta_metrics[sketch_size]["accuracy"] for sketch_size in sketch_sizes], label="FM-beta")
ax[0].plot(sketch_sizes, [fm_final_metrics[sketch_size]["accuracy"] for sketch_size in sketch_sizes], label="FM-final")
ax[0].set_xscale("log")
ax[0].set_xlabel("Sketch size")
ax[0].set_ylabel("Accuracy")
ax[0].set_title("Accuracy")
ax[0].legend()

# Plot the bias
ax[1].plot(sketch_sizes, [fm_metrics[sketch_size]["bias"] for sketch_size in sketch_sizes], label="FM")
ax[1].plot(sketch_sizes, [fm_beta_metrics[sketch_size]["bias"] for sketch_size in sketch_sizes], label="FM-beta")
ax[1].plot(sketch_sizes, [fm_final_metrics[sketch_size]["bias"] for sketch_size in sketch_sizes], label="FM-final")
ax[1].set_xscale("log")
ax[1].set_xlabel("Sketch size")
ax[1].set_ylabel("Bias")
ax[1].set_title("Bias")
ax[1].legend()

# Plot the normalized variance
ax[2].plot(sketch_sizes, [fm_metrics[sketch_size]["normalized_variance"] for sketch_size in sketch_sizes], label="FM")
ax[2].plot(sketch_sizes, [fm_beta_metrics[sketch_size]["normalized_variance"] for sketch_size in sketch_sizes], label="FM-beta")
ax[2].plot(sketch_sizes, [fm_final_metrics[sketch_size]["normalized_variance"] for sketch_size in sketch_sizes], label="FM-final")
ax[2].set_xscale("log")
ax[2].set_xlabel("Sketch size")
ax[2].set_ylabel("Normalized variance")
ax[2].set_title("Normalized variance")
ax[2].legend()

plt.tight_layout()
plt.show()

NameError: name 'fm_results' is not defined