# DGIM Algorithm

El algoritmo DGIM es un algoritmo diseñado para estimar el número de unos en una secuencia de bits en un periodo de tiempo fijo y utilizando espacio limitado. Aquí te explico paso a paso qué hace este algoritmo:

## - Inicialización:

El constructor __init__ toma dos parámetros, N y error_rate. N representa el tamaño de la ventana sobre la cual se quiere realizar la estimación, y error_rate es la tasa de error permitida en la estimación.

Se calcula _r, que representa el número máximo de buckets del mismo tamaño. Esta es una medida que influye en la precisión de la estimación.

Se crea una estructura de datos para almacenar los buckets. La estructura es una lista de colas (_queues). Cada cola representa un bucket de tamaño 2^i. La estructura está diseñada de manera que facilita saber cuántos cubos del mismo tamaño hay y actualizar una cola de cubos.

Se inicializan algunas variables, como _timestamp (una marca de tiempo actual) y _oldest_bucket_timestamp (la marca de tiempo del bucket más antiguo).

## - Actualización:

El método update recibe un elemento booleano (elt) y actualiza la estructura de datos en consecuencia.
Se incrementa _timestamp para mantener un seguimiento del tiempo.
Se verifica si se debe eliminar el bucket más antiguo (mediante _drop_oldest_bucket).
Si el elemento es True, se actualiza la estructura de datos fusionando los buckets según el algoritmo DGIM.

## - Estimación:

El método get_count devuelve una estimación del número de unos en los últimos N elementos del flujo.
Itera sobre las colas, calculando el número estimado de unos en función de la longitud de la cola y la potencia de dos correspondiente. Luego, resta una corrección para evitar contar duplicados.
La estimación se basa en el hecho de que los buckets más recientes tienen más peso en la estimación.

## - Funciones auxiliares:

_is_bucket_too_old verifica si un bucket es demasiado antiguo y debe eliminarse.
nb_buckets devuelve el número total de buckets en la estructura de datos.
_drop_oldest_bucket elimina el bucket más antiguo.


In [2]:
import math
from collections import deque

