In [1]:
import uuid
import random
import mmh3
import math
from scipy.integrate import quad
import numpy as np

In [2]:
def gen_uniq_seq(name, n_records, n_extra_cols=0):
    with open(name, "wt") as f:
        for i in range(n_records):
            print(uuid.uuid4(), file=f, end="")
            for j in range(n_extra_cols):
                print(f",{uuid.uuid4()}", file=f, end="")
            print(file=f)


def gen_grouped_seq(name, pattern, *, n_extra_cols=0, to_shuffle=False):

    def gen():
        num = 0
        for n_keys, n_records in pattern:
            for i1 in range(n_keys):
                body = f"{i1 + num}:{uuid.uuid4()}"
                for i2 in range(n_records):
                    for j in range(n_extra_cols):
                        body += f",{uuid.uuid4()}"
                    yield body
            num += n_keys

    if to_shuffle:
        data = list(gen())
        random.shuffle(data)
        result = data
    else:
        result = gen()

    with open(name, "wt") as f:
        for v in result:
            print(v, file=f)


def random_merge(out_name, *in_names):
    fs = [open(fn, "rt") for fn in in_names]
    with open(out_name, "wt") as fout:
        while fs:
            f = random.choice(fs)
            s = f.readline()
            if not s:
                f.close()
                fs.remove(f)
                continue
            print(s, file=fout, end="")

Задание 1
Реализуем Bloom-Filter на одной хеш-функции

In [3]:
class BloomFilter:
    def __init__(self, n):
        self.k = 1
        self.size = n
        self.bit_array = 0

    def put(self, s):
        for i in range(self.k):
            index = mmh3.hash(s, i) % self.size
            self.bit_array |= (1 << index)

    def get(self, s):
        return all((self.bit_array & (1 << (mmh3.hash(s, i) % self.size))) != 0 for i in range(self.k))

    def get_size(self):
        return bin(self.bit_array).count('1') / self.k
    

Тестирование

In [4]:

bf_sizes = [8, 64, 1024, 64000]
set_sizes = [5, 50, 500, 5000]
k_values = [1]

for s in set_sizes:
    gen_uniq_seq("Size_" + str(s), s)

results = []

for k in k_values:
    print(f"\nРезультаты для k = {k}:")

    for i in bf_sizes:
        for j in set_sizes:
            bf = BloomFilter(i)
            fp_count = 0
            with open("Size_" + str(j), 'r') as file:
                for s in file:
                    result = bf.get(s.strip())
                    if result: 
                        fp_count += 1
                    bf.put(s.strip())

            ones_count = round(bf.get_size(), 2)

            results.append([k, i, j, fp_count, ones_count])

    print("bf_size  | set_size  | fp_count  | ones_count")
    print("---------|-----------|-----------|-----------")


    for k_val, bf_size, set_size, fp_count, ones_count in results:
        print(f"{bf_size:8} | {set_size:9} | {fp_count:9} | {ones_count:10}")

    results.clear()



Результаты для k = 1:
bf_size  | set_size  | fp_count  | ones_count
---------|-----------|-----------|-----------
       8 |         5 |         2 |        3.0
       8 |        50 |        42 |        8.0
       8 |       500 |       492 |        8.0
       8 |      5000 |      4992 |        8.0
      64 |         5 |         0 |        5.0
      64 |        50 |        14 |       36.0
      64 |       500 |       436 |       64.0
      64 |      5000 |      4936 |       64.0
    1024 |         5 |         0 |        5.0
    1024 |        50 |         1 |       49.0
    1024 |       500 |       103 |      397.0
    1024 |      5000 |      3994 |     1006.0
   64000 |         5 |         0 |        5.0
   64000 |        50 |         0 |       50.0
   64000 |       500 |         3 |      497.0
   64000 |      5000 |       188 |     4812.0


Результаты выглядят корректными: set_size = fp_count + ones_count, при увеличении размера словаря, вероятность коллизии уменьшается

