- [ ] Ler todos os arquivos
- [ ] Separar por municipios
- [ ] Aplicar funcao de limpeza
- [ ] Salvar no banco

In [None]:
PASTA = 'dados/raw/'

# Ler municipios

In [2]:
import duckdb
import pandas as pd

def ler_municipios(pasta_csvs):
    query = f"""
        SELECT DISTINCT estado, municipio
        FROM read_csv_auto('{pasta_csvs}/*.csv', union_by_name=true)
        WHERE estado IS NOT NULL AND municipio IS NOT NULL
        ORDER BY estado, municipio 
    """
    return duckdb.query(query).to_df()

In [3]:
municipios_df = ler_municipios(PASTA)

len(municipios_df)

5570

# Ler os .csvs filtrando por municipio e estado

In [4]:
import duckdb
import pandas as pd

def ler_csvs_filtrando_municipio(pasta_csvs, municipio, estado, nome_coluna_municipio='municipio', nome_coluna_estado='estado'):
    municipio_escaped = municipio.replace("'", "''")
    estado_escaped = estado.replace("'", "''")
    
    query = f"""
        SELECT *
        FROM read_csv_auto('{pasta_csvs}/*.csv', union_by_name=true)
        WHERE {nome_coluna_municipio} = '{municipio_escaped}' AND {nome_coluna_estado} = '{estado_escaped}'
    """
    return duckdb.query(query).to_df()

# Funçao de limpeza

In [5]:
def suavizar(df, window_size=3, threshold=2):
    """
    Smooth data by replacing outliers with previous day's casosNovos.
    municipio, estado must be unique.
    
    Parameters:
    - df: pandas DataFrame with columns 'date' and 'casosNovos'
    - window_size: number of days to consider in rolling window
    - threshold: number of standard deviations to use for outlier detection
    
    Returns:
    - DataFrame with smoothed values
    """
    # Make a copy to avoid modifying original data
    df_smoothed = df.copy()
    
    # Calculate rolling statistics
    rolling_mean = df['casosNovos'].rolling(window=window_size, center=True, min_periods=1).mean()
    rolling_std = df['casosNovos'].rolling(window=window_size, center=True, min_periods=1).std()
    
    # Identify outliers (values outside mean ± threshold*std)
    lower_bound = rolling_mean - threshold * rolling_std
    upper_bound = rolling_mean + threshold * rolling_std
    
    is_outlier = (df['casosNovos'] < lower_bound) | (df['casosNovos'] > upper_bound)
    
    # Replace outliers with previous day's value
    df_smoothed['casosNovos'] = df['casosNovos'].where(~is_outlier, df['casosNovos'].shift(1))
    
    # For the first row (no previous value), use the next value if available
    if is_outlier.iloc[0] and len(df) > 1:
        df_smoothed.iloc[0, df_smoothed.columns.get_loc('casosNovos')] = df['casosNovos'].iloc[1]
    
    return df_smoothed

# Recalcula casos acumulados

In [6]:
def recalcula_casos_acumulados(df):
  df['novos_casos_acumulados'] = df['casosNovos'].cumsum()
  return df

# Função de limpeza

In [7]:
def limpar(df):
  df = suavizar(df)
  df = recalcula_casos_acumulados(df)
  return df

# Função principal - sincrona e csv

## Função de logging

Funçao auxiliar

In [8]:
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    handlers=[
        logging.FileHandler('logs.txt'),
        logging.StreamHandler()
    ]
)

In [9]:
from tqdm import tqdm

OUTPUT_FOLDER = 'saida/'

sucesso = 'sucesso.txt'
falha = 'falha.txt'

def rodar(df):
  for i in tqdm(range(len(df)), desc="Processando"):
    try:
      municipio = df.at[i, 'municipio']
      estado = df.at[i, 'estado']
      estado_municipio = f"{estado}_{municipio}"

      logging.info(f"Salvando {estado_municipio}")

      df = ler_csvs_filtrando_municipio(PASTA, municipio, estado)
      df = limpar(df)

      df.to_csv(f'saida/{estado_municipio}.csv', index=False)


      with open(sucesso, 'a', encoding='utf-8') as f:
        f.write(f"{estado_municipio}\n")

      logging.info(f"{estado}_{municipio} salvo")

    except Exception as e:
      with open(falha, 'a', encoding='utf-8') as f:
        f.write(f"{estado_municipio}\n")
      logging.error(f"{estado}_{municipio} não foi salvo")
      logging.error(e)

# rodar(municipios_df)

# Rodar paralelo e parquet

In [10]:
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
)

In [11]:
import duckdb

