From d46df9cc97d1829026a27719353da252b7f0feb6 Mon Sep 17 00:00:00 2001 From: WaltMath Date: Tue, 9 May 2023 21:52:14 -0300 Subject: [PATCH 1/8] =?UTF-8?q?Adiona=20novos=20argumentos=20de=20fun?= =?UTF-8?q?=C3=A7=C3=B5es?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sisab/cadastros_individuais/principal.py | 59 +++++++++++-------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/src/impulsoetl/sisab/cadastros_individuais/principal.py b/src/impulsoetl/sisab/cadastros_individuais/principal.py index b5a8708..96aebc5 100644 --- a/src/impulsoetl/sisab/cadastros_individuais/principal.py +++ b/src/impulsoetl/sisab/cadastros_individuais/principal.py @@ -4,23 +4,19 @@ from datetime import date +import time from prefect import flow from sqlalchemy.orm import Session from impulsoetl import __VERSION__ +from impulsoetl.bd import Sessao from impulsoetl.loggers import habilitar_suporte_loguru, logger -from impulsoetl.sisab.cadastros_individuais.carregamento import ( - carregar_cadastros, -) from impulsoetl.sisab.cadastros_individuais.extracao import ( extrair_cadastros_individuais, ) -from impulsoetl.sisab.cadastros_individuais.tratamento import tratamento_dados -from impulsoetl.sisab.cadastros_individuais.verificacao import ( - verificar_cadastros_individuais, -) - +from impulsoetl.sisab.cadastros_individuais.tratamento import tratar_dados +from impulsoetl.utilitarios.bd import carregar_dataframe @flow( name="Obter Cadastros Individuais", @@ -37,9 +33,11 @@ def obter_cadastros_individuais( sessao: Session, visao_equipe: str, - periodo: date, - com_ponderacao: list[bool] = [True, False], - teste: bool = True, + periodo_data: date, + periodo_id: str, + periodo_codigo: str, + tabela_destino: str, + com_ponderacao: list[bool] = [False, True] ) -> None: """Extrai, transforma e carrega dados de cadastros de equipes pelo SISAB. @@ -63,24 +61,39 @@ def obter_cadastros_individuais( """ habilitar_suporte_loguru() + tempo_inicio_etl = time.time() for status_ponderacao in com_ponderacao: - df = extrair_cadastros_individuais( + + logger.info("Iniciando extração dos dados...") + df_extraido = extrair_cadastros_individuais( visao_equipe=visao_equipe, com_ponderacao=status_ponderacao, - competencia=periodo, + competencia=periodo_data, ) logger.info("Extração dos dados realizada...") - df_tratado = tratamento_dados( + + logger.info("Iniciando tratamento dos dados...") + df_tratado = tratar_dados( sessao=sessao, - dados_sisab_cadastros=df, + df_extraido=df_extraido, com_ponderacao=status_ponderacao, - periodo=periodo, + periodo_id=periodo_id, + periodo_codigo=periodo_codigo, ) - verificar_cadastros_individuais(df=df, df_tratado=df_tratado) - carregar_cadastros( - sessao=sessao, - cadastros_transformada=df_tratado, - visao_equipe=visao_equipe, + logger.info("Tratamento dos dados realizada...") + + logger.info("Iniciando carga dos dados no banco...") + carregar_dataframe( + sessao=sessao, df=df_tratado, tabela_destino=tabela_destino ) - if not teste: - sessao.commit() + logger.info("Carga dos dados no banco realizada...") + + tempo_final_etl = time.time() - tempo_inicio_etl + logger.info( + "Terminou ETL para `{visao_equipe}` " + + "da comepetência`{periodo_codigo}` " + + "em {tempo_final_etl}.", + tabela_nome=tabela_destino, + periodo_codigo=periodo_codigo, + tempo_final_etl=tempo_final_etl + ) From a9ecd8072cc68c92cef61107f198fb1e5a49590d Mon Sep 17 00:00:00 2001 From: WaltMath Date: Tue, 9 May 2023 21:52:39 -0300 Subject: [PATCH 2/8] =?UTF-8?q?Exclui=20m=C3=B3dulo=20de=20carregamento?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cadastros_individuais/carregamento.py | 69 ------------------- 1 file changed, 69 deletions(-) delete mode 100644 src/impulsoetl/sisab/cadastros_individuais/carregamento.py diff --git a/src/impulsoetl/sisab/cadastros_individuais/carregamento.py b/src/impulsoetl/sisab/cadastros_individuais/carregamento.py deleted file mode 100644 index 8d99036..0000000 --- a/src/impulsoetl/sisab/cadastros_individuais/carregamento.py +++ /dev/null @@ -1,69 +0,0 @@ -# SPDX-FileCopyrightText: 2022 ImpulsoGov -# -# SPDX-License-Identifier: MIT - - -import json - -import pandas as pd -from prefect import task -from sqlalchemy.orm import Session - -from impulsoetl.loggers import habilitar_suporte_loguru, logger -from impulsoetl.sisab.cadastros_individuais.modelos import ( - cadastros_equipe_homologadas, - cadastros_equipe_validas, - cadastros_todas_equipes, -) - - -@task( - name="Carregar Cadastros Individuais", - description=( - "Carrega os dados de cadastros individuais extraídos e transformados " - + "a partir do portal público do Sistema de Informação em Saúde para " - + "a Atenção Básica do SUS." - ), - tags=["aps", "sisab", "cadastros_individuais", "carregamento"], - retries=0, - retry_delay_seconds=None, -) -def carregar_cadastros( - sessao: Session, cadastros_transformada: pd.DataFrame, visao_equipe: str -) -> int: - habilitar_suporte_loguru() - - registros = json.loads( - cadastros_transformada.to_json( - orient="records", - date_format="iso", - ) - ) - - if visao_equipe == "equipes-validas": - requisicao_insercao = cadastros_equipe_validas.insert().values( - registros - ) - sufixo_tabela = "equipe_validas" - elif visao_equipe == "equipes-homologadas": - requisicao_insercao = cadastros_equipe_homologadas.insert().values( - registros - ) - sufixo_tabela = "equipe_homologadas" - else: - requisicao_insercao = cadastros_todas_equipes.insert().values( - registros - ) - sufixo_tabela = "equipe_todas" - - conector = sessao.connection() - conector.execute(requisicao_insercao) - - logger.info( - "Carregamento concluído para a tabela `{tabela_nome}`: " - + "adicionadas {linhas_adicionadas} novas linhas.", - tabela_nome=f"dados_publicos._sisab_cadastros_municipios_{sufixo_tabela}", - linhas_adicionadas=len(cadastros_transformada), - ) - - return 0 From a44d1a70c2d9ae23d7483b1883c3b00b18171391 Mon Sep 17 00:00:00 2001 From: WaltMath Date: Tue, 9 May 2023 21:52:59 -0300 Subject: [PATCH 3/8] =?UTF-8?q?Exclui=20m=C3=B3dulo=20de=20modelos=20para?= =?UTF-8?q?=20tabelas?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sisab/cadastros_individuais/modelos.py | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 src/impulsoetl/sisab/cadastros_individuais/modelos.py diff --git a/src/impulsoetl/sisab/cadastros_individuais/modelos.py b/src/impulsoetl/sisab/cadastros_individuais/modelos.py deleted file mode 100644 index 2184f85..0000000 --- a/src/impulsoetl/sisab/cadastros_individuais/modelos.py +++ /dev/null @@ -1,18 +0,0 @@ -# SPDX-FileCopyrightText: 2021, 2022 ImpulsoGov -# -# SPDX-License-Identifier: MIT - - -"""Declara representações das tabelas relativas ao SISAB.""" - -from impulsoetl.bd import tabelas - -cadastros_equipe_validas = tabelas[ - "dados_publicos.sisab_cadastros_municipios_equipe_validas" -] -cadastros_equipe_homologadas = tabelas[ - "dados_publicos.sisab_cadastros_municipios_equipe_homologadas" -] -cadastros_todas_equipes = tabelas[ - "dados_publicos.sisab_cadastros_municipios_equipe_todas" -] From 1577ae7bdb724d25c030895ae2bc76c307c91905 Mon Sep 17 00:00:00 2001 From: WaltMath Date: Tue, 9 May 2023 21:54:21 -0300 Subject: [PATCH 4/8] =?UTF-8?q?Exclui=20etapas=20de=20tratamento=20e=20inc?= =?UTF-8?q?lui=20novas=20fun=C3=A7=C3=B5es?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sisab/cadastros_individuais/tratamento.py | 212 ++++++++++-------- 1 file changed, 119 insertions(+), 93 deletions(-) diff --git a/src/impulsoetl/sisab/cadastros_individuais/tratamento.py b/src/impulsoetl/sisab/cadastros_individuais/tratamento.py index 7e9e575..2bcc6ab 100644 --- a/src/impulsoetl/sisab/cadastros_individuais/tratamento.py +++ b/src/impulsoetl/sisab/cadastros_individuais/tratamento.py @@ -2,18 +2,93 @@ # # SPDX-License-Identifier: MIT +from typing import Final -from datetime import date, datetime - +from frozendict import frozendict import pandas as pd from prefect import task from sqlalchemy.orm import Session -from uuid6 import uuid7 -from impulsoetl.comum.datas import periodo_por_codigo, periodo_por_data from impulsoetl.comum.geografias import id_sus_para_id_impulso -from impulsoetl.loggers import habilitar_suporte_loguru +from impulsoetl.loggers import logger,habilitar_suporte_loguru + +CADASTROS_COLUNAS_TIPOS: Final[frozendict] = frozendict( + { + "municipio_id_sus": str, + "periodo_id": str, + "cnes_id": str, + "cnes_nome": str, + "equipe_id_ine": str, + "quantidade": "int64", + "periodo_codigo": str, + "criterio_pontuacao": bool, + "unidade_geografica_id": str, + } +) + +CADASTROS_COLUNAS: Final[dict[str, str]] = { + "IBGE": "municipio_id_sus", + "CNES": "cnes_id", + "Estabelecimento": "cnes_nome", + "INE": "equipe_id_ine", +} + +def renomear_colunas( + df_extraido:pd.DataFrame, + ): + return df_extraido.rename(columns=CADASTROS_COLUNAS).rename(columns={df_extraido.columns[7]: 'quantidade'}) + +def excluir_colunas( + df_tratado:pd.DataFrame, + ): + + return (df_tratado.drop( + ["Uf", "Municipio","Sigla da equipe", "Unnamed: 8"], + axis=1 + ).dropna()) + + +def garantir_tipos_colunas( + df_tratado:pd.DataFrame, + ): + + return df_tratado.astype(CADASTROS_COLUNAS_TIPOS) + +def definir_coluna_criterio_pontuacao( + df_tratado:pd.DataFrame, + com_ponderacao:bool, + ): + + return df_tratado.insert(1, "criterio_pontuacao", com_ponderacao, allow_duplicates = True) +def definir_coluna_periodo_codigo( + df_tratado:pd.DataFrame, + periodo_codigo:str, + ): + + return df_tratado.insert(1, "periodo_codigo", periodo_codigo, allow_duplicates = True) + +def definir_coluna_periodo_id( + df_tratado:pd.DataFrame, + periodo_id:str, + ): + + return df_tratado.insert(1, "periodo_id", periodo_id, allow_duplicates = True) + +def definir_coluna_unidade_geografica_id( + df_tratado:pd.DataFrame, + sessao:Session, + ): + + df_tratado["unidade_geografica_id"] = df_tratado[ + "municipio_id_sus" + ].apply( + lambda municipio_id_sus: id_sus_para_id_impulso( + sessao=sessao, + id_sus=municipio_id_sus, + ) + ) + return df_tratado @task( name="Transformar Cadastros Individuais", @@ -26,98 +101,49 @@ retries=0, retry_delay_seconds=None, ) -def tratamento_dados( + +def tratar_dados( sessao: Session, - dados_sisab_cadastros: pd.DataFrame, + df_extraido: pd.DataFrame, com_ponderacao: bool, - periodo: date, + periodo_id:str, + periodo_codigo:str, ) -> pd.DataFrame: + """Inclui todas etapas de transformação dos dados de cadastros de equipes pelo SISAB. + + Argumentos: + sessao: objeto [`sqlalchemy.orm.session.Session`][] que permite + acessar a base de dados da ImpulsoGov. + df_extraido: DataFrame do relatŕorio extraído no SISAB. + visao_equipe: Indica a situação da equipe considerada para a contagem + dos cadastros. + com_ponderacao: Lista de booleanos indicando quais tipos de população + devem ser filtradas no cadastro - onde `True` indica apenas as + populações com critério de ponderação e `False` indica todos os + cadastros. Por padrão, o valor é `[True, False]`, indicando que + ambas as possibilidades são extraídas do SISAB e carregadas para a + mesma tabela de destino. + periodo_id: Identificador único do período referente ao mês/ano de disponibilização do relatório. + periodo_codigo: Código do período referente ao mês/ano de disponibilização do relatório. + """ + habilitar_suporte_loguru() - tabela_consolidada = pd.DataFrame( - columns=[ - "id", - "municipio_id_sus", - "periodo_id", - "periodo_codigo", - "cnes_id", - "cnes_nome", - "equipe_id_ine", - "quantidade", - "criterio_pontuacao", - "criacao_data", - "atualizacao_data", - ] - ) + + logger.info("Renomeando colunas da tabela...") + print(df_extraido) + df_tratado = renomear_colunas(df_extraido=df_extraido) - periodo_cod = periodo_por_data(sessao=sessao, data=periodo) - tabela_consolidada[ - [ - "municipio_id_sus", - "cnes_id", - "cnes_nome", - "equipe_id_ine", - "quantidade", - ] - ] = dados_sisab_cadastros.loc[ - :, ["IBGE", "CNES", "Nome UBS", "INE", "quantidade"] - ] - tabela_consolidada["criterio_pontuacao"] = com_ponderacao - tabela_consolidada["periodo_codigo"] = periodo_cod[3] - tabela_consolidada.reset_index(drop=True, inplace=True) - tabela_consolidada["id"] = tabela_consolidada.apply( - lambda row: uuid7(), axis=1 - ) - tabela_consolidada["criacao_data"] = datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ) - tabela_consolidada["atualizacao_data"] = datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ) + logger.info("Exluindo colunas não úteis da tabela...") + df_tratado = excluir_colunas(df_tratado=df_tratado) - periodo_obj = periodo_por_codigo(sessao=sessao, codigo=periodo_cod[3]) - tabela_consolidada["periodo_id"] = periodo_obj.id - tabela_consolidada["unidade_geografica_id"] = tabela_consolidada[ - "municipio_id_sus" - ].apply( - lambda municipio_id_sus: id_sus_para_id_impulso( - sessao=sessao, - id_sus=municipio_id_sus, - ) - ) + logger.info("Ennquicimento de tabela com novas colunas...") + definir_coluna_criterio_pontuacao(df_tratado=df_tratado,com_ponderacao=com_ponderacao) + definir_coluna_periodo_codigo(df_tratado=df_tratado,periodo_codigo=periodo_codigo) + definir_coluna_periodo_id(df_tratado=df_tratado,periodo_id=periodo_id) + definir_coluna_unidade_geografica_id(df_tratado=df_tratado,sessao=sessao) - tabela_consolidada["id"] = tabela_consolidada["id"].astype("string") - tabela_consolidada["municipio_id_sus"] = tabela_consolidada[ - "municipio_id_sus" - ].astype("string") - tabela_consolidada["periodo_id"] = tabela_consolidada["periodo_id"].astype( - "string" - ) - tabela_consolidada["periodo_codigo"] = tabela_consolidada[ - "periodo_codigo" - ].astype("string") - tabela_consolidada["cnes_id"] = tabela_consolidada["cnes_id"].astype( - "string" - ) - tabela_consolidada["cnes_nome"] = tabela_consolidada["cnes_nome"].astype( - "string" - ) - tabela_consolidada["unidade_geografica_id"] = tabela_consolidada[ - "unidade_geografica_id" - ].astype("string") - tabela_consolidada["equipe_id_ine"] = tabela_consolidada[ - "equipe_id_ine" - ].astype("string") - tabela_consolidada["quantidade"] = tabela_consolidada["quantidade"].astype( - int - ) - tabela_consolidada["criterio_pontuacao"] = tabela_consolidada[ - "criterio_pontuacao" - ].astype(bool) - tabela_consolidada["criacao_data"] = tabela_consolidada[ - "criacao_data" - ].astype("string") - tabela_consolidada["atualizacao_data"] = tabela_consolidada[ - "atualizacao_data" - ].astype("string") - - return tabela_consolidada + logger.info("Garantindo tipagem dos dados...") + garantir_tipos_colunas(df_tratado=df_tratado) + + df_tratado.reset_index(drop=True, inplace=True) + return df_tratado From 4dcc59cc02a101d676f389d2124cebc01f655042 Mon Sep 17 00:00:00 2001 From: WaltMath Date: Tue, 9 May 2023 21:55:15 -0300 Subject: [PATCH 5/8] =?UTF-8?q?Adiona=20fun=C3=A7=C3=B5es=20e=20refine=20c?= =?UTF-8?q?onstru=C3=A7=C3=A3o=20do=20DataFrame?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sisab/cadastros_individuais/extracao.py | 167 +++++++++--------- 1 file changed, 83 insertions(+), 84 deletions(-) diff --git a/src/impulsoetl/sisab/cadastros_individuais/extracao.py b/src/impulsoetl/sisab/cadastros_individuais/extracao.py index 3203c1e..4298d35 100644 --- a/src/impulsoetl/sisab/cadastros_individuais/extracao.py +++ b/src/impulsoetl/sisab/cadastros_individuais/extracao.py @@ -12,7 +12,7 @@ import requests from prefect import task -from impulsoetl.loggers import habilitar_suporte_loguru +from impulsoetl.loggers import logger,habilitar_suporte_loguru from impulsoetl.sisab.parametros_requisicao import head VISOES_EQUIPE_CODIGOS: Final[dict[str, str]] = { @@ -21,33 +21,51 @@ "equipes-validas": "|HM|NC|AQ|", } +def escapar_texto(visao_equipe:str): + return ( + urllib.parse.quote( + VISOES_EQUIPE_CODIGOS[visao_equipe], + ) + ) + +def adiciona_parametro_ponderacao(com_ponderacao:bool,payload): + if com_ponderacao: + payload = payload + ("&beneficiarios=on") + else: + payload + + return payload -def _extrair_cadastros_individuais( + +def extrair_requisicao( visao_equipe: str, com_ponderacao: bool, competencia: date, ) -> str: url = ( - "https://sisab.saude.gov.br/paginas/acessoRestrito/relatorio/federal" - + "/indicadores/indicadorCadastro.xhtml" + "https://sisab.saude.gov.br/paginas/acessoRestrito/relatorio/federal/indicadores/indicadorCadastro.xhtml" ) - hd = head(url) - vs = hd[1] + + parametros_requisicaco = head(url) + headers = parametros_requisicaco[0] + view_state = parametros_requisicaco[1] + + visao_equipe_codigos = escapar_texto(visao_equipe=visao_equipe) + ponderacao = "&beneficiarios=on" if com_ponderacao else "" - visao_equipe_codigos = urllib.parse.quote( - VISOES_EQUIPE_CODIGOS[visao_equipe], - ) - headers = hd[0] payload = ( "j_idt44=j_idt44&selectLinha=cnes_ine&opacao-capitacao=" + visao_equipe_codigos + ponderacao + "&competencia={:%Y%m}".format(competencia) + "&javax.faces.ViewState=" - + vs - + "&j_idt83=j_idt83" + + view_state + + "&j_idt85=j_idt85" ) + + payload = adiciona_parametro_ponderacao(com_ponderacao=com_ponderacao,payload=payload) + response = requests.request( "POST", url, @@ -55,8 +73,26 @@ def _extrair_cadastros_individuais( data=payload, timeout=120, ) + return response.text +def definir_posicao_cabecalho( + visao_equipe:str, + com_ponderacao:bool, + ): + + if not com_ponderacao: + if visao_equipe == "todas-equipes": + header = 6 + else: + header = 7 + else: + if visao_equipe == "todas-equipes": + header = 7 + else: + header = 8 + + return header @task( name="Extrair Cadastros Individuais", @@ -68,87 +104,50 @@ def _extrair_cadastros_individuais( retries=2, retry_delay_seconds=120, ) + def extrair_cadastros_individuais( visao_equipe: str, com_ponderacao: bool, competencia: date, ) -> pd.DataFrame: + """Extrai relatório de Cadastros Individuais do SISAB. + + Argumentos: + visao_equipe: Indica a situação da equipe considerada para a contagem + dos cadastros. + periodo: Referente ao mês/ano de disponibilização do relatório. + com_ponderacao: Lista de booleanos indicando quais tipos de população + devem ser filtradas no cadastro - onde `True` indica apenas as + populações com critério de ponderação e `False` indica todos os + cadastros. Por padrão, o valor é `[True, False]`, indicando que + ambas as possibilidades são extraídas do SISAB e carregadas para a + mesma tabela de destino. + + Retorna: + Um objeto `pandas.DataFrame` com dados capturados pela requisição. + """ + habilitar_suporte_loguru() - resposta = _extrair_cadastros_individuais( + resposta = extrair_requisicao( visao_equipe=visao_equipe, com_ponderacao=com_ponderacao, competencia=competencia, ) + header = definir_posicao_cabecalho(visao_equipe=visao_equipe,com_ponderacao=com_ponderacao) + try: + df_extraido = pd.read_csv( + StringIO(resposta), + delimiter=";", + header=header, + encoding="ISO-8859-1", + engine="python", + skipfooter=4, + thousands=".", + dtype="object", + ) + return df_extraido - df = pd.read_csv( - StringIO(resposta), - delimiter="\t", - header=None, - engine="python", - ) - - if not com_ponderacao: - if visao_equipe == "todas-equipes": - dados = df.iloc[8:-4] - df = pd.DataFrame(data=dados) - df = df[0].str.split(";", expand=True) - df.columns = [ - "Uf", - "IBGE", - "Municipio", - "CNES", - "Nome UBS", - "INE", - "Sigla", - "quantidade", - "Parametro", - ] - else: - dados = df.iloc[9:-4] - df = pd.DataFrame(data=dados) - df = df[0].str.split(";", expand=True) - df.columns = [ - "Uf", - "IBGE", - "Municipio", - "CNES", - "Nome UBS", - "INE", - "Sigla", - "quantidade", - "Parametro", - "Coluna", - ] - else: - if visao_equipe == "todas-equipes": - dados = df.iloc[9:-4] - df = pd.DataFrame(data=dados) - df = df[0].str.split(";", expand=True) - df.columns = [ - "Uf", - "IBGE", - "Municipio", - "CNES", - "Nome UBS", - "INE", - "Sigla", - "quantidade", - "Coluna", - ] - else: - dados = df.iloc[10:-4] - df = pd.DataFrame(data=dados) - df = df[0].str.split(";", expand=True) - df.columns = [ - "Uf", - "IBGE", - "Municipio", - "CNES", - "Nome UBS", - "INE", - "Sigla", - "quantidade", - "Coluna", - ] - return df + except pd.errors.ParserError: + logger.error("Data da competência do relatório não está disponível") + From 2c56d1b4f330273c11c878e75652eeef05cbed22 Mon Sep 17 00:00:00 2001 From: WaltMath Date: Tue, 9 May 2023 21:56:48 -0300 Subject: [PATCH 6/8] =?UTF-8?q?Adiciona=20argumentos=20=C3=A0=20fun=C3=A7?= =?UTF-8?q?=C3=A3o=20principal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/impulsoetl/scripts/impulso_previne.py | 34 ++++++++++------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/src/impulsoetl/scripts/impulso_previne.py b/src/impulsoetl/scripts/impulso_previne.py index 93bd1a2..beb2be3 100644 --- a/src/impulsoetl/scripts/impulso_previne.py +++ b/src/impulsoetl/scripts/impulso_previne.py @@ -15,10 +15,6 @@ from impulsoetl.egestor.relatorio_financiamento.principal import ( obter_relatorio_financiamento, ) -""" -from impulsoetl.egestor.relatorio_financiamento.principal import ( - obter_relatorio_financiamento, -) from impulsoetl.sisab.cadastros_individuais import obter_cadastros_individuais from impulsoetl.sisab.indicadores_municipios.principal import ( obter_indicadores_desempenho, @@ -27,14 +23,13 @@ from impulsoetl.sisab.relatorio_validacao_producao.principal import ( obter_validacao_producao, ) -""" from impulsoetl.sisab.relatorio_saude_producao.principal import obter_relatorio_producao_por_profissionais_reduzidos from impulsoetl.sisab.relatorio_saude_producao.principal_outros import obter_relatorio_producao_por_profissionais_outros agendamentos = tabelas["configuracoes.capturas_agendamentos"] capturas_historico = tabelas["configuracoes.capturas_historico"] -""" + @flow( name="Rodar Agendamentos de Cadastros das Equipes Válidas", description=( @@ -65,12 +60,14 @@ def cadastros_municipios_equipe_validas( ) for agendamento in agendamentos_cadastros: - periodo = agendamento.periodo_data_inicio + obter_cadastros_individuais( sessao=sessao, visao_equipe=visao_equipe, - periodo=periodo, - teste=teste, + periodo_data=agendamento.periodo_data_inicio, + periodo_id=agendamento.periodo_id, + periodo_codigo=agendamento.periodo_codigo, + tabela_destino=agendamento.tabela_destino, ) if teste: sessao.rollback() @@ -125,12 +122,13 @@ def cadastros_municipios_equipe_homologada( ) for agendamento in agendamentos_cadastros: - periodo = agendamento.periodo_data_inicio obter_cadastros_individuais( sessao=sessao, visao_equipe=visao_equipe, - periodo=periodo, - teste=teste, + periodo_data=agendamento.periodo_data_inicio, + periodo_id=agendamento.periodo_id, + periodo_codigo=agendamento.periodo_codigo, + tabela_destino=agendamento.tabela_destino, ) if teste: sessao.rollback() @@ -185,12 +183,13 @@ def cadastros_municipios_equipe_todas( ) for agendamento in agendamentos_cadastros: - periodo = agendamento.periodo_data_inicio obter_cadastros_individuais( sessao=sessao, visao_equipe=visao_equipe, - periodo=periodo, - teste=teste, + periodo_data=agendamento.periodo_data_inicio, + periodo_id=agendamento.periodo_id, + periodo_codigo=agendamento.periodo_codigo, + tabela_destino=agendamento.tabela_destino, ) if teste: sessao.rollback() @@ -798,7 +797,7 @@ def egestor_financiamento( conector.execute(requisicao_inserir_historico) sessao.commit() logger.info("OK.") -""" + @flow( name=("Rodar Agendamentos do Relatório de Produção do SISAB - Profissionais Reduzidos"), description=( @@ -857,9 +856,7 @@ def relatorio_producao_saude_profissionais_reduzidos( logger.info("OK.") -#relatorio_producao_saude_profissionais_reduzidos() -""" @flow( name=("Rodar Agendamentos do Relatório de Produção do SISAB - Profissionais Outros"), description=( @@ -917,4 +914,3 @@ def relatorio_producao_saude_profissionais_outros( logger.info("OK.") -relatorio_producao_saude_profissionais_outros()""" \ No newline at end of file From b5cd4f57bb86ca019c06743adcdeb31b1a973dc4 Mon Sep 17 00:00:00 2001 From: WaltMath Date: Wed, 10 May 2023 07:36:02 -0300 Subject: [PATCH 7/8] =?UTF-8?q?Exclui=20m=C3=B3dulo=20de=20verifica=C3=A7?= =?UTF-8?q?=C3=A3o=20de=20dados?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cadastros_individuais/verificacao.py | 102 ------------------ 1 file changed, 102 deletions(-) delete mode 100644 src/impulsoetl/sisab/cadastros_individuais/verificacao.py diff --git a/src/impulsoetl/sisab/cadastros_individuais/verificacao.py b/src/impulsoetl/sisab/cadastros_individuais/verificacao.py deleted file mode 100644 index 4f2a2a9..0000000 --- a/src/impulsoetl/sisab/cadastros_individuais/verificacao.py +++ /dev/null @@ -1,102 +0,0 @@ -# SPDX-FileCopyrightText: 2022 ImpulsoGov -# -# SPDX-License-Identifier: MIT - - -import pandas as pd -from prefect import task - -from impulsoetl.loggers import habilitar_suporte_loguru - - -def verificar_qtd_municipios( - df: pd.DataFrame, - df_tratado: pd.DataFrame, -) -> bool: - """Verifica se a quantidade de municípios é superior a 5000.""" - return df["IBGE"].nunique() > 5000 - - -def verificar_diferenca_ctg_municpios( - df: pd.DataFrame, - df_tratado: pd.DataFrame, -) -> bool: - """Verifica se há diferença na contagem de municípios.""" - return ( - df["IBGE"].nunique() - df_tratado["municipio_id_sus"].nunique() - ) == 0 - - -def verificar_diferenca_mun_betim( - df: pd.DataFrame, - df_tratado: pd.DataFrame, -) -> bool: - """Verifica se há diferença na soma de cadastros de equipes em Betim-MG.""" - return ( - df.query("IBGE == '310670'")["quantidade"].astype(int).sum() - - df_tratado.query("municipio_id_sus == '310670'")["quantidade"].sum() - ) == 0 - - -def verificar_qtd_uf(df: pd.DataFrame, df_tratado: pd.DataFrame) -> bool: - """Verifica se a quantidade de unidades federativas é igual a 26.""" - return df_tratado["unidade_geografica_id"].nunique() >= 26 - - -def verificar_diferenca_qtd_cadastros( - df: pd.DataFrame, - df_tratado: pd.DataFrame, -) -> int: - """Verifica se há diferença na soma de cadastros.""" - return ( - df["quantidade"].astype(int).sum() - df_tratado["quantidade"].sum() - ) == 0 - - -def verificar_diferenca_ctg_cnes( - df: pd.DataFrame, - df_tratado: pd.DataFrame, -) -> int: - """Verifica se há diferença na contagem de estabelecimentos.""" - return df["CNES"].nunique() - df_tratado["cnes_id"].nunique() == 0 - - -def verificar_diferenca_ctg_ine( - df: pd.DataFrame, - df_tratado: pd.DataFrame, -) -> int: - """Verifica se há diferença na contagem de equipes.""" - return df["INE"].nunique() - df_tratado["ine_id"].nunique() == 0 - - -@task( - name="Validar Cadastros Individuais", - description=( - "Valida os dados de cadastros individuais extraídos e transformados a " - + "partir do portal público do Sistema de Informação em Saúde para a " - + "Atenção Básica do SUS." - ), - tags=["aps", "sisab", "cadastros_individuais", "validacao"], - retries=0, - retry_delay_seconds=None, -) -def verificar_cadastros_individuais( - df: pd.DataFrame, - df_tratado: pd.DataFrame, -) -> None: - """Testa qualidade dos dados tratados de cadastros individuais. - - Exceções: - Levanta um erro da classe [`AssertionError`][] quando uma das condições - testadas não é considerada válida. - - [`AssertionError`]: https://docs.python.org/3/library/exceptions.html#AssertionError - """ - habilitar_suporte_loguru() - assert verificar_qtd_municipios(df, df_tratado) - assert verificar_diferenca_ctg_municpios(df, df_tratado) - assert verificar_diferenca_mun_betim(df, df_tratado) - assert verificar_qtd_uf(df, df_tratado) - assert verificar_diferenca_qtd_cadastros(df, df_tratado) - assert verificar_diferenca_ctg_cnes(df, df_tratado) - assert verificar_diferenca_ctg_ine(df, df_tratado) From 4077c56f4d39d011ecab5d5e11dac5619ec09ee1 Mon Sep 17 00:00:00 2001 From: WaltMath Date: Wed, 10 May 2023 07:39:51 -0300 Subject: [PATCH 8/8] =?UTF-8?q?Formata=20c=C3=B3digo=20baseado=20no=20padr?= =?UTF-8?q?=C3=A3o=20de=20esilo=20=20PEP8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/impulsoetl/scripts/impulso_previne.py | 38 ++++---- .../sisab/cadastros_individuais/extracao.py | 44 ++++----- .../sisab/cadastros_individuais/principal.py | 9 +- .../sisab/cadastros_individuais/tratamento.py | 93 ++++++++++--------- 4 files changed, 99 insertions(+), 85 deletions(-) diff --git a/src/impulsoetl/scripts/impulso_previne.py b/src/impulsoetl/scripts/impulso_previne.py index beb2be3..7037dba 100644 --- a/src/impulsoetl/scripts/impulso_previne.py +++ b/src/impulsoetl/scripts/impulso_previne.py @@ -11,20 +11,24 @@ from impulsoetl import __VERSION__ from impulsoetl.bd import Sessao, tabelas -from impulsoetl.loggers import habilitar_suporte_loguru, logger from impulsoetl.egestor.relatorio_financiamento.principal import ( obter_relatorio_financiamento, ) +from impulsoetl.loggers import habilitar_suporte_loguru, logger from impulsoetl.sisab.cadastros_individuais import obter_cadastros_individuais from impulsoetl.sisab.indicadores_municipios.principal import ( obter_indicadores_desempenho, ) from impulsoetl.sisab.parametros_cadastro.principal import obter_parametros +from impulsoetl.sisab.relatorio_saude_producao.principal import ( + obter_relatorio_producao_por_profissionais_reduzidos, +) +from impulsoetl.sisab.relatorio_saude_producao.principal_outros import ( + obter_relatorio_producao_por_profissionais_outros, +) from impulsoetl.sisab.relatorio_validacao_producao.principal import ( obter_validacao_producao, ) -from impulsoetl.sisab.relatorio_saude_producao.principal import obter_relatorio_producao_por_profissionais_reduzidos -from impulsoetl.sisab.relatorio_saude_producao.principal_outros import obter_relatorio_producao_por_profissionais_outros agendamentos = tabelas["configuracoes.capturas_agendamentos"] capturas_historico = tabelas["configuracoes.capturas_historico"] @@ -60,7 +64,6 @@ def cadastros_municipios_equipe_validas( ) for agendamento in agendamentos_cadastros: - obter_cadastros_individuais( sessao=sessao, visao_equipe=visao_equipe, @@ -725,6 +728,8 @@ def validacao_producao( conector.execute(requisicao_inserir_historico) sessao.commit() logger.info("OK.") + + @flow( name=("Rodar Agendamentos de Relatórios de Financiamento"), description=( @@ -798,8 +803,11 @@ def egestor_financiamento( sessao.commit() logger.info("OK.") + @flow( - name=("Rodar Agendamentos do Relatório de Produção do SISAB - Profissionais Reduzidos"), + name=( + "Rodar Agendamentos do Relatório de Produção do SISAB - Profissionais Reduzidos" + ), description=( "Lê as capturas agendadas para obter o Relatório de Produção de Saúde do SISAB " + "para todos os municípios, filtrados por Tipo de Equipe, Categoria Profissional," @@ -811,7 +819,6 @@ def egestor_financiamento( version=__VERSION__, validate_parameters=False, ) - def relatorio_producao_saude_profissionais_reduzidos( teste: bool = True, ) -> None: @@ -834,10 +841,10 @@ def relatorio_producao_saude_profissionais_reduzidos( tabela_destino=agendamento.tabela_destino, periodo_competencia=agendamento.periodo_data_inicio, periodo_id=agendamento.periodo_id, - unidade_geografica_id=agendamento.unidade_geografica_id + unidade_geografica_id=agendamento.unidade_geografica_id, ) - if teste: + if teste: sessao.rollback() break @@ -852,13 +859,14 @@ def relatorio_producao_saude_profissionais_reduzidos( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() - - logger.info("OK.") + logger.info("OK.") @flow( - name=("Rodar Agendamentos do Relatório de Produção do SISAB - Profissionais Outros"), + name=( + "Rodar Agendamentos do Relatório de Produção do SISAB - Profissionais Outros" + ), description=( "Lê as capturas agendadas para obter o Relatório de Produção de Saúde do SISAB " + "para todos os municípios, filtrados por Tipo de Equipe, Categoria Profissional," @@ -870,7 +878,6 @@ def relatorio_producao_saude_profissionais_reduzidos( version=__VERSION__, validate_parameters=False, ) - def relatorio_producao_saude_profissionais_outros( teste: bool = True, ) -> None: @@ -893,10 +900,10 @@ def relatorio_producao_saude_profissionais_outros( tabela_destino=agendamento.tabela_destino, periodo_competencia=agendamento.periodo_data_inicio, periodo_id=agendamento.periodo_id, - unidade_geografica_id=agendamento.unidade_geografica_id + unidade_geografica_id=agendamento.unidade_geografica_id, ) - if teste: + if teste: sessao.rollback() break @@ -911,6 +918,5 @@ def relatorio_producao_saude_profissionais_outros( conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() - - logger.info("OK.") + logger.info("OK.") diff --git a/src/impulsoetl/sisab/cadastros_individuais/extracao.py b/src/impulsoetl/sisab/cadastros_individuais/extracao.py index 4298d35..43548ff 100644 --- a/src/impulsoetl/sisab/cadastros_individuais/extracao.py +++ b/src/impulsoetl/sisab/cadastros_individuais/extracao.py @@ -12,7 +12,7 @@ import requests from prefect import task -from impulsoetl.loggers import logger,habilitar_suporte_loguru +from impulsoetl.loggers import habilitar_suporte_loguru, logger from impulsoetl.sisab.parametros_requisicao import head VISOES_EQUIPE_CODIGOS: Final[dict[str, str]] = { @@ -21,19 +21,19 @@ "equipes-validas": "|HM|NC|AQ|", } -def escapar_texto(visao_equipe:str): - return ( - urllib.parse.quote( + +def escapar_texto(visao_equipe: str): + return urllib.parse.quote( VISOES_EQUIPE_CODIGOS[visao_equipe], - ) ) -def adiciona_parametro_ponderacao(com_ponderacao:bool,payload): + +def adiciona_parametro_ponderacao(com_ponderacao: bool, payload): if com_ponderacao: payload = payload + ("&beneficiarios=on") else: payload - + return payload @@ -42,10 +42,7 @@ def extrair_requisicao( com_ponderacao: bool, competencia: date, ) -> str: - - url = ( - "https://sisab.saude.gov.br/paginas/acessoRestrito/relatorio/federal/indicadores/indicadorCadastro.xhtml" - ) + url = "https://sisab.saude.gov.br/paginas/acessoRestrito/relatorio/federal/indicadores/indicadorCadastro.xhtml" parametros_requisicaco = head(url) headers = parametros_requisicaco[0] @@ -63,9 +60,11 @@ def extrair_requisicao( + view_state + "&j_idt85=j_idt85" ) - - payload = adiciona_parametro_ponderacao(com_ponderacao=com_ponderacao,payload=payload) - + + payload = adiciona_parametro_ponderacao( + com_ponderacao=com_ponderacao, payload=payload + ) + response = requests.request( "POST", url, @@ -73,14 +72,14 @@ def extrair_requisicao( data=payload, timeout=120, ) - + return response.text -def definir_posicao_cabecalho( - visao_equipe:str, - com_ponderacao:bool, - ): +def definir_posicao_cabecalho( + visao_equipe: str, + com_ponderacao: bool, +): if not com_ponderacao: if visao_equipe == "todas-equipes": header = 6 @@ -94,6 +93,7 @@ def definir_posicao_cabecalho( return header + @task( name="Extrair Cadastros Individuais", description=( @@ -104,7 +104,6 @@ def definir_posicao_cabecalho( retries=2, retry_delay_seconds=120, ) - def extrair_cadastros_individuais( visao_equipe: str, com_ponderacao: bool, @@ -134,7 +133,9 @@ def extrair_cadastros_individuais( com_ponderacao=com_ponderacao, competencia=competencia, ) - header = definir_posicao_cabecalho(visao_equipe=visao_equipe,com_ponderacao=com_ponderacao) + header = definir_posicao_cabecalho( + visao_equipe=visao_equipe, com_ponderacao=com_ponderacao + ) try: df_extraido = pd.read_csv( StringIO(resposta), @@ -150,4 +151,3 @@ def extrair_cadastros_individuais( except pd.errors.ParserError: logger.error("Data da competência do relatório não está disponível") - diff --git a/src/impulsoetl/sisab/cadastros_individuais/principal.py b/src/impulsoetl/sisab/cadastros_individuais/principal.py index 96aebc5..b515c2d 100644 --- a/src/impulsoetl/sisab/cadastros_individuais/principal.py +++ b/src/impulsoetl/sisab/cadastros_individuais/principal.py @@ -10,7 +10,6 @@ from sqlalchemy.orm import Session from impulsoetl import __VERSION__ -from impulsoetl.bd import Sessao from impulsoetl.loggers import habilitar_suporte_loguru, logger from impulsoetl.sisab.cadastros_individuais.extracao import ( extrair_cadastros_individuais, @@ -18,6 +17,7 @@ from impulsoetl.sisab.cadastros_individuais.tratamento import tratar_dados from impulsoetl.utilitarios.bd import carregar_dataframe + @flow( name="Obter Cadastros Individuais", description=( @@ -37,7 +37,7 @@ def obter_cadastros_individuais( periodo_id: str, periodo_codigo: str, tabela_destino: str, - com_ponderacao: list[bool] = [False, True] + com_ponderacao: list[bool] = [False, True], ) -> None: """Extrai, transforma e carrega dados de cadastros de equipes pelo SISAB. @@ -63,7 +63,6 @@ def obter_cadastros_individuais( tempo_inicio_etl = time.time() for status_ponderacao in com_ponderacao: - logger.info("Iniciando extração dos dados...") df_extraido = extrair_cadastros_individuais( visao_equipe=visao_equipe, @@ -84,7 +83,7 @@ def obter_cadastros_individuais( logger.info("Iniciando carga dos dados no banco...") carregar_dataframe( - sessao=sessao, df=df_tratado, tabela_destino=tabela_destino + sessao=sessao, df=df_tratado, tabela_destino=tabela_destino ) logger.info("Carga dos dados no banco realizada...") @@ -95,5 +94,5 @@ def obter_cadastros_individuais( + "em {tempo_final_etl}.", tabela_nome=tabela_destino, periodo_codigo=periodo_codigo, - tempo_final_etl=tempo_final_etl + tempo_final_etl=tempo_final_etl, ) diff --git a/src/impulsoetl/sisab/cadastros_individuais/tratamento.py b/src/impulsoetl/sisab/cadastros_individuais/tratamento.py index 2bcc6ab..e9c3f77 100644 --- a/src/impulsoetl/sisab/cadastros_individuais/tratamento.py +++ b/src/impulsoetl/sisab/cadastros_individuais/tratamento.py @@ -4,13 +4,13 @@ from typing import Final -from frozendict import frozendict import pandas as pd +from frozendict import frozendict from prefect import task from sqlalchemy.orm import Session from impulsoetl.comum.geografias import id_sus_para_id_impulso -from impulsoetl.loggers import logger,habilitar_suporte_loguru +from impulsoetl.loggers import habilitar_suporte_loguru, logger CADASTROS_COLUNAS_TIPOS: Final[frozendict] = frozendict( { @@ -33,56 +33,61 @@ "INE": "equipe_id_ine", } + def renomear_colunas( - df_extraido:pd.DataFrame, - ): - return df_extraido.rename(columns=CADASTROS_COLUNAS).rename(columns={df_extraido.columns[7]: 'quantidade'}) + df_extraido: pd.DataFrame, +): + return df_extraido.rename(columns=CADASTROS_COLUNAS).rename( + columns={df_extraido.columns[7]: "quantidade"} + ) + def excluir_colunas( - df_tratado:pd.DataFrame, - ): - - return (df_tratado.drop( - ["Uf", "Municipio","Sigla da equipe", "Unnamed: 8"], - axis=1 - ).dropna()) + df_tratado: pd.DataFrame, +): + return df_tratado.drop( + ["Uf", "Municipio", "Sigla da equipe", "Unnamed: 8"], axis=1 + ).dropna() def garantir_tipos_colunas( - df_tratado:pd.DataFrame, - ): - + df_tratado: pd.DataFrame, +): return df_tratado.astype(CADASTROS_COLUNAS_TIPOS) + def definir_coluna_criterio_pontuacao( - df_tratado:pd.DataFrame, - com_ponderacao:bool, - ): + df_tratado: pd.DataFrame, + com_ponderacao: bool, +): + return df_tratado.insert( + 1, "criterio_pontuacao", com_ponderacao, allow_duplicates=True + ) - return df_tratado.insert(1, "criterio_pontuacao", com_ponderacao, allow_duplicates = True) def definir_coluna_periodo_codigo( - df_tratado:pd.DataFrame, - periodo_codigo:str, - ): + df_tratado: pd.DataFrame, + periodo_codigo: str, +): + return df_tratado.insert( + 1, "periodo_codigo", periodo_codigo, allow_duplicates=True + ) - return df_tratado.insert(1, "periodo_codigo", periodo_codigo, allow_duplicates = True) def definir_coluna_periodo_id( - df_tratado:pd.DataFrame, - periodo_id:str, - ): + df_tratado: pd.DataFrame, + periodo_id: str, +): + return df_tratado.insert( + 1, "periodo_id", periodo_id, allow_duplicates=True + ) - return df_tratado.insert(1, "periodo_id", periodo_id, allow_duplicates = True) def definir_coluna_unidade_geografica_id( - df_tratado:pd.DataFrame, - sessao:Session, - ): - - df_tratado["unidade_geografica_id"] = df_tratado[ - "municipio_id_sus" - ].apply( + df_tratado: pd.DataFrame, + sessao: Session, +): + df_tratado["unidade_geografica_id"] = df_tratado["municipio_id_sus"].apply( lambda municipio_id_sus: id_sus_para_id_impulso( sessao=sessao, id_sus=municipio_id_sus, @@ -90,6 +95,7 @@ def definir_coluna_unidade_geografica_id( ) return df_tratado + @task( name="Transformar Cadastros Individuais", description=( @@ -101,13 +107,12 @@ def definir_coluna_unidade_geografica_id( retries=0, retry_delay_seconds=None, ) - def tratar_dados( sessao: Session, df_extraido: pd.DataFrame, com_ponderacao: bool, - periodo_id:str, - periodo_codigo:str, + periodo_id: str, + periodo_codigo: str, ) -> pd.DataFrame: """Inclui todas etapas de transformação dos dados de cadastros de equipes pelo SISAB. @@ -128,7 +133,7 @@ def tratar_dados( """ habilitar_suporte_loguru() - + logger.info("Renomeando colunas da tabela...") print(df_extraido) df_tratado = renomear_colunas(df_extraido=df_extraido) @@ -137,10 +142,14 @@ def tratar_dados( df_tratado = excluir_colunas(df_tratado=df_tratado) logger.info("Ennquicimento de tabela com novas colunas...") - definir_coluna_criterio_pontuacao(df_tratado=df_tratado,com_ponderacao=com_ponderacao) - definir_coluna_periodo_codigo(df_tratado=df_tratado,periodo_codigo=periodo_codigo) - definir_coluna_periodo_id(df_tratado=df_tratado,periodo_id=periodo_id) - definir_coluna_unidade_geografica_id(df_tratado=df_tratado,sessao=sessao) + definir_coluna_criterio_pontuacao( + df_tratado=df_tratado, com_ponderacao=com_ponderacao + ) + definir_coluna_periodo_codigo( + df_tratado=df_tratado, periodo_codigo=periodo_codigo + ) + definir_coluna_periodo_id(df_tratado=df_tratado, periodo_id=periodo_id) + definir_coluna_unidade_geografica_id(df_tratado=df_tratado, sessao=sessao) logger.info("Garantindo tipagem dos dados...") garantir_tipos_colunas(df_tratado=df_tratado)