In [70]:
 pip install mmh3

Note: you may need to restart the kernel to use updated packages.


In [71]:
import mmh3
import math
import numpy as np
import pandas as pd
from collections import defaultdict

# Задание 1

Реализуем Bloom-Filter на одной хеш-функции. Будем использовать битовую арифметику на целых числах.</b>

In [3]:
class BloomFilter:
    def __init__(self, n):
        self.n = n
        self.bit_array = 0
        self.bit_size = 0

    def put(self, s):
        # Генерируем хеш и устанавливаем соответствующий бит в 1
        hash_value = mmh3.hash(s) % self.n
        bit_value = 1 << hash_value

        if (self.bit_array & bit_value) == 0:
            self.bit_size += 1
        self.bit_array |= bit_value

    def get(self, s):
        # Проверяем, установлен ли соответствующий бит
        hash_value = mmh3.hash(s) % self.n
        return (self.bit_array & (1 << hash_value)) != 0

    def size(self):
        return self.bit_size

Проведем эксперименты:

In [4]:
bf_sizes = [8, 64, 1024, 64 * 1024, 16 * 1024 * 1024]  # размеры Bloom-фильтра
set_sizes = [5, 50, 500, 5000, 5000000]  # размеры наборов уникальных строк

results = []

for bf_size in bf_sizes:
    for set_size in set_sizes:
        bf = BloomFilter(bf_size)
        unique_strings = [f"string_{i}" for i in range(set_size)]
        fp_count = 0

        for s in unique_strings:
            if bf.get(s):
                fp_count += 1
            bf.put(s)

        ones_count = bf.size()
        results.append((bf_size, set_size, fp_count, ones_count))

df = pd.DataFrame(results)

In [12]:
df

Unnamed: 0,0,1,2,3
0,8,5,2,3
1,8,50,42,8
2,8,500,492,8
3,8,5000,4992,8
4,8,5000000,4999992,8
5,64,5,0,5
6,64,50,14,36
7,64,500,436,64
8,64,5000,4936,64
9,64,5000000,4999936,64


# Задание 2

Реализуем Bloom Filter на *к* хеш-функций.

In [74]:
class BloomFilter_k:
    def __init__(self, k, n):
        self.k = k  # Количество хеш-функций
        self.n = n
        self.bit_array = 0
        self.bit_size = 0

    def put(self, s):
        for i in range(self.k):
            hash_value = mmh3.hash(s, i) % self.n
            bit_value = 1 << hash_value

            if (self.bit_array & bit_value) == 0:
                self.bit_size += 1
            self.bit_array |= bit_value

    def get(self, s):
        for i in range(self.k):
            hash_value = mmh3.hash(s, i) % self.n
            if (self.bit_array & (1 << hash_value)) == 0:
                return False  # Если хотя бы один бит не установлен, возвращаем False
        return True

    def size(self):
        return self.bit_size / self.k

Проведем эксперименты:

In [77]:
bf_sizes = [8, 64, 1024, 64 * 1024, 16 * 1024 * 1024]
set_sizes = [5, 50, 500, 5000, 500000]
k_values = [1, 2, 3, 4]

results = []

for bf_size in bf_sizes:
    for set_size in set_sizes:
        for k in k_values:
            bf = BloomFilter_k(k, bf_size)
            fp_count = 0

            unique_strings = [f"str_{i}" for i in range(set_size)]

            for s in unique_strings:
                if bf.get(s):
                    fp_count += 1
                bf.put(s)

            results.append((bf_size, set_size, k, fp_count, bf.size()))

df1 = pd.DataFrame(results)

In [78]:
pd.set_option('display.max_rows', None)

In [79]:
df1.columns = ['bf_size', 'set_size', 'k', 'fp_count', 'size_k']
df1['size_k'] = df1['size_k'].astype(int)

df1

Unnamed: 0,bf_size,set_size,k,fp_count,size_k
0,8,5,1,1,4
1,8,5,2,1,2
2,8,5,3,1,2
3,8,5,4,1,2
4,8,50,1,42,8
5,8,50,2,43,4
6,8,50,3,44,2
7,8,50,4,46,2
8,8,500,1,492,8
9,8,500,2,493,4


# Задание 3

Реализуем Counting Bloom Filter на *к* хеш-функций.</br>
*сap* - количество битов, отведенных на счетчик (1 даст по сути Bloom Filter)

In [29]:
class CountingBloomFilter:
    def __init__(self, k, n, cap):
        self.k = k
        self.n = n
        self.cap = cap
        self.counters = np.zeros(n, dtype=np.uint8)

    def put(self, s):
        for i in range(self.k):
            hash_value = mmh3.hash(s, i) % self.n
            if self.counters[hash_value] < self.cap:
                self.counters[hash_value] += 1

    def get(self, s):
        for i in range(self.k):
            hash_value = mmh3.hash(s, i) % self.n
            if self.counters[hash_value] == 0:
                return False
        return True

    def size(self):
        return np.sum(self.counters) / self.k

