In [1]:
import numpy as np
from typing import Dict, List, Tuple, Optional, Iterator
import logging
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import queue
from abc import ABC, abstractmethod
import hashlib
import psutil

In [2]:
# Configuração de logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class ProcessingMetrics:
    """Classe para rastreamento de métricas de processamento"""
    def __init__(self, processing_time: float, memory_usage: float,
                 throughput: float, error_rate: float, batch_size: int):
        self.processing_time = processing_time
        self.memory_usage = memory_usage
        self.throughput = throughput
        self.error_rate = error_rate
        self.batch_size = batch_size

class DataChunk:
    """Classe para gerenciamento eficiente de chunks de dados"""
    def __init__(self, data: np.ndarray, metadata: Dict):
        self._data = data
        self._view = None
        self._metadata = metadata
        self._hash = None

    @property
    def data(self) -> np.ndarray:
        """Retorna view dos dados para economia de memória"""
        if self._view is None:
            self._view = self._data.view()
        return self._view

    @property
    def hash(self) -> str:
        """Calcula hash do chunk para verificação de integridade"""
        if self._hash is None:
            self._hash = hashlib.md5(self._data.tobytes()).hexdigest()
        return self._hash

    def process_inplace(self, func) -> None:
        """Processa dados in-place para economia de memória"""
        func(self._data)
        self._view = None  # Invalida view atual
        self._hash = None  # Invalida hash atual

class DataProcessor(ABC):
    """Classe base abstrata para processadores de dados"""
    @abstractmethod
    def process(self, chunk: DataChunk) -> DataChunk:
        pass

class StreamProcessor:
    """
    Processador de streams de dados otimizado para grandes volumes
    """
    def __init__(self,
                 chunk_size: int = 1000,
                 max_workers: int = 4,
                 buffer_size: int = 1000):
        self.chunk_size = chunk_size
        self.max_workers = max_workers
        self.buffer_size = buffer_size
        self.metrics = []
        self.processing_queue = queue.Queue(maxsize=buffer_size)
        self.result_queue = queue.Queue(maxsize=buffer_size)
        self._stop_event = threading.Event()

    def _initialize_workers(self) -> None:
        """Inicializa pool de workers para processamento paralelo"""
        self.thread_pool = ThreadPoolExecutor(max_workers=self.max_workers)
        self.process_pool = ProcessPoolExecutor(max_workers=self.max_workers)

    def _create_optimized_chunk(self,
                              data: np.ndarray,
                              metadata: Optional[Dict] = None) -> DataChunk:
        """Cria chunk otimizado com metadata"""
        if metadata is None:
            metadata = {}
        return DataChunk(data, metadata)

    def _process_chunk_with_metrics(self,
                                  chunk: DataChunk,
                                  processor: DataProcessor) -> Tuple[DataChunk, ProcessingMetrics]:
        """Processa chunk individual com métricas"""
        start_time = time.time()
        start_memory = self._get_memory_usage()

        try:
            result = processor.process(chunk)

            end_time = time.time()
            end_memory = self._get_memory_usage()
            processing_time = end_time - start_time
            memory_delta = end_memory - start_memory
            throughput = len(chunk.data) / processing_time

            metrics = ProcessingMetrics(
                processing_time=processing_time,
                memory_usage=memory_delta,
                throughput=throughput,
                error_rate=0.0,
                batch_size=len(chunk.data)
            )

            return result, metrics

        except Exception as e:
            logger.error(f"Erro no processamento: {str(e)}")
            raise

    def _optimize_memory_usage(self, chunk: DataChunk) -> DataChunk:
        """Otimiza uso de memória do chunk"""
        if not chunk.data.flags.owndata:
            return chunk

        optimized_data = np.ascontiguousarray(chunk.data)
        return self._create_optimized_chunk(optimized_data, chunk._metadata)

    def process_stream(self,
                      data_generator: Iterator[np.ndarray],
                      processor: DataProcessor) -> Iterator[DataChunk]:
        """Processa stream de dados de forma otimizada"""
        self._initialize_workers()

        def producer():
            try:
                for data_batch in data_generator:
                    if self._stop_event.is_set():
                        break
                    chunk = self._create_optimized_chunk(data_batch)
                    chunk = self._optimize_memory_usage(chunk)
                    self.processing_queue.put(chunk)
            finally:
                self.processing_queue.put(None)

        def consumer():
            while True:
                chunk = self.processing_queue.get()
                if chunk is None:
                    self.result_queue.put(None)
                    break

                try:
                    result, metrics = self._process_chunk_with_metrics(chunk, processor)
                    self.metrics.append(metrics)
                    self.result_queue.put(result)
                except Exception as e:
                    logger.error(f"Erro no consumidor: {str(e)}")
                    self._stop_event.set()
                    raise
                finally:
                    self.processing_queue.task_done()

        producer_thread = threading.Thread(target=producer)
        consumer_thread = threading.Thread(target=consumer)

        producer_thread.start()
        consumer_thread.start()

        while True:
            result = self.result_queue.get()
            if result is None:
                break
            yield result
            self.result_queue.task_done()

        producer_thread.join()
        consumer_thread.join()
        self.thread_pool.shutdown()
        self.process_pool.shutdown()

    def get_processing_metrics(self) -> List[ProcessingMetrics]:
        """Retorna métricas de processamento"""
        return self.metrics

    @staticmethod
    def _get_memory_usage() -> float:
        """Obtém uso atual de memória"""
        process = psutil.Process()
        return process.memory_info().rss / 1024 / 1024  # MB

