<a href="https://colab.research.google.com/github/Sara323-blip/HLL/blob/master/Cardinality_estimation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HyperLogLog++ Implementation in Python
This part of the notebook contains an implementation of the HyperLogLog++ algorithm, which is a probabilistic data structure used for efficient cardinality estimation (i.e., estimating the number of distinct elements in a dataset).

## Key Features:
1. **Sparse Representation:** Efficient memory usage for small cardinalities by storing only non-zero registers.
2. **Dense Mode:** Automatically transitions to dense mode for large cardinalities to maintain accuracy.
3. **Merge Capability:** Combines sketches to estimate the cardinality of unioned datasets.
4. **Bias correction for small cardinalities**

### Requirements:
The code uses Python libraries `math` and `numpy` for mathematical operations and arrays, and `collections.defaultdict` for managing the sparse map.

Below is the Python implementation of HyperLogLog++:




In [None]:
import math
import numpy as np
from collections import defaultdict

class HyperLogLogPlusPlus:
    def __init__(self, num_registers):
        """
        Initialize the HyperLogLog++ structure.
        num_registers: Number of registers (must be a power of 2).
        """
        if num_registers & (num_registers - 1) != 0:
            raise ValueError("num_registers must be a power of 2.")

        self.m = num_registers
        self.logM = int(math.log2(num_registers))
        self.sparse = True  # Start in sparse mode
        self.sparse_map = defaultdict(int)  # For sparse representation
        self.registers = np.zeros(self.m, dtype=np.uint8)  # Dense registers
        self.sparse_threshold = self.m / 16  # Threshold for switching to dense mode

    def _hash(self, value):
        """
        Generate a hash value for the given input.
        """
        return hash(value)

    def _rho(self, value):
        """
        Calculate the position of the first 1-bit in the binary representation.
        """
        return (value & -value).bit_length()

    def _switch_to_dense(self):
        """
        Switch from sparse to dense mode.
        """
        for key, value in self.sparse_map.items():
            index = key & (self.m - 1)
            rank = value
            self.registers[index] = max(self.registers[index], rank)
        self.sparse_map.clear()
        self.sparse = False

    def add(self, value):
        """
        Add a new element to the sketch.
        """
        hash_value = self._hash(value)
        j = hash_value & (self.m - 1)  # Extract the first logM bits
        w = hash_value >> self.logM    # Remaining bits for rank
        r = self._rho(w)

        if self.sparse:
            self.sparse_map[j] = max(self.sparse_map[j], r)
            if len(self.sparse_map) > self.sparse_threshold:
                self._switch_to_dense()
        else:
            self.registers[j] = max(self.registers[j], r)

    def estimate(self):
        """
        Estimate the cardinality.
        """
        if self.sparse:
            Z = sum(2 ** -r for r in self.sparse_map.values())
            E = self.alpha(self.m) * self.m ** 2 / Z
            V = self.m - len(self.sparse_map)
        else:
            Z = sum(2 ** -r for r in self.registers)
            E = self.alpha(self.m) * self.m ** 2 / Z
            V = np.count_nonzero(self.registers == 0)

        # Small range correction
        if E <= 5 / 2 * self.m and V > 0:
            return self.m * math.log(self.m / V)

        # Large range correction
        if E > 2 ** 32 / 30:
            return -(2 ** 32) * math.log(1 - E / 2 ** 32)

        return E

    def merge(self, other):
        """
        Merge another HyperLogLog++ sketch into this one.
        """
        if self.m != other.m:
            raise ValueError("Number of registers must match.")

        merged = HyperLogLogPlusPlus(self.m)

        if self.sparse and other.sparse:
            merged.sparse = True
            merged.sparse_map = defaultdict(
                int,
                {
                    key: max(self.sparse_map.get(key, 0), other.sparse_map.get(key, 0))
                    for key in set(self.sparse_map.keys()).union(other.sparse_map.keys())
                },
            )
        else:
            if self.sparse:
                self._switch_to_dense()
            if other.sparse:
                other._switch_to_dense()

            merged.registers = np.maximum(self.registers, other.registers)
            merged.sparse = False

        return merged

    @staticmethod
    def alpha(m):
        """
        Return the alpha constant for the given number of registers.
        """
        if m == 16:
            return 0.673
        elif m == 32:
            return 0.697
        elif m == 64:
            return 0.709
        return 0.7213 / (1 + 1.079 / m)

    def export_registers(self):
        """
        Export the registers as a list of integers.
        """
        if self.sparse:
            return dict(self.sparse_map)
        return self.registers.tolist()


