## 1. Librería

*   Frecuencia aproximada de un ítem:
    La librería permite consultar cuántas veces aparece un ítem en el flujo de datos, sin necesitar almacenar cada aparición exactamente. Para esta tarea se utilizará la estructura de Count-Min Sketch, que estima la frecuencia con un margen de error controlado y poco uso de memoria.

*   Elementos más frecuentes (Top-k):
    La librería permite identificar los ítems más repetidos dentro del flujo, para conocer las tendencias. Para esta tarea se utilizará la estructura de Space-Saving, que mantiene un resumen compacto de los ítems más frecuentes y sus estimaciones.

*   Estimación de momentos:
    La librería permite calcular métricas estadísticas como la varianza de las frecuencias, para analizar la dispersión o desigualdad en los datos. Para esta tarea se utilizará la estructura de AMS Sketch (Alon-Matias-Szegedy), que ofrece estimaciones eficientes y con alta precisión.
    
*   Número de elementos únicos:
    La librería permite estimar la cantidad de ítems distintos que han aparecido en el flujo, sin necesidad de almacenarlos todos. Para esta tarea se utilizará la estructura de HyperLogLog++, que ofrece una estimación precisa de la cardinalidad con muy poco uso de memoria.

*   Agregaciones sobre ventanas deslizantes:
    La librería permite hacer consultas sobre los últimos n ítems del flujo, como la suma o promedio en la ventana más reciente. Para esta tarea se utilizará la estructura de Sliding Window Aggregation, que actualiza estos valores de forma dinámica.



## 2. Estructuras Implementadas

### 2.1. Count-Min Sketch
Permite estimar la frecuencia de cualquier clave en el flujo de datos usando múltiples funciones de hash.

Cada vez que se recibe una clave, se calcula un índice para cada fila del sketch y se incrementa el contador correspondiente.  
Esto garantiza que, aunque algunos contadores puedan sobrestimar, la estimación final se aproxima bien gracias a usar el mínimo entre las filas.

Complejidad:
* La inserción y la estimación dependen del número de funciones de hash:  $
\boxed{O(\text{número de hashes})}
$










In [None]:
import mmh3
import sys
from array import array
from math import ceil, e, log
from typing import Union

class CountMinSketch:

    def __init__(
        self,
        epsilon: float,
        delta: float,
        conservative: bool = False,
    ):
        # Validar rangos de parámetros
        if not (0 < epsilon < 1):
            raise ValueError(f"epsilon debe estar en (0,1), pero vino {epsilon}")
        if not (0 < delta < 1):
            raise ValueError(f"delta debe estar en (0,1), pero vino {delta}")

        self.epsilon = epsilon
        self.delta = delta
        self.conservative = conservative

        self.w = ceil(e / epsilon)
        self.d = ceil(log(1 / delta))
        self.total = 0

        self.table = [array('L', [0] * self.w) for _ in range(self.d)]

        self.seeds = [((i * 0xFBA4C795 + 1) & 0xFFFFFFFF) for i in range(self.d)]

    def update(self, key: Union[str, bytes], count: int = 1) -> None:
        if count < 1:
            raise ValueError(f"count debe ser >=1, pero vino {count}")

        indices = [mmh3.hash(str(key), seed) % self.w for seed in self.seeds]

        if self.conservative:
            valores = [self.table[i][idx] for i, idx in enumerate(indices)]
            minimo = min(valores)
            for i, idx in enumerate(indices):
                if self.table[i][idx] == minimo:
                    self.table[i][idx] += count
        else:
            for i, idx in enumerate(indices):
                self.table[i][idx] += count

        self.total += count

    def estimate(self, key: Union[str, bytes]) -> int:
        """
        devuelve la estimación de frecuencia de key
        """
        return min(
            self.table[i][mmh3.hash(key, seed) % self.w]
            for i, seed in enumerate(self.seeds)
        )

    def reset(self):
        self.total = 0
        for i in range(self.d):
            for j in range(self.w):
                self.table[i][j] = 0

    def __len__(self) -> int:
        return self.total

    def get_memory_usage(self):
        size = sys.getsizeof(self.table)
        for row in self.table:
            size += sys.getsizeof(row)
        size += sys.getsizeof(self.seeds)
        return size

### 2.2. Space-Saving (Stream Summary)
Es una estructura diseñada para encontrar los elementos más frecuentes (top-k) en un flujo de datos con memoria limitad

