In [1]:
#!/usr/bin/env python3
"""
🧪 Teste de Normalização - VERSÃO AMOSTRA
========================================

Testa a normalização em uma pequena amostra do dataset HuMob.
Perfeito para debugar e validar antes de processar os 162M de linhas!

Uso:
    python normalize_sample.py
"""

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
from pathlib import Path
from tqdm import tqdm
import json
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import warnings
warnings.filterwarnings('ignore')

class HuMobNormalizerSample:
    """
    Versão de teste que normaliza apenas uma amostra do dataset.
    
    🧪 Use isso para:
    - Validar se o processo funciona
    - Debugar problemas de POIs
    - Ajustar hiperparâmetros
    - Testar mudanças rapidamente
    """
    
    def __init__(self, input_path: str, output_path: str, max_chunks: int = 10):
        self.input_path = Path(input_path)
        self.output_path = Path(output_path)
        self.max_chunks = max_chunks  # 🧪 LIMITA QUANTOS CHUNKS PROCESSAR
        
        # Scalers que serão salvos para uso posterior
        self.coords_scaler = MinMaxScaler(feature_range=(0, 1))
        self.poi_stats = {}  # Para log1p + normalização manual
        
        # Metadados para salvar junto
        self.normalization_info = {
            "version": "1.0-SAMPLE",
            "strategy": {
                "coordinates": "MinMaxScaler [0,1]",
                "days": "Linear [0,1]", 
                "timeslots": "Circular sin/cos",
                "pois": "log1p + per-column normalization",
                "cities": "Label encoding A=0, B=1, C=2, D=3"
            },
            "sample_info": {
                "max_chunks": max_chunks,
                "chunk_size": 50000,
                "estimated_rows": max_chunks * 50000
            }
        }
    
    def compute_statistics_sample(self, chunk_size: int = 50_000):
        """
        Calcula estatísticas apenas dos primeiros chunks (AMOSTRA).
        """
        print(f"🧪 Calculando estatísticas da AMOSTRA (primeiros {self.max_chunks} chunks)...")
        
        # Acumuladores para coordenadas
        all_coords = []
        
        # Acumuladores para POIs (por coluna) - COM LIMPEZA
        poi_sums = np.zeros(85)
        poi_counts = 0
        poi_max_vals = np.zeros(85)
        
        # Ranges de tempo
        max_day = 0
        
        # Contadores para debug
        problematic_chunks = 0
        total_problematic_pois = 0
        
        pf = pq.ParquetFile(self.input_path)
        print(f"📁 Dataset completo: {pf.metadata.num_rows:,} linhas")
        print(f"🧪 Processando amostra: ~{self.max_chunks * chunk_size:,} linhas ({self.max_chunks} chunks)")
        
        for i, batch in enumerate(tqdm(pf.iter_batches(batch_size=chunk_size), 
                                      total=self.max_chunks, desc="Stats da amostra")):
            
            # 🧪 PARA APÓS max_chunks
            if i >= self.max_chunks:
                break
            
            table = pa.Table.from_batches([batch], schema=pf.schema_arrow)
            
            # 1. COORDENADAS (como sempre)
            x_vals = table.column("x").to_numpy()
            y_vals = table.column("y").to_numpy()
            coords = np.column_stack([x_vals, y_vals])
            all_coords.append(coords)
            
            # 2. DIAS (como sempre)
            d_vals = table.column("d").to_numpy()
            max_day = max(max_day, d_vals.max())
            
            # 3. POIs COM DEBUG DETALHADO
            try:
                poi_lists = table.column("POI").to_pylist()
                
                # 🔍 DEBUG DETALHADO NOS PRIMEIROS CHUNKS
                if i < 3:  # Debug detalhado nos 3 primeiros
                    print(f"\n🔍 DEBUG Chunk {i}:")
                    sample_pois = poi_lists[:5]  # Primeiros 5 POIs do chunk
                    for j, poi in enumerate(sample_pois):
                        if poi is None:
                            print(f"   Row {j}: None")
                        elif not isinstance(poi, (list, tuple)):
                            print(f"   Row {j}: type={type(poi)}")
                        elif len(poi) != 85:
                            print(f"   Row {j}: len={len(poi)} (expected 85)")
                        else:
                            poi_arr = np.array(poi, dtype=np.float32)
                            if not np.isfinite(poi_arr).all():
                                print(f"   Row {j}: contains NaN/Inf")
                            else:
                                print(f"   Row {j}: OK, range=[{poi_arr.min():.1f}, {poi_arr.max():.1f}]")
                
                # Limpa POIs problemáticos
                cleaned_poi_lists = []
                chunk_problems = 0
                
                for poi_list in poi_lists:
                    if poi_list is None:
                        cleaned_poi_lists.append([0.0] * 85)
                        chunk_problems += 1
                    elif not isinstance(poi_list, (list, tuple)):
                        cleaned_poi_lists.append([0.0] * 85)
                        chunk_problems += 1
                    elif len(poi_list) != 85:
                        # Ajusta tamanho
                        if len(poi_list) < 85:
                            poi_fixed = list(poi_list) + [0.0] * (85 - len(poi_list))
                        else:
                            poi_fixed = list(poi_list)[:85]
                        
                        # Converte valores para float seguro
                        poi_cleaned = []
                        for val in poi_fixed:
                            try:
                                float_val = float(val) if val is not None else 0.0
                                if not np.isfinite(float_val) or float_val < 0:
                                    float_val = 0.0
                                poi_cleaned.append(float_val)
                            except (ValueError, TypeError):
                                poi_cleaned.append(0.0)
                        
                        cleaned_poi_lists.append(poi_cleaned)
                        chunk_problems += 1
                    else:
                        # Converte valores para float seguro
                        poi_cleaned = []
                        for val in poi_list:
                            try:
                                float_val = float(val) if val is not None else 0.0
                                if not np.isfinite(float_val) or float_val < 0:
                                    float_val = 0.0
                                poi_cleaned.append(float_val)
                            except (ValueError, TypeError):
                                poi_cleaned.append(0.0)
                        
                        cleaned_poi_lists.append(poi_cleaned)
                
                # Converte para array e processa
                poi_array = np.array(cleaned_poi_lists, dtype=np.float32)
                
                # Limpa qualquer NaN/Inf restante
                poi_array = np.nan_to_num(poi_array, nan=0.0, posinf=0.0, neginf=0.0)
                
                # Acumula estatísticas
                poi_sums += poi_array.sum(axis=0)
                poi_counts += poi_array.shape[0]
                poi_max_vals = np.maximum(poi_max_vals, poi_array.max(axis=0))
                
                # Conta problemas
                if chunk_problems > 0:
                    problematic_chunks += 1
                    total_problematic_pois += chunk_problems
                
            except Exception as e:
                print(f"⚠️ Erro no chunk {i}: {str(e)}")
                problematic_chunks += 1
        
        # Calcula scalers das coordenadas
        all_coords = np.vstack(all_coords)
        self.coords_scaler.fit(all_coords)
        
        # Salva estatísticas POI
        self.poi_stats = {
            'max_vals': poi_max_vals.tolist(),
            'mean_vals': (poi_sums / max(poi_counts, 1)).tolist()  # Evita divisão por zero
        }
        
        # Salva info de tempo
        self.max_day = max_day
        
        print(f"\n✅ Estatísticas da AMOSTRA calculadas:")
        print(f"   📍 Coordenadas: x[{all_coords[:,0].min():.0f}, {all_coords[:,0].max():.0f}], y[{all_coords[:,1].min():.0f}, {all_coords[:,1].max():.0f}] → [0,1]")
        print(f"   📅 Dias: [0, {max_day}]")
        print(f"   🏢 POIs: max_sum={poi_max_vals.max():.0f}, mean_sum={poi_sums.max()/max(poi_counts,1):.2f}")
        print(f"   📊 Linhas processadas: {poi_counts:,}")
        
        if problematic_chunks > 0 or total_problematic_pois > 0:
            print(f"   🔧 Limpeza POI: {problematic_chunks} chunks com problemas, {total_problematic_pois:,} POIs corrigidos")
        else:
            print(f"   ✅ POIs: Nenhum problema encontrado!")
    
    def normalize_coordinates(self, x_vals, y_vals):
        """Normaliza coordenadas para [0, 1]"""
        coords = np.column_stack([x_vals, y_vals])
        coords_norm = self.coords_scaler.transform(coords)
        return coords_norm[:, 0], coords_norm[:, 1]
    
    def normalize_time(self, d_vals, t_vals):
        """
        Normaliza tempo:
        - Dias: linear [0,1]  
        - Timeslots: circular (sin/cos)
        """
        # Dias: normalização linear
        d_norm = d_vals.astype(np.float32) / self.max_day
        
        # Timeslots: normalização circular (importante para horários!)
        t_sin = np.sin(2 * np.pi * t_vals / 48).astype(np.float32)
        t_cos = np.cos(2 * np.pi * t_vals / 48).astype(np.float32)
        
        return d_norm, t_sin, t_cos
    
    def normalize_pois_robust(self, poi_lists):
        """Normaliza POIs com estratégia robusta"""
        valid_poi_lists = []
        
        for poi_list in poi_lists:
            if poi_list is None or not isinstance(poi_list, (list, tuple)):
                valid_poi_lists.append([0.0] * 85)
                continue
            
            # Converte e ajusta tamanho
            if len(poi_list) < 85:
                poi_fixed = list(poi_list) + [0.0] * (85 - len(poi_list))
            elif len(poi_list) > 85:
                poi_fixed = list(poi_list)[:85]
            else:
                poi_fixed = list(poi_list)
            
            # Converte para float e limpa
            poi_cleaned = []
            for val in poi_fixed:
                try:
                    float_val = float(val) if val is not None else 0.0
                    if not np.isfinite(float_val):
                        float_val = 0.0
                    poi_cleaned.append(max(0.0, float_val))
                except (ValueError, TypeError):
                    poi_cleaned.append(0.0)
            
            valid_poi_lists.append(poi_cleaned)
        
        # Converte para array
        poi_array = np.array(valid_poi_lists, dtype=np.float32)
        
        # Aplica log1p e normalização
        poi_log = np.log1p(poi_array)
        
        max_vals = np.array(self.poi_stats['max_vals'])
        max_vals_log = np.log1p(max_vals)
        max_vals_log[max_vals_log == 0] = 1.0
        
        poi_normalized = poi_log / max_vals_log[np.newaxis, :]
        poi_normalized = np.clip(poi_normalized, 0.0, 1.0)
        
        return poi_normalized.tolist()
    
    def normalize_cities(self, city_vals):
        """Converte cidades para encoding numérico"""
        city_mapping = {'A': 0, 'B': 1, 'C': 2, 'D': 3}
        return np.array([city_mapping[c] for c in city_vals], dtype=np.int8)
    
    def process_and_save_sample(self, chunk_size: int = 50_000):
        """
        Processa e salva apenas a AMOSTRA.
        """
        print(f"🔄 Normalizando e salvando AMOSTRA...")
        
        pf = pq.ParquetFile(self.input_path)
        
        writer = None
        processed_rows = 0
        
        for i, batch in enumerate(tqdm(pf.iter_batches(batch_size=chunk_size), 
                                      total=self.max_chunks, desc="Normalizando amostra")):
            
            # 🧪 PARA APÓS max_chunks
            if i >= self.max_chunks:
                break
            
            table = pa.Table.from_batches([batch], schema=pf.schema_arrow)
            
            # Extrai dados originais
            uid_vals = table.column("uid").to_numpy()
            d_vals = table.column("d").to_numpy() 
            t_vals = table.column("t").to_numpy()
            x_vals = table.column("x").to_numpy()
            y_vals = table.column("y").to_numpy()
            city_vals = table.column("city").to_pylist()
            poi_lists = table.column("POI").to_pylist()
            
            # Aplica normalizações
            x_norm, y_norm = self.normalize_coordinates(x_vals, y_vals)
            d_norm, t_sin, t_cos = self.normalize_time(d_vals, t_vals)
            poi_norm = self.normalize_pois_robust(poi_lists)
            city_encoded = self.normalize_cities(city_vals)
            
            # Cria novo schema
            if writer is None:
                new_schema = pa.schema([
                    pa.field("uid", pa.int64()),
                    pa.field("d_orig", pa.uint8()),
                    pa.field("t_orig", pa.uint8()),
                    pa.field("d_norm", pa.float32()),
                    pa.field("t_sin", pa.float32()),
                    pa.field("t_cos", pa.float32()),
                    pa.field("x_norm", pa.float32()),
                    pa.field("y_norm", pa.float32()),
                    pa.field("city", pa.string()),
                    pa.field("city_encoded", pa.int8()),
                    pa.field("POI_norm", pa.list_(pa.float32()))
                ])
                
                # Adiciona metadados
                metadata = {
                    b"normalization_info": json.dumps(self.normalization_info).encode("utf-8"),
                    b"coords_scaler_min": json.dumps(self.coords_scaler.data_min_.tolist()).encode("utf-8"),
                    b"coords_scaler_scale": json.dumps(self.coords_scaler.scale_.tolist()).encode("utf-8"),
                    b"poi_stats": json.dumps(self.poi_stats).encode("utf-8"),
                    b"max_day": str(self.max_day).encode("utf-8")
                }
                new_schema = new_schema.with_metadata(metadata)
                
                writer = pq.ParquetWriter(self.output_path, new_schema, compression='snappy')
            
            # Cria nova tabela
            new_table = pa.table({
                "uid": uid_vals,
                "d_orig": d_vals,
                "t_orig": t_vals, 
                "d_norm": d_norm,
                "t_sin": t_sin,
                "t_cos": t_cos,
                "x_norm": x_norm,
                "y_norm": y_norm,
                "city": city_vals,
                "city_encoded": city_encoded,
                "POI_norm": poi_norm
            }, schema=new_schema)
            
            writer.write_table(new_table)
            processed_rows += len(uid_vals)
        
        if writer:
            writer.close()
            
        print(f"✅ AMOSTRA normalizada salva em: {self.output_path}")
        print(f"📊 Linhas processadas: {processed_rows:,}")
        print(f"💾 Tamanho do arquivo: {self.output_path.stat().st_size / (1024**2):.1f} MB")
    
    def run(self, chunk_size: int = 50_000):
        """Executa normalização da AMOSTRA"""
        print("🧪 Iniciando TESTE de normalização do dataset HuMob")
        print(f"📂 Input: {self.input_path}")
        print(f"📁 Output: {self.output_path}")
        print(f"🔬 Amostra: {self.max_chunks} chunks (~{self.max_chunks * chunk_size:,} linhas)")
        print("="*60)
        
        # Passo 1: Calcular estatísticas da amostra
        self.compute_statistics_sample(chunk_size)
        
        # Passo 2: Processar e salvar amostra
        self.process_and_save_sample(chunk_size)
        
        print("\n🧪 TESTE concluído com sucesso!")
        print("\n📋 Próximos passos:")
        print("   1. Verifique se o arquivo de teste está correto")
        print("   2. Se OK, execute a versão completa")
        print("   3. Se problemas, ajuste parâmetros e teste novamente")