class Dgim(object):
    def __init__(self, N, error_rate=0.5):
        self.N = N
        if not (0 < error_rate <= 1):
            error_msg = ("Invalid value for error_rate: {}. "
                         "Error rate should be in (0, 1].".format(error_rate))
            raise ValueError(error_msg)
        self.error_rate = error_rate
        #El número máximo de buckets del mismo tamaño
        self._r = math.ceil(1/error_rate)
        self._r = max(self._r, 2)
        '''
        - La estructura de datos para almacenar los buckets es una matriz de colas.
        - Queue[i] almacena la marca de tiempo del bucket de tamaño 2^i en orden descendente.
        - Esta estructura facilita:
             - Saber cuántos buckets del mismo tamaño hay
             - Actualizar una cola de buckets
        '''
        self._queues = []
        if N == 0:
            max_index = -1
        else:
            max_index = int(math.ceil(math.log(N)/math.log(2)))
        self._queues = [deque() for _ in range(max_index + 1)]
        self._timestamp = 0
        self._oldest_bucket_timestamp = -1

    '''
    - Verifica si N (el tamaño de la ventana) es cero. Si es cero, no tiene sentido procesar la actualización, así que simplemente retorna sin hacer nada.
    - Incrementa la marca de tiempo (_timestamp) en 1 y aplica un módulo para asegurarse de que esté en el rango de 0 a 2*N - 1.
    Esto mantiene un seguimiento del tiempo de manera circular en una ventana de tamaño 2*N.
    - Verifica si hay un bucket más antiguo (_oldest_bucket_timestamp está definido y es mayor o igual a cero) y si ese bucket es demasiado antiguo
    según la función _is_bucket_too_old. Si es así, se llama a _drop_oldest_bucket para eliminar el bucket más antiguo.
    - Si elt no es True (es decir, si es False), simplemente retorna sin realizar más acciones, ya que no hay nada que agregar a la estructura para los ceros.
    - Se inicializa carry_over con la marca de tiempo actual.
    - Si el bucket más antiguo aún no se ha establecido (_oldest_bucket_timestamp es -1), se establece como la marca de tiempo actual.
    - Se agrega la marca de tiempo actual al comienzo de la cola (appendleft(carry_over)).
    - Se verifica si la longitud de la cola es menor o igual a _r (el número máximo de buckets del mismo tamaño permitido).
      Si es así, se sale del bucle para pasar a la siguiente cola.
    - Si la longitud de la cola supera _r, se extraen los dos últimos elementos de la cola (last y second_last).
      Luego, se fusionan estos dos buckets, y se actualiza carry_over con el segundo desde el final (second_last).
    - Si el último elemento extraído es igual a la marca de tiempo del bucket más antiguo (last == self._oldest_bucket_timestamp),
      se actualiza _oldest_bucket_timestamp con la segunda marca de tiempo desde el final (self._oldest_bucket_timestamp = second_last).
    '''
    def update(self, elt):
        # Actualiza el flujo con un elemento. (:param elt: el último elemento del flujo, :type elt: bool)
        if self.N == 0:
            return
        self._timestamp = (self._timestamp + 1) % (2 * self.N)
        # Comprobar si se debe eliminar el bucket más antiguo
        if (self._oldest_bucket_timestamp >= 0 and
                self._is_bucket_too_old(self._oldest_bucket_timestamp)):
            self._drop_oldest_bucket()
        if elt is not True:
            return
        carry_over = self._timestamp
        if self._oldest_bucket_timestamp == -1:
            self._oldest_bucket_timestamp = self._timestamp
        for queue in self._queues:
            queue.appendleft(carry_over)
            if len(queue) <= self._r:
                break
            last = queue.pop()
            second_last = queue.pop()
            # Fusionar los dos últimos buckets.
            carry_over = second_last
            if last == self._oldest_bucket_timestamp:
                self._oldest_bucket_timestamp = second_last

    '''
    - result se inicializa en 0. Esta variable almacenará el resultado final de la estimación.
    - max_value se inicializa en 0. Se utilizará para realizar un ajuste en la estimación.
    - power_of_two se inicializa en 1. Se utiliza para calcular la contribución de cada cola a la estimación.
    - queue_length obtiene la longitud de la cola actual.
    - Si la longitud de la cola es mayor que 0, significa que hay buckets en la cola, y se realiza lo siguiente:
          - max_value se actualiza con el valor actual de power_of_two.
            Esto se hace para realizar un seguimiento del valor máximo de power_of_two que ha contribuido a la estimación.
          - Se suma a result el producto de la longitud de la cola y power_of_two. Esto representa la contribución de la cola actual a la estimación.
          - Se duplica power_of_two multiplicándolo por 2.
            Esto es necesario porque cada cola representa buckets de tamaño 2^i, y power_of_two se utiliza para calcular esa potencia de dos.
    - Se resta a result la mitad de max_value redondeada hacia abajo. Este ajuste se realiza para evitar contar duplicados en la estimación.
    '''

    def get_count(self):
        # Devuelve una estimación del número de "True" en los últimos N elementos del flujo.
        result = 0
        max_value = 0
        power_of_two = 1
        for queue in self._queues:
            queue_length = len(queue)
            if queue_length > 0:
                max_value = power_of_two
                result += queue_length * power_of_two
            power_of_two = power_of_two << 1
        result -= math.floor(max_value/2)
        return int(result)

    '''
    - Resta bucket_timestamp de la marca de tiempo actual _timestamp. Esto da como resultado el tiempo transcurrido desde la marca de tiempo del bucket hasta el momento actual.
    - Aplica un módulo al tiempo transcurrido utilizando 2 * self.N como el divisor.
      Esto se hace para manejar el almacenamiento circular de buckets, ya que los buckets se almacenan en módulo 2 * N.
    - Compara el resultado obtenido con self.N. Si el tiempo transcurrido (módulo 2 * self.N) es mayor o igual a self.N,
      entonces el bucket se considera demasiado antiguo y el método devuelve True. En este caso, el bucket debe ser eliminado.
    '''

    def _is_bucket_too_old(self, bucket_timestamp):
        # Los buckets se almacenan en módulo 2 * N
        return (self._timestamp - bucket_timestamp) % (2 * self.N) >= self.N

    @property
    def nb_buckets(self):
        # Devuelve el número de buckets.
        result = 0
        for queue in self._queues:
            result += len(queue)
        return result

    '''
    - Itera sobre las colas en orden inverso (reversed(self._queues)), comenzando desde la cola más grande (la que representa buckets de mayor tamaño).
    - Para cada cola, verifica si la longitud de la cola es mayor que 0 (len(queue) > 0). Si es así, significa que hay un bucket en esa cola,
      así que se ejecutan las siguientes acciones:
          - Se utiliza pop() para eliminar el último elemento de la cola, que representa el bucket más antiguo.
          - Se rompe el bucle, ya que solo necesitas eliminar el bucket más antiguo una vez.
    - Después de eliminar el bucket más antiguo, se establece _oldest_bucket_timestamp en -1. Esto indica que no hay bucket más antiguo definido en este momento.
    - Luego, se realiza otra iteración sobre las colas en orden inverso para encontrar la nueva marca de tiempo del bucket más antiguo.
      Para cada cola, se verifica si la longitud de la cola es mayor que 0. Si es así, se actualiza _oldest_bucket_timestamp con la marca de tiempo del
      último elemento de la cola (queue[-1]), que representa la marca de tiempo del nuevo bucket más antiguo.
    '''

    def _drop_oldest_bucket(self):
        for queue in reversed(self._queues):
            if len(queue) > 0:
                queue.pop()
                break
        self._oldest_bucket_timestamp = -1
        for queue in reversed(self._queues):
            if len(queue) > 0:
                self._oldest_bucket_timestamp = queue[-1]
                break

