Skip to content

Commit

Permalink
Merge branch 'main' into correcoes_etl_cnes
Browse files Browse the repository at this point in the history
  • Loading branch information
waltmatheus committed May 10, 2023
2 parents 11eaa55 + d88ab80 commit 4a21b17
Show file tree
Hide file tree
Showing 15 changed files with 1,747 additions and 775 deletions.
4 changes: 0 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ FROM base AS dependencias-sistema

# Instalar Geckodriver e Firefox-ESR
RUN apt-get install -yqq curl
RUN curl -s -L \
https://github.com/mozilla/geckodriver/releases/download/v0.31.0/geckodriver-v0.31.0-linux64.tar.gz \
| tar xz -C /usr/local/bin/
RUN apt-get install -yqq firefox-esr:amd64=91.13.0esr-1~deb11u1

# instalar dependências
RUN apt-get install -yqq git build-essential libssl-dev libffi-dev python3-dev cargo
Expand Down
1,778 changes: 1,058 additions & 720 deletions poetry.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ authors = ["ImpulsoGov <contato@impulsogov.org>"]

python = ">=3.10,<3.11"
cryptography = ">=38.0.3"
pandas = "1.4.2"
"psycopg2-binary" = "^2.9.3"
scipy = "~1.8.1"
frozendict = "2.1.1"
Expand All @@ -37,7 +36,15 @@ uuid6 = "^2022.6.25"
lxml = "^4.9.1"
xlrd = "^2.0.1"
beautifulsoup4 = "^4.11.2"
pandas = "1.4.3"
pyreaddbc = "1.0.0"
dbfread = "2.0.7"

[tool.poetry.group.prefect]
optional = true

[tool.poetry.group.prefect.dependencies]
prefect = ">=2.10.8, <3.0"

[tool.poetry.group.dev.dependencies]

Expand All @@ -58,13 +65,6 @@ safety = "1.10.3"
wemake-python-styleguide="0.0.1"
pytest-testmon = "^1.2.3"

[tool.poetry.group.prefect]
optional = true

[tool.poetry.group.prefect.dependencies]
prefect = ">=2.6.7, <3.0"


[tool.poetry.extras]

analise = [
Expand Down
90 changes: 49 additions & 41 deletions src/impulsoetl/scripts/saude_mental.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from impulsoetl.siasus.raas_ps import obter_raas_ps
from impulsoetl.sihsus.aih_rd import obter_aih_rd
from impulsoetl.sinan.violencia import obter_agravos_violencia
from impulsoetl.sisab.producao import obter_relatorio_producao
from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.principal import obter_relatorio_resolutividade_por_condicao
from impulsoetl.sisab.relatorio_tipo_equipe_por_tipo_producao.principal import obter_relatorio_tipo_equipe_por_producao

agendamentos = tabelas["configuracoes.capturas_agendamentos"]
capturas_historico = tabelas["configuracoes.capturas_historico"]
Expand All @@ -39,28 +40,16 @@
validate_parameters=False,
)
def resolutividade_aps_por_condicao(
teste: bool = False,
teste:bool = False,
) -> None:
"""Desfechos dos atendimentos individuais da APS por condição avaliada.
Argumentos:
sessao: objeto [`sqlalchemy.orm.session.Session`][] que permite
acessar a base de dados da ImpulsoGov.
teste: Indica se as modificações devem ser de fato escritas no banco de
dados (`False`, padrão). Caso seja `True`, as modificações são
adicionadas à uma transação, e podem ser revertidas com uma chamada
posterior ao método [`Session.rollback()`][] da sessão gerada com o
SQLAlchemy.

[`sqlalchemy.orm.session.Session`]: https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
"""
habilitar_suporte_loguru()
logger.info(
"Capturando dados de resolutividade da APS (desfechos de atendimentos "
+ "individuais) por condição de saúde avaliada.",
)

