In [1]:
import dask.dataframe as ddf
import pandas as pd
import numpy as np

In [2]:
from dask.distributed import Client
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:56013  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.50 GB


##### Rational

He feel like we need to justify why we have come up with the following has table:

We started looking into different hash functions, all of them resulting in different issues:

- Convert string into binary: We used the function ord() in python, but it was giving very bas results
- Convert letters into integers using ascii and convert each individual integer into its binary: We found this also to be problematic since most strings (except the number 0) would start with 1's. It also gave a very un-evenly distributed probabilities between 0 and 1.

We then finally decided to generate a binary value of 5 digits (32 possible combinations) and then also add the 4 remaining values using smaller binary values in order to keep the distribution uniform

In [3]:
def decimal_to_binary(integer, decimals=None):
    binary = ''
    i = integer
    while i >= 1:
        binary = str(int(i%2)) + binary
        i = i/2
    if decimals:
        binary = '0'*(decimals-len(binary)) + binary
    return binary if binary else '0'

def string_to_binary(string):
    
    binary = ''
    for letter in string:
        binary = binary + hash_dictionary[letter]
    return binary

In [4]:
def string_to_binary_df(df, column='User_ID'):
    
    df[column] = df.User_ID.apply(lambda x: string_to_binary(x))
    
    return df

In [20]:
dask_dataframe = ddf.from_pandas(users_df, npartitions=20)

In [21]:
%%time
users_df_binary = dask_dataframe.map_partitions(string_to_binary_df, meta=users_df).compute()

Wall time: 3.77 s


In [22]:
users_df_binary.head()

Unnamed: 0,User_ID,Binary
0,844082e02a27ddee8d99ea1af94a2969,1000010001000000100000101110000000101010001001...
1,ff96d6665b5c59d3a70bb8f2ba4f10be,1111111110010110110101100110011001011011010111...
2,b64a85884e2b159829331c19e05dbac9,1011011001001010100001011000100001001110001010...
3,1c8836719e84867c26ba2cfeb372c53d,0001110010001000001101100111000110011110100001...
4,b66f73ffd9008d9c99159e164261df51,1011011001101111011100111111111111011001000000...


### Split binary values into categories (m) based on a new hash function

In [67]:
def split_binary(df, m=64, column='Binary', max_zeros_per_bucket={}):
    """
    This function will take a df with binary values (binary values are the hashed unique user ids) 
    and splits them into a set of buckets (m). For each of these buckets the function will also compute 
    the maximum number of zero's at the end of each binary number. The function will then return this 
    list which will then be used to estimate the distinct count value
    
    df: Dataframe with a column of binary values
    m: Number of buckets
    """    
    len_buckets = int(np.log2(m))
    #max_zeros_per_bucket = get_bucket_groups(m)

    for index, user_id in df[[column]].iterrows():
        group = user_id[0][:len_buckets]
        num_final_zeros = len(user_id[0]) - len(user_id[0].rstrip('0'))
        try:
            max_zeros_per_bucket[group] = max(max_zeros_per_bucket[group], num_final_zeros)
        except:
            print(index)
    
    return max_zeros_per_bucket

In [6]:
def get_bucket_groups(m=64):
    """
    Function that will give us the initial binary values to bucket our hashed binary values
    """
    buckets = {}
    len_binary = int(np.log2(m))
    for i in range(m):
        bucket_binary = decimal_to_binary(i, decimals=len_binary)
        buckets[bucket_binary] = 0
    return buckets

In [7]:
def compute_alpha(m):
    """
    Function to compute the alpha value used correct a systematic multiplicative bias
    Instead of computing the integral we have taken the constant values from wikipedia
    """
    
    if m == 16:
        alpha = 0.673
    elif m == 32:
        alpha = 0.697
    elif m == 64:
        alpha = 0.709
    else:
        alpha = 0.7213 / (1+ (1.079/m))
    
    return alpha
    

In [26]:
%%time
max_per_bucket = split_binary(users_df_binary)

Wall time: 2.56 s


In [8]:
def hyperloglog_estimate(max_zeros):
    """
    Returns the estimated hyperloglog estimate and its estimated error
    
    max_zeros: A list with all the max values over which the harmonic mean will be applied
    """
    
    max_zeros = np.array(max_zeros)
    m = len(max_zeros)
    
    Z = float(2)**(-max_zeros)
    Z = 1 / (np.sum(Z))
    
    estimate = compute_alpha(m) * m**2 * Z
    
    error = 1.04 / np.sqrt(m)
    
    return estimate, error

In [9]:
asciiDict = {chr(i): i for i in range(97, 103)}
list_values = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] + list(asciiDict.keys())
list_binary = list(get_bucket_groups(16).keys())

df_hash = pd.DataFrame(columns=['Original', 'Binary'])
df_hash['Original'] = list_values
df_hash['Binary'] = list_binary
df_hash['# 0'] = [i.count('0') for i in list_binary]
df_hash['# 1'] = [i.count('1') for i in list_binary]