# **HyperLogLogLog Implementation for Cardinality Estimation**

This section provides an implementation of the **HyperLogLogLog** algorithm, an advanced extension of the **HyperLogLog** data structure used for estimating the number of distinct elements (cardinality) in a data stream. The HyperLogLogLog algorithm introduces a novel compression mechanism that balances between dense and sparse representations to optimize memory usage while preserving accuracy.

## **Key Features**

### **Efficient Memory Usage**
- Utilizes **sparse representation** for large outlier values, significantly reducing storage overhead.
- Compresses dense registers by using **offsets from a base value**, minimizing the number of bits required.

### **Mergeability**
- The sketch can be merged across distributed systems, making it scalable for large datasets and distributed processing.

### **Estimation Accuracy**
- Preserves the theoretical error bounds of the original HyperLogLog.
- Improves estimates by leveraging a **bias-corrected harmonic mean**.

### **Amortized Constant Updates**
- Optimized for handling large data streams efficiently, with compression and updates designed to operate in amortized constant time.


In [None]:
import math
import numpy as np

class HyperLogLogLog:
    HYPERLOGLOGLOG_COMPRESS_WHEN_ALWAYS = 0x1
    HYPERLOGLOGLOG_COMPRESS_WHEN_APPEND = 0x2
    HYPERLOGLOGLOG_COMPRESS_TYPE_FULL = 0x4
    HYPERLOGLOGLOG_COMPRESS_TYPE_INCREASE = 0x8
    HYPERLOGLOGLOG_COMPRESS_BOTTOM = 0x10
    HYPERLOGLOGLOG_COMPRESS_DEFAULT = HYPERLOGLOGLOG_COMPRESS_WHEN_ALWAYS | HYPERLOGLOGLOG_COMPRESS_TYPE_FULL

    def __init__(self, m, m_bits=3, flags=None):
        if m & (m - 1) != 0:
            raise ValueError("m must be a power of 2.")

        self.m = m
        self.logM = int(math.log2(m))
        self.m_bits = m_bits
        self.s_bits = int(math.log2(64))
        self.flags = flags or self.HYPERLOGLOGLOG_COMPRESS_DEFAULT
        self.M = np.zeros(m, dtype=np.uint8)
        self.S = {}
        self.B = 0
        self.lower_bound = 0
        self.min_value_count = m
        self.max_offset = (1 << m_bits) - 1
        self.compress_count = 0
        self.rebase_count = 0

    def _hash(self, value):
        return hash(value)

    def _rho(self, value):
        if value == 0:
            return 0
        return (value & -value).bit_length()

    def add(self, value):
        hashed = self._hash(value)
        j = hashed & (self.m - 1)
        w = hashed >> self.logM
        r = self._rho(w)
        self._add_jr(j, r)

    def _compress(self):
        self.compress_count += 1
        if self.flags & self.HYPERLOGLOGLOG_COMPRESS_TYPE_FULL:
            self._compress_full()
        elif self.flags & self.HYPERLOGLOGLOG_COMPRESS_TYPE_INCREASE:
            self._compress_increase()

    def _compress_full(self):
        best_ns = len(self.S)
        best_base = self.B

        for base in range(self.B, self.B + self.max_offset + 1):
            ns = sum(1 for r in self.S.values() if r < base or r > base + self.max_offset)
            if ns < best_ns:
                best_ns = ns
                best_base = base

        if best_base != self.B:
            self._rebase(best_base)

    def _rebase(self, new_base):
        for j in range(self.m):
            r = self.S.pop(j, self.M[j] + self.B)
            if new_base <= r <= new_base + self.max_offset:
                self.M[j] = r - new_base
            else:
                self.S[j] = r

        self.B = new_base
        self.rebase_count += 1

    def estimate(self):
        E = 0
        V = 0

        for j in range(self.m):
            r = self.S.get(j, self.M[j] + self.B)
            V += r == 0
            E += 2 ** abs(-r)

        E = self.alpha(self.m) * self.m ** 2 / E

        if E <= 5 / 2 * self.m and V > 0:
            return self.m * math.log(self.m / V)
        elif E > (1 << 32) / 30:
            return -(1 << 32) * math.log(1 - E / (1 << 32))
        return E

    def merge(self, other):
        if self.m != other.m:
            raise ValueError("Register count mismatch.")

        result = HyperLogLogLog(self.m, self.m_bits, self.flags)
        result.B = max(self.B, other.B)

        for j in range(self.m):
            r1 = self.S.get(j, self.M[j] + self.B)
            r2 = other.S.get(j, other.M[j] + other.B)
            r = max(r1, r2)

            if result.B <= r <= result.B + result.max_offset:
                result.M[j] = r - result.B
            else:
                result.S[j] = r

        result._compress()
        return result

    @staticmethod
    def alpha(m):
        if m == 16:
            return 0.673
        elif m == 32:
            return 0.697
        elif m == 64:
            return 0.709
        return 0.7213 / (1 + 1.079 / m)