def ler_csvs_filtrando_municipio(pasta_csvs, municipio, estado, nome_coluna_municipio='municipio', nome_coluna_estado='estado'):
    municipio_escaped = municipio.replace("'", "''")
    estado_escaped = estado.replace("'", "''")
    
    conn = duckdb.connect()
    query = f"""
        SELECT *
        FROM read_csv_auto('{pasta_csvs}/*.csv', union_by_name=true)
        WHERE {nome_coluna_municipio} = '{municipio_escaped}' 
          AND {nome_coluna_estado} = '{estado_escaped}'
    """
    try:
        return conn.execute(query).fetch_df()
    except Exception as e:
        raise e
    finally:
        conn.close()

In [12]:
from tqdm import tqdm

OUTPUT_FOLDER = 'saida/'

falha = 'falha_v2.txt'

def process_municipio(municipio_estado_tuple):
  municipio, estado = municipio_estado_tuple
  try:
    estado_municipio = f"{estado}_{municipio}"

    logging.info(f"Salvando {estado_municipio}")

    df = ler_csvs_filtrando_municipio(PASTA, municipio, estado)
    df = limpar(df)

    df.to_parquet(f'saida_v2/{estado_municipio}.parquet', index=False)
    # df.to_csv(f'saida_v2/{estado_municipio}.csv', index=False)

    logging.info(f"{estado}_{municipio} salvo")

  except Exception as e:
    with open(falha, 'a', encoding='utf-8') as f:
      f.write(f"{estado_municipio}\n")
    logging.error(f"{estado}_{municipio} não foi salvo")
    logging.error(e)

In [13]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def rodar_v2():
  # List all CSV files
  municipio_estado = [(row['municipio'], row['estado']) for _, row in municipios_df.iterrows()]

  # ThreadPool (better for IO-bound tasks like reading files)
  with ThreadPoolExecutor(max_workers=4) as executor:
      # results = list(executor.map(rodar_v2, municipio_estado))
      # Submit all tasks
      futures = [executor.submit(process_municipio, item) for item in municipio_estado]
    
      # Use tqdm to track completion
      results = []
      for future in tqdm(
          futures, 
          total=len(municipio_estado), 
          desc="Processing municipios"
      ):
          results.append(future.result())

# rodar_v2()

# Funcão Principal - RAM + 1 arquivo

In [14]:
df = pd.read_parquet('0.raw.parquet')
len(df)

10390399

In [15]:
from tqdm import tqdm

OUTPUT_FOLDER = 'saida/'

falha = 'falha.txt'

def rodar_ram(df, municipios_df):
  resultado_df = pd.DataFrame()

  for i in tqdm(range(len(municipios_df)), desc="Processando"):
    try:
      municipio = municipios_df.at[i, 'municipio']
      estado = municipios_df.at[i, 'estado']
      estado_municipio = f"{estado}_{municipio}"

      logging.info(f"Salvando {estado_municipio}")

      filtrado_df = df[(df['municipio'] == municipio) & (df['estado'] == estado)]

      limpo_df = limpar(filtrado_df)

      resultado_df = pd.concat([resultado_df, limpo_df])

      logging.info(f"{estado}_{municipio} salvo")

    except Exception as e:
      with open(falha, 'a', encoding='utf-8') as f:
        f.write(f"{estado_municipio}\n")
      logging.error(f"{estado}_{municipio} não foi salvo")
      logging.error(e)
  
  resultado_df.to_parquet('1.limpo.parquet', index=False)

rodar_ram(df, municipios_df)

Processando:   0%|          | 0/5570 [00:00<?, ?it/s]

2025-05-10 12:08:36 - INFO - Salvando AC_Acrelândia
2025-05-10 12:08:38 - INFO - AC_Acrelândia salvo
Processando:   0%|          | 1/5570 [00:01<2:15:10,  1.46s/it]2025-05-10 12:08:38 - INFO - Salvando AC_Assis Brasil
2025-05-10 12:08:39 - INFO - AC_Assis Brasil salvo
Processando:   0%|          | 2/5570 [00:02<2:06:40,  1.37s/it]2025-05-10 12:08:39 - INFO - Salvando AC_Brasiléia
2025-05-10 12:08:40 - INFO - AC_Brasiléia salvo
Processando:   0%|          | 3/5570 [00:04<2:06:25,  1.36s/it]2025-05-10 12:08:40 - INFO - Salvando AC_Bujari
2025-05-10 12:08:42 - INFO - AC_Bujari salvo
Processando:   0%|          | 4/5570 [00:05<2:09:03,  1.39s/it]2025-05-10 12:08:42 - INFO - Salvando AC_Capixaba
2025-05-10 12:08:43 - INFO - AC_Capixaba salvo
Processando:   0%|          | 5/5570 [00:06<2:05:41,  1.36s/it]2025-05-10 12:08:43 - INFO - Salvando AC_Cruzeiro do Sul
2025-05-10 12:08:45 - INFO - AC_Cruzeiro do Sul salvo
Processando:   0%|          | 6/5570 [00:08<2:04:53,  1.35s/it]2025-05-10 12:08