-
Notifications
You must be signed in to change notification settings - Fork 289
/
hyperloglog.py
351 lines (302 loc) · 12.1 KB
/
hyperloglog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
import struct, copy
import numpy as np
import warnings
try:
from .hyperloglog_const import _thresholds, _raw_estimate, _bias
except ImportError:
# For Python 2
from hyperloglog_const import _thresholds, _raw_estimate, _bias
from datasketch.hashfunc import sha1_hash32, sha1_hash64
# Get the number of bits starting from the first non-zero bit to the right
_bit_length = lambda bits : bits.bit_length()
# For < Python 2.7
if not hasattr(int, 'bit_length'):
_bit_length = lambda bits : len(bin(bits)) - 2 if bits > 0 else 0
class HyperLogLog(object):
'''
The HyperLogLog data sketch for estimating
cardinality of very large dataset in a single pass.
The original HyperLogLog is described `here
<http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf>`_.
This HyperLogLog implementation is based on:
https://github.com/svpcom/hyperloglog
Args:
p (int, optional): The precision parameter. It is ignored if
the `reg` is given.
reg (numpy.array, optional): The internal state.
This argument is for initializing the HyperLogLog from
an existing one.
hashfunc (optional): The hash function used by this MinHash.
It takes the input passed to the `update` method and
returns an integer that can be encoded with 32 bits.
The default hash function is based on SHA1 from hashlib_.
hashobj (**deprecated**): This argument is deprecated since version
1.4.0. It is a no-op and has been replaced by `hashfunc`.
'''
__slots__ = ('p', 'm', 'reg', 'alpha', 'max_rank', 'hashfunc')
# The range of the hash values used for HyperLogLog
_hash_range_bit = 32
_hash_range_byte = 4
def _get_alpha(self, p):
if not (4 <= p <= 16):
raise ValueError("p=%d should be in range [4 : 16]" % p)
if p == 4:
return 0.673
if p == 5:
return 0.697
if p == 6:
return 0.709
return 0.7213 / (1.0 + 1.079 / (1 << p))
def __init__(self, p=8, reg=None, hashfunc=sha1_hash32, hashobj=None):
if reg is None:
self.p = p
self.m = 1 << p
self.reg = np.zeros((self.m,), dtype=np.int8)
else:
# Check if the register has the correct type
if not isinstance(reg, np.ndarray):
raise ValueError("The imported register must be a numpy.ndarray.")
# We have to check if the imported register has the correct length.
self.m = reg.size
self.p = _bit_length(self.m) - 1
if 1 << self.p != self.m:
raise ValueError("The imported register has \
incorrect size. Expect a power of 2.")
# Generally we trust the user to import register that contains
# reasonable counter values, so we don't check for every values.
self.reg = reg
# Check the hash function.
if not callable(hashfunc):
raise ValueError("The hashfunc must be a callable.")
# Check for use of hashobj and issue warning.
if hashobj is not None:
warnings.warn("hashobj is deprecated, use hashfunc instead.",
DeprecationWarning)
self.hashfunc = hashfunc
# Common settings
self.alpha = self._get_alpha(self.p)
self.max_rank = self._hash_range_bit - self.p
def update(self, b):
'''
Update the HyperLogLog with a new data value in bytes.
The value will be hashed using the hash function specified by
the `hashfunc` argument in the constructor.
Args:
b: The value to be hashed using the hash function specified.
Example:
To update with a new string value (using the default SHA1 hash
function, which requires bytes as input):
.. code-block:: python
hll = HyperLogLog()
hll.update("new value".encode('utf-8'))
We can also use a different hash function, for example, `pyfarmhash`:
.. code-block:: python
import farmhash
def _hash_32(b):
return farmhash.hash32(b)
hll = HyperLogLog(hashfunc=_hash_32)
hll.update("new value")
'''
# Digest the hash object to get the hash value
hv = self.hashfunc(b)
# Get the index of the register using the first p bits of the hash
reg_index = hv & (self.m - 1)
# Get the rest of the hash
bits = hv >> self.p
# Update the register
self.reg[reg_index] = max(self.reg[reg_index], self._get_rank(bits))
def count(self):
'''
Estimate the cardinality of the data values seen so far.
Returns:
int: The estimated cardinality.
'''
# Use HyperLogLog estimation function
e = self.alpha * float(self.m ** 2) / np.sum(2.0**(-self.reg))
# Small range correction
small_range_threshold = (5.0 / 2.0) * self.m
if abs(e-small_range_threshold)/small_range_threshold < 0.15:
warnings.warn(("Warning: estimate is close to error correction threshold. "
+"Output may not satisfy HyperLogLog accuracy guarantee."))
if e <= small_range_threshold:
num_zero = self.m - np.count_nonzero(self.reg)
return self._linearcounting(num_zero)
# Normal range, no correction
if e <= (1.0 / 30.0) * (1 << 32):
return e
# Large range correction
return self._largerange_correction(e)
def merge(self, other):
'''
Merge the other HyperLogLog with this one, making this the union of the
two.
Args:
other (datasketch.HyperLogLog):
'''
if self.m != other.m or self.p != other.p:
raise ValueError("Cannot merge HyperLogLog with different\
precisions.")
self.reg = np.maximum(self.reg, other.reg)
def digest(self):
'''
Returns:
numpy.array: The current internal state.
'''
return copy.copy(self.reg)
def copy(self):
'''
Create a copy of the current HyperLogLog by exporting its state.
Returns:
datasketch.HyperLogLog:
'''
return self.__class__(reg=self.digest(), hashfunc=self.hashfunc)
def is_empty(self):
'''
Returns:
bool: True if the current HyperLogLog is empty - at the state of just
initialized.
'''
if np.any(self.reg):
return False
return True
def clear(self):
'''
Reset the current HyperLogLog to empty.
'''
self.reg = np.zeros((self.m,), dtype=np.int8)
def __len__(self):
'''
Returns:
int: Get the size of the HyperLogLog as the size of
`reg`.
'''
return len(self.reg)
def __eq__(self, other):
'''
Check equivalence between two HyperLogLogs
Args:
other (datasketch.HyperLogLog):
Returns:
bool: True if both have the same internal state.
'''
return type(self) is type(other) and \
self.p == other.p and \
self.m == other.m and \
np.array_equal(self.reg, other.reg)
def _get_rank(self, bits):
rank = self.max_rank - _bit_length(bits) + 1
if rank <= 0:
raise ValueError("Hash value overflow, maximum size is %d\
bits" % self.max_rank)
return rank
def _linearcounting(self, num_zero):
return self.m * np.log(self.m / float(num_zero))
def _largerange_correction(self, e):
return - (1 << 32) * np.log(1.0 - e / (1 << 32))
@classmethod
def union(cls, *hyperloglogs):
if len(hyperloglogs) < 2:
raise ValueError("Cannot union less than 2 HyperLogLog\
sketches")
m = hyperloglogs[0].m
if not all(h.m == m for h in hyperloglogs):
raise ValueError("Cannot union HyperLogLog sketches with\
different precisions")
reg = np.maximum.reduce([h.reg for h in hyperloglogs])
h = cls(reg=reg)
return h
def bytesize(self):
# Since p is no larger than 64, use 8 bits
p_size = struct.calcsize('B')
# Each register value is no larger than 64, use 8 bits
# TODO: is there a way to use 5 bits instead of 8 bits
# to store integer in Python?
reg_val_size = struct.calcsize('B')
return p_size + reg_val_size * self.m
def serialize(self, buf):
if len(buf) < self.bytesize():
raise ValueError("The buffer does not have enough space\
for holding this HyperLogLog.")
fmt = 'B%dB' % self.m
struct.pack_into(fmt, buf, 0, self.p, *self.reg)
@classmethod
def deserialize(cls, buf):
size = struct.calcsize('B')
try:
p = struct.unpack_from('B', buf, 0)[0]
except TypeError:
p = struct.unpack_from('B', buffer(buf), 0)[0]
h = cls(p)
offset = size
try:
h.reg = np.array(struct.unpack_from('%dB' % h.m,
buf, offset), dtype=np.int8)
except TypeError:
h.reg = np.array(struct.unpack_from('%dB' % h.m,
buffer(buf), offset), dtype=np.int8)
return h
def __getstate__(self):
buf = bytearray(self.bytesize())
self.serialize(buf)
return buf
def __setstate__(self, buf):
size = struct.calcsize('B')
try:
p = struct.unpack_from('B', buf, 0)[0]
except TypeError:
p = struct.unpack_from('B', buffer(buf), 0)[0]
self.__init__(p=p)
offset = size
try:
self.reg = np.array(struct.unpack_from('%dB' % self.m,
buf, offset), dtype=np.int8)
except TypeError:
self.reg = np.array(struct.unpack_from('%dB' % self.m,
buffer(buf), offset), dtype=np.int8)
class HyperLogLogPlusPlus(HyperLogLog):
'''
HyperLogLog++ is an enhanced HyperLogLog `from Google
<http://research.google.com/pubs/pub40671.html>`_.
Main changes from the original HyperLogLog:
1. Use 64 bits instead of 32 bits for hash function
2. A new small-cardinality estimation scheme
3. Sparse representation (not implemented here)
Args:
p (int, optional): The precision parameter. It is ignored if
the `reg` is given.
reg (numpy.array, optional): The internal state.
This argument is for initializing the HyperLogLog from
an existing one.
hashfunc (optional): The hash function used by this MinHash.
It takes the input passed to the `update` method and
returns an integer that can be encoded with 64 bits.
The default hash function is based on SHA1 from hashlib_.
hashobj (**deprecated**): This argument is deprecated since version
1.4.0. It is a no-op and has been replaced by `hashfunc`.
'''
_hash_range_bit = 64
_hash_range_byte = 8
def __init__(self, p=8, reg=None, hashfunc=sha1_hash64,
hashobj=None):
super(HyperLogLogPlusPlus, self).__init__(p=p, reg=reg,
hashfunc=hashfunc, hashobj=hashobj)
def _get_threshold(self, p):
return _thresholds[p - 4]
def _estimate_bias(self, e, p):
bias_vector = _bias[p - 4]
estimate_vector = _raw_estimate[p - 4]
nearest_neighbors = np.argsort((e - estimate_vector)**2)[:6]
return np.mean(bias_vector[nearest_neighbors])
def count(self):
num_zero = self.m - np.count_nonzero(self.reg)
if num_zero > 0:
# linear counting
lc = self._linearcounting(num_zero)
if lc <= self._get_threshold(self.p):
return lc
# Use HyperLogLog estimation function
e = self.alpha * float(self.m ** 2) / np.sum(2.0**(-self.reg))
if e <= 5 * self.m:
return e - self._estimate_bias(e, self.p)
else:
return e