# Random Data Generator for Testing and Benchmarking

This section provides utility functions to generate random data for testing and benchmarking purposes. The generated data can be used in various scenarios such as testing data pipelines, benchmarking algorithms, or simulating large datasets.

## Key Features

### Generate 64-bit Unsigned Integers:
- Produces random 64-bit integers efficiently.
- Outputs the data in binary format.
- Performance metrics (generation and writing time) are logged for analysis.

### Generate Alphanumeric Strings:
- Creates random alphanumeric strings of a specified length.
- Outputs the data in a text-readable format, separated by newlines.


In [None]:
import random
import struct
import time
import math
import sys

def generate_uint64(n):
    """Generate random 64-bit unsigned integers."""
    start_time = time.time()
    data = [random.getrandbits(64) for _ in range(n)]
    elapsed = time.time() - start_time
    sys.stderr.write(f"Data generation took {elapsed:.3f} seconds\n")

    start_time = time.time()
    for value in data:
        sys.stdout.buffer.write(struct.pack(">Q", value))  # Write in network byte order
    elapsed = time.time() - start_time
    sys.stderr.write(f"Data writing took {elapsed:.3f} seconds\n")


def generate_str(n, length):
    """Generate random alphanumeric strings."""
    chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    start_time = time.time()
    data = [''.join(random.choice(chars) for _ in range(length)) for _ in range(n)]
    elapsed = time.time() - start_time
    sys.stderr.write(f"Data generation took {elapsed:.3f} seconds\n")

    start_time = time.time()
    for value in data:
        sys.stdout.write(value + "\n")
    elapsed = time.time() - start_time
    sys.stderr.write(f"Data writing took {elapsed:.3f} seconds\n")

# Unit Tests for HyperLogLogLog

This section includes a comprehensive set of unit tests for the `HyperLogLogLog` class, designed to validate its functionality and ensure accuracy in cardinality estimation. The tests are written using Python's `unittest` framework and cover various scenarios such as merging sketches, handling invalid parameters, and verifying accuracy.


In [None]:
import unittest
import numpy as np