def check_normalized_sample(file_path: str, n_samples: int = 1000):
    """Verifica se a amostra foi normalizada corretamente."""
    if not Path(file_path).exists():
        print(f"❌ Arquivo não encontrado: {file_path}")
        return
    
    print(f"🔍 Verificando AMOSTRA normalizada em: {file_path}")
    
    # Lê amostra
    df = pq.read_table(file_path).slice(0, n_samples).to_pandas()
    
    print(f"\n📊 Estatísticas da amostra ({len(df):,} linhas):")
    print(f"   📍 Coordenadas:")
    print(f"      x_norm: [{df['x_norm'].min():.3f}, {df['x_norm'].max():.3f}] (deve ser [0,1])")
    print(f"      y_norm: [{df['y_norm'].min():.3f}, {df['y_norm'].max():.3f}] (deve ser [0,1])")
    
    print(f"   📅 Tempo:")
    print(f"      d_norm: [{df['d_norm'].min():.3f}, {df['d_norm'].max():.3f}] (deve ser [0,1])")
    print(f"      t_sin: [{df['t_sin'].min():.3f}, {df['t_sin'].max():.3f}] (deve ser [-1,1])")
    print(f"      t_cos: [{df['t_cos'].min():.3f}, {df['t_cos'].max():.3f}] (deve ser [-1,1])")
    
    print(f"   🏢 POIs:")
    poi_sample = np.array(df['POI_norm'].iloc[0])
    print(f"      Exemplo: [{poi_sample.min():.3f}, {poi_sample.max():.3f}] (deve ser [0,1])")
    print(f"      Dimensão: {len(poi_sample)} (deve ser 85)")
    
    # Verifica NaNs
    has_nan = df.isnull().any().any()
    print(f"   🧪 Valores NaN: {'❌ Encontrados!' if has_nan else '✅ Nenhum'}")
    
    print(f"   🏙️ Cidades:")
    print(f"      Encoded: {sorted(df['city_encoded'].unique())} (deve ser [0,1,2,3])")
    print(f"      Original: {sorted(df['city'].unique())}")


