In [None]:
import sys
import pandas as pd
import numpy as np
import boto3
import re
from io import BytesIO
from awsglue.utils import getResolvedOptions

BUCKET_NAME = 'bucket-challenge-03'
PREFIX_RAW = 'data-input/dados/'
PREFIX_TRUSTED = 'data-output/'

s3_client = boto3.client('s3')

In [None]:
def listar_arquivos_s3(bucket, prefix):
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    arquivos = []
    for obj in response.get('Contents', []):
        key = obj['Key']
        if key.endswith('.csv'):
            arquivos.append(f"s3://{bucket}/{key}")
    return arquivos

def processar(df_original):

    df = df_original.copy()
    colunas_especificas = [
        'A004', 'A005',
        'B0011','B0012','B0013','B0015','B0018','B00111','B00112', 
        'B0014','B0016','B0017','B0019','B00110','B00113',
        'B002', 
        'B0031','B0032','B0033','B0034','B0035','B0036','B0037',
        'B0041','B0042','B0043','B0044','B0045','B0046',
        'B009B','B009D','B009F',
        'B0101','B0102','B0103','B0104','B0105','B0106',
        'B011',
        'C001','C002',
        'C01012','C01022','C011A12','C011A22',
        'C012','C013',
        'D0013','D0023','D0033','D0043','D0053','D0063','D0073',
        'E001',
        'F001',
        'F002A1','F002A2','F002A3','F002A4','F002A5'
    ]

    colunas_vdf = [c for c in colunas_especificas if c in df.columns]

    padrao_vdf = re.compile(r'^(V|D|F|A|C|B|E)[0-9A-Z]')

    todas_vdf = [c for c in df.columns if padrao_vdf.match(c) and c not in ['Ano','CAPITAL']]

    colunas_para_remover = [c for c in todas_vdf if c not in colunas_vdf]

    df = df.drop(columns=colunas_para_remover)

    anos = df["Ano"].unique()
    for a in anos:
        df[f"Ano_{int(a)}"] = (df["Ano"] == a).astype(int)

    df = df.drop(columns=["Ano"])

    colunas_outras = [
        c for c in df.columns
        if (not padrao_vdf.match(c) or c in ['CAPITAL'])
        and c not in colunas_vdf
    ]

    id_vars = colunas_outras + [c for c in df.columns if c.startswith("Ano_")]

    df_transformado = df.copy()

    def calc_grupo(df, cols):
        if not cols:
            return 2
        return np.where(df[cols].isin([1,1.0,'1']).any(axis=1), 1, 2)

    def calc_soma(df, cols):
        return df[cols].fillna(0).sum(axis=1)

    grupos = {
        "B1_Sintomas_Principais": ['B0011','B0012','B0013','B0015','B0018','B00111'],
        "B1_Outros_Sintomas": ['B0014','B0016','B0017','B0019','B00110','B00112','B00113'],
        "B3_Isolamento": ['B0031'],
        "B3_Buscou_Orientacao": ['B0032','B0034','B0035','B0036'],
        "B3_Automedicacao": ['B0033'],
        "B3_Outra": ['B0037'],
        "B4_Publico": ['B0041','B0042','B0043'],
        "B4_Privado": ['B0044','B0045','B0046'],
        "B9": ['B009B','B009D','B009F'],
        "B10": ['B0101','B0102','B0103','B0104','B0105','B0106'],
        "C10": ['C01012','C01022'],
        "C11": ['C011A12','C011A22'],
        "D1": ['D0013','D0023','D0033','D0043','D0053','D0063','D0073'],
        "F2A": ['F002A1','F002A2','F002A3','F002A4','F002A5']
    }

    for nome, cols in grupos.items():
        cols_validas = [c for c in cols if c in df_transformado.columns]
        if nome in ["C10", "C11", "D1"]:
            df_transformado[nome] = calc_soma(df_transformado, cols_validas)
        else:
            df_transformado[nome] = calc_grupo(df_transformado, cols_validas)

    colunas_grupos_originais = sum(grupos.values(), [])
    colunas_drop = [c for c in colunas_grupos_originais if c in df_transformado.columns]

    df_transformado = df_transformado.drop(columns=colunas_drop)

    novas_cols = [c for c in grupos.keys() if c in df_transformado.columns]

    df_final = df_transformado.copy()

    return df_final

def main():
    arquivos = listar_arquivos_s3(BUCKET_NAME, PREFIX_RAW)
    
    print(f"Total de arquivos encontrados: {len(arquivos)}")
    
    for i, arquivo_entrada in enumerate(arquivos):
        nome_arquivo_limpo = arquivo_entrada.split('/')[-1].replace('.csv', '')
        print(f"[{i+1}/{len(arquivos)}] Processando: {nome_arquivo_limpo} ...")
        
        try:
            df = pd.read_csv(arquivo_entrada, storage_options={'anon': False}, low_memory=False)
            df_proc = processar(df)

            caminho_saida = f"s3://{BUCKET_NAME}/{PREFIX_TRUSTED}{nome_arquivo_limpo}.parquet"
            df_proc.to_parquet(caminho_saida, index=False)
            print(f"   -> Salvo em: {caminho_saida}")

            del df
            del df_proc
            gc.collect()
            
        except Exception as e:
            print(f"Erro ao processar {arquivo_entrada}: {str(e)}")

if __name__ == '__main__':
    main()