* Cuando agregamos un nuevo elemento
Si ya está en el resumen: incrementa su contador.

* Si no está y hay espacio: lo añade con contador 1.

* Si no está y no hay espacio: reemplaza el elemento con menor frecuencia, sumando ese contador + 1 para capturar el posible error.

De esta forma, siempre se priorizan los elementos más frecuentes en el resumen.+

Complejidad

* Inserción: 𝑂
(
𝑘
)


In [None]:
from structures.base import StreamEstimator

class SpaceSaving(StreamEstimator):
    """
    estructura para top-k elementos más frecuentes
    """

    def __init__(self, k: int = 10):
        if k < 1:
            raise ValueError("k debe ser al menos 1")
        self.k = k
        self.counters = {}

    def update(self, item, count=1):

        if item in self.counters:
            freq, err = self.counters[item]
            self.counters[item] = (freq + count, err)
        elif len(self.counters) < self.k:
            self.counters[item] = (count, 0)
        else:
            min_item = min(self.counters, key=lambda x: self.counters[x][0])
            min_count, min_error = self.counters[min_item]

            del self.counters[min_item]
            self.counters[item] = (min_count + count, min_count)

    def estimate(self, item):
        return self.counters.get(item, (0, 0))[0]

    def consultar_top_k(self, k=None):
        k = k or self.k
        return sorted(self.counters.items(), key=lambda x: -x[1][0])[:k]

    def reset(self):
        self.counters = {}

    def __repr__(self):
        return f"<SpaceSaving k={self.k} counters={len(self.counters)}>"

    def get_memory_usage(self):
        import sys
        size = sys.getsizeof(self.counters)
        for k, (c, e) in self.counters.items():
            size += sys.getsizeof(k) + sys.getsizeof(c) + sys.getsizeof(e)
        return size

### 2.3. AMS Sketch
Estima el segundo momento o varianza de frecuencias de un flujo de datos.
#### Función signo
Para cada clave ingresada se obtiene un signo
$g(x) =
\begin{cases}
+1 & \text{con probabilidad } 0.5 \\
-1 & \text{con probabilidad } 0.5
\end{cases}$
Luego, se actuliza un contador
$Z \leftarrow Z + g(x)$

#### Estimador con un contador

Luego de procesar el flujo, se calcula:

$\hat{F}_2 = Z^2$

Este valor es un estimador insesgado de $ F_2 $, , pero puede tener varianza alta.

#### Estimador con varios contadores
Para reducir la varianza, se utilizan varios contadores independientes $(Z_1, Z_2, \ldots, Z_r )$, cada uno con su propia función de signo:

$
\hat{F}_2 = \frac{1}{r} \sum_{i=1}^{r} Z_i^2
$

Esto asegura un estimador más estable y con menor error relativo.

##### Complejidad  
* Tanto para la inserción como para la estimación, la complejidad es proporcional al número de contadores que usemos.  
$
\boxed{O(\text{n})}
$


In [None]:
import mmh3
from random import randint
from math import sqrt
from structures.base import StreamEstimator

class AMSSketch(StreamEstimator):
    """
    Estima el segundo momento de un flujo de datos
    """

    def __init__(self, num_projections: int = 10, seed: int = None):
        """
        num_projections es el número de contadores
        """
        self.num_projections = num_projections
        self.seed = seed or randint(0, 1 << 30)
        self.counters = [0] * num_projections

    def _sign_hash(self, item, i):
        """
        genera un signo +/-1 para el ítem para cada contador i
        """
        combined_seed = self.seed + i
        h = mmh3.hash(str(item), combined_seed, signed=True)
        return 1 if (h & 1) == 0 else -1

    def update(self, item, count=1):
        for i in range(self.num_projections):
            sign = self._sign_hash(item, i)
            self.counters[i] += sign * count

    def estimate(self, item=None):
        """
        estimación del segundo momento
        """
        estimates = [(c ** 2) for c in self.counters]
        return sum(estimates) / self.num_projections

    def reset(self):
        self.counters = [0] * self.num_projections

    def __repr__(self):
        return f"<AMSSketch projections={self.num_projections} seed={self.seed}>"

    def get_memory_usage(self):
        import sys
        size = sys.getsizeof(self.counters)
        for c in self.counters:
            size += sys.getsizeof(c)
        return size


