Skip to content

Commit

Permalink
Implementa semáforos de escrita (#68)
Browse files Browse the repository at this point in the history
* Corrige função de extração dos profissionais

* Corrige função de tratamento dos profissionais de saúde

* Adiciona bloco try except na função extrair_profissionais_com_ine

* Inclui bloco try/except para o erro json.JSONDecodeError

* Inclui condições para identificar os horários dos estabelecimentos que tem funcionamento indicado como 'Sempre Aberto'

* Inclui exceção para considerar a ausência de profisisonais em determinada competência durant ea extração

* Corrige função para tratar tipos das colunas de data

* Inclui tasks e flow do prefect

* Refatora função que extrai os profissionais com INE

* ETL dos profissinais de saúde com INE

* Inclui função para extração dos profissionais com INE

* Cria ETL para os profissionais totais

* Cria função para extração dos profisisonais totais a partir da página do CNES

* Implementa fluxos do ETL SCNES

* Adiona novos argumentos de funções

* Exclui módulo de carregamento

* Exclui módulo de modelos para tabelas

* Exclui etapas de tratamento e inclui novas funções

* Adiona funções e refine construção do DataFrame

* Adiciona argumentos à função principal

* Exclui módulo de verificação de dados

* Formata código baseado no padrão de esilo  PEP8

* Comenta funções que utilizam a biblioteca dbfread

* Atualiza biblioteca prefect para versão anterior

* Salva bloco de infra do Docker

* Adiciona params p/ salvar bloco Docker

* Salva bloco de registro DockerHub

* Exclui módulos que estão desuso na pipeline devido a redunção de uso de funções e de etapas de verificação de dados que não garantem integridade dos dados a serem inseridos

* Adiciona método que registra o erro gerado e grava no banco de dados

* Adiciona bloco try/except, novos argumentos de função e módulos e bibliotecas a serem importados

* Retira bloco try/except, corrige argumentos de funções

* Retira decorador de task do prefect corrigindo problemas de orquestração de fluxos

* Adiona novos argumentos as funções de inicialização de ETLs puxando novas variaveis dos agendmaentos para realizar tratamento dos erros

* Trata valores nulos ignorando na conversão tipos a atribuindo valor None

* Refatora funções de tratamento para simplificação de etapas

* Funções de extração e tratamento dos Relatórios de produção do SISAB

* Extração e tratamento p/ bloco 06 do painel

* Primeiras funções para extração dos dados para atualização do painel do AGP

* Cria funções de ETL para os dados que irão compor o painel de produtividade do AGP

* Adiciona função para o ETL dos dados de produção do AGP no script geral

* Deleta arquivo de teste

* Cria arquivo init.py

* Adiciona tasks do prefeito na função principal

* Especifica tipo de erro no módulo try/except

* Corrige argumentos e chamada de funções

* refactor: ♻️ Aumenta timeouts de ETLs de SM (#66)

Aumenta timeout_seconds para capturas de BPA-i, AIH e Procedimentos Ambulatoriais para evitar [Errno 110] Connection timed out\n')

* fix: 🚑 Pula checagem de tamanho

Comenta fora o trecho da função `extrair_dbc_lotes()` que checa se o tamanho do arquivo baixado é o mesmo que o do arquivo no servidor FTP, já que não tem havido mais downloads corrompidos desde o uso do ftplib, e a checagem de tamanho no servidor às vezes gera falha em alguns arquivos do FTP do DataSUS.

* fix: ⬆️ Corrige erro de instalação do pyyaml

Atualiza dependências para que a versão mais recente (>= 6.0.0) do pacote pyyaml seja instalada, evitando erro pela falta de suporte à PEP 517

* feat: 🗃️ Adiciona suporte a semáforos para escrita no banco de bancos

* test: ✅ Adiciona testes para semáforos

* feat: 🗃️ Adiciona exemplo de uso de semáforo em script de ETL

* Corrige função de extração dos profissionais

* Corrige função de tratamento dos profissionais de saúde

* Adiciona bloco try except na função extrair_profissionais_com_ine

* Corrige função para tratar tipos das colunas de data

* Inclui tasks e flow do prefect

* Refatora função que extrai os profissionais com INE

* ETL dos profissinais de saúde com INE

* refactor: ♻️ Aumenta timeouts de ETLs de SM (#66)

Aumenta timeout_seconds para capturas de BPA-i, AIH e Procedimentos Ambulatoriais para evitar [Errno 110] Connection timed out\n')

* fix: 🚑 Pula checagem de tamanho

Comenta fora o trecho da função `extrair_dbc_lotes()` que checa se o tamanho do arquivo baixado é o mesmo que o do arquivo no servidor FTP, já que não tem havido mais downloads corrompidos desde o uso do ftplib, e a checagem de tamanho no servidor às vezes gera falha em alguns arquivos do FTP do DataSUS.

* fix: ⬆️ Corrige erro de instalação do pyyaml

Atualiza dependências para que a versão mais recente (>= 6.0.0) do pacote pyyaml seja instalada, evitando erro pela falta de suporte à PEP 517

* feat: 🗃️ Adiciona suporte a semáforos para escrita no banco de bancos

* test: ✅ Adiciona testes para semáforos

* feat: 🗃️ Adiciona exemplo de uso de semáforo em script de ETL

* feat: 🗃️ Adiciona semáforos em ETLs de SM

Adiciona semáforos em ETLs de SM

* feat: 🗃️ Adiciona semáforos em ETLs gerais relevantes para SM

Adiciona semáforos em ETLs gerais relevantes para SM e corrige bug de typo no script saude_mental

* fix: 🐛 Corrige erro de identação em ETL de CEPs

* fix: 🐛 Define default para argumento de unidade geográfica em função

Define default para argumento de unidade geográfica na função 'insere_erro_database'

---------

Co-authored-by: maira <maaottoni@gmail.com>
Co-authored-by: WaltMath <wmatheus.97@gmail.com>
Co-authored-by: Bruna Fernandes <108029366+fernandesbruna@users.noreply.github.com>
Co-authored-by: fernandesbruna <brunadesfernandes@gmail.com>
  • Loading branch information
5 people committed Aug 18, 2023
1 parent 4c9ac20 commit 49740df
Show file tree
Hide file tree
Showing 5 changed files with 621 additions and 8 deletions.
159 changes: 159 additions & 0 deletions src/impulsoetl/scripts/geral.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
from impulsoetl.scnes.estabelecimentos_profissionais_com_ine.principal import obter_profissionais_cnes_com_ine
from impulsoetl.scnes.estabelecimentos_profissionais_totais.principal import obter_profissionais_cnes_totais
from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.principal import relatorio_profissional_conduta_atendimento
from impulsoetl.utilitarios.semaforos import (
bloquear_escrita,
checar_escrita_liberada,
EscritaBloqueadaExcecao,
liberar_escrita,
)


agendamentos = tabelas["configuracoes.capturas_agendamentos"]
Expand Down Expand Up @@ -57,7 +63,24 @@ def habilitacoes_disseminacao(
.filter(agendamentos.c.operacao_id == operacao_id)
.all()
)

for agendamento in agendamentos_habilitacoes:
try:
checar_escrita_liberada(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
except EscritaBloqueadaExcecao:
logger.warning("Pulando...")
continue
bloquear_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
obter_habilitacoes(
sessao=sessao,
uf_sigla=agendamento.uf_sigla,
Expand All @@ -84,6 +107,12 @@ def habilitacoes_disseminacao(
sessao.rollback()
break
sessao.commit()
liberar_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
logger.info("OK.")


Expand Down Expand Up @@ -115,6 +144,22 @@ def vinculos_disseminacao(
.all()
)
for agendamento in agendamentos_vinculos:
try:
checar_escrita_liberada(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
except EscritaBloqueadaExcecao:
logger.warning("Pulando...")
continue
bloquear_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
obter_vinculos(
sessao=sessao,
uf_sigla=agendamento.uf_sigla,
Expand All @@ -141,6 +186,12 @@ def vinculos_disseminacao(
sessao.rollback()
break
sessao.commit()
liberar_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
logger.info("OK.")


Expand Down Expand Up @@ -172,6 +223,22 @@ def obitos_disseminacao(
.all()
)
for agendamento in agendamentos_do:
try:
checar_escrita_liberada(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
except EscritaBloqueadaExcecao:
logger.warning("Pulando...")
continue
bloquear_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
obter_do(
sessao=sessao,
uf_sigla=agendamento.uf_sigla,
Expand Down Expand Up @@ -200,6 +267,12 @@ def obitos_disseminacao(
conector = sessao.connection()
conector.execute(requisicao_inserir_historico)
sessao.commit()
liberar_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
logger.info("OK.")


Expand Down Expand Up @@ -232,6 +305,23 @@ def ceps(teste: bool = False) -> None:
ceps_pendentes_query = ceps_pendentes_query.limit(10)
ceps_pendentes = ceps_pendentes_query.all()

try:
checar_escrita_liberada(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
except EscritaBloqueadaExcecao:
logger.warning("Pulando...")
continue
bloquear_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)

obter_cep(sessao=sessao, ceps_pendentes=ceps_pendentes, teste=teste)

@flow(
Expand Down Expand Up @@ -268,6 +358,23 @@ def cnes_estabelecimentos_identificados(teste: bool = False,)-> None:
codigo_sus_municipio = agendamento.unidade_geografica_id_sus
periodo_data_inicio = agendamento.periodo_data_inicio

try:
checar_escrita_liberada(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
except EscritaBloqueadaExcecao:
logger.warning("Pulando...")
continue
bloquear_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)

obter_informacoes_estabelecimentos_identificados(
sessao=sessao,
tabela_destino=tabela_destino,
Expand All @@ -293,6 +400,12 @@ def cnes_estabelecimentos_identificados(teste: bool = False,)-> None:
conector = sessao.connection()
conector.execute(requisicao_inserir_historico)
sessao.commit()
liberar_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
logger.info("OK.")

@flow(
Expand Down Expand Up @@ -444,6 +557,23 @@ def cnes_estabelecimentos_horarios(teste: bool = True,)-> None:
codigo_sus_municipio = agendamento.unidade_geografica_id_sus
periodo_data_inicio = agendamento.periodo_data_inicio

try:
checar_escrita_liberada(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
except EscritaBloqueadaExcecao:
logger.warning("Pulando...")
continue
bloquear_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)

obter_horarios_estabelecimentos(
sessao=sessao,
tabela_destino=tabela_destino,
Expand All @@ -469,6 +599,12 @@ def cnes_estabelecimentos_horarios(teste: bool = True,)-> None:
conector = sessao.connection()
conector.execute(requisicao_inserir_historico)
sessao.commit()
liberar_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
logger.info("OK.")


Expand Down Expand Up @@ -504,6 +640,23 @@ def cnes_profissionais_totais(
codigo_sus_municipio = agendamento.unidade_geografica_id_sus
periodo_data_inicio = agendamento.periodo_data_inicio

try:
checar_escrita_liberada(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
except EscritaBloqueadaExcecao:
logger.warning("Pulando...")
continue
bloquear_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)

obter_profissionais_cnes_totais(
sessao=sessao,
tabela_destino=tabela_destino,
Expand All @@ -529,6 +682,12 @@ def cnes_profissionais_totais(
conector = sessao.connection()
conector.execute(requisicao_inserir_historico)
sessao.commit()
liberar_escrita(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
unidade_geografica_id=agendamento.unidade_geografica_id,
periodo_id=agendamento.periodo_id,
)
logger.info("OK.")


Expand Down
Loading

0 comments on commit 49740df

Please sign in to comment.