In [1]:
from pysus.ftp.databases.sia import SIA
from pysus.ftp.databases.sih import SIH
from pysus.ftp.databases.cnes import CNES
from pysus.ftp.databases.sim import SIM
from pysus.ftp.databases.sinasc import SINASC
from pysus.ftp.databases.sinan import SINAN
import pandas as pd
import os

sia = SIA().load()
sih = SIH().load()
cnes = CNES().load()
sim = SIM().load()
sinasc = SINASC().load()
sinan = SINAN().load()

## tratamentos

#### alter_tables

In [None]:
## Alterar Tabelas com erro de Date
import psycopg2

def optimize_alter_table():
    # Configuração da conexão com o banco de dados
    conn = psycopg2.connect(
        dbname="datasus",
        user="webadmin",
        password="h532947h5g932h",
        host="10.100.60.19",
        port="5432"
    )
    cursor = conn.cursor()

    try:
        # Script SQL otimizado para executar alterações
        sql_script = """
        DO $$
        BEGIN
            -- Alterações na tabela sia_apac_medicamentos
            ALTER TABLE sia_apac_medicamentos ALTER COLUMN ap_dtaut TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_medicamentos ALTER COLUMN ap_dtfim TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_medicamentos ALTER COLUMN ap_dtinic TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_medicamentos ALTER COLUMN ap_dtocor TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_medicamentos ALTER COLUMN ap_dtsolic TYPE DATE USING ap_dtfim::DATE;

            -- Alterações na tabela sia_apac_tratamento_dialitico
            ALTER TABLE sia_apac_tratamento_dialitico ALTER COLUMN ap_dtaut TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_tratamento_dialitico ALTER COLUMN ap_dtfim TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_tratamento_dialitico ALTER COLUMN ap_dtinic TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_tratamento_dialitico ALTER COLUMN ap_dtocor TYPE DATE USING ap_dtfim::DATE;
            ALTER TABLE sia_apac_tratamento_dialitico ALTER COLUMN ap_dtsolic TYPE DATE USING ap_dtfim::DATE;

            -- Alterações na tabela sia_apac_laudos_diversos
            ALTER TABLE sia_apac_laudos_diversos ALTER COLUMN ap_dtsolic TYPE DATE USING ap_dtsolic::DATE;
            ALTER TABLE sia_apac_laudos_diversos ALTER COLUMN ap_dtaut TYPE DATE USING ap_dtaut::DATE;

            -- Alterações na tabela sia_boletim_producao_ambulatorial_individualizado
            ALTER TABLE sia_boletim_producao_ambulatorial_individualizado ALTER COLUMN dtnasc TYPE DATE USING ap_dtaut::DATE;
        END $$;
        """
        
        # Executar o script SQL
        cursor.execute(sql_script)
        conn.commit()
        print("Alterações aplicadas com sucesso!")

    except Exception as e:
        conn.rollback()
        print("Erro ao aplicar alterações:", e)

    finally:
        cursor.close()
        conn.close()

if __name__ == "__main__":
    optimize_alter_table()


#### generate_partition_sql

In [None]:
# Lista de tabelas e suas sequências associadas
particoes = [
    "sih_servicos_profissionais_temp_part_AC",
    "sih_servicos_profissionais_temp_part_AL",
    "sih_servicos_profissionais_temp_part_AM",
    "sih_servicos_profissionais_temp_part_AP",
    "sih_servicos_profissionais_temp_part_BA",
    "sih_servicos_profissionais_temp_part_CE",
    "sih_servicos_profissionais_temp_part_DF",
    "sih_servicos_profissionais_temp_part_ES",
    "sih_servicos_profissionais_temp_part_GO",
    "sih_servicos_profissionais_temp_part_MA",
    "sih_servicos_profissionais_temp_part_MG",
    "sih_servicos_profissionais_temp_part_MS",
    "sih_servicos_profissionais_temp_part_MT",
    "sih_servicos_profissionais_temp_part_PA",
    "sih_servicos_profissionais_temp_part_PB",
    "sih_servicos_profissionais_temp_part_PE",
    "sih_servicos_profissionais_temp_part_PI",
    "sih_servicos_profissionais_temp_part_PR",
    "sih_servicos_profissionais_temp_part_RJ",
    "sih_servicos_profissionais_temp_part_RN",
    "sih_servicos_profissionais_temp_part_RO",
    "sih_servicos_profissionais_temp_part_RR",
    "sih_servicos_profissionais_temp_part_RS",
    "sih_servicos_profissionais_temp_part_SC",
    "sih_servicos_profissionais_temp_part_SE",
    "sih_servicos_profissionais_temp_part_SP",
    "sih_servicos_profissionais_temp_part_TO"
]