### 2.4. HyperLogLog++
Cada vez que se recibe un nuevo elemento, se calcula un hash uniforme de 64 bits.  
Se usan los primeros bits para decidir qué registro actualizar y el resto para determinar cuántos ceros consecutivos hay.  
Se actualiza el registro correspondiente con la mayor cantidad de ceros observada.
Combina todos los registros para dar una aproximación muy precisa del número de elementos distintos en el flujo, usando técnicas de corrección.

* Complejidad:
Tanto la actualización como la estimación final dependen del número de registros (buckets):  
$
\boxed{O(\text{número de buckets})}
$

In [None]:
import mmh3
import math
from structures.base import StreamEstimator

class HyperLogLogPlusPlus(StreamEstimator):

    def __init__(self, b: int = 12):
        if not (4 <= b <= 16):
            raise ValueError("b debe estar en [4,16]")
        self.b = b
        self.m = 1 << b
        self.alpha = 0.7213 / (1 + 1.079 / self.m)
        self._w_bits = 64 - b

        self.sparse = {}
        self.dense = None

    def _rho(self, w: int) -> int:
        if w == 0:
            return self._w_bits + 1
        return (self._w_bits - w.bit_length()) + 1

    def _hash(self, value: str) -> int:
        return mmh3.hash64(value.encode('utf-8'), signed=False)[0]

    def update(self, item, count=1):
        x = self._hash(str(item))
        idx = x >> self._w_bits
        w = x & ((1 << self._w_bits) - 1)
        rho = self._rho(w)

        if self.dense is not None:
            self.dense[idx] = max(self.dense[idx], rho)
        else:
            if idx not in self.sparse or rho > self.sparse[idx]:
                self.sparse[idx] = rho
            if len(self.sparse) > (self.m / 2):
                self._convert_to_dense()

    def _convert_to_dense(self):
        self.dense = [0] * self.m
        for idx, val in self.sparse.items():
            self.dense[idx] = val
        self.sparse = None

    def estimate(self, item=None) -> float:
        if self.dense is not None:
            registers = self.dense
        else:
            V = self.m - len(self.sparse)
            if V > 0:
                return self.m * math.log(self.m / V)
            registers = [0] * self.m
            for idx, val in self.sparse.items():
                registers[idx] = val

        Z = 1.0 / sum(2.0 ** -r for r in registers)
        E = self.alpha * self.m * self.m * Z

        if E > (1/30) * (1 << 64):
            E = - (1 << 64) * math.log(1 - E / (1 << 64))

        return int(E)

    def reset(self):
        self.sparse = {}
        self.dense = None

    def get_memory_usage(self):
        import sys
        size = sys.getsizeof(self)
        if self.dense is not None:
            size += sys.getsizeof(self.dense)
        else:
            size += sys.getsizeof(self.sparse)
            for k, v in self.sparse.items():
                size += sys.getsizeof(k) + sys.getsizeof(v)
        return size


### 2.5. Sliding Window Aggregation
Permite realizar consultas agregadas (por ejemplo, suma, promedio) en una ventana deslizante de tamaño fijo.


Usa una estructura como una deque para almacenar solo los últimos N elementos.


* Mantiene los últimos N elementos en un deque (doble cola eficiente para inserciones y eliminaciones en ambos extremos).
* A medida que llegan nuevos elementos, añade el nuevo elemento al final de la ventana y elimina el m+as antiguo.
* Calcula la suma acumulada y, a partir de esta, el promedio de los últimos elementos.
* Permite consultar métricas básicas de la ventana: suma, promedio, tamaño actual, máximo y mínimo.




Complejidad
* Inserción y eliminación (por deque): 𝑂
(
1
)

* Cálculo de promedio (usando suma acumulada): 𝑂
(
1
)

* Consulta de máximo y mínimo:𝑂
(
𝑁
)


In [None]:
from collections import deque
from structures.base import StreamEstimator

class SlidingWindowAggregator(StreamEstimator):

    def __init__(self, window_size: int = 100):
        if window_size < 1:
            raise ValueError("window_size debe ser >= 1")
        self.window_size = window_size
        self.window = deque()
        self.sum = 0

    def update(self, item, count=1):
        for _ in range(count):
            self.window.append(item)
            self.sum += item
            if len(self.window) > self.window_size:
                removed = self.window.popleft()
                self.sum -= removed

    def estimate(self, item=None):
        if not self.window:
            return 0
        return self.sum / len(self.window)

    def consultar_ventana_deslizante(self):
        return {
            "suma": self.sum,
            "promedio": self.sum / len(self.window) if self.window else 0,
            "size_actual": len(self.window),
            "maximo": max(self.window) if self.window else None,
            "minimo": min(self.window) if self.window else None
        }

    def reset(self):
        self.window.clear()
        self.sum = 0

    def __repr__(self):
        return f"<SlidingWindowAggregator window_size={self.window_size} current_size={len(self.window)}>"

    def get_memory_usage(self):
        import sys
        size = sys.getsizeof(self.window)
        for item in self.window:
            size += sys.getsizeof(item)
        size += sys.getsizeof(self.sum)
        return size


