This notebook aims at applying the first stream processing technique to the dataset, namely a count of the distinct elements.\
We will be using the Flajolet-Martin algorithm to do so, with the goal to count the number of distinct actors in the actors.csv file.\
The Kaggle page tells us that the actual number of unique names is 1 513 888, value that we will be comparing to our result.\
The algorithm has to be re implemented from scracth, and 2 versions will be implemented, a simple version with only one hash function, and a refined version using multiple hash functions and aggregating their results.\
It will be applied to a downsampled version of the dataset by default, with the possibility to use the full dataset.\
We will compare the results with the successor of the Flajolet-Martin, namely HyperLogLog (HLL), that we will import from a library.

In [1]:
# Import the needed modules
import csv
import hashlib
import random
import sys
import mmh3
import math

In [2]:
# Define the global variables
real_count = 1513888
sampled = False
sr = 0.1 if sampled else 1.0
phi = 0.77351

In [3]:
# Let's define a Flajolet-Martin class that encapsulates all the functions needed to perform the steps of the algorithm
class FlajoletMartinSimple:
    # Only thing we need to estimate the cardinality is the max number of leading zeros (tail length) of the hash values
    def __init__(self):
        self.max_tail_length = 0

    # We need to use a hash function to hash the values, here we use SHA-256
    # The output set of the hash function has to be bigger than the set of elements we are using, with sha256 we should be good
    def _hash(self, value):
        hash_value = hashlib.sha256(value.encode('utf8')).hexdigest()
        return int(hash_value, 16)

    # We need the tail length of the binary representation of the hash value
    # Note that we could use the number of leading zeros, but it is apparently not completely equivalent because of the distribution of the values
    def _tail_length(self, hash_value):
        binary_hash = bin(hash_value)[2:]  # Convert hash to binary string and remove '0b' prefix added automatically by the bin function
        # We want the number of zeros at the end of the binary representation
        return len(binary_hash) - len(binary_hash.rstrip('0'))

    # For each value in the dataset we will process it, namely computing its hash, the tail length, and update the max value if needed
    def process(self, value):
        hash_value = self._hash(value)
        tail_length = self._tail_length(hash_value)
        self.max_tail_length = max(self.max_tail_length, tail_length)

    # According to the Flajolet-Martin algorithm, the cardinality is estimated as 2^max_tail_length
    def estimate_cardinality(self):
        return 2 ** self.max_tail_length

In [4]:
# Define a process csv function that will apply the algortihm to the stream of data
def process_csv(file, fm, sample_rate=1.0):
    with open(file, 'r', encoding='utf-8') as csvfile:
        # We will make use of the DictReader class to read the file more easily (dictionary for each row, headers handled automatically)
        csvreader = csv.DictReader(csvfile)
        counter = 0
        # We treat the file as a stream, processing each row independently
        for row in csvreader:
            # We use the sample rate to choose at random a certain fraction of the data
            if random.random() <= sample_rate:
                actor = row['name']
                fm.process(actor)
            counter += 1

    return fm.estimate_cardinality(), counter

In [5]:
# Now we can use everything we defined to process our data
fm = FlajoletMartinSimple()
cardinality, count = process_csv('data/actors.csv', fm, sample_rate=sr)
print(f"Estimated number of unique names : {cardinality}")
print(f"Total number of names processed : {count}")
if not sampled:
    print(f"Real number of unique names : {real_count}")
    error = abs(cardinality - real_count) / real_count
    print(f"Relative error : {error:.2%}")

Estimated number of unique names : 1048576
Total number of names processed : 5523327
Real number of unique names : 1513888
Relative error : 30.74%


With the simple Flajolet-Martin algorithm, we can process all 5.5 million elements in the actors file in less than 30 seconds, and we get an estimate with a relative error of 30%.\
After diving a bit more into the paper, it turns out that the FM algorithm introduces a correction factor $\phi$ to account for bias. The exact calculations are detailed in the original paper, but we get that $\phi = 0.77351$, and the estimate is $2^r/\phi$. Dividing by a factor of 0.7 means an increase of about 30%, which corresponds to our relative error. If we introduce the correction, we should see the basic FM algorithm produce quite good results !