Попробуем подобрать интересные комбинации значений.
*   размер фильтра 1024 и 65536.
*   размер уникальных строк 500 и 5000.
*   количество хеш-функций 1, 2, 3, 4.
*   cap = 3 (каждый счетчик может хранить значения от 0 до 7).



Проведем эксперименты:

In [70]:
bf_sizes = [1024, 64 * 1024]
set_sizes = [500, 5000]
k_values = [1, 2, 3, 4]
cap_values = [3]

results = []

for bf_size in bf_sizes:
    for set_size in set_sizes:
        for k in k_values:
            for cap in cap_values:
                bf = CountingBloomFilter(k, bf_size, cap)
                fp_count = 0

                unique_strings = [f"str_{i}" for i in range(set_size)]

                for s in unique_strings:
                    if bf.get(s):
                        fp_count += 1
                    bf.put(s)

                ones_count = bf.size()
                results.append((bf_size, set_size, k, cap, fp_count, ones_count))

df2 = pd.DataFrame(results, columns=['bf_size', 'set_size', 'k', 'cap', 'fp_count', 'ones_count'])

In [71]:
df2['ones_count'] = df2['ones_count'].astype(int)
df2

Unnamed: 0,bf_size,set_size,k,cap,fp_count,ones_count
0,1024,500,1,3,109,498
1,1024,500,2,3,74,492
2,1024,500,3,3,86,474
3,1024,500,4,3,92,451
4,1024,5000,1,3,3982,2873
5,1024,5000,2,3,4229,1532
6,1024,5000,3,3,4377,1023
7,1024,5000,4,3,4465,768
8,65536,500,1,3,2,500
9,65536,500,2,3,0,500


При размере 1024 и количестве уникальных строк 500, использование более одного хеша (k=2, 3, 4) показывает, что количество ложных срабатываний (fp_count) уменьшается с увеличением числа хешей. При увеличении размера уникальных строк до 5000 заметно увеличивается количество ложных срабатываний. </br>
С увеличением размера фильтра до 65536, количество ложных срабатываний снижается, следовательно, увеличение фильтра снижает вероятность коллизий. </br>
Увеличение числа хеш-функций позволяет более точно распределять данные по фильтру и снижает вероятность ложных срабатываний.
Например, при n=1024 и set_size=500 с 4 хеш-функциями число ложных срабатываний снижается до 92, в то время как при 1 хеш-функции оно составляет 109.

# Задание 4

Реализуем HyperLogLog.

In [50]:
class HyperLogLog:
    def __init__(self, b):
        self.b = b
        self.m = 1 << b
        self.registers = np.zeros(self.m, dtype=int)

    def put(self, s):
        hash_value = mmh3.hash(s, seed=0)
        index = hash_value & (self.m - 1)
        w = hash_value >> self.b
        self.registers[index] = max(self.registers[index], self._rho(w))

    def _rho(self, w):
        return (w & -w).bit_length()

    def est_size(self):
        alpha_m = 0.7213 / (1 + 1.079 / self.m)
        Z = 1 / sum([2 ** -float(reg) for reg in self.registers])

        E = alpha_m * self.m * self.m * Z

        if E <= (5.0 / 2.0) * self.m:
            V = np.count_nonzero(self.registers == 0)  # Количество пустых регистров
            if V > 0:
                E = self.m * np.log(self.m / V)  # Корректируем оценку
        elif E > (1 << 32):  # Если больше 2^32
            E = -(2 ** 32) * np.log(1 - E / (2 ** 32))  # Используем другую формулу

        return int(E)

In [60]:
b_values = [4, 8, 12, 16]
set_sizes = [500, 5000, 5000000]

results = []

for b in b_values:
    for set_size in set_sizes:
        hll = HyperLogLog(b)

        unique_strings = [f"str_{i}" for i in range(set_size)]

        for s in unique_strings:
            hll.put(s)

        estimated_size = hll.est_size()
        results.append((b, set_size, estimated_size, abs(len(unique_strings) - estimated_size)))

df3 = pd.DataFrame(results, columns=['b', 'set_size', 'estimated_size', 'error'])

In [61]:
df3

Unnamed: 0,b,set_size,estimated_size,error
0,4,500,451,49
1,4,5000,5059,59
2,4,5000000,3609751,1390249
3,8,500,540,40
4,8,5000,4420,580
5,8,5000000,4874121,125879
6,12,500,499,1
7,12,5000,5076,76
8,12,5000000,5097571,97571
9,16,500,499,1