### STREAM ESTIMATOR

In [None]:
from structures.count_min_sketch import CountMinSketch
from structures.hyperloglogpp import HyperLogLogPlusPlus
from structures.stream_summary import SpaceSaving
from structures.ams_sketch import AMSSketch
from structures.sliding_window import SlidingWindowAggregator

class StreamEstimatorManager:
    """
    Coordinador de sketches para procesamiento de streams con memoria limitada.
    """

    def __init__(self, **kwargs):
        """
        inicializa todos los sketches internos con parámetros opcionales
        """
        self.cms = CountMinSketch(
            epsilon=kwargs.get("cms_epsilon", 0.01),
            delta=kwargs.get("cms_delta", 0.01)
        )
        self.hll = HyperLogLogPlusPlus(
            b=kwargs.get("hll_b", 12)
        )
        self.ss = SpaceSaving(
            k=kwargs.get("ss_k", 10)
        )
        self.ams = AMSSketch(
            num_projections=kwargs.get("ams_projections", 10)
        )
        self.sliding = SlidingWindowAggregator(
            window_size=kwargs.get("window_size", 100)
        )

    def update(self, item, count=1):

        self.cms.update(item, count)

        self.hll.update(item)

        self.ss.update(item, count)

        self.ams.update(item, count)

        try:
            num_item = int(item)
            self.sliding.update(num_item, count)
        except ValueError:
            pass

    def consultar_frecuencia(self, item):
        return self.cms.estimate(item)

    def consultar_unicidad(self):
        return self.hll.estimate()

    def consultar_top_k(self, k=10):
        return self.ss.consultar_top_k(k)

    def consultar_varianza(self):
        return self.ams.estimate()


    def consultar_ventana_deslizante(self):
        return self.sliding.consultar_ventana_deslizante()

    def reset(self):

        self.cms.reset()
        self.hll.reset()
        self.ss.reset()
        self.ams.reset()
        self.sliding.reset()

    def get_memory_usage(self):
        return {
            "CountMinSketch": self.cms.get_memory_usage(),
            "HyperLogLog++": self.hll.get_memory_usage(),
            "SpaceSaving": self.ss.get_memory_usage(),
            "AMSSketch": self.ams.get_memory_usage(),
            "SlidingWindowAggregator": self.sliding.get_memory_usage()
        }

#### Driver

In [None]:
import random
from structures.stream_estimator_manager import StreamEstimatorManager

def main():
    manager = StreamEstimatorManager(
        epsilon=0.01,
        delta=0.01,
        k=10,
        window_size=1000,
        num_hashes=5
    )

    NUM_ITEMS = 1_000_000
    print("Insertar 1 millón de claves")

    # Generar claves al azar
    keys = [str(random.randint(1, 10_000)) for _ in range(NUM_ITEMS)]

    # Insertar claves
    for key in keys:
        manager.update(key)

    print("Consultas a la librería")

    # Frecuencia aproximada
    test_key = keys[0]
    freq = manager.consultar_frecuencia(test_key)
    print(f"Frecuencia aproximada de '{test_key}': {freq}")

    # Estimación de elementos únicos
    uniq = manager.consultar_unicidad()
    print(f"Número de elementos únicos aproximado: {uniq}")

    # Top-k
    top_k = manager.consultar_top_k(10)
    print("Top 10 elementos más frecuentes:")
    for item, count in top_k:
        print(f"  {item}: {count}")

    # Estimación del segundo momento (varianza)
    varianza = manager.consultar_varianza()
    print(f"Segundo momento (varianza): {varianza}")

    # Estadísticas de la ventana deslizante
    ventana = manager.consultar_ventana_deslizante()
    print("Ventana deslizante:")
    for k, v in ventana.items():
        print(f"  {k}: {v}")

if __name__ == "__main__":
    main()
