In [1]:
import numpy as np

Classes to help me compute GK sketches for Lin n-of-N algorithm
Taken from: https://github.com/DataDog/sketches-py/tree/v0.1

In [2]:
DEFAULT_EPS = 0.01


class UnequalEpsilonException(Exception):
    pass

In [3]:
class Entry(object):

    def __init__(self, val, g, delta):
        # the value that covers the phi quantile range, subset of original input entries
        self.val = val
        # number of positions covered by the value
        # g(i) = r_min(i) - r_min(i-1)
        self.g = g
        # delta = r_max(i) - r_min(i)
        self.delta = delta

    def __repr__(self):
        return 'Entry(val={}, g={}, delta={})'.format(self.val, self.g, self.delta)

In [4]:
class GKArray(object):

    def __init__(self, eps=None):
        if eps is None or eps <= 0 or eps >= 1:
            self.eps = DEFAULT_EPS
        else:
            self.eps = eps
        self.entries = []
        self.incoming = []
        self._min = float('+inf')
        self._max = float('-inf')
        self._count = 0
        self._sum = 0

    def __repr__(self):
        return "entries: {}, incoming: {}, count: {}, min: {}, max: {}, sum: {}\n".format(
            self.entries, self.incoming, self._count, self._min, self._max, self._sum)

    @property
    def name(self):
        return 'GKArray'

    @property
    def num_values(self):
        return self._count

    @property
    def avg(self):
        return float(self._sum) / self._count

    @property
    def sum(self):
        return self._sum

    def size(self):
        if len(self.incoming) > 0:
            self.merge_compress()
        return len(self.entries)

    def add(self, val):
        """ Add a value to the sketch.
        """
        self.incoming.append(val)
        self._count += 1
        self._sum += val
        if val < self._min:
            self._min = val
        if val > self._max:
            self._max = val
        if self._count % (int(1.0 / self.eps) + 1) == 0:
            self.merge_compress()

    def merge_compress(self, entries=[]):
        """ Merge the given entry list into self.entries as well as compressing any values in
        self.incoming buffer.

        Parameters:
            entries: list of Entry
        """
        removal_threshold = np.floor(2.0 * self.eps * (self._count - 1))
        incoming = [Entry(val, 1, 0) for val in self.incoming] + [Entry(e.val, e.g, e.delta) for e in entries]
        incoming = sorted(incoming, key=lambda x: x.val)

        merged = []
        i, j = 0, 0
        while i < len(incoming) or j < len(self.entries):
            if i == len(incoming):
                # done with incoming; now only considering entries
                if j + 1 < len(self.entries) and \
                        self.entries[j].g + self.entries[j + 1].g + self.entries[j + 1].delta <= removal_threshold:
                    self.entries[j + 1].g += self.entries[j].g
                else:
                    merged.append(self.entries[j])
                j += 1
            elif j == len(self.entries):
                # done with entries; now only considering incoming
                if i + 1 < len(incoming) and \
                        incoming[i].g + incoming[i + 1].g + incoming[i + 1].delta <= removal_threshold:
                    incoming[i + 1].g += incoming[i].g
                else:
                    merged.append(incoming[i])
                i += 1
            elif incoming[i].val < self.entries[j].val:
                if incoming[i].g + self.entries[j].g + self.entries[j].delta <= removal_threshold:
                    self.entries[j].g += incoming[i].g
                else:
                    incoming[i].delta = self.entries[j].g + self.entries[j].delta - incoming[i].g
                    merged.append(incoming[i])
                i += 1
            else:
                if j + 1 < len(self.entries) and \
                        self.entries[j].g + self.entries[j + 1].g + self.entries[j + 1].delta <= removal_threshold:
                    self.entries[j + 1].g += self.entries[j].g
                else:
                    merged.append(self.entries[j])
                j += 1

        self.entries = merged
        self.incoming = []

    def merge(self, sketch):
        """ Merge another GKArray into the current. The two sketches should have the same
        epsilon value.

        Parameters:
            other: GKArray
        """
        if self.eps != sketch.eps:
            raise UnequalEpsilonException("Cannot merge two GKArrays with different epsilon values")

        if sketch._count == 0:
            return

        if self._count == 0:
            self.entries = [Entry(e.val, e.g, e.delta) for e in sketch.entries]
            self.incoming = sketch.incoming[:]
            self._min = sketch._min
            self._max = sketch._max
            self._count = sketch._count
            self._sum = sketch._sum
            return

        entries = []
        spread = int(sketch.eps * (sketch._count - 1))
        sketch.merge_compress()
        g = sketch.entries[0].g + sketch.entries[0].delta - spread - 1
        if g > 0:
            entries.append(Entry(sketch._min, g, 0))
        for i in range(len(sketch.entries) - 1):
            g = sketch.entries[i + 1].g + sketch.entries[i + 1].delta - sketch.entries[i].delta
            if g > 0:
                entries.append(Entry(sketch.entries[i].val, g, 0))
        g = spread + 1 - sketch.entries[len(sketch.entries) - 1].delta
        if g > 0:
            entries.append(Entry(sketch.entries[len(sketch.entries) - 1].val, g, 0))

        self._count += sketch._count
        self._sum += sketch._sum
        self._min = min(self._min, sketch._min)
        self._max = max(self._max, sketch._max)

        self.merge_compress(entries)

    def quantile(self, q):
        """ Return an epsilon-approximate element at quantile q.

        Parameters:
            q: quantile to query for
               0 <= q <= 1
        """
        if q < 0 or q > 1 or self._count == 0:
            return np.nan

        if len(self.incoming) > 0:
            self.merge_compress()

        rank = int(q * (self._count - 1) + 1)
        spread = int(self.eps * (self._count - 1))
        g_sum = 0.0
        i = 0
        while i < len(self.entries):
            g_sum += self.entries[i].g
            if g_sum + self.entries[i].delta > rank + spread:
                break
            i += 1
        if i == 0:
            return self._min

        return self.entries[i - 1].val