Задание 2
Реализуем Bloom Filter на к хеш-функций

In [60]:
class BloomFilter:
    def __init__(self, k, n):
        self.k = k
        self.size = n
        self.bit_array = 0

    def put(self, s):
        for i in range(self.k):
            index = mmh3.hash(s, i) % self.size
            self.bit_array |= (1 << index)

    def get(self, s):
        return all((self.bit_array & (1 << (mmh3.hash(s, i) % self.size))) != 0 for i in range(self.k))

    def get_size(self):
        return bin(self.bit_array).count('1') / self.k

In [61]:
bf_sizes = [8, 64, 1024, 64000]
set_sizes = [5, 50, 500, 5000]
k_values = [1, 2, 3, 4]

results = []

for k in k_values:
    print(f"\nРезультаты для k = {k}:")

    for i in bf_sizes:
        for j in set_sizes:
            bf = BloomFilter(k, i)
            fp_count = 0
            with open("Size_" + str(j), 'r') as file:
                for s in file:
                    result = bf.get(s.strip())
                    if result: 
                        fp_count += 1
                    bf.put(s.strip())

            ones_count = round(bf.get_size(), 2)

            results.append([k, i, j, fp_count, ones_count])

    print("bf_size  | set_size  | fp_count  | ones_count")
    print("---------|-----------|-----------|-----------")


    for k_val, bf_size, set_size, fp_count, ones_count in results:
        print(f"{bf_size:8} | {set_size:9} | {fp_count:9} | {ones_count:10}")

    results.clear()



Результаты для k = 1:
bf_size  | set_size  | fp_count  | ones_count
---------|-----------|-----------|-----------
       8 |         5 |         2 |        3.0
       8 |        50 |        42 |        8.0
       8 |       500 |       492 |        8.0
       8 |      5000 |      4992 |        8.0
      64 |         5 |         0 |        5.0
      64 |        50 |        14 |       36.0
      64 |       500 |       436 |       64.0
      64 |      5000 |      4936 |       64.0
    1024 |         5 |         0 |        5.0
    1024 |        50 |         1 |       49.0
    1024 |       500 |       103 |      397.0
    1024 |      5000 |      3994 |     1006.0
   64000 |         5 |         0 |        5.0
   64000 |        50 |         0 |       50.0
   64000 |       500 |         3 |      497.0
   64000 |      5000 |       188 |     4812.0

Результаты для k = 2:
bf_size  | set_size  | fp_count  | ones_count
---------|-----------|-----------|-----------
       8 |         5 |         2 |

Результаты выглядят корректными:
1) для 1 хеш функции результат совпадает с предыдущим пунктом
2) Вероятность коллизии зависит не только  от размера словаря, но и колчества хеш-функций

Задание 3 Реализуем Counting Bloom Filter на к хеш-функций.