# Função para gerar SQL dinamicamente
def gerar_sql(particoes):
    sql_comandos = []
    for particao in particoes:
        # Gerar o nome da sequência baseado na partição
        sequencia = f"{particao.lower()}_id_seq"
        
        # Gerar SQL para associar a sequência à coluna `id`
        sql = f"""
        -- Configurar sequência para a partição {particao}
        ALTER SEQUENCE {sequencia}
        OWNED BY "{particao}".id;
        """
        sql_comandos.append(sql.strip())
    return sql_comandos

# Gerar os comandos SQL
comandos_sql = gerar_sql(particoes)

# Salvar os comandos em um arquivo ou exibir no console
with open("configurar_sequencias.sql", "w") as file:
    for comando in comandos_sql:
        file.write(comando + "\n\n")

# Exibir os comandos gerados
for comando in comandos_sql:
    print(comando)
    print("-" * 80)


#### generate_table_columns

In [None]:
import psycopg2
import json

def fetch_table_columns():
    # Configuração da conexão com o banco de dados
    conn = psycopg2.connect(
        dbname="datasus",
        user="webadmin",
        password="h532947h5g932h",
        host="10.100.60.19",
        port="5432"
    )
    cursor = conn.cursor()
    try:
        # Consulta para buscar tabelas que começam com 'sih' e suas colunas, incluindo o tamanho para tipos de caracteres
        query = """
        SELECT table_name, column_name, data_type, character_maximum_length
        FROM information_schema.columns
        WHERE table_schema = 'public'
          AND table_name LIKE 'new%'
        ORDER BY table_name, column_name;
        """
        cursor.execute(query)
        results = cursor.fetchall()

        # Estrutura do JSON similar a group_info
        tables = {}
        for table_name, column_name, data_type, char_max_length in results:
            if table_name not in tables:
                tables[table_name] = {"tabela": table_name, "colunas": {}}
            
            # Formatação do tipo com tamanho, se aplicável
            tipo_formatado = format_data_type(data_type, char_max_length)
            
            tables[table_name]["colunas"][column_name.lower()] = tipo_formatado

        # Salvar em arquivo JSON
        with open("sih_tables_columns.json", "w", encoding='utf-8') as json_file:
            json.dump(tables, json_file, indent=4, ensure_ascii=False)
        
        print("Arquivo 'sia_tables_columns.json' gerado com sucesso.")

    except Exception as e:
        print("Erro ao executar o script:", e)
    
    finally:
        cursor.close()
        conn.close()

def format_data_type(data_type, char_max_length):
    """
    Formata o tipo de dado incluindo o tamanho para tipos de caracteres.
    """
    data_type_upper = data_type.upper()
    if data_type_upper in ["CHARACTER VARYING", "VARCHAR"]:
        if char_max_length:
            return f"VARCHAR({char_max_length})"
        else:
            return "VARCHAR"
    elif data_type_upper == "CHARACTER":
        if char_max_length:
            return f"CHAR({char_max_length})"
        else:
            return "CHAR"
    elif data_type_upper == "USER-DEFINED":
        # Trata tipos definidos pelo usuário, se houver
        return "USER_DEFINED"
    else:
        # Para outros tipos, retorna apenas o tipo em maiúsculas
        return data_type_upper

if __name__ == "__main__":
    fetch_table_columns()


Arquivo 'sia_tables_columns.json' gerado com sucesso.


#### gerar_sql_correcao_sia

In [None]:
import re
import os

def extrair_tabela_coluna(linha_contexto):
    """
    Extrai o nome da tabela e da coluna a partir de uma linha de contexto.

    Exemplo de linha de contexto:
    CONTEXT:  COPY sia_apac_laudos_diversos, line 1, column ap_dtaut: "2009-10-01"
    """
    regex = r'COPY\s+(\w+),\s+line\s+\d+,\s+column\s+(\w+):\s+".+"'
    match = re.search(regex, linha_contexto)
    if match:
        tabela = match.group(1)
        coluna = match.group(2)
        return tabela, coluna
    return None, None