In [5]:
class Bucket(object):
    def __init__(self, timestamp, element, eps):
        self.timestamp = timestamp
        self.N = 0
        self.sketch = GKArray(eps)
        self.sketch.add(element)


In [73]:
import math
from copy import deepcopy


class nN(object):
    def __init__(self, eps, l, N):
        self.eps = eps
        self.l = l
        self.buckets_list = [[]]
        self.timestamp = 0
        self.levels = 1
        self.N = N

    def add(self, element):
        bucket = Bucket(self.timestamp, element, self.eps)

        self.buckets_list[0].append(bucket)

        # shuffling
        for level_index in range(self.levels):
            if len(self.buckets_list[level_index]) > (1 / self.l) + 1:
                d1 = self.buckets_list[level_index].pop(0)
                self.buckets_list[level_index].pop(0)
                if (level_index + 1 == self.levels):
                    self.buckets_list.append([])
                    self.levels += 1
                self.buckets_list[level_index + 1].append(d1)

        # check expired buckets

        for level_index in range(self.levels):
            indexes = []
            for b in range(len(self.buckets_list[level_index])):
                if (self.N <= self.buckets_list[level_index][b].N):
                    indexes.append(b)
                else:
                    # maintain sketches
                    if (self.timestamp != self.buckets_list[level_index][b].timestamp):
                        self.buckets_list[level_index][b].sketch.add(element)
                        self.buckets_list[level_index][b].N += 1
            for ind in indexes:
                self.buckets_list[level_index].pop(ind)

        self.timestamp += 1

    def lift(self, sketch: GKArray, N: int):

        S = deepcopy(sketch)
        value = math.floor(0.5 * self.eps * N)
        for i in S.entries:
            i.delta += value
        return S

    def query(self, n):
        print(self.buckets_list)

        for level in range(self.levels, 0, -1):
            print(level)
            for b in range(len(self.buckets_list[level-1])):
                if len(self.buckets_list[level-1][b].sketch.incoming) > 0:
                    self.buckets_list[level-1][b].sketch.merge_compress()
                if (self.buckets_list[level-1][b].N <= n):
                    S_lift = self.lift(self.buckets_list[level-1][b].sketch, n)
                    print(S_lift)


In [74]:
nn = nN(0.1, 0.5,4)
nn.add(1)
nn.add(2)
nn.add(3)
nn.add(1)
nn.add(2)
nn.add(3)


In [75]:
nn.query(4)

[[<__main__.Bucket object at 0x000001F05C5F0400>, <__main__.Bucket object at 0x000001F05C5F0E80>], [<__main__.Bucket object at 0x000001F05C5F0160>]]
2
entries: [Entry(val=1, g=1, delta=0), Entry(val=2, g=1, delta=0), Entry(val=3, g=1, delta=0), Entry(val=3, g=1, delta=0)], incoming: [], count: 4, min: 1, max: 3, sum: 9

1
entries: [Entry(val=2, g=1, delta=0), Entry(val=3, g=1, delta=0)], incoming: [], count: 2, min: 2, max: 3, sum: 5

entries: [Entry(val=3, g=1, delta=0)], incoming: [], count: 1, min: 3, max: 3, sum: 3



[[1]]