class OptimizedDataTransformer:
    """Implementa transformações otimizadas em dados"""
    def __init__(self):
        self._cached_views = {}

    def transform_batch(self,
                       data: np.ndarray,
                       operations: List[str]) -> np.ndarray:
        """Aplica sequência de transformações em batch de dados"""
        current = data.view()

        for op in operations:
            if op == 'normalize':
                mean = current.mean()
                std = current.std()
                result = current.copy()
                for idx in np.ndindex(current.shape):
                    result.itemset(idx, (current.item(idx) - mean) / std)
                current = result

            elif op == 'filter_outliers':
                q1, q3 = np.percentile(current, [25, 75])
                iqr = q3 - q1
                mask = ((current >= (q1 - 1.5 * iqr)) &
                       (current <= (q3 + 1.5 * iqr)))
                current = current[mask].copy()

            elif op == 'fill_missing':
                if np.any(np.isnan(current)):
                    temp = current.copy()
                    temp.fill(np.nanmean(current))
                    current = temp

        return current

    def apply_rolling_operation(self,
                              data: np.ndarray,
                              window_size: int,
                              operation: str) -> np.ndarray:
        """Aplica operação em janela móvel de forma otimizada"""
        result = np.zeros(len(data) - window_size + 1)

        for i in range(len(result)):
            window = data[i:i + window_size].view()

            if operation == 'mean':
                result[i] = window.mean()
            elif operation == 'std':
                result[i] = window.std()
            elif operation == 'sum':
                result[i] = window.sum()

        return result

class SampleProcessor(DataProcessor):
    """Processador de exemplo para demonstração"""
    def process(self, chunk: DataChunk) -> DataChunk:
        transformer = OptimizedDataTransformer()
        processed_data = transformer.transform_batch(
            chunk.data,
            operations=['normalize', 'filter_outliers', 'fill_missing']
        )
        return DataChunk(processed_data, chunk._metadata)

def generate_sample_data(n_samples: int = 1000000):
    """Gera dados de exemplo para processamento"""
    for _ in range(0, n_samples, 1000):
        yield np.random.randn(1000)

if __name__ == "__main__":
    processor = StreamProcessor(chunk_size=1000, max_workers=4)
    sample_processor = SampleProcessor()

    logger.info("Iniciando processamento...")
    start_time = time.time()

    processed_chunks = []
    for chunk in processor.process_stream(
        generate_sample_data(1000000),
        sample_processor
    ):
        processed_chunks.append(chunk)

    total_time = time.time() - start_time

    metrics = processor.get_processing_metrics()
    avg_throughput = np.mean([m.throughput for m in metrics])
    avg_memory = np.mean([m.memory_usage for m in metrics])

    logger.info(f"Processamento concluído em {total_time:.2f} segundos")
    logger.info(f"Throughput médio: {avg_throughput:.2f} registros/s")
    logger.info(f"Uso médio de memória: {avg_memory:.2f} MB")

2024-12-20 13:51:25,241 - __main__ - INFO - Iniciando processamento...
2024-12-20 13:51:26,904 - __main__ - INFO - Processamento concluído em 1.66 segundos
2024-12-20 13:51:26,906 - __main__ - INFO - Throughput médio: 1065799.10 registros/s
2024-12-20 13:51:26,907 - __main__ - INFO - Uso médio de memória: 0.01 MB
