In [217]:
import math
import random
from collections import deque

In [234]:
# setup, change this
error_rate = 0.05  # error multiplier in range (0, 0.5]

In [235]:
# this code generates random stream of binary data
def generate_random_stream(length):
    """Generate a random stream of booleans, 10% of them are ones and 90% are zeros.
    """
    for _ in range(length):
        yield bool(random.random() >= 0.9)
        
# stream = generate_random_stream(length)        
# for element in stream:
#     dgim.update(element)

In [236]:
N = 1000000  # size of sliding window

r = math.ceil(1/error_rate)
r = max(r, 2)

queues = []
if N == 0:
    max_index = -1
else:
    max_index = int(math.ceil(math.log(N)/math.log(2)))

queues = [deque() for _ in range(max_index + 1)]

timestamp = 0
oldest_bucket_timestamp = -1  # No bucket so far

In [237]:
# move sliding window by one new element in a stream, updating bucket structure
def update(element):
    """Update the stream with one element.
    """
    global timestamp
    global oldest_bucket_timestamp
    global queues
    
    timestamp = (timestamp + 1) % (2 * N)

    #check if oldest bucket should be removed
    if (oldest_bucket_timestamp >= 0 and (timestamp - oldest_bucket_timestamp) % (2 * N) >= N):
        # find and remove the oldest bucket we have
        for queue in reversed(queues):
            if len(queue) > 0:
                queue.pop()
                break
        # update oldest bucket timestamp
        oldest_bucket_timestamp = -1
        for queue in reversed(queues):
            if len(queue) > 0:
                oldest_bucket_timestamp = queue[-1]
                break

    # nothing else to do if we get a zero
    if element is False:
        return

    # update buckets if we get a one
    carry_over = timestamp  # index of new/modified bucket
    
    if oldest_bucket_timestamp == -1:
        oldest_bucket_timestamp = timestamp
        
    for queue in queues:
        queue.appendleft(carry_over)
        if len(queue) <= r:  # if we don't go over maximum number of buckets of that size
            break
            
        # remove two buckets if we have too many
        # these two buckets will make one bucket of twice larger size
        last = queue.pop()
        second_last = queue.pop()
        
        # merge last two buckets.
        carry_over = second_last

        # some auxiliary stuff
        if last == oldest_bucket_timestamp:
            oldest_bucket_timestamp = second_last

In [238]:
# put a single +1 in a stream, show that it goes to the first bucket
update(1)
queues

[deque([1]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([]),
 deque([])]

In [239]:
# process a stream of 2 million random elements, and print buckets
stream = generate_random_stream(2000000)        
for element in stream:
     update(element)
queues

[deque([1999997,
        1999994,
        1999957,
        1999956,
        1999945,
        1999935,
        1999929,
        1999903,
        1999895,
        1999889,
        1999867,
        1999839,
        1999836,
        1999833,
        1999828,
        1999825,
        1999817,
        1999795,
        1999757]),
 deque([1999754,
        1999735,
        1999723,
        1999698,
        1999679,
        1999649,
        1999632,
        1999625,
        1999604,
        1999597,
        1999578,
        1999557,
        1999532,
        1999500,
        1999478,
        1999467,
        1999431,
        1999396,
        1999390]),
 deque([1999376,
        1999322,
        1999282,
        1999239,
        1999179,
        1999090,
        1999056,
        1999020,
        1998975,
        1998932,
        1998899,
        1998857,
        1998844,
        1998825,
        1998790,
        1998750,
        1998717,
        1998677,
        1998648,
        1998615]),
 deque([

In [240]:
def get_count(k):
    """Returns an estimate of the number of "True" in the last N elements of the stream.
    """
    result = 0
    max_value = 0
    power_of_two = 1
    for queue in queues:
        for element in queue:
            delta = timestamp - element
            if delta < 0:
                delta += 2*N
            if delta <= k:
                max_value = power_of_two
                result += power_of_two
        power_of_two = power_of_two << 1

    result -= math.ceil(max_value/2)
    return int(result)

In [241]:
# add some new data, and print estimates
stream = [i for i in generate_random_stream(20000)]
for element in stream:
     update(element)
        
for i in (10, 100, 1000, 10000):
    x_true = sum(stream[-i:])
    x = get_count(i)
    xmax =  math.ceil(x + x*error_rate)
    xmin = math.floor(x - x*error_rate)
    print "DGIM estimated sum over last %d elements is %.0f-%.0f (true: %d)" % (i, xmin, xmax, x_true)

DGIM estimated sum over last 10 elements is 0-2 (true: 1)
DGIM estimated sum over last 100 elements is 12-14 (true: 13)
DGIM estimated sum over last 1000 elements is 107-119 (true: 114)
DGIM estimated sum over last 10000 elements is 926-1024 (true: 970)
