In [24]:
import mmh3
import math
from bitarray import bitarray
import numpy as np
import pandas as pd
import random 
import string
from tqdm import tqdm

from utils import gen_uniq_seq

# Задание 1

Bloom-filter на одной хэщ-функции
C целыми числами скорость около 300 ит/сек ~18000 строк в минуту. Слишком долго ждать. 

In [18]:
class BloomFilterInt:
    '''
    Bloom-filter on Python integers
    '''
    def __init__(self,
                 n: int):
        self.n = n
        self.bit_array = 0

    def _hash(self, s):
        return mmh3.hash(s, 0) % self.n

    def put(self, s):
        self.bit_array |= 1 << self._hash(s)

    def get(self, s):
        return self.bit_array & (1 << self._hash(s)) != 0

    def size(self):
        return bin(self.bit_array).count('1')

Попробуем реализовать с помощью numpy

In [27]:
class BloomFilterNumpy:
    '''
    Bloom-filter using numpy bit array
    '''
    def __init__(self,
                 n: int):
        self.n = n
        self.bit_array = np.zeros(n, dtype=bool)  # Use a numpy array of booleans

    def _hash(self, s):
        # Generate a single hash value for the given input
        return mmh3.hash(s, 0) % self.n

    def put(self, s):
        # Set the bit corresponding to the hash value
        hash_value = self._hash(s)
        self.bit_array[hash_value] = True

    def get(self, s):
        # Check if the bit corresponding to the hash value is set
        hash_value = self._hash(s)
        return self.bit_array[hash_value]

    def size(self):
        # Count the number of set bits in the bit array
        return np.sum(self.bit_array)

In [4]:
bf_sizes = [8, 64, 1024, 65536, 16777216]
set_sizes = [5, 50, 500, 5000, 5000000]

In [8]:
for set_size in set_sizes:
    gen_uniq_seq(name = str(set_size),
                n_records = set_size)

0
0
0
0
0
1000000
2000000
3000000
4000000


In [32]:
result = []

Использовал tqdm для контроля за процессом - избыточно, можно было обойтись просто выводом в stdout

In [50]:
result_np = []

for bf_size in tqdm(bf_sizes,
                    total=len(set_sizes),
                    position=0,
                    leave=False,
                    desc='Iterating through bf sizes'):
    for set_size in tqdm(set_sizes,
                            total=len(set_sizes),
                            position=1,
                            leave=False,
                            desc='Iterating through set sizes'):

        bf_int = BloomFilterNumpy(n = bf_size)

        fp_count = 0

        with open(f'{set_size}') as file: 
            for line in file:
                if bf_int.get(line):
                    fp_count += 1
                bf_int.put(line)
            ones_count_int = bf_int.size()

        result_np.append(
            {
                'bf_size' : bf_size,
                'set_size' : set_size,
                'fp_count' : fp_count,
                'ones_count' : ones_count_int
            }
        )

                                                                         

In [51]:
pd.DataFrame(result_np)

Unnamed: 0,bf_size,set_size,fp_count,ones_count
0,8,5,0,5
1,8,50,42,8
2,8,500,492,8
3,8,5000,4992,8
4,8,5000000,4999992,8
5,64,5,0,5
6,64,50,15,35
7,64,500,436,64
8,64,5000,4936,64
9,64,5000000,4999936,64


Видим, что с увеличением `bf_size` уменьшается число фолс позитив и увеличивается ones count. Видим важность правильного подбора размера фильтра. 

# Задание 2

Оптимальное число хэш-функций k для данных

$$
    k = \ln(2) \cdot m/n
$$

In [52]:
class BloomFilterNumpy:
    def __init__(self,
                 n: int,
                 k: int):
        """
        Bloom filter with k hash funcs
        implemented with numpy
        """
        self.n = n
        self.k = k
        self.bit_array = np.zeros(n, dtype=bool)

    def _hashes(self, item):
        return [mmh3.hash(item, seed) % self.n for seed in range(self.k)]

    def put(self, item):
        for hash_value in self._hashes(item):
            self.bit_array[hash_value] = True

    def get(self, item):
        return all(self.bit_array[hash_value] for hash_value in self._hashes(item))

    def size(self):
        return np.sum(self.bit_array) / self.k

