# **Raw Processing - Health Insurance Market Data**

*   **Turma:** eEDB-015/2025-1 - Projeto Integrador
*   **Grupo:** H

### Imports

In [None]:
import os
import re
import csv
import hashlib
import traceback
import gc
from typing import Dict, List, Optional, Union
import pandas as pd
import numpy as np
from pathlib import Path
import argparse
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import json
from jsonschema import validate
from datetime import datetime
import boto3
import logging
from io import BytesIO

### Constantes

In [None]:
# Configuração do logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

# Configuração do cliente S3
s3 = boto3.client('s3')

# Constantes
INPUT_BUCKET = 'your-input-bucket-name' # ALTERAR!
OUTPUT_BUCKET = 'your-output-bucket-name' # ALTERAR!
ZIP_FILE_KEY = 'path/to/your/zipfile.zip' # ALTERAR!

# Anos para processar
years_to_process = ['2014', '2015', '2016']

### Funções Auxiliares

In [None]:
def normalize_file_name(file_name: str) -> str:
    return re.sub(r"_PUF.*", "", file_name)

def ensure_s3_directory(bucket: str, path: str) -> None:
    try:
        s3.put_object(Bucket=bucket, Key=(path.rstrip('/') + '/'))
        logger.info(f"Diretório S3 criado: s3://{bucket}/{path}")
    except Exception as e:
        logger.error(f"Erro ao criar diretório S3: {str(e)}")

def read_csv_from_s3(bucket: str, key: str, chunksize: Optional[int] = None) -> Union[pd.DataFrame, pd.io.parsers.TextFileReader]:
    try:
        obj = s3.get_object(Bucket=bucket, Key=key)
        return pd.read_csv(BytesIO(obj['Body'].read()), chunksize=chunksize)
    except Exception as e:
        logger.error(f"Erro ao ler CSV do S3: {str(e)}")
        raise

def validate_data(df: pd.DataFrame) -> Dict:
    total_rows = len(df)
    numeric_columns = df.select_dtypes(include=[np.number]).columns
    other_columns = df.columns.difference(numeric_columns)
    numeric_null_counts = df[numeric_columns].isnull().sum() if not numeric_columns.empty else pd.Series(dtype=float)
    other_null_counts = df[other_columns].isnull().sum() if not other_columns.empty else pd.Series(dtype=float)
    null_counts = pd.concat([numeric_null_counts, other_null_counts])

    validation_results = {
        "total_rows": total_rows,
        "columns": df.columns.tolist(),
        "null_percentages": (null_counts / total_rows * 100).to_dict()
    }

    return validation_results

def is_csv_file(filename: str) -> bool:
    return filename.lower().endswith('.csv')

def generate_version_hash(df: pd.DataFrame) -> str:
    return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()

### Funções Auxiliares - Específicas para Camada **RAW**

In [None]:
def process_and_save_file(bucket: str, key: str, table_name: str) -> None:
    try:
        logger.info(f"Começando o processamento do arquivo: {key}")
        if key.startswith('.') or not is_csv_file(key):
            logger.warning(f"Ignorando arquivo não CSV ou oculto: {key}")
            return

        # Normalizar o nome da tabela
        table_name = normalize_file_name(os.path.basename(key))

        # Leitura em chunks para otimização de memória
        chunk_size = 100000  # Ajuste conforme necessário
        chunks = read_csv_from_s3(bucket, key, chunksize=chunk_size)

        table_path = f"{table_name}/"
        ensure_s3_directory(OUTPUT_BUCKET, table_path)

        # Colunas para comparação (excluindo ingestDate, partitionDate e version)
        compare_columns = None
        new_records_total = 0
        processed_chunks = 0

        for chunk in chunks:
            processed_chunks += 1
            logger.info(f"Processando chunk {processed_chunks} para {table_name}")

            if chunk.empty:
                continue

            chunk = chunk.infer_objects()
            chunk['ingestDate'] = pd.Timestamp.now()
            chunk['partitionDate'] = pd.Timestamp.now().strftime("%Y%m%d")
            chunk['version'] = generate_version_hash(chunk)

            if compare_columns is None:
                compare_columns = [col for col in chunk.columns if col not in ["ingestDate", "partitionDate", "version"]]

            # Ler dados existentes (se houver)
            existing_df = pd.DataFrame()
            try:
                existing_objects = s3.list_objects_v2(Bucket=OUTPUT_BUCKET, Prefix=table_path)
                if 'Contents' in existing_objects:
                    existing_df = pd.concat([
                        pd.read_parquet(BytesIO(s3.get_object(Bucket=OUTPUT_BUCKET, Key=obj['Key'])['Body'].read()))
                        for obj in existing_objects['Contents']
                        if obj['Key'].endswith('.parquet')
                    ])
            except Exception as e:
                logger.error(f"Erro ao ler dados existentes para {table_name}: {str(e)}")

            if not existing_df.empty:
                # Identificar registros únicos em chunk
                merged = chunk.merge(existing_df[compare_columns + ['partitionDate']],
                                      on=compare_columns,
                                      how='left',
                                      suffixes=('', '_existing'))

                # Manter apenas registros novos ou atualizações em partições mais recentes
                new_records = merged[
                    (merged['partitionDate_existing'].isnull()) |
                    (merged['partitionDate'] > merged['partitionDate_existing'])
                ][chunk.columns]

                # Combinar registros existentes com novos registros únicos
                combined_df = pd.concat([existing_df, new_records], ignore_index=True)
            else:
                combined_df = chunk
                new_records = chunk

            # Remover duplicatas, mantendo o registro mais antigo
            final_df = combined_df.sort_values('ingestDate').drop_duplicates(subset=compare_columns, keep='first')

            new_records_total += len(new_records)

            # Validar os dados
            validation_results = validate_data(final_df)
            logger.info(f"Validação para {table_name}: {validation_results}")

            # Salvar os dados no S3
            buffer = BytesIO()
            final_df.to_parquet(buffer, engine='pyarrow', compression='snappy', index=False)
            buffer.seek(0)
            s3.put_object(Bucket=OUTPUT_BUCKET, Key=f"{table_path}{table_name}_{processed_chunks}.parquet", Body=buffer.getvalue())
            logger.info(f"Chunk {processed_chunks} processado e salvo para a tabela {table_name}")

            # Liberar memória
            del chunk, combined_df, final_df, existing_df
            if 'merged' in locals():
                del merged
            if 'new_records' in locals():
                del new_records
            gc.collect()

        logger.info(f"Processamento concluído para {key}")
        logger.info(f"Total de novos registros para {table_name}: {new_records_total}")

    except Exception as e:
        logger.error(f"Erro ao processar {key}: {str(e)}")
        logger.error(traceback.format_exc())

In [None]:
def process_zip_file():
    try:
        zip_obj = s3.get_object(Bucket=INPUT_BUCKET, Key=ZIP_FILE_KEY)
        zip_content = BytesIO(zip_obj['Body'].read())

        # Processar o conteúdo do zip aqui
        # Por exemplo, você pode usar a biblioteca zipfile para extrair os arquivos

        # Após extrair, processe cada arquivo CSV
        for file_name in extracted_files:
            if is_csv_file(file_name):
                table_name = f"tb_{normalize_file_name(file_name).lower()}"
                process_and_save_file(INPUT_BUCKET, file_name, table_name)

    except Exception as e:
        logger.error(f"Erro ao processar o arquivo ZIP: {str(e)}")
        logger.error(traceback.format_exc())

### Run

In [None]:
if __name__ == "__main__":
    process_zip_file()