Как видно из результатов, при увеличении размера множества с 500 до 5000000, оценка оставалась достаточно близкой к реальному значению. </br>
Параметр b напрямую влияет на точность оценки. С увеличением до 16 наблюдается значительное уменьшение ошибки. </br>
В большинстве случаев, даже при высоком значении set_size, ошибка принимает адекватные значения, что свидетельствует о стабильности и эффективности алгоритма.

# Задание 5

In [80]:
import csv
from collections import defaultdict

def misra_gries(filepath, filelen, threshold):
    k = int(filelen // threshold)
    counters = {}

    with open(filepath, 'r') as f:
        reader = csv.reader(f)
        for row in reader:
            key = row[0]
            if key in counters:
                counters[key] += 1
            elif len(counters) < k:
                counters[key] = 1
            else:
                remove_keys = []
                for candidate in counters:
                    counters[candidate] -= 1
                    if counters[candidate] == 0:
                        remove_keys.append(candidate)
                for candidate in remove_keys:
                    del counters[candidate]
    
    return set(counters.keys())

def count_exact(filepath, candidates):
    counts = defaultdict(int)
    with open(filepath, 'r') as f:
        reader = csv.reader(f)
        for row in reader:
            key = row[0]
            if key in candidates:
                counts[key] += 1
    return counts

def find_heavy_hitters(file1, file2, filelen, threshold):
    candidates = misra_gries(file1, filelen, threshold)
    
    counts2 = count_exact(file2, candidates)
    filtered = {key for key in candidates if counts2[key] >= threshold}
    
    if not filtered:
        return []

    counts1 = count_exact(file1, filtered)
    heavy = [key for key in filtered if counts1[key] >= threshold]
    return heavy


In [81]:
heavy_keys = find_heavy_hitters("data/file1.csv", "data/file2.csv", 200_000_000, 60000)
print(f"Количество проблемных ключей: {len(heavy_keys)}")

Количество проблемных ключей: 54


In [60]:
import csv
import os
import random
import tempfile

def generate_shuffled_large_csv(file_path, total_rows, common_keys, max_dups_per_key, is_first_file, num_shards=100):
    key_prefix = "A" if is_first_file else "B"
    common_prefix = "common"

    print(f"Начинаем генерацию в шардированные файлы.")

    temp_dir = tempfile.mkdtemp()
    shard_paths = [os.path.join(temp_dir, f"shard_{i:03d}.csv") for i in range(num_shards)]
    shard_files = [open(path, "w", newline='') for path in shard_paths]
    shard_writers = [csv.writer(f) for f in shard_files]

    rows_written = 0

    for i in range(common_keys):
        key = f"{common_prefix}_{i:07d}"
        count = random.randint(1, max_dups_per_key)
        for _ in range(count):
            shard = random.randint(0, num_shards - 1)
            shard_writers[shard].writerow([key])
        rows_written += count

    remaining = total_rows - rows_written
    group_id = 0
    while remaining > 0:
        key = f"{key_prefix}_{group_id:012d}"
        repeats = random.randint(1, max_dups_per_key)
        if repeats > remaining:
            repeats = remaining
        for _ in range(repeats):
            shard = random.randint(0, num_shards - 1)
            shard_writers[shard].writerow([key])
        remaining -= repeats
        rows_written += repeats
        group_id += 1

    for f in shard_files:
        f.close()

    print(f"Генерация завершена. Начинаем объединение шардов с перемешкой.")

    with open(file_path, "w", newline='') as out_file:
        writer = csv.writer(out_file)
        shard_indices = list(range(num_shards))
        random.shuffle(shard_indices)
        for i in shard_indices:
            with open(shard_paths[i], "r") as shard_file:
                lines = shard_file.readlines()
                random.shuffle(lines)
                out_file.writelines(lines)

    for path in shard_paths:
        os.remove(path)
    os.rmdir(temp_dir)

    print(f"Готово! Файл сохранён: {file_path}")

In [58]:
generate_shuffled_large_csv("data/file1.csv", 200_000_000, 1000, 80000, True, 1000)

Начинаем генерацию в шардированные файлы.
[data/file1.csv] Сгенерировано 200,000,000 строк...
Генерация завершена. Начинаем объединение шардов с перемешкой.
Готово! Файл сохранён: data/file1.csv


In [62]:
generate_shuffled_large_csv("data/file2.csv", 200_000_000, 1000, 80000, False, 1000)

Начинаем генерацию в шардированные файлы.
Генерация завершена. Начинаем объединение шардов с перемешкой.
Готово! Файл сохранён: data/file2.csv