In [54]:
result_np_k = []

for k in [1, 2, 3, 4]:
    print(f'Experiment with {k} hash functions')
    for bf_size in tqdm(bf_sizes,
                        total=len(set_sizes),
                        position=0,
                        leave=False,
                        desc='Iterating through bf sizes'):
        for set_size in tqdm(set_sizes,
                                total=len(set_sizes),
                                position=1,
                                leave=False,
                                desc='Iterating through set sizes'):

            bf_int = BloomFilterNumpy(n = bf_size,
                                      k = k)

            fp_count = 0

            with open(f'{set_size}') as file: 
                for line in file:
                    if bf_int.get(line):
                        fp_count += 1
                    bf_int.put(line)
                ones_count_int = bf_int.size()

            result_np_k.append(
                {
                    'k' : k,
                    'bf_size' : bf_size,
                    'set_size' : set_size,
                    'fp_count' : fp_count,
                    'ones_count' : ones_count_int
                }
            )

Experiment with 1 hash functions


                                                                         

Experiment with 2 hash functions


                                                                         

Experiment with 3 hash functions


                                                                         

Experiment with 4 hash functions


                                                                         

In [56]:
k_df = pd.DataFrame(result_np_k)

Видим, что, например, с `k=2` fp_count снизился в разы для 

In [74]:
k_df[k_df['k'] == 2]

Unnamed: 0.1,Unnamed: 0,k,bf_size,set_size,fp_count,ones_count
25,25,2,8,5,0,4.0
26,26,2,8,50,44,4.0
27,27,2,8,500,494,4.0
28,28,2,8,5000,4994,4.0
29,29,2,8,5000000,4999994,4.0
30,30,2,64,5,0,5.0
31,31,2,64,50,15,25.0
32,32,2,64,500,454,32.0
33,33,2,64,5000,4952,32.0
34,34,2,64,5000000,4999955,32.0


Выведем общуюю статистику, чтобы увидеть, как изменяется число false positives и ones count. Для наглядности возьмем максимальный размер bf. 

In [86]:
# Calculate false positive rate
k_df['fp_rate'] = k_df['fp_count'] / k_df['set_size']

k_df[k_df['bf_size'] == 16777216]

Unnamed: 0.1,Unnamed: 0,k,bf_size,set_size,fp_count,ones_count,fp_rate
20,20,1,16777216,5,0,5.0,0.0
21,21,1,16777216,50,0,50.0,0.0
22,22,1,16777216,500,0,500.0,0.0
23,23,1,16777216,5000,0,5000.0,0.0
24,24,1,16777216,5000000,676830,4323170.0,0.135366
45,45,2,16777216,5,0,5.0,0.0
46,46,2,16777216,50,0,50.0,0.0
47,47,2,16777216,500,0,500.0,0.0
48,48,2,16777216,5000,0,4998.5,0.0
49,49,2,16777216,5000000,388002,3765852.0,0.0776


Видим, что с увеличением числа хэш-функций до 3 включительно число fp уменьшается, однако при 4 хэш-функциях оно больше, чем при 3. Слишком большое количество хэш-функций может привести к увеличению числа единиц в фильтре (битовые позиции становятся единицами), что увеличивает шансы на ложные срабатывания. 

# Задание 3

Реализуем Counting Bloom Filter на `k` хэш-функций.