class TestHyperLogLogLog(unittest.TestCase):

    def test_hyperlogloglog_estimate(self):
        hlll = HyperLogLogLog(16)
        self.assertEqual(hlll.m, 16)  # Example size check

        elements = [i for i in range(1000)]
        for element in elements:
            hlll.add(element)

        estimate = hlll.estimate()
        self.assertGreaterEqual(estimate, len(elements) * 0.9)  # Allow 10% error margin
        self.assertLessEqual(estimate, len(elements) * 1.1)

    def test_hyperlogloglog_merge(self):
        hlll1 = HyperLogLogLog(64)
        hlll2 = HyperLogLogLog(64)
        common_elements = [i for i in range(500)]
        unique_hlll1 = [500 + i for i in range(500)]
        unique_hlll2 = [1000 + i for i in range(500)]

        for element in common_elements + unique_hlll1:
            hlll1.add(element)
        for element in common_elements + unique_hlll2:
            hlll2.add(element)

        hlll_merged = hlll1.merge(hlll2)
        self.assertEqual(hlll_merged.estimate(), len(set(common_elements + unique_hlll1 + unique_hlll2)))

    def test_invalid_register_size(self):
        with self.assertRaises(ValueError):
            HyperLogLogLog(15)  # Not a power of 2

    def test_merge_incompatible_sizes(self):
        hlll1 = HyperLogLogLog(16)
        hlll2 = HyperLogLogLog(32)
        with self.assertRaises(ValueError):
            hlll1.merge(hlll2)

    def test_export_import_registers(self):
        hlll = HyperLogLogLog(16)
        elements = [i for i in range(1000)]
        for element in elements:
            hlll.add(element)

        exported_registers = hlll.export_registers()
        hlll_new = HyperLogLogLog(16)
        hlll_new.registers = np.array(exported_registers, dtype=np.uint8)

        self.assertEqual(hlll.estimate(), hlll_new.estimate())



This code is a comprehensive experimental framework designed to evaluate and benchmark multiple cardinality estimation algorithms,

In [None]:
import random
import math
import sys
import os
import pandas as pd

# Maximum log value for the range of dataset sizes (n values)
MAX_LOG_N = 10

# List of dataset sizes (n values), alternating between powers of 2 and scaled powers of 2
NS = [
    (1 << i // 2) if i % 2 == 0 else int(round(math.sqrt(2) * (1 << i // 2)))
    for i in range(8, 2 * MAX_LOG_N + 1)
]

# List of memory sizes (m values) in powers of 2, ranging from 16 to 262144
MS = [1 << i for i in range(4, 19)]

# Number of repetitions for each experiment configuration
NUM_REPS = 10

# Supported data types for the experiments
DATATYPES = ['str', 'uint64']

# List of algorithms being tested
ALGORITHMS = [
    'hyperloglog',       # Standard HyperLogLog algorithm
    'hyperlogloglog',
    'hyperloglogplusplus',
]

# Modes of operation for the experiments
MODES = [
    'query',  # Perform query operations on data
    'merge'   # Test merging datasets (e.g., combining sketches)
]

# Length of random strings used in experiments involving string data types
RANDOM_STRING_LENGTH = 8

# Initial random seed for reproducibility, given as a hexadecimal value
INITIAL_SEED = 0x11e3ea10  # Equivalent to 300000016 in decimal


def instance_filename_stub(instance):
    return f"results/{instance['mode']}_{instance['algo']}" + \
           f"_{instance['dt']}_{instance['m']}_{instance['n']}"


def populate_seeds(initial_seed):
    global RANDOM_NUMBER_SEEDS
    RANDOM_NUMBER_SEEDS = dict()
    seed = initial_seed
    for ds in DATATYPES:
        RANDOM_NUMBER_SEEDS[ds] = dict()
        for m in MS:
            RANDOM_NUMBER_SEEDS[ds][m] = dict()
            for n in NS:
                RANDOM_NUMBER_SEEDS[ds][m][n] = dict()
                for rep in range(NUM_REPS):
                    RANDOM_NUMBER_SEEDS[ds][m][n][rep] = seed
                    seed += 1


def filter_instances(mode_filter, algo_filter, dt_filter, m_filter, n_filter):
    instances = []
    for mode in MODES:
        if mode_filter is not None and mode != mode_filter:
            continue
        for algo in ALGORITHMS:
            if algo_filter is not None and algo != algo_filter:
                continue
            for dt in DATATYPES:
                if dt_filter is not None and dt != dt_filter:
                    continue
                for m in MS:
                    if m_filter is not None and m_filter != m:
                        continue
                    for n in NS:
                        if n_filter is not None and n_filter != n:
                            continue
                        instance = {
                            'mode': mode,
                            'algo': algo,
                            'dt': dt,
                            'm': m,
                            'n': n
                        }
                        instances.append(instance)
    return instances


def run_instance(instance):
    mode = instance['mode']
    algo = instance['algo']
    dt = instance['dt']
    m = instance['m']
    n = instance['n']

    sys.stderr.write(f"Running {mode} {algo} {dt} {m} {n}\n")
    filename_stub = instance_filename_stub(instance)
    csv_filename = filename_stub + '.csv'

    # Ensure the results directory exists
    os.makedirs(os.path.dirname(csv_filename), exist_ok=True)

    results = []
    for rep in range(NUM_REPS):
        seed = RANDOM_NUMBER_SEEDS[dt][m][n][rep]
        # Simulated experiment logic can go here
        results.append({
            'time': random.random(),  # Simulated time
            'estimate': random.random(),  # Simulated estimate
            'bitsize': random.randint(10, 100),  # Simulated bitsize
            'compressCount': random.randint(1, 10),
            'rebaseCount': random.randint(1, 10)
        })

    df = pd.DataFrame(results)
    df.to_csv(csv_filename, index=False)

    return df


def combine_results():
    result_dir = "results"
    all_files = [os.path.join(result_dir, f) for f in os.listdir(result_dir) if f.endswith('.csv')]
    if not all_files:
        sys.stderr.write("No result files found for combining.\n")
        return

    combined_df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True)
    combined_df.to_csv("Combined.csv", index=False)