In [6]:
# Let's try to re run the algorithm, this time applying the correction factor
cardinality, count = process_csv('data/actors.csv', fm, sample_rate=sr)
cardinality = int(cardinality / phi)
print(f"Estimated number of unique names : {cardinality}")
print(f"Total number of names processed : {count}")
if not sampled:
    print(f"Real number of unique names : {real_count}")
    error = abs(cardinality - real_count) / real_count
    print(f"Relative error : {error:.2%}")

Estimated number of unique names : 1355607
Total number of names processed : 5523327
Real number of unique names : 1513888
Relative error : 10.46%


As expected, we get a way better result if we account for the correction factor (in that instance, 10% instead of 30) !\
But the results can still be improved, especially by using several hash functions and aggregating the data.

As we have seen, the mean is not a very good aggregator because it is very sensitive to outliers.\
A better aggregator is the median, but it has a problem of granularity, namely that the value is one of the values in the set, so a value of the form $2^r$, which is not that amazing in terms of granularity.\
A good aggregator would therefore be to use a hybrid approach : for $k\times l$ hash functions, split the functions in $l$ groups of $k$, apply the algorithm, aggregate the results with the median in each group, then aggregate the groups using the mean.

In [7]:
# Let's define a new class that will use multiple hash functions to improve the accuracy of the estimation
class FlajoletMartinMultiple:
    # This time, we will use multiple hash functions, and we will keep the max tail length for each of them
    # Note that the multiple hash functions are not different schemes, but the same scheme with different seeds (still using SHA-256)
    def __init__(self, num_groups=4, hashes_per_group=5):
        self.num_groups = num_groups
        self.hashes_per_group = hashes_per_group
        self.num_hashes = num_groups * hashes_per_group
        self.max_tail_length = [[0] * hashes_per_group for _ in range(num_groups)]
        self.seeds = [random.randint(0, 2**32 - 1) for _ in range(self.num_hashes)]
        self.phi = phi  # Correction factor (this time we include it directly in the class)

    # We will be using the MMH3 (MurMurHash3) hash function, which is a set of fast and robust hash functions and should solve the performance problem
    # that we had when working with only SHA-256 and using seed concatenated with the value to be hashed
    def _hash(self, value, seed):
        hash_value = mmh3.hash128(value, seed)
        return hash_value

    # The tail length is computed as before
    def _tail_length(self, hash_value):
        binary_hash = bin(hash_value)[2:]
        return len(binary_hash) - len(binary_hash.rstrip('0'))

    # The process function is basically the same except that we process the value num_hash times
    def process(self, value):
        for group in range(self.num_groups):
            for i in range(self.hashes_per_group):
                # We use the index to get the right seed for the hash function (the seeds are kept in a one dimensional array for simplicity)
                index = group * self.hashes_per_group + i
                hash_value = self._hash(value, self.seeds[index])
                trailing_zeros = self._tail_length(hash_value)
                self.max_tail_length[group][i] = max(self.max_tail_length[group][i], trailing_zeros)

    # The estimate cardinality is now computed as the median of the estimates of each hash function
    def estimate_cardinality(self):
        # Compute the median of each group
        group_medians = []
        for group in self.max_tail_length:
            # Apply the correction directly here
            sorted_array = sorted([2 ** r / self.phi for r in group])
            median = sorted_array[len(sorted_array) // 2]
            group_medians.append(median)
            print(sorted_array)
        # Then compute the mean of the medians
        print(group_medians)
        estimate = sum(group_medians) / len(group_medians)
        return int(estimate)

In [8]:
# The process csv function is the same, we just need to change the class used
fm = FlajoletMartinMultiple(5,7)
cardinality, count = process_csv('data/actors.csv', fm, sample_rate=sr)
print(f"Estimated number of unique names : {cardinality}")
print(f"Total number of names processed : {count}")
if not sampled:
    print(f"Real number of unique names : {real_count}")
    error = abs(cardinality - real_count) / real_count
    print(f"Relative error : {error:.2%}")

[677803.7775852929, 1355607.5551705859, 1355607.5551705859, 1355607.5551705859, 5422430.2206823435, 21689720.882729374, 21689720.882729374]
[338901.88879264647, 338901.88879264647, 677803.7775852929, 1355607.5551705859, 1355607.5551705859, 2711215.1103411717, 5422430.2206823435]
[677803.7775852929, 677803.7775852929, 1355607.5551705859, 1355607.5551705859, 1355607.5551705859, 21689720.882729374, 43379441.76545875]
[338901.88879264647, 677803.7775852929, 677803.7775852929, 1355607.5551705859, 2711215.1103411717, 5422430.2206823435, 21689720.882729374]
[338901.88879264647, 677803.7775852929, 1355607.5551705859, 2711215.1103411717, 2711215.1103411717, 21689720.882729374, 43379441.76545875]
[1355607.5551705859, 1355607.5551705859, 1355607.5551705859, 1355607.5551705859, 2711215.1103411717]
Estimated number of unique names : 1626729
Total number of names processed : 5523327
Real number of unique names : 1513888
Relative error : 7.45%


This time, as expected, the processing time for the full dataset is way larger, simply because we are processing each row n times instead of 1 ! The default case is 4 groups of 5 hash functions, so the processing would take about 20 times longer, which can be a lot, but is still reasonable considering that the simple method takes less than 30 seconds.\
Tried using the MurmurHash library. The mmh3 hash is faster than SHA256, leading to way faster execution time (4 minutes instead of about 10 with SHA for 35 hashes). The seeds can also be directly input in the function instead of concatenated with the value to be hashed.\
The results are better, but quite unstable it seems : 7.45% relative error on one execution, but sometimes way more, whereas the simple one hash version always produces around 10%.\
Increasing the number of groups and hashes per group should reduce variance but it also increases the computation time by a lot : at what point is it "enough" to actually observe a decrease in variance, and is this number actually usable or does it just take hours to execute ?

To have an exact approach we could use a set, which would be faster in terms of processing time, but would not scale at all, simply because we would have to maintain a set of all the unique names that come up, and that set could be impossible to store in main memory in the case of a very large dataset.\
We will now be quickly comparing this FM algorithm, with its successor called the HyperLogLog (HLL). This method however, will not be re implemented from scratch but imported from a library, given that the goal is just to compare the accuracy.\
It is said that "The HyperLogLog algorithm is able to estimate cardinalities of > $10^9$ with a typical accuracy (standard error) of 2%, using 1.5 kB of memory". This is much better than the estimate we obtained, but let's see it in action.

In [9]:
import hyperloglog

def process_csv_hll(file, hll, sample_rate=1.0):
    with open(file, 'r', encoding='utf-8') as csvfile:
        csvreader = csv.DictReader(csvfile)
        counter = 0
        for row in csvreader:
            if random.random() <= sample_rate:
                actor = row['name']
                hll.add(actor)
            counter += 1

    return len(hll), counter

# Initialize the HyperLogLog object
# This algorithm allows us to set a precision parameter, in our case we will use a 2% error rate
hll = hyperloglog.HyperLogLog(0.02)

# Process the data
cardinality, count = process_csv_hll('data/actors.csv', hll, sample_rate=sr)
print(f"Estimated number of unique names : {cardinality}")
print(f"Total number of names processed : {count}")
if not sampled:
    print(f"Real number of unique names : {real_count}")
    error = abs(cardinality - real_count) / real_count
    print(f"Relative error : {error:.2%}")

Estimated number of unique names : 1564700
Total number of names processed : 5523327
Real number of unique names : 1513888
Relative error : 3.36%


In that case, since the HLL algorithm allows us to set the precision ourselves, it is not very interesting to compare the actual precision since we know what it will be. We can however, compare the execution time and the memory usage of both algorithm to get an idea of how effective the FM algorithm is.\
With a precision set to 2%, we get a relative error of 3.36%, which is close enough, and the execution takes less than 30 seconds, so it is much faster than the FM algorithm (if we consider the refined version) and yields better results.\
The memory usage of the FM algorithm can be easily computed : we need to maintain an array of number of groups * hashes per group integers, therefore the memory usage is groups * hashes * size(int).\
For the HLL algorithm it is a bit more delicate. In fact, the memory usage depends on the precision the user wants : the algorithm works by maintaining an array of registers, and the more precision, the more registers, the more memory is needed. But this is not too hard to compute and we can quite easily write a function to do so for us.

In [10]:
# Memory usage of the FM algorithm
# We make use of the sys library to get the size of an integer on this machine
mem = fm.num_groups * fm.hashes_per_group * sys.getsizeof(0)
print(f"Memory usage of the Flajolet-Martin algorithm : {mem} bytes")

def memory_usage_hll(hll):
    # The memory usage is computed as the sum of the memory usage of the registers
    # The registers store the maximum number of leading zeros for each hash value, therefore the size of the register depends on the hash function
    # used (for example, 256 bits for SHA-256) and the parameters of the algorithm (how many bits are used to index the register)
    # In the original paper, a 32 bits hash function was used, but the library uses HLL+ which uses 64 bits hash functions
    # We will use the naming convention of the paper, m is the number of registers, b is the number of bits used to index the register, w is size_hash - b
    # Since the number of registers is a power of 2, we can use the formula 2^b = m to get the number of bits needed to index the register
    m = hll.m 
    b = math.log2(m)
    w = 64 - b
    print(f"m = {m}, b = {b}, w = {w}")
    # Now we know that there can be at most w zeros in the hash value, i.e. the number of leading zeros is at most w, if the hash is full of zeros
    # Therefore, we need to store log_2(w) bits to store the maximum number of leading zeros in each register
    # Since we have m registers, the total memory usage is m * log_2(w) bits, or m * log_2(w) / 8 bytes
    return int(m * math.log2(w) / 8)

# Memory usage of the HyperLogLog algorithm
mem = memory_usage_hll(hll)
print(f"Memory usage of the HyperLogLog algorithm : {mem} bytes")

Memory usage of the Flajolet-Martin algorithm : 980 bytes
m = 4096, b = 12.0, w = 52.0
Memory usage of the HyperLogLog algorithm : 2918 bytes


We can see that even though the HLL algorithm does not use a lot of memory, the FM algorithm is even more efficient in terms of memory.\
Note though, that we are not using that many hashes for our FM algorithm and that this probably costs us a bit of accuracy.\
The HLL algorithm provides better results in way less time, and with a memory consumption that is not that enormous, even though a bit higher than FM.

Mechanism of HLL Algorithm

Registers:
The HLL algorithm uses a fixed array of registers to store information about the hashed values of the elements being counted.
The number of registers $m$ is a power of 2, and it determines the precision of the algorithm. Specifically, $m=2^b$, where $b$ is the number of bits used to index the registers.

Error Rate:
The standard error rate $\epsilon$ of the HLL algorithm is given by $\epsilon \approx 1.04/\sqrt{m}$.
Therefore, to achieve a desired error rate $\epsilon$, you need to set the number of registers $m$ such that $m=(1.04/\epsilon)^2$.
This relationship shows that increasing the number of registers decreases the error rate (i.e., improves accuracy).

Hashing:
Each incoming element is hashed to a large binary number. The first $b$ bits of the hash value determine which register to update.
The remaining bits are used to count the number of leading zeros, which is stored in the selected register.

Estimation:
The algorithm estimates the cardinality by combining the information stored in all registers. The harmonic mean of the register values is used to compute the raw estimate, which is then corrected using empirical constants.