In [22]:
class CountingBloomFilter:
    def __init__(self,
                 k: int,
                 n: int,
                 cap: int):
        """
        Parameters:
        k (int): Number of hash functions.
        n (int): Number of counters.
        cap (int): Number of bits per counter.
        """
        self.k = k
        self.n = n
        self.cap = cap

        # Calculate the number of counters that fit into a 64-bit integer
        counters_per_int = 64 // cap
        self.counters_per_int = counters_per_int

        # Calculate how many 64-bit integers we need
        num_ints = (n + counters_per_int - 1) // counters_per_int
        self.num_ints = num_ints

        # Initialize the bit array
        self.bit_array = np.zeros(num_ints, dtype=np.uint64)

    def _hashes(self, item):
        """Generate k hash values for the item using different seeds."""
        return [mmh3.hash(item, seed) % self.n for seed in range(self.k)]

    def _get_counter_index_and_offset(self, hash_value):
        """Calculate the index and bit offset for the given hash value."""
        int_index = hash_value // self.counters_per_int
        bit_offset = (hash_value % self.counters_per_int) * self.cap
        return int_index, bit_offset

    def put(self, item):
        """Insert an item into the Counting Bloom Filter."""
        for hash_value in self._hashes(item):
            int_index, bit_offset = self._get_counter_index_and_offset(hash_value)
            # Extract the counter
            mask = (1 << self.cap) - 1
            current_count = (self.bit_array[int_index] >> bit_offset) & mask
            # Increment the counter if it is not at its maximum
            if current_count < mask:
                self.bit_array[int_index] += (1 << bit_offset)

    def get(self, item):
        """Check if an item is in the Counting Bloom Filter."""
        for hash_value in self._hashes(item):
            int_index, bit_offset = self._get_counter_index_and_offset(hash_value)
            # Extract the counter
            mask = (1 << self.cap) - 1
            current_count = (self.bit_array[int_index] >> bit_offset) & mask
            if current_count == 0:
                return False
        return True

    def size(self):
        """Return the sum of all counters divided by k."""
        mask = (1 << self.cap) - 1
        total_count = 0
        for int_index in range(self.num_ints):
            value = self.bit_array[int_index]
            for _ in range(self.counters_per_int):
                total_count += value & mask
                value >>= self.cap
        return total_count / self.k

Низкий cap может привести к переполнению счетчиков, в то время как слишком высокий cap увеличивает использование памяти без необходимости.

Для разных задач оптимальные разные комбинации `cap`, `k`, `bf_size`

### Комбинации для экспериментов с Counting Bloom Filter

#### 1. Сценарий с высоким потоком данных
- **Параметры**: `cap = 2`, `k = 3`, `sample_size = 10,000`, `bf_size = 65,536 бит` (ближайшая степень двойки)
- **Описание**: Подходит для сценариев с высокой нагрузкой и частым добавлением/удалением элементов. Размер фильтра поддерживает низкую вероятность ложных срабатываний при большом ожидаемом количестве элементов.

#### 2. Сценарий с редкими изменениями
- **Параметры**: `cap = 5`, `k = 2`, `sample_size = 1,000`, `bf_size = 8,192 бит` (степень двойки)
- **Описание**: Для приложений, где элементы редко удаляются. Более высокий `cap` позволяет больше добавлений без переполнения, а меньший размер фильтра подходит для меньшего количества элементов.

#### 3. Баланс между точностью и использованием памяти
- **Параметры**: `cap = 3`, `k = 4`, `sample_size = 5,000`, `bf_size = 32,768 бит` (степень двойки)
- **Описание**: Баланс между использованием памяти и вероятностью ложных срабатываний, подходящий для средних по размеру наборов данных с необходимостью умеренной точности.

#### 4. Сценарий с высокой точностью
- **Параметры**: `cap = 4`, `k = 5`, `sample_size = 2,000`, `bf_size = 32,768 бит` (степень двойки)
- **Описание**: Максимизирует точность с большим количеством хэш-функций и умеренным `cap`, подходит для приложений, где важно минимизировать ложные срабатывания.

#### 5. Эксперимент с количеством битов счетчика, не делящимся на 64
- **Параметры**: `cap = 3`, `k = 3`, `sample_size = 3,000`, `bf_size = 16,384 бит` (степень двойки), **битов на счетчик**: 5
- **Описание**: Проверка нестандартной битности