operacao_id = "bdbeb1c4-bdc6-432f-a3b4-b6ca306e32c9"
operacao_id = "0644acff-4642-75e1-b559-6193f928cb16"
with Sessao() as sessao:
agendamentos_resolutividade_por_condicao = (
sessao.query(agendamentos)
Expand All @@ -69,21 +58,30 @@ def resolutividade_aps_por_condicao(
)

for agendamento in agendamentos_resolutividade_por_condicao:
obter_relatorio_producao(
tabela_destino=agendamento.tabela_destino,
variaveis=("Conduta", "Problema/Condição Avaliada"),
unidades_geograficas_ids=[agendamento.unidade_geografica_id],
unidade_geografica_tipo="Municípios",
ano=agendamento.periodo_data_inicio.year,
mes=agendamento.periodo_data_inicio.month,
atualizar_captura=False,
sessao=sessao,
teste=teste,
obter_relatorio_resolutividade_por_condicao(
sessao = sessao,
teste = teste,
tabela_destino = agendamento.tabela_destino,
periodo_id = agendamento.periodo_id,
unidade_geografica_id = agendamento.unidade_geografica_id,
unidade_geografica_id_sus= agendamento.unidade_geografica_id_sus,
periodo_competencia = agendamento.periodo_data_inicio,
)
logger.info("Registrando captura bem-sucedida...")
requisicao_inserir_historico = capturas_historico.insert(
{
"operacao_id": operacao_id,
"periodo_id": agendamento.periodo_id,
"unidade_geografica_id": agendamento.unidade_geografica_id,
}
)
if teste: # evitar rodar muitas iterações
conector = sessao.connection()
conector.execute(requisicao_inserir_historico)
if teste:
sessao.rollback()
break
sessao.commit()
logger.info("OK.")


@flow(
Expand All @@ -101,8 +99,9 @@ def resolutividade_aps_por_condicao(
validate_parameters=False,
)
def tipo_equipe_por_tipo_producao(
teste: bool = False,
teste:bool = True,
) -> None:

"""Número de contatos assistenciais na APS por tipo de produção e equipe.
Argumentos:
Expand All @@ -116,36 +115,45 @@ def tipo_equipe_por_tipo_producao(
[`sqlalchemy.orm.session.Session`]: https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
"""

habilitar_suporte_loguru()
logger.info(
"Capturando dados de atendimentos individuais) por condição de saúde avaliada.",
)
variaveis = ("Tipo de Equipe", "Tipo de Produção")
operacao_id = "0f397c27-db38-4fd9-b097-3a9e25138b4c"

operacao_id = '0644be06-c2f5-75ef-9c31-8027d0a6f166'
with Sessao() as sessao:
agendamentos_producao_por_equipe = (
sessao.query(agendamentos)
.filter(agendamentos.c.operacao_id == operacao_id)
.all()
)

for agendamento in agendamentos_producao_por_equipe:
obter_relatorio_producao(
sessao=sessao,
tabela_destino=agendamento.tabela_destino,
variaveis=variaveis,
unidades_geograficas_ids=[agendamento.unidade_geografica_id],
unidade_geografica_tipo=agendamento.unidade_geografica_tipo,
ano=agendamento.periodo_data_inicio.year,
mes=agendamento.periodo_data_inicio.month,
tipo_producao=None,
atualizar_captura=False,
teste=teste,
obter_relatorio_tipo_equipe_por_producao(
sessao = sessao,
teste = teste,
tabela_destino = agendamento.tabela_destino,
periodo_id = agendamento.periodo_id,
unidade_geografica_id = agendamento.unidade_geografica_id,
unidade_geografica_id_sus= agendamento.unidade_geografica_id_sus,
periodo_competencia = agendamento.periodo_data_inicio,
)
logger.info("Registrando captura bem-sucedida...")
requisicao_inserir_historico = capturas_historico.insert(
{
"operacao_id": operacao_id,
"periodo_id": agendamento.periodo_id,
"unidade_geografica_id": agendamento.unidade_geografica_id,
}
)
conector = sessao.connection()
conector.execute(requisicao_inserir_historico)
if teste:
sessao.rollback()
break
sessao.commit()

logger.info("OK.")

@flow(
name="Rodar Agendamentos de Arquivos de Disseminação da RAAS-PS",
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import pandas as pd
from prefect import task
from sqlalchemy import delete
from sqlalchemy.orm import Query, Session

from impulsoetl.bd import tabelas
from impulsoetl.loggers import habilitar_suporte_loguru, logger
from impulsoetl.utilitarios.bd import carregar_dataframe


def obter_lista_registros_inseridos(
sessao: Session,
tabela_destino: str,
) -> Query:
"""Obtém lista de registro da períodos que já constam na tabela.
Argumentos:
sessao: objeto [`sqlalchemy.orm.session.Session`][] que permite
acessar a base de dados da ImpulsoGov.
tabela_destino: Tabela que irá acondicionar os dados.
Retorna:
Lista de períodos que já constam na tabela destino
[`sqlalchemy.orm.session.Session`]: https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
"""

tabela = tabelas[tabela_destino]
registros = sessao.query(tabela.c.periodo_id, tabela.c.unidade_geografica_id).distinct(
tabela.c.periodo_id, tabela.c.unidade_geografica_id
)

logger.info("Leitura dos períodos inseridos no banco Impulso OK!")
return registros


def carregar_dados(
sessao: Session,
df_tratado: pd.DataFrame,
tabela_destino: str,
periodo_id: str,
unidade_geografica_id: str,
) -> int:
"""Carrega os dados de um arquivo validação do portal SISAB no BD da Impulso.
Argumentos:
sessao: objeto [`sqlalchemy.orm.session.Session`][] que permite
acessar a base de dados da ImpulsoGov.
df_tratado: objeto [`pandas.DataFrame`][] contendo os
dados a serem carregados na tabela de destino, já no formato
utilizado pelo banco de dados da ImpulsoGov.
Retorna:
Código de saída do processo de carregamento. Se o carregamento
for bem sucedido, o código de saída será `0`.
[`sqlalchemy.orm.session.Session`]: https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
[`pandas.D"""
habilitar_suporte_loguru()
logger.info("Excluíndo registros se houver atualização retroativa...")
tabela_relatorio_producao = tabelas[tabela_destino]
registros_inseridos = obter_lista_registros_inseridos(
sessao, tabela_destino
)

if any(
[registro.periodo_id == periodo_id for registro in registros_inseridos]
):
limpar = (
delete(tabela_relatorio_producao)
.where(tabela_relatorio_producao.c.periodo_id == periodo_id)
.where(tabela_relatorio_producao.c.unidade_geografica_id == unidade_geografica_id)
)
logger.debug(limpar)
sessao.execute(limpar)

logger.info("Carregando dados em tabela...")
carregar_dataframe(
sessao=sessao, df=df_tratado, tabela_destino=tabela_destino
)

logger.info(
"Carregamento concluído para a tabela `{tabela_nome}`: "
+ "adicionadas {linhas_adicionadas} novas linhas.",
tabela_nome=tabela_destino,
linhas_adicionadas=len(df_tratado),
)

return 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# SPDX-FileCopyrightText: 2021, 2022 ImpulsoGov <contato@impulsogov.org>
#
# SPDX-License-Identifier: MIT

"""Extrai dados de produção da APS a partir do SISAB."""

import warnings

warnings.filterwarnings("ignore")

import pandas as pd

from datetime import date
from prefect import task

from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import extrair_producao_por_municipio
from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import transformar_producao_por_municipio


from impulsoetl.loggers import logger
"""
@task(
name= "Extrair Dados de Produção da APS por Tipo de Condição Avaliada e Desfecho",
description=(
"Extrai os dados do relatório de produção da Atenção Primária à Saúde,"
+ "por problema/condição avaliada e desfecho a partir do portal público"
+ "do Sistema de Informação em Saúde para a Atenção Básica do SUS."
),
tags=["aps", "sisab", "producao", "extracao"],
retries=2,
retry_delay_seconds=120,
)"""
def extrair_relatorio(
periodo_competencia: date)-> pd.DataFrame():
"""
Extrai relatório de produção por problema/condição avaliada e conduta a partir da página do SISAB
Argumentos:
periodo_data_inicio: Data da competência
Retorna:
Objeto [`pandas.DataFrame`] com os dados extraídos.
"""
df_consolidado = pd.DataFrame()

try:
df_parcial = extrair_producao_por_municipio(
tipo_producao="Atendimento Individual",
competencias=[periodo_competencia],
selecoes_adicionais={
"Problema/Condição Avaliada": "Selecionar Todos",
"Conduta":"Selecionar Todos",
},
).pipe(transformar_producao_por_municipio)

df_consolidado = df_consolidado.append(df_parcial)

except Exception as e:
logger.error(e)
pass

return df_consolidado

Loading

0 comments on commit 4a21b17

Please sign in to comment.