From 49740df573aa88e02ed1ca06610b7a0a300230a0 Mon Sep 17 00:00:00 2001 From: Bernardo Chrispim Baron Date: Fri, 18 Aug 2023 14:21:50 -0300 Subject: [PATCH] =?UTF-8?q?Implementa=20sem=C3=A1foros=20de=20escrita=20(#?= =?UTF-8?q?68)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Corrige função de extração dos profissionais * Corrige função de tratamento dos profissionais de saúde * Adiciona bloco try except na função extrair_profissionais_com_ine * Inclui bloco try/except para o erro json.JSONDecodeError * Inclui condições para identificar os horários dos estabelecimentos que tem funcionamento indicado como 'Sempre Aberto' * Inclui exceção para considerar a ausência de profisisonais em determinada competência durant ea extração * Corrige função para tratar tipos das colunas de data * Inclui tasks e flow do prefect * Refatora função que extrai os profissionais com INE * ETL dos profissinais de saúde com INE * Inclui função para extração dos profissionais com INE * Cria ETL para os profissionais totais * Cria função para extração dos profisisonais totais a partir da página do CNES * Implementa fluxos do ETL SCNES * Adiona novos argumentos de funções * Exclui módulo de carregamento * Exclui módulo de modelos para tabelas * Exclui etapas de tratamento e inclui novas funções * Adiona funções e refine construção do DataFrame * Adiciona argumentos à função principal * Exclui módulo de verificação de dados * Formata código baseado no padrão de esilo PEP8 * Comenta funções que utilizam a biblioteca dbfread * Atualiza biblioteca prefect para versão anterior * Salva bloco de infra do Docker * Adiciona params p/ salvar bloco Docker * Salva bloco de registro DockerHub * Exclui módulos que estão desuso na pipeline devido a redunção de uso de funções e de etapas de verificação de dados que não garantem integridade dos dados a serem inseridos * Adiciona método que registra o erro gerado e grava no banco de dados * Adiciona bloco try/except, novos argumentos de função e módulos e bibliotecas a serem importados * Retira bloco try/except, corrige argumentos de funções * Retira decorador de task do prefect corrigindo problemas de orquestração de fluxos * Adiona novos argumentos as funções de inicialização de ETLs puxando novas variaveis dos agendmaentos para realizar tratamento dos erros * Trata valores nulos ignorando na conversão tipos a atribuindo valor None * Refatora funções de tratamento para simplificação de etapas * Funções de extração e tratamento dos Relatórios de produção do SISAB * Extração e tratamento p/ bloco 06 do painel * Primeiras funções para extração dos dados para atualização do painel do AGP * Cria funções de ETL para os dados que irão compor o painel de produtividade do AGP * Adiciona função para o ETL dos dados de produção do AGP no script geral * Deleta arquivo de teste * Cria arquivo init.py * Adiciona tasks do prefeito na função principal * Especifica tipo de erro no módulo try/except * Corrige argumentos e chamada de funções * refactor: :recycle: Aumenta timeouts de ETLs de SM (#66) Aumenta timeout_seconds para capturas de BPA-i, AIH e Procedimentos Ambulatoriais para evitar [Errno 110] Connection timed out\n') * fix: :ambulance: Pula checagem de tamanho Comenta fora o trecho da função `extrair_dbc_lotes()` que checa se o tamanho do arquivo baixado é o mesmo que o do arquivo no servidor FTP, já que não tem havido mais downloads corrompidos desde o uso do ftplib, e a checagem de tamanho no servidor às vezes gera falha em alguns arquivos do FTP do DataSUS. * fix: :arrow_up: Corrige erro de instalação do pyyaml Atualiza dependências para que a versão mais recente (>= 6.0.0) do pacote pyyaml seja instalada, evitando erro pela falta de suporte à PEP 517 * feat: :card_file_box: Adiciona suporte a semáforos para escrita no banco de bancos * test: :white_check_mark: Adiciona testes para semáforos * feat: :card_file_box: Adiciona exemplo de uso de semáforo em script de ETL * Corrige função de extração dos profissionais * Corrige função de tratamento dos profissionais de saúde * Adiciona bloco try except na função extrair_profissionais_com_ine * Corrige função para tratar tipos das colunas de data * Inclui tasks e flow do prefect * Refatora função que extrai os profissionais com INE * ETL dos profissinais de saúde com INE * refactor: :recycle: Aumenta timeouts de ETLs de SM (#66) Aumenta timeout_seconds para capturas de BPA-i, AIH e Procedimentos Ambulatoriais para evitar [Errno 110] Connection timed out\n') * fix: :ambulance: Pula checagem de tamanho Comenta fora o trecho da função `extrair_dbc_lotes()` que checa se o tamanho do arquivo baixado é o mesmo que o do arquivo no servidor FTP, já que não tem havido mais downloads corrompidos desde o uso do ftplib, e a checagem de tamanho no servidor às vezes gera falha em alguns arquivos do FTP do DataSUS. * fix: :arrow_up: Corrige erro de instalação do pyyaml Atualiza dependências para que a versão mais recente (>= 6.0.0) do pacote pyyaml seja instalada, evitando erro pela falta de suporte à PEP 517 * feat: :card_file_box: Adiciona suporte a semáforos para escrita no banco de bancos * test: :white_check_mark: Adiciona testes para semáforos * feat: :card_file_box: Adiciona exemplo de uso de semáforo em script de ETL * feat: :card_file_box: Adiciona semáforos em ETLs de SM Adiciona semáforos em ETLs de SM * feat: :card_file_box: Adiciona semáforos em ETLs gerais relevantes para SM Adiciona semáforos em ETLs gerais relevantes para SM e corrige bug de typo no script saude_mental * fix: :bug: Corrige erro de identação em ETL de CEPs * fix: :bug: Define default para argumento de unidade geográfica em função Define default para argumento de unidade geográfica na função 'insere_erro_database' --------- Co-authored-by: maira Co-authored-by: WaltMath Co-authored-by: Bruna Fernandes <108029366+fernandesbruna@users.noreply.github.com> Co-authored-by: fernandesbruna --- src/impulsoetl/scripts/geral.py | 159 +++++++++++++++++++++ src/impulsoetl/scripts/saude_mental.py | 179 +++++++++++++++++++++++- src/impulsoetl/sisab/excecoes.py | 7 +- src/impulsoetl/utilitarios/semaforos.py | 119 ++++++++++++++++ tests/utilitarios/teste_semaforos.py | 165 ++++++++++++++++++++++ 5 files changed, 621 insertions(+), 8 deletions(-) create mode 100644 src/impulsoetl/utilitarios/semaforos.py create mode 100644 tests/utilitarios/teste_semaforos.py diff --git a/src/impulsoetl/scripts/geral.py b/src/impulsoetl/scripts/geral.py index 27a82bb..6465b29 100755 --- a/src/impulsoetl/scripts/geral.py +++ b/src/impulsoetl/scripts/geral.py @@ -25,6 +25,12 @@ from impulsoetl.scnes.estabelecimentos_profissionais_com_ine.principal import obter_profissionais_cnes_com_ine from impulsoetl.scnes.estabelecimentos_profissionais_totais.principal import obter_profissionais_cnes_totais from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.principal import relatorio_profissional_conduta_atendimento +from impulsoetl.utilitarios.semaforos import ( + bloquear_escrita, + checar_escrita_liberada, + EscritaBloqueadaExcecao, + liberar_escrita, +) agendamentos = tabelas["configuracoes.capturas_agendamentos"] @@ -57,7 +63,24 @@ def habilitacoes_disseminacao( .filter(agendamentos.c.operacao_id == operacao_id) .all() ) + for agendamento in agendamentos_habilitacoes: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_habilitacoes( sessao=sessao, uf_sigla=agendamento.uf_sigla, @@ -84,6 +107,12 @@ def habilitacoes_disseminacao( sessao.rollback() break sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -115,6 +144,22 @@ def vinculos_disseminacao( .all() ) for agendamento in agendamentos_vinculos: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_vinculos( sessao=sessao, uf_sigla=agendamento.uf_sigla, @@ -141,6 +186,12 @@ def vinculos_disseminacao( sessao.rollback() break sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -172,6 +223,22 @@ def obitos_disseminacao( .all() ) for agendamento in agendamentos_do: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_do( sessao=sessao, uf_sigla=agendamento.uf_sigla, @@ -200,6 +267,12 @@ def obitos_disseminacao( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -232,6 +305,23 @@ def ceps(teste: bool = False) -> None: ceps_pendentes_query = ceps_pendentes_query.limit(10) ceps_pendentes = ceps_pendentes_query.all() + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + obter_cep(sessao=sessao, ceps_pendentes=ceps_pendentes, teste=teste) @flow( @@ -268,6 +358,23 @@ def cnes_estabelecimentos_identificados(teste: bool = False,)-> None: codigo_sus_municipio = agendamento.unidade_geografica_id_sus periodo_data_inicio = agendamento.periodo_data_inicio + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + obter_informacoes_estabelecimentos_identificados( sessao=sessao, tabela_destino=tabela_destino, @@ -293,6 +400,12 @@ def cnes_estabelecimentos_identificados(teste: bool = False,)-> None: conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @flow( @@ -444,6 +557,23 @@ def cnes_estabelecimentos_horarios(teste: bool = True,)-> None: codigo_sus_municipio = agendamento.unidade_geografica_id_sus periodo_data_inicio = agendamento.periodo_data_inicio + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + obter_horarios_estabelecimentos( sessao=sessao, tabela_destino=tabela_destino, @@ -469,6 +599,12 @@ def cnes_estabelecimentos_horarios(teste: bool = True,)-> None: conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -504,6 +640,23 @@ def cnes_profissionais_totais( codigo_sus_municipio = agendamento.unidade_geografica_id_sus periodo_data_inicio = agendamento.periodo_data_inicio + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + obter_profissionais_cnes_totais( sessao=sessao, tabela_destino=tabela_destino, @@ -529,6 +682,12 @@ def cnes_profissionais_totais( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") diff --git a/src/impulsoetl/scripts/saude_mental.py b/src/impulsoetl/scripts/saude_mental.py index 1d85a04..5502052 100755 --- a/src/impulsoetl/scripts/saude_mental.py +++ b/src/impulsoetl/scripts/saude_mental.py @@ -20,6 +20,12 @@ from impulsoetl.sinan.violencia import obter_agravos_violencia from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.principal import obter_relatorio_resolutividade_por_condicao from impulsoetl.sisab.relatorio_tipo_equipe_por_tipo_producao.principal import obter_relatorio_tipo_equipe_por_producao +from impulsoetl.utilitarios.semaforos import ( + bloquear_escrita, + checar_escrita_liberada, + EscritaBloqueadaExcecao, + liberar_escrita, +) agendamentos = tabelas["configuracoes.capturas_agendamentos"] capturas_historico = tabelas["configuracoes.capturas_historico"] @@ -58,6 +64,22 @@ def resolutividade_aps_por_condicao( ) for agendamento in agendamentos_resolutividade_por_condicao: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_relatorio_resolutividade_por_condicao( sessao = sessao, tabela_destino = agendamento.tabela_destino, @@ -76,10 +98,17 @@ def resolutividade_aps_por_condicao( ) conector = sessao.connection() conector.execute(requisicao_inserir_historico) + if teste: sessao.rollback() break sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -129,6 +158,22 @@ def tipo_equipe_por_tipo_producao( ) for agendamento in agendamentos_producao_por_equipe: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_relatorio_tipo_equipe_por_producao( sessao = sessao, tabela_destino = agendamento.tabela_destino, @@ -147,11 +192,31 @@ def tipo_equipe_por_tipo_producao( ) conector = sessao.connection() conector.execute(requisicao_inserir_historico) + if teste: sessao.rollback() break - sessao.commit() - logger.info("OK.") + + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.error("Revertendo alterações realizadas...") + sessao.rollback() + else: + sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + logger.info("OK.") + @flow( name="Rodar Agendamentos de Arquivos de Disseminação da RAAS-PS", @@ -184,6 +249,22 @@ def raas_disseminacao( .all() ) for agendamento in agendamentos_raas: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_raas_ps( sessao=sessao, uf_sigla=agendamento.uf_sigla, @@ -211,6 +292,12 @@ def raas_disseminacao( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -246,6 +333,22 @@ def bpa_i_disseminacao( .all() ) for agendamento in agendamentos_bpa_i: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_bpa_i( sessao=sessao, uf_sigla=agendamento.uf_sigla, @@ -273,6 +376,12 @@ def bpa_i_disseminacao( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -310,6 +419,22 @@ def procedimentos_disseminacao( .all() ) for agendamento in agendamentos_pa: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_pa( sessao=sessao, uf_sigla=agendamento.uf_sigla, @@ -337,6 +462,12 @@ def procedimentos_disseminacao( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -369,6 +500,22 @@ def aih_reduzida_disseminacao( .all() ) for agendamento in agendamentos_aih_rd: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_aih_rd( sessao=sessao, uf_sigla=agendamento.uf_sigla, @@ -395,6 +542,12 @@ def aih_reduzida_disseminacao( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") @@ -430,6 +583,22 @@ def agravos_violencia( .all() ) for agendamento in agendamentos_agravos_violencia: + try: + checar_escrita_liberada( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) + except EscritaBloqueadaExcecao: + logger.warning("Pulando...") + continue + bloquear_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) obter_agravos_violencia( sessao=sessao, periodo_id=agendamento.periodo_id, @@ -457,4 +626,10 @@ def agravos_violencia( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() + liberar_escrita( + sessao=sessao, + tabela_destino=agendamento.tabela_destino, + unidade_geografica_id=agendamento.unidade_geografica_id, + periodo_id=agendamento.periodo_id, + ) logger.info("OK.") \ No newline at end of file diff --git a/src/impulsoetl/sisab/excecoes.py b/src/impulsoetl/sisab/excecoes.py index bea9d5d..d560530 100644 --- a/src/impulsoetl/sisab/excecoes.py +++ b/src/impulsoetl/sisab/excecoes.py @@ -30,12 +30,7 @@ class SisabErroCompetenciaInexistente: def __init__(self,exception): self.exception = exception - def insere_erro_database(self,sessao,traceback_str,operacao_id,periodo_id,unidade_geografica_id): - - if unidade_geografica_id is None: - unidade_geografica_id = '28de805e-5bdc-49c3-863c-2cf87f95e371' - else: - unidade_geografica_id = unidade_geografica_id + def insere_erro_database(self,sessao,traceback_str,operacao_id,periodo_id,unidade_geografica_id="28de805e-5bdc-49c3-863c-2cf87f95e371"): tabela_destino = tabelas["configuracoes.capturas_erros_etl"] requisicao_inserir_historico = insert(tabela_destino).values(operacao_id=operacao_id,periodo_id=periodo_id,unidade_geografica_id=unidade_geografica_id,erro_mensagem=self.exception,erro_traceback=str(traceback_str)) diff --git a/src/impulsoetl/utilitarios/semaforos.py b/src/impulsoetl/utilitarios/semaforos.py new file mode 100644 index 0000000..2679a8f --- /dev/null +++ b/src/impulsoetl/utilitarios/semaforos.py @@ -0,0 +1,119 @@ +# SPDX-FileCopyrightText: 2023 ImpulsoGov +# +# SPDX-License-Identifier: MIT + + +"""Define funções para restringir e liberar a escrita no banco de dados.""" + + +import socket + +from sqlalchemy.orm.session import Session + +from impulsoetl.bd import tabelas +from impulsoetl.loggers import logger + + +tabela_semaforos = tabelas["configuracoes.capturas_semaforos"] + + +class EscritaBloqueadaExcecao(BlockingIOError): + """Exceção relativa à tentativa de escrita em uma tabela bloqueada.""" + + pass + + +def bloquear_escrita( + sessao: Session, + tabela_destino: str, + unidade_geografica_id: str, + periodo_id: str, +) -> int: + logger.info( + "Bloqueando escrita simultânea para a tabela `{tabela_destino}` e " + + "identificadores de unidade geográfica {unidade_geografica_id} e " + + "de período `{periodo_id}`.", + tabela_destino=tabela_destino, + unidade_geografica_id=unidade_geografica_id, + periodo_id=periodo_id, + ) + cliente_nome = socket.gethostname() + cliente_ipv4 = socket.gethostbyname(cliente_nome) + + requisicao_bloqueio = tabela_semaforos.insert().values([{ + "tabela_destino": tabela_destino, + "unidade_geografica_id": unidade_geografica_id, + "periodo_id": periodo_id, + "cliente_nome": cliente_nome, + "cliente_ipv4": cliente_ipv4, + }]) + sessao.execute(requisicao_bloqueio) + sessao.commit() + return 0 + + +def checar_escrita_liberada( + sessao: Session, + tabela_destino: str, + unidade_geografica_id: str, + periodo_id: str, +) -> int: + logger.info( + "Checando estatus de liberação da escrita para a tabela " + "`{tabela_destino}` e identificadores de unidade geográfica " + + "{unidade_geografica_id} e de período `{periodo_id}`.", + tabela_destino=tabela_destino, + unidade_geografica_id=unidade_geografica_id, + periodo_id=periodo_id, + ) + bloqueios = ( + sessao.query(tabela_semaforos) + .filter(tabela_semaforos.c.tabela_destino == tabela_destino) + .filter( + tabela_semaforos.c.unidade_geografica_id == unidade_geografica_id, + ) + .filter(tabela_semaforos.c.periodo_id == periodo_id) + .order_by(tabela_semaforos.c.data_inicio.asc()) + ) + + if len(bloqueios.all()) != 0: + logger.error( + "Outro processo já está escrevendo na tabela `{tabela_destino}` " + "para a unidade geográfica `{unidade_geografica_id}` e " + "período `{periodo_id}` (bloqueio ativo desde " + "{data_inicio:%d/%m/%Y às %H:%M}, por `{cliente_nome}`).", + tabela_destino=tabela_destino, + unidade_geografica_id=unidade_geografica_id, + periodo_id=periodo_id, + data_inicio=bloqueios.first()["data_inicio"], + cliente_nome=bloqueios.first()["cliente_nome"], + ) + raise EscritaBloqueadaExcecao + return 0 + + +def liberar_escrita( + sessao: Session, + tabela_destino: str, + unidade_geografica_id: str, + periodo_id: str, +) -> int: + logger.info( + "Liberando escrita para a tabela `{tabela_destino}` e " + + "identificadores de unidade geográfica {unidade_geografica_id} e " + + "de período `{periodo_id}`.", + tabela_destino=tabela_destino, + unidade_geografica_id=unidade_geografica_id, + periodo_id=periodo_id, + ) + requisicao_liberacao = ( + sessao.query(tabela_semaforos) + .filter(tabela_semaforos.c.tabela_destino == tabela_destino) + .filter( + tabela_semaforos.c.unidade_geografica_id == unidade_geografica_id, + ) + .filter(tabela_semaforos.c.periodo_id == periodo_id) + ) + requisicao_liberacao.delete() + sessao.commit() + return 0 diff --git a/tests/utilitarios/teste_semaforos.py b/tests/utilitarios/teste_semaforos.py new file mode 100644 index 0000000..10bfa7c --- /dev/null +++ b/tests/utilitarios/teste_semaforos.py @@ -0,0 +1,165 @@ +# SPDX-FileCopyrightText: 2023 ImpulsoGov +# +# SPDX-License-Identifier: MIT + + +"""Testes das funções para restringir e liberar a escrita no banco de dados.""" + + +import pytest + +from sqlalchemy.orm.session import Session + +from impulsoetl.utilitarios.semaforos import ( + bloquear_escrita, + checar_escrita_liberada, + liberar_escrita, + EscritaBloqueadaExcecao, + tabela_semaforos, +) + + +@pytest.fixture() +def tabela_teste(sessao: Session): + return "tabela_teste" + + +@pytest.fixture() +def limpar_testes(sessao: Session, tabela_teste: str): + try: + yield + finally: + sessao.query(tabela_semaforos).filter( + tabela_semaforos.c.tabela_destino == tabela_teste, + ).delete() + sessao.commit() + + +@pytest.mark.parametrize( + "unidade_geografica_id,periodo_id", + [( + "e8cb5dcc-46d4-45af-a237-4ab683b8ce8e", + "9883e787-10c9-4de8-af11-9de1df09543b", + )], +) +def teste_liberar_escrita( + sessao: Session, + tabela_teste: str, + unidade_geografica_id: str, + periodo_id: str, + limpar_testes: None, +): + + # Insere uma entrada de bloqueio para excluí-la posteriormente + sessao.execute( + """ + INSERT INTO configuracoes.capturas_semaforos ( + tabela_destino, + unidade_geografica_id, + periodo_id, + cliente_nome, + cliente_ipv4 + ) + VALUES ( + :tabela_destino, + :unidade_geografica_id, + :periodo_id, + :cliente_nome, + :cliente_ipv4 + ) + """, + { + "tabela_destino": tabela_teste, + "unidade_geografica_id": unidade_geografica_id, + "periodo_id": periodo_id, + "cliente_nome": "cliente_teste", + "cliente_ipv4": "127.0.0.1", + } + ) + + # Executa a operação de liberação + liberar_escrita(sessao, tabela_teste, unidade_geografica_id, periodo_id) + + # Verifica se a entrada de bloqueio foi removida do banco de dados + result = sessao.execute( + "SELECT * FROM configuracoes.capturas_semaforos", + ).fetchall() + assert len(result) == 0 + + +@pytest.mark.parametrize( + "unidade_geografica_id,periodo_id", + [( + "e8cb5dcc-46d4-45af-a237-4ab683b8ce8e", + "9883e787-10c9-4de8-af11-9de1df09543b", + )], +) +def teste_bloquear_escrita( + sessao: Session, + tabela_teste: str, + unidade_geografica_id: str, + periodo_id: str, + limpar_testes: None, +): + # Executa a operação de bloqueio + bloquear_escrita(sessao, tabela_teste, unidade_geografica_id, periodo_id) + + # Verifica se a entrada de bloqueio está no banco de dados + result = sessao.execute( + "SELECT * FROM configuracoes.capturas_semaforos", + ).fetchall() + + assert len(result) == 1 + assert result[0]["tabela_destino"] == tabela_teste + assert str(result[0]["unidade_geografica_id"]) == unidade_geografica_id + assert str(result[0]["periodo_id"]) == periodo_id + + +@pytest.mark.parametrize( + "unidade_geografica_id,periodo_id", + [( + "e8cb5dcc-46d4-45af-a237-4ab683b8ce8e", + "9883e787-10c9-4de8-af11-9de1df09543b", + )], +) +def teste_checar_escrita_liberada( + sessao: Session, + tabela_teste: str, + unidade_geografica_id: str, + periodo_id: str, + limpar_testes: None, +): + + # Verifica se checar_escrita_liberada gera a exceção apropriada quando a entrada está presente + sessao.execute( + """ + INSERT INTO configuracoes.capturas_semaforos ( + tabela_destino, + unidade_geografica_id, + periodo_id, + cliente_nome, + cliente_ipv4 + ) + VALUES ( + :tabela_destino, + :unidade_geografica_id, + :periodo_id, + :cliente_nome, + :cliente_ipv4 + ) + """, + { + "tabela_destino": tabela_teste, + "unidade_geografica_id": unidade_geografica_id, + "periodo_id": periodo_id, + "cliente_nome": "cliente_teste", + "cliente_ipv4": "127.0.0.1", + }, + ) + with pytest.raises(EscritaBloqueadaExcecao): + checar_escrita_liberada( + sessao, + tabela_teste, + unidade_geografica_id, + periodo_id, + )