In [96]:
def cap_experiment(cap: int,
                   k: int,
                   bf_size: int,
                   set_size: int) -> tuple[int, float]:
    """
    Runs experiment with cap bloom filter
    """
    gen_uniq_seq(f'cap_{set_size}.csv', 5000)
    counting_bloom_filter = CountingBloomFilter(cap=cap,
                                                k=k,
                                                n=bf_size)
    fp_count = 0

    with open(f'cap_{set_size}.csv') as file: 
        for line in file:
            if counting_bloom_filter.get(line):
                fp_count += 1
            counting_bloom_filter.put(line)
        ones_count = counting_bloom_filter.size()

    return fp_count, ones_count


In [97]:
def run_cap_experiments():
    """
    Defines experiments with counting Bloom Filter
    """
    experiments = [
        {"cap": 2, "k": 3, "bf_size": 65536, "set_size": 10000},
        {"cap": 5, "k": 2, "bf_size": 8192, "set_size": 1000},
        {"cap": 3, "k": 4, "bf_size": 32768, "set_size": 5000},
        {"cap": 4, "k": 5, "bf_size": 32768, "set_size": 2000},
        {"cap": 3, "k": 3, "bf_size": 16384, "set_size": 3000}  # 5-bit counter not directly specified in parameters
    ]

    results = []

    for exp in experiments:
        fp_count, ones_count = cap_experiment(
            cap=exp["cap"],
            k=exp["k"],
            bf_size=exp["bf_size"],
            set_size=exp["set_size"]
        )
        results.append({
            "cap": exp["cap"],
            "k": exp["k"],
            "bf_size": exp["bf_size"],
            "set_size": exp["set_size"],
            "fp_count": fp_count,
            "ones_count": ones_count
        })

    for result in results:
        print(f"Experiment: cap={result['cap']}, k={result['k']}, bf_size={result['bf_size']}, set_size={result['set_size']}")
        print(f"  False Positives: {result['fp_count']}, Ones in Filter: {result['ones_count']}")

run_cap_experiments()

0
0
0
0
0
Experiment: cap=2, k=3, bf_size=65536, set_size=10000
  False Positives: 6, Ones in Filter: 4999.0
Experiment: cap=5, k=2, bf_size=8192, set_size=1000
  False Positives: 1109, Ones in Filter: 5000.0
Experiment: cap=3, k=4, bf_size=32768, set_size=5000
  False Positives: 58, Ones in Filter: 5000.0
Experiment: cap=4, k=5, bf_size=32768, set_size=2000
  False Positives: 55, Ones in Filter: 5000.0
Experiment: cap=3, k=3, bf_size=16384, set_size=3000
  False Positives: 338, Ones in Filter: 5000.0


1. Большой размер фильтра (эксперимнет 1) значительно снижает количество ложных срабатываний, даже при низкой емкости счетчиков.

2. Маленький размер фильтра и высокое значение емкости счетчиков (эксперимент 2) приводят к высокому количеству ложных срабатываний из-за насыщения фильтра.

3. Сбалансированный размер фильтра и количество хэш-функций (эксперимент 3 и 4) обеспечивают умеренное количество ложных срабатываний, но увеличение числа хэш-функций выше определенного уровня не дает значительных улучшений.

4. Недостаточный размер фильтра (эксперимент 5) при умеренном количестве хэш-функций и емкости счетчиков увеличивает вероятность ложных срабатываний.

# Задание 4

Реализация HyperLogLog

Кажется, что аргументы инициализации k, n избыточны - ведь в общем случае hyperloglog принимает параметр точности b, который определяет число регистров. 

```
m, where m = 2^b
```