In [None]:
dgim = Dgim(12)
stream = iter([
    False, False, True, False, True, True, True, False, True,
    True, False, False, True, False, True, True, False
])
for elt in stream:
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 7


In [None]:
# Ejemplo con una ventana más grande
dgim = Dgim(24)
stream = iter([True, False, True, True, False] * 10)  # Repetición de patrón
for elt in stream:
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())


Número estimado de unos en la última ventana: 12


In [None]:
# Ejemplo con una ventana más pequeña
dgim = Dgim(6)
stream = iter([True, False, True, False, True])
for elt in stream:
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 2


In [None]:
# Ejemplo con una tasa de error más baja
dgim = Dgim(12, error_rate=0.2)
stream = iter([True, False, True, False, True, True, True, False, True, True] * 5)
for elt in stream:
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 8


In [None]:
# Ejemplo con un stream de solo ceros
dgim = Dgim(10)
for elt in itertools.repeat(False, 1000):
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 0


In [None]:
# Ejemplo con un stream de solo unos
dgim = Dgim(10)
for elt in itertools.repeat(True, 1000):
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 10


In [None]:
# Ejemplo del capitulo 4 de "Mining of Massing Datasets" (http://infolab.stanford.edu/~ullman/mmds/ch4.pdf)
crt_timestamp = 65
queues = [
    deque([crt_timestamp - 1, crt_timestamp - 2]),
    deque([crt_timestamp - 4]),
    deque([crt_timestamp - 8]),
    deque()
]

dgim = Dgim(10)
dgim._timestamp = crt_timestamp
dgim._queues = queues
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 6


In [None]:
# Ejemplo con un stream vacío
dgim = Dgim(10)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 0