if __name__ == "__main__":
    # Configurações para TESTE
    INPUT_FILE = "humob_all_cities_dpsk.parquet"
    OUTPUT_FILE = "humob_sample_normalized.parquet"  # 🧪 Arquivo de teste
    MAX_CHUNKS = 20  # 🧪 Apenas 20 chunks = ~1M linhas (2 minutos)
    
    # Verifica se arquivo de entrada existe
    if not Path(INPUT_FILE).exists():
        print(f"❌ Arquivo não encontrado: {INPUT_FILE}")
        print("📝 Certifique-se de que o arquivo foi gerado pelo data_loader.ipynb")
        exit(1)
    
    # Executa normalização de TESTE
    normalizer = HuMobNormalizerSample(INPUT_FILE, OUTPUT_FILE, max_chunks=MAX_CHUNKS)
    normalizer.run(chunk_size=50_000)
    
    # Verifica resultado
    print("\n" + "="*60)
    check_normalized_sample(OUTPUT_FILE, n_samples=5000)
    
    print(f"\n🧪 TESTE COMPLETO!")
    print(f"   📄 Arquivo de teste: {OUTPUT_FILE}")
    print(f"   📊 ~{MAX_CHUNKS * 50000:,} linhas processadas")
    print(f"\n✅ Se tudo estiver OK, execute a versão completa:")
    print(f"   python normalize_humob_data_fixed.py")