In [32]:
class HyperLogLog:
    def __init__(self, b: int):
        self.b = b
        self.m = 1 << b  # m = 2^b
        self.registers = [0] * self.m
        self.alpha_m = self.get_alpha_m(self.m)

    def get_alpha_m(self, m):
        # Constants for different m values
        if m == 16:
            return 0.673
        elif m == 32:
            return 0.697
        elif m == 64:
            return 0.709
        else:
            return 0.7213 / (1 + 1.079 / m)

    def hash(self, value):
        # Hash the value using MurmurHash and convert to a binary string
        return mmh3.hash(value, signed=False)

    def rho(self, w):
        # Position of the leftmost 1-bit in the binary representation of w
        # Add 1 because we want the position starting from 1, not 0
        return len(w) - len(w.lstrip('0')) + 1

    def put(self, item):
        x = self.hash(item)
        # Convert the hash to binary and ensure it has enough bits
        x_bin = bin(x)[2:].zfill(32)  

        # Use the first b bits for the register index
        j = int(x_bin[:self.b], 2)
        
        # Use the remaining bits to calculate the rank (rho)
        w = x_bin[self.b:]
        self.registers[j] = max(self.registers[j], self.rho(w))

    def est_size(self):
        # Calculate the harmonic mean of 2^-M[j]
        Z = sum(2.0 ** -reg for reg in self.registers)
        E = self.alpha_m * self.m * self.m / Z

        # Apply small range correction
        if E <= 2.5 * self.m:
            V = self.registers.count(0)
            if V > 0:
                E = self.m * math.log(self.m / V)

        # Apply large range correction
        elif E > (1 / 30.0) * (1 << 32):
            E = -(1 << 32) * math.log(1 - E / (1 << 32))

        return E

Планируем эксперименты с генерацией повторяющихся значений с использованием предоставленной функции `gen_grouped_seq`, также рассчитываем относительную ошибку

In [31]:
from utils import gen_grouped_seq
from typing import List

def run_experiment(pattern: List,
                   filename: str,
                   true_size: int,
                   b: int):
    # Generate the dataset
    gen_grouped_seq(filename, pattern)

    # Initialize HyperLogLog
    hll = HyperLogLog(b=b)

    # Read the dataset and insert keys into HyperLogLog
    with open(filename, "r") as f:
        for line in f:
            key = line.strip().split(':')[0]
            hll.put(key)

    # Estimate the size
    estimated_size = hll.est_size()
    
    # Calculate relative error
    relative_error = abs(estimated_size - true_size) / true_size

    print(f"Dataset: {filename}")
    print(f"True size: {true_size}")
    print(f"Estimated size: {estimated_size}")
    print(f"Relative error: {relative_error:.4%}\n")

# Run experiments with different sizes and patterns
run_experiment(pattern=[(500, 1), (10, 100)], filename="grouped_seq_500.txt", true_size=510, b=14)
run_experiment(pattern=[(40000, 1), (100, 100)], filename="grouped_seq_50000.txt", true_size=40100, b=14)
run_experiment(pattern=[(4000000, 1), (1000, 1000)], filename="grouped_seq_5000000.txt", true_size=4001000, b=18)

Dataset: grouped_seq_500.txt
True size: 510
Estimated size: 510.8829835851813
Relative error: 0.1731%

Dataset: grouped_seq_50000.txt
True size: 40100
Estimated size: 41545.498112794034
Relative error: 3.6047%

Dataset: grouped_seq_5000000.txt
True size: 4001000
Estimated size: 3995660.9960047817
Relative error: 0.1334%



Как можем видеть, относительная ошибка невысока. Для размера даннных 50 тыс. ошибка выше. Попробуем увеличить параметр точности с 14 до 18 

In [95]:
run_experiment(pattern=[(40000, 1), (100, 100)], filename="grouped_seq_50000.txt", true_size=40100, b=18)


Dataset: grouped_seq_50000.txt
True size: 40100
Estimated size: 40086.54171819347
Relative error: 0.0336%



Ошибка снизилась

# Задание 5

Васе подарили два csv-файла. Каждый из них содержит по 10 миллиардов строк.

Вася хочет сделать JOIN по их первым колонкам, но опасается, что по некоторым ключам будет слишком много записей и JOIN будет работать слишком долго.