In [None]:
# Ejemplo con un tamaño de ventana 0
dgim = Dgim(0)
stream = iter([True, False, False, True])
for elt in stream:
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 0


In [None]:
# Ejemplo con un tamaño de ventana 1
dgim = Dgim(1)
dgim.update(True)
print("Número estimado de unos en la última ventana:", dgim.get_count())
dgim.update(False)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 1
Número estimado de unos en la última ventana: 0


In [None]:
# Ejemplo con un tamaño de ventana 2
dgim = Dgim(2)
dgim.update(True)
print("Número estimado de unos en la última ventana:", dgim.get_count())
dgim.update(True)
print("Número estimado de unos en la última ventana:", dgim.get_count())
dgim.update(True)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 1
Número estimado de unos en la última ventana: 2
Número estimado de unos en la última ventana: 2


In [None]:
crt_timestamp = 65
queues = [
    deque([crt_timestamp - 1, crt_timestamp - 2]),
    deque([crt_timestamp - 4]),
    deque()
]
dgim = Dgim(6)
dgim._timestamp = crt_timestamp
dgim._queues = queues
dgim._oldest_bucket_timestamp = crt_timestamp - 4
print("Número estimado de unos en la última ventana:", dgim.get_count())
dgim.update(0)
print("Número estimado de unos en la última ventana:", dgim.get_count())
dgim.update(0)
print("Número estimado de unos en la última ventana:", dgim.get_count())

Número estimado de unos en la última ventana: 3
Número estimado de unos en la última ventana: 3
Número estimado de unos en la última ventana: 2


In [3]:
# Ejemplo con posibilidad de elegir el tamaño del stream y de la ventana
import random

stream = []
def generate_random_stream(length):
    for _ in range(length):
        stream.append(bool(random.randint(0, 1)))

length = int(input("Ingrese el tamaño de stream deseado: "))
N = int(input("Ingrese el tamaño de ventana deseado: "))

generate_random_stream(length)
stream = iter(stream)
dgim = Dgim(N)

for elt in stream:
    dgim.update(elt)
print("Número estimado de unos en la última ventana:", dgim.get_count())


Ingrese el tamaño de stream deseado: 2000
Ingrese el tamaño de ventana deseado: 20
Número estimado de unos en la última ventana: 9


In [None]:
# Clase para testear la computación del _r, valores del error incorrectos y la verificación de buckets demasiado antiguos
import unittest
import itertools
from collections import deque

class TestDgim(unittest.TestCase):
    def test_r_computation(self):
        dgim = Dgim(10, 0.5)
        self.assertEqual(2, dgim._r)
        dgim = Dgim(10, 0.1)
        self.assertEqual(10, dgim._r)

    def test_invalid_error_rates(self):
        self.assertRaises(ValueError, Dgim, 10, 0)
        self.assertRaises(ValueError, Dgim, 10, -0.1)
        self.assertRaises(ValueError, Dgim, 10, 1.1)

    def test_is_bucket_too_old(self):
        dgim = Dgim(10)
        dgim._timestamp = 15
        self.assertFalse(dgim._is_bucket_too_old(6))
        self.assertTrue(dgim._is_bucket_too_old(5))
        self.assertTrue(dgim._is_bucket_too_old(16))
        dgim._timestamp = 5
        self.assertFalse(dgim._is_bucket_too_old(16))
        self.assertTrue(dgim._is_bucket_too_old(15))

In [None]:
suite = unittest.TestLoader().loadTestsFromTestCase(TestDgim)

runner = unittest.TextTestRunner()
result = runner.run(suite)

print("\nResultados de las pruebas:")
print(f"Errores: {len(result.errors)}")
print(f"Failures: {len(result.failures)}")
print(f"Pruebas ejecutadas: {result.testsRun}")

...
----------------------------------------------------------------------
Ran 3 tests in 0.008s

OK



Resultados de las pruebas:
Errores: 0
Failures: 0
Pruebas ejecutadas: 3