def gerar_comandos_sql(pares_tabela_coluna):
    """
    Gera comandos SQL para alterar o tipo de colunas para DATE.

    :param pares_tabela_coluna: Set contendo tuplas (tabela, coluna)
    :return: Lista de strings com comandos SQL
    """
    comandos = []
    for tabela, coluna in pares_tabela_coluna:
        comando = f'ALTER TABLE {tabela} ALTER COLUMN {coluna} TYPE DATE USING {coluna}::DATE;'
        comandos.append(comando)
    return comandos

def processar_log(caminho_log):
    """
    Processa o arquivo de log para extrair pares tabela-coluna de erros críticos.

    :param caminho_log: Caminho para o arquivo de log
    :return: Set contendo tuplas (tabela, coluna)
    """
    pares_tabela_coluna = set()
    
    # Verifica se o arquivo existe
    if not os.path.isfile(caminho_log):
        print(f"Erro: O arquivo {caminho_log} não foi encontrado.")
        return pares_tabela_coluna
    
    with open(caminho_log, 'r', encoding='utf-8') as arquivo:
        linhas = arquivo.readlines()
    
    for i, linha in enumerate(linhas):
        if 'CRITICAL' in linha:
            # Procura pela linha CONTEXT na mesma linha ou nas próximas 2 linhas
            contexto = None
            # Verifica até 2 linhas à frente para encontrar o CONTEXT
            for j in range(1, 3):
                if i + j < len(linhas):
                    linha_proxima = linhas[i + j].strip()
                    if linha_proxima.startswith('CONTEXT:'):
                        contexto = linha_proxima
                        break
            if contexto:
                tabela, coluna = extrair_tabela_coluna(contexto)
                if tabela and coluna:
                    pares_tabela_coluna.add((tabela, coluna))
    
    return pares_tabela_coluna

def main():
    caminho_log = './logs/upload_sia_2959293.log'  # Substitua pelo caminho correto do seu arquivo de log
    pares = processar_log(caminho_log)
    
    if not pares:
        print("Nenhum erro crítico relevante encontrado no log.")
        return
    
    comandos_sql = gerar_comandos_sql(pares)
    
    # Exibe os comandos gerados
    print("Comandos SQL para alterar colunas para tipo DATE:\n")
    for comando in comandos_sql:
        print(comando)
    
    # Opcional: Salva os comandos em um arquivo SQL
    caminho_saida = 'alterar_colunas_para_date.sql'
    with open(caminho_saida, 'w', encoding='utf-8') as f:
        for comando in comandos_sql:
            f.write(comando + '\n')
    
    print(f"\nOs comandos SQL foram salvos no arquivo: {caminho_saida}")

if __name__ == "__main__":
    main()


#### gerar_tabelas_sql

In [7]:
import os
import json
import re
import unicodedata

def normalizar_nome(nome):
    """
    Normaliza o nome:
    - Converte para minúsculas.
    - Remove acentos e caracteres especiais.
    - Substitui caracteres não alfanuméricos por sublinhado '_'.
    - Remove sublinhados extras no início e no fim.
    
    Args:
        nome (str): Nome a ser normalizado.
    
    Returns:
        str: Nome normalizado.
    """
    nome = nome.lower()
    nome = unicodedata.normalize('NFKD', nome).encode('ASCII', 'ignore').decode('ASCII')
    nome = re.sub(r'\W+', '_', nome)
    nome = nome.strip('_')
    return nome