Вася разбирается в том, как делать JOIN и он даже прибросил, что пороговым значением будет 60000. То есть если у двух таблиц будет общий ключ и в каждой таблице по этому ключу будет более 60000 записей - то будут проблемы.

Вася попробовал посчитать наивным скриптом через Counter, но не хватило памяти.

Напишите Васе скрипт. который посчитает и памяти которому хватит. Прочитать файлы несколько раз можно, но чем меньше - тем лучше. Два прохода по каждому файлу должно точно хватать. Но в некоторых случаях может хватить и меньшего. И этими случаями лучше воспользоваться.



## Сгенерируем файлы

Файлы с 10 млрд строк будут занимать очень много места ~1 TB, если генерировать с помощью предоставленных функций. 

Немного изменим генерацию ключей (уберем генерацию uuid для ключа), чтобы получать одинаковые ключи в обоих файлах.

In [10]:
import uuid
import random

# Modify the key generation to ensure overlap
def gen_grouped_seq_fixed_keys(name: str,
                               pattern: List,
                               *,
                               n_extra_cols: int = 0,
                               to_shuffle: bool = False):
    '''
    Generates keys without uuid
    '''
    def gen():
        num = 0
        for n_keys, n_records in pattern:
            for i1 in range(n_keys):
                # Fixed key for ensuring overlap
                body = f"key{i1 + num}"
                for i2 in range(n_records):
                    for j in range(n_extra_cols):
                        body += f",{uuid.uuid4()}"
                    yield body
            num += n_keys

    if to_shuffle:
        data = list(gen())
        random.shuffle(data)
        result = data
    else:
        result = gen()

    with open(name, "wt") as f:
        for v in result:
            print(v, file=f)


Сгенерируем файлы с паттерном, чтобы первые 10 ключей превышали заданный порог в `60 000`. Записей с такими ключами будет `700 000`, остальные 50 ключей будут иметь по `30 000` записей на каждый

In [11]:
from utils import gen_grouped_seq
import os

pattern = [(10, 70000), (50, 30000)]  # 10 keys with 70,000 records each, 50 keys with 30,000 records
gen_grouped_seq_fixed_keys("file1.csv", pattern, to_shuffle=True)
gen_grouped_seq_fixed_keys("file2.csv", pattern, to_shuffle=True)

Используем подход с контролем памяти - контролем размера `chunk_size` словаря. Когда словарь `key_counts` превышает установленный размер `chunk_size`, значения счетчиков записываются во временный файл. 

Читаем файл в два прохода: в первом считаем ключи и сохраняем во временные файлы, во втором - аггрегируем значения из временных файлов. 

In [13]:
import csv
from collections import defaultdict
import os
import tempfile

def count_keys(file_path: str,
               threshold: int) -> dict:
    """
    Counts keys and saves it into temp files,
    if memory threshold `chunk_size` was exceeded
    """
    temp_dir = tempfile.mkdtemp()
    chunk_size = 60000  # Memory limit
    key_counts = defaultdict(int)
    temp_files = []

    with open(file_path, newline='') as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            key = row[0]
            key_counts[key] += 1
            
            if len(key_counts) >= chunk_size:
                temp_file = os.path.join(temp_dir, f'temp_{len(temp_files)}.csv')
                with open(temp_file, 'w', newline='') as tempf:
                    writer = csv.writer(tempf)
                    for k, count in key_counts.items():
                        writer.writerow([k, count])
                temp_files.append(temp_file)
                key_counts.clear()
    
    if key_counts:
        temp_file = os.path.join(temp_dir, f'temp_{len(temp_files)}.csv')
        with open(temp_file, 'w', newline='') as tempf:
            writer = csv.writer(tempf)
            for k, count in key_counts.items():
                writer.writerow([k, count])
        temp_files.append(temp_file)
    
    # Aggregating values from temp files
    final_counts = defaultdict(int)
    for temp_file in temp_files:
        with open(temp_file, newline='') as tempf:
            reader = csv.reader(tempf)
            for row in reader:
                key, count = row
                final_counts[key] += int(count)
        os.remove(temp_file)
    
    os.rmdir(temp_dir)

    # Return keys which exceed the threshold
    return {k for k, count in final_counts.items() if count > threshold}