def main():
    mode_filter = None
    algo_filter = None
    dt_filter = None
    m_filter = None
    n_filter = None

    sys.stderr.write('Populating random number seeds...\n')
    populate_seeds(INITIAL_SEED)
    sys.stderr.write('Done\n')

    sys.stderr.write('Filtering instances...\n')
    instances = filter_instances(mode_filter, algo_filter, dt_filter, m_filter, n_filter)
    sys.stderr.write(f'{len(instances)} instances found\n')

    for instance in instances:
        run_instance(instance)

    combine_results()


if __name__ == "__main__":
    main()


Populating random number seeds...
Done
Filtering instances...
2340 instances found
Running query hyperloglog str 16 16
Running query hyperloglog str 16 23
Running query hyperloglog str 16 32
Running query hyperloglog str 16 45
Running query hyperloglog str 16 64
Running query hyperloglog str 16 91
Running query hyperloglog str 16 128
Running query hyperloglog str 16 181
Running query hyperloglog str 16 256
Running query hyperloglog str 16 362
Running query hyperloglog str 16 512
Running query hyperloglog str 16 724
Running query hyperloglog str 16 1024
Running query hyperloglog str 32 16
Running query hyperloglog str 32 23
Running query hyperloglog str 32 32
Running query hyperloglog str 32 45
Running query hyperloglog str 32 64
Running query hyperloglog str 32 91
Running query hyperloglog str 32 128
Running query hyperloglog str 32 181
Running query hyperloglog str 32 256
Running query hyperloglog str 32 362
Running query hyperloglog str 32 512
Running query hyperloglog str 32 724
Run

KeyboardInterrupt: 

This code is just provided to check the Hyperloglog++ implementation I compared the values I got from here with the estimated values from my implementation

In [None]:
from google.cloud import bigquery

# Initialize BigQuery client
client = bigquery.Client()

# Step 1: Upload Dataset to BigQuery
def upload_dataset_to_bigquery(dataset_id, table_id, file_path):
    """
    Uploads a dataset to BigQuery from a CSV file.
    """
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        autodetect=True,
    )

    with open(file_path, 'rb') as source_file:
        load_job = client.load_table_from_file(source_file, f'{dataset_id}.{table_id}', job_config=job_config)

    load_job.result()  # Wait for the job to complete
    print(f"Loaded {load_job.output_rows} rows into {dataset_id}.{table_id}.")

# Step 2: Query Dataset for Cardinality Estimation
def estimate_cardinality(project_id, dataset_id, table_id, column_name):
    """
    Estimates cardinality using BigQuery's APPROX_COUNT_DISTINCT function.
    """
    query = f"""
    SELECT APPROX_COUNT_DISTINCT({column_name}) AS estimated_cardinality
    FROM `{project_id}.{dataset_id}.{table_id}`;
    """

    query_job = client.query(query)  # Run the query
    results = query_job.result()

    for row in results:
        print(f"Estimated Cardinality: {row.estimated_cardinality}")