def mapear_tipo_postgres(tipo):
    """
    Mapeia o tipo de dado para o tipo correspondente no PostgreSQL.
    
    Args:
        tipo (str): Tipo de dado original.
    
    Returns:
        str: Tipo de dado mapeado para PostgreSQL.
    """
    # Remover espaços extras e converter para maiúsculas
    tipo = tipo.strip().upper()
    
    # Dicionário de mapeamento básico
    mapeamento = {
        'SMALLINT': 'SMALLINT',
        'INTEGER': 'INTEGER',
        'BIGINT': 'BIGINT',
        'NUMERIC': 'NUMERIC',
        'BOOLEAN': 'BOOLEAN',
        'DATE': 'DATE',
        'TIMESTAMP': 'TIMESTAMP',
        'TEXT': 'TEXT',
        'SERIAL': 'SERIAL',
        'VARCHAR': 'VARCHAR',
        'CHAR': 'CHAR'
    }
    
    # Verificar se o tipo é NUMERIC com precisão
    if tipo.startswith('NUMERIC'):
        return tipo  # Mantém como está (e.g., NUMERIC(10,2))
    
    # Verificar se é CHAR(n) ou VARCHAR(n)
    match = re.match(r'(CHAR|VARCHAR)\((\d+)\)', tipo)
    if match:
        return f"{match.group(1)}({match.group(2)})"
    
    # Se o tipo estiver no mapeamento, retorna o mapeamento
    if tipo in mapeamento:
        return mapeamento[tipo]
    else:
        # Caso contrário, retorna TEXT como padrão para tipos desconhecidos
        print(f"Aviso: Tipo de dado desconhecido '{tipo}'. Usando 'TEXT' como padrão.")
        return 'TEXT'

def gerar_queries_criacao_tabelas(grupos_info, nome_arquivo_sql='criar_tabelas.sql'):
    """
    Gera queries SQL para criar tabelas com base no dicionário GRUPOS_INFO.
    
    Args:
        grupos_info (dict): Dicionário contendo informações dos grupos e suas tabelas.
        nome_arquivo_sql (str): Nome do arquivo .sql onde as queries serão salvas (opcional).
    
    Returns:
        str: String contendo todas as queries SQL de criação de tabelas.
    """
    queries = ""
    for grupo, info in grupos_info.items():
        tabela = info.get('tabela')
        colunas = info.get('colunas', {})
        
        if not tabela:
            print(f"Aviso: O grupo '{grupo}' não possui um nome de tabela definido.")
            continue
        
        # Iniciar a query de criação da tabela
        query = f"CREATE TABLE IF NOT EXISTS {tabela} (\n"
        
        # Adicionar a coluna id como PRIMARY KEY com sequência automática
        query += "    id SERIAL PRIMARY KEY,\n"
        
        colunas_definicoes = []
        for coluna, tipo in colunas.items():
            coluna_norm = normalizar_nome(coluna)
            tipo_postgres = mapear_tipo_postgres(tipo)
            colunas_definicoes.append(f"    {coluna_norm} {tipo_postgres}")
        
        # Unir todas as definições de colunas com vírgulas
        query += ",\n".join(colunas_definicoes)
        query += "\n);\n\n"
        
        # Adicionar a query ao conjunto total
        queries += query
    
    # Salvar as queries em um arquivo .sql
    try:
        with open(nome_arquivo_sql, 'w', encoding='utf-8') as f:
            f.write(queries)
        print(f"Todas as queries foram geradas e salvas em '{nome_arquivo_sql}'.")
    except Exception as e:
        print(f"Erro ao salvar o arquivo '{nome_arquivo_sql}': {e}")
    
    return queries

def main():
    """
    Função principal que coordena a geração das queries SQL a partir do arquivo grupos_info.json.
    """
    # Caminho para o arquivo grupos_info.json
    caminho_grupos_info = "../grupos_info.json"
    
    # Verificar se o arquivo grupos_info.json existe
    if not os.path.exists(caminho_grupos_info):
        print(f"Erro: O arquivo '{caminho_grupos_info}' não foi encontrado.")
        return
    
    # Carregar o conteúdo de grupos_info.json
    try:
        with open(caminho_grupos_info, 'r', encoding='utf-8') as f:
            grupos_info = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Erro ao decodificar JSON no arquivo '{caminho_grupos_info}': {e}")
        return
    except Exception as e:
        print(f"Erro ao ler o arquivo '{caminho_grupos_info}': {e}")
        return
    
    # Verificar se GRUPOS_INFO está no formato correto
    if not isinstance(grupos_info, dict):
        print(f"Erro: O conteúdo de '{caminho_grupos_info}' não está no formato esperado (dicionário).")
        return
    
    # Gerar as queries de criação de tabelas
    print("Gerando queries SQL de criação de tabelas a partir de 'grupos_info.json'...")
    gerar_queries_criacao_tabelas(grupos_info)

if __name__ == "__main__":
    main()


Gerando queries SQL de criação de tabelas a partir de 'grupos_info.json'...
Todas as queries foram geradas e salvas em 'criar_tabelas.sql'.