def find_common_heavy_keys(file1, file2, threshold):
    # Get 'exceeding' keys from each file
    heavy_keys_file1 = count_keys(file1, threshold)
    heavy_keys_file2 = count_keys(file2, threshold)
    
    # Find intersection - common keys
    common_heavy_keys = heavy_keys_file1.intersection(heavy_keys_file2)
    
    return common_heavy_keys


file1 = 'file1.csv'
file2 = 'file2.csv'
threshold = 60000

common_keys = find_common_heavy_keys(file1, file2, threshold)
print(f"Keys exceeding {threshold} occurrences in both files: {common_keys}")


Keys exceeding 60000 occurrences in both files: {'key2', 'key7', 'key8', 'key3', 'key5', 'key6', 'key1', 'key9', 'key4', 'key0'}


Ожидаемо десять ключей, которые в файлах идут первыми, превышают установленный порог. Что мы и наблюдаем в выводе программы

# Задание 6 

Для решения этой задачи мы можем использовать комбинацию Counting Bloom Filter и HyperLogLog. Эти структуры данных помогут нам оценить размер потенциального JOIN-а между двумя CSV-файлами, эффективно управляя памятью.

### Объяснение решения:
**Counting Bloom Filter**:

Используется для проверки наличия ключей из одного файла в другом.
В отличие от стандартного Bloom Filter, Counting Bloom Filter позволяет не только проверять наличие, но и учитывать количество вхождений, что полезно для нашего случая.
Он поможет с высокой вероятностью определить, пересекаются ли ключи из двух файлов.

**HyperLogLog**:

Используется для оценки количества уникальных ключей в каждом файле.
Это дает нам приблизительное представление о количестве уникальных ключей, что полезно для понимания масштабов пересечения.


Алгоритм: 

- Первый проход по файлам:

Создаем HyperLogLog для каждого файла, чтобы оценить количество уникальных ключей.
Создаем Counting Bloom Filter для отслеживания встреченных ключей.

- Второй проход (при необходимости):

Если количество уникальных ключей в обоих файлах не превышает 1 миллион, мы можем точно подсчитать пересечение, используя обычные множества.
Если количество уникальных ключей велико, и мы не можем точно подсчитать пересечение, используем Counting Bloom Filter для оценки количества общих ключей.

**Вывод результатов**:

Если количество общих ключей превышает 10 миллионов, мы можем просто сообщить, что размер JOIN-а велик.
В других случаях даем разумно точную оценку на основе данных из Counting Bloom Filter и HyperLogLog.

> [!NOTE] Для тестирования решения константы из задания были снижены до `100 000` (с 10 млн)

Подбираем следующие параметры для тестового прогона: 

```
HyperLogLog(b=14)
CountingBloomFilter(k=4, n=1000000, cap=4)
```

In [67]:
from typing import Callable

def process_file(file_path: str,
                 bloom_filter: Callable,
                 hll: Callable) -> None:
    with open(file_path, newline='') as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            key = row[0].strip()
            hll.put(key)
            bloom_filter.put(key)