hash_dictionary = dict(zip(df_hash.Original, df_hash.Binary))

In [62]:
def hyperloglog(path_df='data/hash.txt', num_substreams=64, hash_dict=hash_dictionary, chunksize=1000000):
    """
    This function takes as input a txt file and estimates the length of unique values using the hyperloglog using a given
    number of substreams (number of substreams over which the harmonic mean will be computed).
    
    It is also necesary to provide the used hashing table (maps hexadecimal values into binary).
    
    Assumptions:
    - The unique values are in hexadecimal form
    - Unique values are randomly and uniformly distributed
    
    """
    
    # 
    max_zeros_per_bucket = get_bucket_groups(num_substreams)
    
    count = 0
    for users_df in pd.read_csv(path_df, sep=" ", header=None, chunksize=chunksize):
        print(count)
        print('============')
        count+=1
        users_df.columns = ["User_ID"]
        
        dask_dataframe = ddf.from_pandas(users_df, npartitions=20)
        users_df_binary = dask_dataframe.map_partitions(string_to_binary_df, meta=users_df).compute()
        
        max_zeros_per_bucket = split_binary(users_df_binary, m=num_substreams, 
                                            column='User_ID', max_zeros_per_bucket=max_zeros_per_bucket)
        
    print('LOOP HAS FINISHED. WE WILL NOW COMPUTE THE ESTIMATED DISTINCT COUNT VALUE')
    
    return hyperloglog_estimate(list(max_zeros_per_bucket.values()))
    

In [20]:
hyperloglog()

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
LOOP HAS FINISHED. WE WILL NOW COMPUTE THE ESTIMATED DISTINCT COUNT VALUE


(65092998.003772885, 0.13)

In [14]:
hyperloglog_estimate(list(max_zeros_per_bucket.values()))

(65092998.003772885, 0.13)

In [63]:
def hyperloglog_test(df, num_substreams=64, hash_dict=hash_dictionary):
    """
    This function takes as input a txt file and estimates the length of unique values using the hyperloglog using a given
    number of substreams (number of substreams over which the harmonic mean will be computed).
    
    It is also necesary to provide the used hashing table (maps hexadecimal values into binary).
    
    Assumptions:
    - The unique values are in hexadecimal form
    - Unique values are randomly and uniformly distributed
    
    """

    max_zeros_per_bucket = get_bucket_groups(num_substreams)
    
    max_zeros_per_bucket = split_binary(users_df_binary, m=num_substreams, column='User_ID', max_zeros_per_bucket=max_zeros_per_bucket)

    return hyperloglog_estimate(list(max_zeros_per_bucket.values()))

In [69]:
n = 100000

df = pd.read_csv('data/hash.txt', sep=" ", header=None, nrows=n)
df.columns=['User_ID']
dask_dataframe = ddf.from_pandas(df, npartitions=20)
users_df_binary = dask_dataframe.map_partitions(string_to_binary_df, meta=df).compute()

x = hyperloglog_test(users_df_binary, 4096)

In [72]:
E = x[0]

In [None]:
if E <= (2**32)/30

In [73]:
(2**32)/30

143165576.53333333

In [74]:
E

50262.114437117954

In [75]:
users_df_binary

Unnamed: 0,User_ID
0,1000010001000000100000101110000000101010001001...
1,1111111110010110110101100110011001011011010111...
2,1011011001001010100001011000100001001110001010...
3,0001110010001000001101100111000110011110100001...
4,1011011001101111011100111111111111011001000000...
...,...
99995,1011110111110101010101010001010000001111101111...
99996,0101010100001001001001010000010111011100101011...
99997,1111110110001111011000001011101111010110000110...
99998,0100101011101010011000101100010001110111010000...


In [78]:
len(max_zeros_per_bucket.values())

4096

In [80]:
df

Unnamed: 0,User_ID
0,844082e02a27ddee8d99ea1af94a2969
1,ff96d6665b5c59d3a70bb8f2ba4f10be
2,b64a85884e2b159829331c19e05dbac9
3,1c8836719e84867c26ba2cfeb372c53d
4,b66f73ffd9008d9c99159e164261df51
...,...
99995,bdf555140fbd464a1700dd3604fe73b1
99996,55092505dcac086871b895e35ac1cc81
99997,fd8f60bbd61988ce78de240240bda5a3
99998,4aea62c4774099e8bec57e80e59c74f1


In [89]:
x = get_bucket_groups(16)

In [92]:
users_df_binary.iloc[2].values

array(['10110110010010101000010110001000010011100010101100010101100110000010100100110011000111000001100111100000010111011011101011001001'],
      dtype=object)

In [90]:
x['1111'] = max(x['1111'], 1)

In [91]:
x

{'0000': 0,
 '0001': 0,
 '0010': 0,
 '0011': 0,
 '0100': 0,
 '0101': 0,
 '0110': 0,
 '0111': 0,
 '1000': 0,
 '1001': 0,
 '1010': 0,
 '1011': 0,
 '1100': 0,
 '1101': 0,
 '1110': 0,
 '1111': 1}