In [7]:
class CountingBloomFilter:
    def __init__(self, k, n, cap):
        self.k = k
        self.n = n
        self.cap = cap
        self.array = np.zeros((n // cap + 1, cap), dtype=np.uint32)
        self.t = list(range(k)) 

    def put(self, item):
        for t in self.t:
            index = mmh3.hash(item, t) % (self.n // self.cap)
            pos = t % self.cap
            if self.array[index][pos] < self.cap: 
                self.array[index][pos] += 1

    def get(self, item):
        for seed in self.t:
            index = mmh3.hash(item, seed) % (self.n // self.cap)
            if self.array[index][seed % self.cap] == 0:
                return False
        return True

    def get_size(self):

        return np.sum(self.array)/k

Проверим реализацию

In [8]:
bf_sizes = [1024, 8192, 524288]
set_sizes = [5, 50, 5000]
k_values = [1, 2, 3]
cap_values = [1, 5, 10]  

result = []
for k in k_values:
    print(f"\nРезультаты для k = {k}:")    
    for cap in cap_values:
        for n in bf_sizes:
            for size in set_sizes:
                bf = CountingBloomFilter(k, n, cap)
                fp_count = 0
                with open("Size_" + str(size), 'r') as file:
                    for s in file:
                        if bf.get(s):
                            fp_count += 1
                        bf.put(s)

                result.append([cap, n, size, fp_count, np.round(bf.get_size(),2)])  
    
    print("bf_size  | set_size  | cap    | fp_count  | ones_count")
    print("---------|-----------|--------|-----------|-----------")

    for cap_val, bf_size, set_size, fp_count, ones_count in result:  
        print(f"{bf_size:8} | {set_size:9} | {cap_val:6} | {fp_count:9} | {ones_count:10}")

    result.clear() 



Результаты для k = 1:
bf_size  | set_size  | cap    | fp_count  | ones_count
---------|-----------|--------|-----------|-----------
    1024 |         5 |      1 |         0 |        5.0
    1024 |        50 |      1 |         2 |       48.0
    1024 |      5000 |      1 |      3984 |     1016.0
    8192 |         5 |      1 |         0 |        5.0
    8192 |        50 |      1 |         0 |       50.0
    8192 |      5000 |      1 |      1265 |     3735.0
  524288 |         5 |      1 |         0 |        5.0
  524288 |        50 |      1 |         0 |       50.0
  524288 |      5000 |      1 |        22 |     4978.0
    1024 |         5 |      5 |         0 |        5.0
    1024 |        50 |      5 |         4 |       50.0
    1024 |      5000 |      5 |      4796 |     1020.0
    8192 |         5 |      5 |         0 |        5.0
    8192 |        50 |      5 |         3 |       50.0
    8192 |      5000 |      5 |      3449 |     4756.0
  524288 |         5 |      5 |         0 

При cap = 1 результат совпадает с предыдущими реализациями. Вероятность коллизии зависит не только  от размера словаря( и cap соответственно), но и колчества хеш-функций

Задание 4

Реализуем HyperLogLog

In [43]:
class HyperLogLog:
    def __init__(self, b):
        self.b = b
        self.m = 1 << b
        self.M = [0] * self.m

    def put(self, s):
        
        x = mmh3.hash(s, signed=False)
        x_bin = bin(x)[2:].zfill(32)
        j = int(x_bin[:self.b], 2)
        w = x_bin[self.b:]

        r = next((i + 1 for i, bit in enumerate(w) if bit == '1'), len(w) + 1)
        
        self.M[j] = max(self.M[j], r)

    def est_size(self):
        z = sum(2 ** (-m) for m in self.M)
        Z = 1 / z
        
        alpha_m = 0.7213 / (1 + 1.079 / self.m) if self.m >= 128 else \
                   (self.m * quad(lambda x: math.log2((2+x)/(1+x))**self.m, 0, float('+inf'))[0]) ** (-1)
        
        E = alpha_m * self.m**2 * Z
        
        if E <= 2.5 * self.m:
            V = sum(1 for m in self.M if m == 0)
            if V > 0:
                E = self.m * math.log(self.m / V)
        elif E > (1/30) * (1 << 32):
            E = -(1 << 32) * math.log(1 - E / (1 << 32))
        
        return round(E)


In [44]:
b = [1, 2, 8]
for i in b:
        hyper_log_log = HyperLogLog(i)
        with open("1_vers",'r') as file:
            for s in file:
                result = s.strip().split(':')[0]
                hyper_log_log.put(result)
        est_size = hyper_log_log.est_size()
        error = np.round(abs(est_size - 3000) / 3000, 2)
        print("b  | est_size  | error ")
        print("---|-----------|-------")
        print(f"{i:2} | {est_size:9} | {error:7} ")
        print("---|-----------|-------")

b  | est_size  | error 
---|-----------|-------
 1 |       479 |    0.84 
---|-----------|-------
b  | est_size  | error 
---|-----------|-------
 2 |       793 |    0.74 
---|-----------|-------
b  | est_size  | error 
---|-----------|-------
 8 |      1060 |    0.65 
---|-----------|-------


Сгенерируем 2 случая: уникальный набор и неуникальный

In [45]:
gen_grouped_seq ("1_vers",  [(1000, 1), (1, 2000)])
gen_uniq_seq ("2_vers",  3000)

In [46]:
b = [1, 2, 8]
for i in b:
        hyper_log_log = HyperLogLog(i)
        with open("2_vers",'r') as file:
            for s in file:
                result = s.strip().split(':')[0]
                hyper_log_log.put(result)
        est_size = hyper_log_log.est_size()
        error = np.round(abs(est_size - 3000) / 3000, 2)
        print("b  | est_size  | error ")
        print("---|-----------|-------")
        print(f"{i:2} | {est_size:9} | {error:7} ")
        print("---|-----------|-------")

b  | est_size  | error 
---|-----------|-------
 1 |     10229 |    2.41 
---|-----------|-------
b  | est_size  | error 
---|-----------|-------
 2 |     10737 |    2.58 
---|-----------|-------
b  | est_size  | error 
---|-----------|-------
 8 |      2902 |    0.03 
---|-----------|-------


Наблюдаем, что из-за дубликатов общее количество элементов может быть меньше ожидаемого, что приводит к снижению точности подсчета. При увеличении b увеличивается точность предсказания.

Задание 5

Реализуем функцию для поиска частных ключей. В нашей задаче возьмем как предельный случай k = 10 миллиардов / 60000

In [47]:
class Counter:
    def __init__(self, k: int, threshold: int):
        self.k = k
        self.threshold = threshold
        self.items = {}
        self.processed_count = 0
        self.frequent_items = {}

    def add_one(self, item: str):
        self.processed_count += 1

        if item in self.items:
            self.items[item] += 1
            if self.items[item] > self.threshold:
                self.frequent_items[item] = self.items[item]
        else:
            if len(self.items) < self.k - 1:
                self.items[item] = 1
            else:
                items_copy = self.items.copy()
                i = 0
                while i < len(items_copy):
                    key, value = list(items_copy.items())[i]
                    items_copy[key] = max(value - 1, 0)
                    if items_copy[key] == 0:
                        del items_copy[key]
                        i -= 1
                    i += 1
                self.items = items_copy

    def add(self, items: list):
        for item in items:
            self.add_one(item)

    def get(self, key: str):
        return self.items.get(key, 0)

    def get_frequent_items(self):
        return self.frequent_items

def find_keys(filename: str, k: int, threshold: int):
    h = Counter(k, threshold)

    with open(filename, 'r') as f:
        for line in f:
            items = line.split()
            h.add(items)

    return h.get_frequent_items()

Сгенерируем тестовые файлы

In [48]:
gen_grouped_seq ("1_csv",  [(2000, 1), (6, 500)])
gen_grouped_seq ("2_csv",  [(3000, 1), (3, 10000)] )

Соберем тяжелые ключи из каждого файла

In [49]:
keys_1 = find_keys("1_csv", 10, 400)
keys_1

{'2000:df55c821-8bc9-4cbd-aedb-a017fb207c24': 500,
 '2001:6c0e5f23-66e1-4741-a223-b0a3acee2423': 500,
 '2002:f0604c9e-8b1c-40e6-899f-50649d20706e': 500,
 '2003:4e44b9c9-d373-4081-952e-9b734ccf490a': 500,
 '2004:f372f192-cb78-46ff-83f6-3e032a136e2a': 500,
 '2005:87b5cd26-54a8-4ae7-9f89-61f91014b648': 500}

In [50]:
keys_2 = find_keys("2_csv", 10, 400)
list(keys_2.keys())

['3000:4b6f0e7e-3337-42fe-9067-91c357b85d45',
 '3001:8b1900b2-864d-4074-bd66-4a6c0f4a3dba',
 '3002:b9e06965-bc38-4620-82bb-e2e74ac4e35f']

Если у двух таблиц будет общий ключ и в каждой таблице по этому ключу будет более 60000 записей - то будут проблемы. Найдем пересечение ( если такое найдется, то исключим записи по этому ключу из джойна)

In [51]:
conflicting_keys  = [key for key in keys_1 if key in keys_2]
conflicting_keys

[]

In [52]:
conflicting_keys  = [key for key in keys_1 if key in keys_1]
conflicting_keys

['2000:df55c821-8bc9-4cbd-aedb-a017fb207c24',
 '2001:6c0e5f23-66e1-4741-a223-b0a3acee2423',
 '2002:f0604c9e-8b1c-40e6-899f-50649d20706e',
 '2003:4e44b9c9-d373-4081-952e-9b734ccf490a',
 '2004:f372f192-cb78-46ff-83f6-3e032a136e2a',
 '2005:87b5cd26-54a8-4ae7-9f89-61f91014b648']

Задание 6

Оценим количество потенциальных ключей в каждом из файлов

In [189]:
gen_grouped_seq ("6_1",  [(70000, 1), (5, 10000)] , to_shuffle= True)
gen_uniq_seq("6_2", 10000)

In [190]:
def split_file():
    with open("6_1", 'r') as file:
        content = file.read()
    half_size = len(content) // 2
    first_half = content[:half_size]
    second_half = content[half_size:]
    with open("6_3", 'w') as new_file:
        new_file.write(first_half)
split_file()

In [191]:
b = [16]
for i in b:
        hyper_log_log = HyperLogLog(i)
        with open("6_1",'r') as file:
            for s in file:
                result = s.strip().split(':')[0]
                hyper_log_log.put(result)
print("количество ключей", hyper_log_log.est_size())

количество ключей 69851


In [192]:
b = [16]
for i in b:
        hyper_log_log = HyperLogLog(i)
        with open("6_2",'r') as file:
            for s in file:
                result = s.strip().split(':')[0]
                hyper_log_log.put(result)
print("количество ключей", hyper_log_log.est_size())

количество ключей 9945


In [181]:
b = [16]
for i in b:
        hyper_log_log = HyperLogLog(i)
        with open("6_3",'r') as file:
            for s in file:
                result = s.strip().split(':')[0]
                hyper_log_log.put(result)
        est_size = hyper_log_log.est_size()
print("количество ключей", hyper_log_log.est_size())

количество ключей 24986


Если в результате суммарное количество уникальных ключей будет безопасно для полного джойна, то можно остановиться на этом моменте.

Найдем оптимальные парметры для фильтра ( сразу заложим по объемам 2-х ключей, предполагая что вдруг они все уникальные будут...)

In [182]:
def optimal_size(n, p):
    return -int(n * math.log(p) / (math.log(2) ** 2))

def optimal_hashes(m, n):
    return max(1, int((m / n) * math.log(2)))

In [204]:
optimal_size(100000, 0.01)

958505

In [205]:
optimal_hashes (958505, 100000)

6

Заполним наш фильтр, применим его к другому файлу

In [206]:
bf = BloomFilter(6, 958505)
with open("6_1", 'r') as file:
    for s in file:
        bf.put(s.strip())
ones_count = round(bf.get_size(), 2)
ones_count

56689.67

In [207]:
with open("6_3", 'r') as file:
    for s in file:
        bf.put(s.strip())
ones_count = round(bf.get_size(), 2)
ones_count

56690.5

Здесь можем наблюдать, что т.к один был образован из другого у нас практически не поменялся размер

In [210]:
bf = BloomFilter(6, 958505)
with open("6_1", 'r') as file:
    for s in file:
        bf.put(s.strip())
ones_count = round(bf.get_size(), 2)
ones_count

56689.67

In [209]:
with open("6_2", 'r') as file:
    for s in file:
        bf.put(s.strip())
ones_count = round(bf.get_size(), 2)
ones_count

57136.0

Здесь уже поймали уникальные значения