def estimate_join_size(file1, file2):
    # Initialize HyperLogLog for estimating unique keys
    hll1 = HyperLogLog(b=14)
    hll2 = HyperLogLog(b=14)
    
    # Initialize Counting Bloom Filters
    bloom_filter1 = CountingBloomFilter(k=4, n=1000000, cap=4)
    bloom_filter2 = CountingBloomFilter(k=4, n=1000000, cap=4)
    
    # Process both files
    process_file(file1, bloom_filter1, hll1)
    process_file(file2, bloom_filter2, hll2)
    
    # Estimate unique counts
    unique_count1 = hll1.est_size()
    unique_count2 = hll2.est_size()
    
    # If both have less than 1 million unique keys, calculate exact intersection
    if unique_count1 <= 10_000 and unique_count2 <= 10_000:
        keys1 = set()
        keys2 = set()
        with open(file1, newline='') as csvfile:
            reader = csv.reader(csvfile)
            for row in reader:
                keys1.add(row[0].strip())
        
        with open(file2, newline='') as csvfile:
            reader = csv.reader(csvfile)
            for row in reader:
                keys2.add(row[0].strip())
        
        exact_intersection = keys1.intersection(keys2)
        print(f"Exact intersection: {len(exact_intersection)}")
        return len(exact_intersection)
    
    # Estimate intersection size using Bloom Filters
    intersection_estimate = 0
    with open(file1, newline='') as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            key = row[0].strip()
            if bloom_filter2.get(key):
                intersection_estimate += 1

    # Determine if the estimated intersection size suggests a large join
    if intersection_estimate > 100_000:
        return "> 100,000 (large join)"
    
    print(f"Estimated join size: {intersection_estimate}")
    return f"Estimated join size: {intersection_estimate}"

In [71]:
def gen_shared_keys(file1_path: str,
                    file2_path: str,
                    shared_keys: List,
                    unique_keys1: int,
                    unique_keys2: int) -> None:
    """Generate two files with shared keys and unique keys."""
    with open(file1_path, "w") as f1, open(file2_path, "w") as f2:
        for key in shared_keys:
            f1.write(f"{key}\n")
            f2.write(f"{key}\n")

        for _ in range(unique_keys1):
            f1.write(f"{uuid.uuid4()}\n")

        for _ in range(unique_keys2):
            f2.write(f"{uuid.uuid4()}\n")

def run_experiments():
    # Experiment 1: Files with less than 1 million unique keys for exact intersection
    gen_uniq_seq("file1_exact.csv", 5000)
    gen_uniq_seq("file2_exact.csv", 4500)
    print("Experiment 1: Exact Intersection Test")
    print(estimate_join_size("file1_exact.csv", "file2_exact.csv"))

    # Experiment 2: Non-intersecting sets with high confidence of zero intersection
    gen_uniq_seq("file1_non_intersect.csv", 100000)
    gen_uniq_seq("file2_non_intersect.csv", 101000)
    print("Experiment 2: Non-Intersecting Sets Test")
    print(estimate_join_size("file1_non_intersect.csv", "file2_non_intersect.csv"))

    # Experiment 3: Large join size detection (threshold 100,000)
    shared_keys_large = [str(uuid.uuid4()) for _ in range(105_000)]
    gen_shared_keys("file1_large_join.csv", "file2_large_join.csv", shared_keys_large, 50_000, 50_000)
    print("Experiment 3: Large Join Detection Test")
    print(estimate_join_size("file1_large_join.csv", "file2_large_join.csv"))

    # Experiment 4: Reasonably accurate estimation for moderate intersection
    shared_keys_moderate = [str(uuid.uuid4()) for _ in range(40_000)]
    gen_shared_keys("file1_moderate.csv", "file2_moderate.csv", shared_keys_moderate, 40_000, 40_000)
    print("Experiment 4: Moderate Intersection Estimation Test")
    print(estimate_join_size("file1_moderate.csv", "file2_moderate.csv"))

    # Clean up generated files
    os.remove("file1_exact.csv")
    os.remove("file2_exact.csv")
    os.remove("file1_non_intersect.csv")
    os.remove("file2_non_intersect.csv")
    os.remove("file1_large_join.csv")
    os.remove("file2_large_join.csv")
    os.remove("file1_moderate.csv")
    os.remove("file2_moderate.csv")

In [72]:
run_experiments()

0
0
Experiment 1: Exact Intersection Test
Exact intersection: 0
0
0
0
Experiment 2: Non-Intersecting Sets Test
Estimated join size: 1208
Estimated join size: 1208
Experiment 3: Large Join Detection Test
> 100,000 (large join)
Experiment 4: Moderate Intersection Estimation Test
Estimated join size: 40242
Estimated join size: 40242


Как мы видим, ошибка <2%