🧪 Iniciando TESTE de normalização do dataset HuMob
📂 Input: humob_all_cities_dpsk.parquet
📁 Output: humob_sample_normalized.parquet
🔬 Amostra: 20 chunks (~1,000,000 linhas)
🧪 Calculando estatísticas da AMOSTRA (primeiros 20 chunks)...
📁 Dataset completo: 162,785,736 linhas
🧪 Processando amostra: ~1,000,000 linhas (20 chunks)


Stats da amostra:   0%|          | 0/20 [00:00<?, ?it/s]


🔍 DEBUG Chunk 0:
   Row 0: OK, range=[0.0, 19.0]
   Row 1: OK, range=[0.0, 19.0]
   Row 2: OK, range=[0.0, 6.0]
   Row 3: OK, range=[0.0, 6.0]
   Row 4: OK, range=[0.0, 8.0]


Stats da amostra:   5%|▌         | 1/20 [00:05<01:49,  5.79s/it]


🔍 DEBUG Chunk 1:
   Row 0: OK, range=[0.0, 3.0]
   Row 1: OK, range=[0.0, 3.0]
   Row 2: OK, range=[0.0, 4.0]
   Row 3: OK, range=[0.0, 8.0]
   Row 4: OK, range=[0.0, 0.0]


Stats da amostra:  10%|█         | 2/20 [00:11<01:43,  5.76s/it]


🔍 DEBUG Chunk 2:
   Row 0: OK, range=[0.0, 8.0]
   Row 1: OK, range=[0.0, 14.0]
   Row 2: OK, range=[0.0, 14.0]
   Row 3: OK, range=[0.0, 3.0]
   Row 4: OK, range=[0.0, 1.0]


Stats da amostra: 100%|██████████| 20/20 [01:53<00:00,  5.68s/it]



✅ Estatísticas da AMOSTRA calculadas:
   📍 Coordenadas: x[1, 200], y[1, 200] → [0,1]
   📅 Dias: [0, 74]
   🏢 POIs: max_sum=236, mean_sum=6.07
   📊 Linhas processadas: 1,000,000
   ✅ POIs: Nenhum problema encontrado!
🔄 Normalizando e salvando AMOSTRA...


Normalizando amostra: 100%|██████████| 20/20 [02:07<00:00,  6.38s/it]


✅ AMOSTRA normalizada salva em: humob_sample_normalized.parquet
📊 Linhas processadas: 1,000,000
💾 Tamanho do arquivo: 27.7 MB

🧪 TESTE concluído com sucesso!

📋 Próximos passos:
   1. Verifique se o arquivo de teste está correto
   2. Se OK, execute a versão completa
   3. Se problemas, ajuste parâmetros e teste novamente

🔍 Verificando AMOSTRA normalizada em: humob_sample_normalized.parquet

📊 Estatísticas da amostra (5,000 linhas):
   📍 Coordenadas:
      x_norm: [0.226, 0.824] (deve ser [0,1])
      y_norm: [0.040, 0.719] (deve ser [0,1])
   📅 Tempo:
      d_norm: [0.000, 1.000] (deve ser [0,1])
      t_sin: [-1.000, 1.000] (deve ser [-1,1])
      t_cos: [-1.000, 1.000] (deve ser [-1,1])
   🏢 POIs:
      Exemplo: [0.000, 0.756] (deve ser [0,1])
      Dimensão: 85 (deve ser 85)
   🧪 Valores NaN: ✅ Nenhum
   🏙️ Cidades:
      Encoded: [np.int8(0)] (deve ser [0,1,2,3])
      Original: ['A']

🧪 TESTE COMPLETO!
   📄 Arquivo de teste: humob_sample_normalized.parquet
   📊 ~1,000,000 linhas