# Extração de dado - BI_RADS

## Objetivo
- Extrair BI-RADIS 0,1,2,3,4,5,6 e sem birads.
- Salvar **lista de ativação**: considera apenas BI_RADS 1, 2 e 3 e tem com objetivo identificar pacientes com exames em atraso para envio de notificação.

## Resumo do Código SQL/PySpark

Este código SQL, executado via PySpark (`spark.sql`), tem como objetivo principal processar dados de laudos médicos de mamografia para extrair, padronizar e enriquecer informações relacionadas à classificação BIRADS, combinando-as com outros dados do paciente e informações de retorno elegível.

### Tópicos Principais:

#### Tabela e filtros:
*   A consulta começa selecionando dados de uma tabela de laudos médicos (`refined.saude_preventiva.fleury_laudos`), que contém informações detalhadas sobre exames e enriququece com dados de retorno da tabela `refined.saude_preventiva.fleury_retorno_elegivel_ficha`.
*   Coluna com informações de BIRADS: `laudo_tratado` (texto do laudo médico).
*   Filtros:
    *   linha_cuidado = 'mama'
    *   sigla_exame IN ('MAMOG', 'MAMOGDIG', 'MAMOPROT', 'MAMOG3D')
    *   **where_clause:** _datestamp (`refined.saude_preventiva.fleury_laudos`) >= _datestamp (`refined.saude_preventiva.fleury_laudos_mama_birads`)
    *   **filtro_ativacao:**
        *    eleg.ficha IS NULL
        *    brd.BIRADS IN (1, 2, 3)
        *    flr.sigla_exame IN ('MAMOG', 'MAMOGDIG', 'MAMOPROT', 'MAMOG3D')
        *    UPPER(flr.sexo_cliente) = 'F'
        *    idade_cliente >= 40 AND idade_cliente < 76

#### Extração e Padronização BIRADS (`base` CTE):
*   **Limpeza de Texto:** Remove caracteres indesejados (`-`, `:`, `®`, `\xa0`) e converte o texto do laudo para maiúsculas.
*   **Extração de Conteúdo:** Utiliza `REGEXP_EXTRACT` para isolar seções relevantes do laudo (Avaliação, Conclusão, Impressão, Opinião).
*   **Extração de BIRADS Bruto:** Aplica `REGEXP_EXTRACT_ALL` para encontrar todos os valores BIRADS (numéricos ou romanos como I-VI) dentro do texto extraído.
*   **Categorização BIRADS (`CAT_BIRADS`):** Transforma os valores BIRADS brutos em inteiros padronizados (1 a 6), tratando algarismos romanos e filtrando valores inválidos.

#### Análise de BIRADS Extraído (`dados_birads` CTE):
*   Calcula o valor **mínimo (`MIN_BIRADS`)** e **máximo (`MAX_BIRADS`)** das categorias BIRADS encontradas em um laudo.
*   Identifica o **último valor BIRADS (`BIRADS`)** presente no array de categorias.

*   **Seleção de Dados de Laudos (`dados_laudos` CTE):**
*   Seleciona diversas colunas de identificação e características dos laudos (`linha_cuidado`, `id_unidade`, `ficha`, `sigla_exame`, etc.).
*   Calcula a **idade do cliente (`idade_cliente`)** com base na data de nascimento e na data atual.
*   Inclui um placeholder `{where_clause}` para filtros dinâmicos na seleção dos laudos.

#### Consulta Principal (`SELECT` final):
*   **Junção de Dados:** Realiza um `INNER JOIN` entre os dados dos laudos (`dados_laudos`) e as informações BIRADS processadas (`dados_birads`) usando `ficha`, `id_item` e `id_subitem` como chaves.
*   **Enriquecimento com Retorno Elegível:** Executa um `LEFT JOIN` com a tabela `refined.saude_preventiva.fleury_retorno_elegivel_ficha` para adicionar detalhes sobre possíveis retornos ou acompanhamentos elegíveis para o paciente.
*   **Seleção de Colunas:** Seleciona todas as colunas dos laudos (exceto `idade_cliente`), as colunas BIRADS (`MIN_BIRADS`, `MAX_BIRADS`, `BIRADS`), e as colunas de retorno elegível.
*   Inclui um placeholder `{filtro_ativacao}` para filtros dinâmicos adicionais.

#### Fluxo de Execução do Notebook

O notebook segue o seguinte fluxo de processamento:

1.  **Definição de Variáveis e Filtros:** `table_birads`, `table_birads_ativacao`, `where_clause` (com lógica incremental) e `filtro_ativacao` são definidos.
2.  **Execução da Query SQL Principal:**
    *   A `query` SQL é executada duas vezes usando `spark.sql`:
        *   `df_spk`: Gerado com a `where_clause` incremental e `filtro_ativacao` vazio. Contém todos os laudos de mamografia com BIRADS extraídos e enriquecidos.
        *   `df_spk_ativacao`: Gerado com `where_clause` vazio e o `filtro_ativacao` específico para a lista de ativação.
3.  **Transformações nos DataFrames:**
    *   A função `transform_fields` é aplicada a `df_spk` e `df_spk_ativacao`. Esta função adiciona colunas como `retorno_cliente`, `dth_previsao_retorno` e `dias_ate_retorno`.
4.  **Desduplicação da Lista de Ativação:**
    *   `df_spk_ativacao` é desduplicado pela coluna `ficha` (`dropDuplicates(['ficha'])`) para garantir uma única entrada por paciente para fins de notificação.
5.  **Persistência dos Dados:**
    *   `save_data(df_spk, table_birads)`: Salva o DataFrame completo de BIRADS na tabela `table_birads`, realizando um `MERGE` se a tabela já existir (para atualizações incrementais) ou um `INSERT` se for a primeira vez.
    *   `insert_data(df_spk_ativacao, table_birads_ativacao)`: Salva a lista de ativação na tabela `table_birads_ativacao`, sempre sobrescrevendo o conteúdo existente para refletir a lista mais recente de pacientes elegíveis para notificação.


In [None]:
!pip install --extra-index-url https://almir_martins:glpat-s01_R5xw79N_6syRa5Tz8G86MQp1OmhweWc0Cw.01.12091y3ha@gitlab.com/api/v4/projects/65902035/packages/pypi/simple octoops

Looking in indexes: https://pypi.org/simple, https://almir_martins:****@gitlab.com/api/v4/projects/65902035/packages/pypi/simple


ERROR: Could not find a version that satisfies the requirement octoops (from versions: none)
ERROR: No matching distribution found for octoops


In [1]:
pip show octoops

Note: you may need to restart the kernel to use updated packages.




In [2]:
%pip install octoops

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement octoops (from versions: none)
ERROR: No matching distribution found for octoops


In [2]:
%pip install octoops==0.21.0

ERROR: Could not find a version that satisfies the requirement octoops==0.21.0 (from versions: none)
ERROR: No matching distribution found for octoops==0.21.0


In [0]:
dbutils.library.restartPython()

In [None]:
from pyspark.sql.functions import col, year, month, dayofmonth, when, lit, expr, to_timestamp
from pyspark.sql.types import DateType
from pyspark.sql import DataFrame
from pyspark.sql.functions import datediff, to_date
from datetime import datetime
import logging
import sys
import traceback
from octoops import OctoOps
from octoops import Sentinel

In [0]:
logger = logging.getLogger(__name__)

In [0]:
# Tabela com todos os BIRADS extraídos dos laudos de mamografia
table_birads = "refined.saude_preventiva.fleury_laudos_mama_birads"

# Somente os laudos que estão dentro das regras de negócio para ativação (BIRADS 1, 2 e 3)
table_birads_ativacao = "refined.saude_preventiva.fleury_laudos_mama_birads_ativacao"
where_clause = ""
 
# datestamp => data em que recebemos os dados na plataforma
# Pegar da tabela de laudos
if spark.catalog.tableExists(table_birads):
    where_clause = f"""
    WHERE
        flr._datestamp >= (
            SELECT MAX(brd._datestamp)
            FROM {table_birads} brd
        )
    """
# Regra de negócio para ativação: BIRADS 1, 2 e 3, mulheres entre 40 e 75 anos, sem ficha ativa 
filtro_ativacao = """
    WHERE
        eleg.ficha IS NULL
        AND brd.BIRADS IN (1, 2, 3)
        AND flr.sigla_exame IN ('MAMOG', 'MAMOGDIG', 'MAMOPROT', 'MAMOG3D')
        AND UPPER(flr.sexo_cliente) = 'F'
        AND (
            idade_cliente >= 40 AND idade_cliente < 76
        )
"""

## Filtro de ativação - Estevão

Inserir as alterações abaixo na variável `filtro_ativacao` do código SQL principal.

**Filtro Estevão na íntegra:**
```python
AND flr.ficha_retorno_elegivel IS NULL
AND atv.BIRADS IN (1, 2, 3)
AND UPPER(flr.marca) ILIKE ANY(
    'AMAIS - BA',
    'AMAIS - PE',
    'AMAIS - SP',
    'Felippe Mattoso',
    'FLEURY',
    'IR',
    'LABS AMAIS'
)
AND (
    TIMESTAMPDIFF(DAY, flr.dth_nascimento_cliente, CURDATE()) / 365.25 >= 40
    AND TIMESTAMPDIFF(DAY, flr.dth_nascimento_cliente, CURDATE()) / 365.25 < 76
)
AND UPPER(flr.sexo_cliente) = 'F
```

**Filtro Estevão editado:**
```python
AND UPPER(flr.marca) ILIKE ANY(
    'AMAIS - BA',
    'AMAIS - PE',
    'AMAIS - SP',
    'Felippe Mattoso',
    'FLEURY',
    'IR',
    'LABS AMAIS'
)
```

In [0]:
query = """
WITH base AS (
    SELECT
        flr.ficha,
        flr.id_item,
        flr.id_subitem,
        REGEXP_EXTRACT_ALL(
            REGEXP_EXTRACT(
                REGEXP_REPLACE(UPPER(flr.laudo_tratado), r'[-:®]|\xa0', ''),
                r'(?mi)(AVALIA[CÇ][AÃ]O|CONCLUS[AÃ]O|IMPRESS[AÃ]O|OPINI[AÃ]O)?(.*)', 2
            ),
            r"(?mi)(BIRADS|CATEGORIA|CAT\W)\s*(\d+\w*|VI|V|IV|III|II|I)\W*\w?(BIRADS|CATEGORIA)?(\W|$)", 2
        ) AS RAW_BIRADS,
        FILTER(
            TRANSFORM(RAW_BIRADS, x ->
                CASE
                    WHEN x = "I" THEN 1
                    WHEN x = "II" THEN 2
                    WHEN x = "III" THEN 3
                    WHEN x = "IV" THEN 4
                    WHEN x = "V" THEN 5
                    WHEN x = "VI" THEN 6
                    WHEN TRY_CAST(x AS INT) > 6 THEN NULL
                    ELSE REGEXP_REPLACE(x, r'[^0-9]', '')
                END
            ), x -> x IS NOT NULL
        ) AS CAT_BIRADS
    FROM refined.saude_preventiva.fleury_laudos flr
    WHERE
        flr.linha_cuidado = 'mama'
        AND flr.sigla_exame IN ('MAMOG', 'MAMOGDIG', 'MAMOPROT', 'MAMOG3D')
),
 
dados_birads AS (
    SELECT
        *,
        ARRAY_MIN(CAT_BIRADS) AS MIN_BIRADS,
        ARRAY_MAX(CAT_BIRADS) AS MAX_BIRADS,
        TRY_ELEMENT_AT(CAST(CAT_BIRADS AS ARRAY<INT>), -1) AS BIRADS
    FROM base
),
 
dados_laudos AS (
    SELECT
        flr.linha_cuidado,
        flr.id_unidade,
        flr.id_ficha,
        flr.id_item,
        flr.id_subitem,
        flr.ficha,
        flr.id_exame,
        flr.id_cliente,
        flr.pefi_cliente,
        flr.sigla_exame,
        flr.id_marca,
        flr.marca,
        (
          TIMESTAMPDIFF(DAY, flr.dth_nascimento_cliente, CURDATE()) / 365.25
        ) AS idade_cliente,
        flr.sexo_cliente,
        flr.dth_pedido,
        flr._datestamp
    FROM refined.saude_preventiva.fleury_laudos flr
    {where_clause}
)
 
SELECT
    flr.* except(idade_cliente),
    brd.MIN_BIRADS,
    brd.MAX_BIRADS,
    brd.BIRADS,
 
    eleg.dth_pedido        AS dth_pedido_retorno_elegivel,
    eleg.ficha             AS ficha_retorno_elegivel,
    eleg.siglas_ficha      AS siglas_ficha_retorno_elegivel,
    eleg.marca             AS marca_retorno_elegivel,
    eleg.unidade           AS unidade_retorno_elegivel,
    eleg.convenio          AS convenio_retorno_elegivel,
    eleg.valores_exame     AS valores_exame_retorno_elegivel,
    eleg.valores_ficha     AS valores_ficha_retorno_elegivel,
    eleg.qtd_exame         AS qtd_exame_retorno_elegivel,
    eleg.secao             AS secao_retorno_elegivel,
    eleg.dias_entre_ficha  AS dias_entre_ficha_elegivel
FROM dados_laudos flr
INNER JOIN dados_birads brd
    ON flr.ficha = brd.ficha
    AND flr.id_item = brd.id_item
    AND flr.id_subitem = brd.id_subitem
LEFT JOIN refined.saude_preventiva.fleury_retorno_elegivel_ficha eleg
    ON eleg.ficha_origem = flr.ficha
    AND eleg.id_cliente = flr.id_cliente
    AND eleg.linha_cuidado = flr.linha_cuidado
{filtro_ativacao}
"""

# Executa a consulta SQL com o filtro de _datestamp e sem o filtro de ativação
df_spk = spark.sql(query.format(
    where_clause = where_clause,
    filtro_ativacao = ""
    )
)
 
# Executa a consulta SQL sem o filtro de _datestamp e com o filtro de ativação 
df_spk_ativacao = spark.sql(query.format(
    where_clause = "",
    filtro_ativacao = filtro_ativacao
    )
)

In [0]:
def calcular_data_prevista(df_spk: DataFrame):
    """
    Adiciona uma coluna 'dth_previsao_retorno' ao DataFrame com base na coluna 'BIRADS'.

    - Para BIRADS 1 ou 2, adiciona 360 dias à data da coluna 'dth_pedido'.
    - Para BIRADS 3, adiciona 180 dias à data da coluna 'dth_pedido'.
    - Para outros valores de BIRADS, define 'dth_previsao_retorno' como None.

    Parâmetros:
    df_spk (DataFrame): O DataFrame Spark contendo os dados de entrada.

    Retorna:
    DataFrame: O DataFrame atualizado com a nova coluna 'dth_previsao_retorno'.
    """
    df_spk = df_spk.withColumn(
        'dth_previsao_retorno',
        when(
            col('BIRADS').isin([1, 2]),
            expr("date_add(dth_pedido, 360)")
        ).when(
            col('BIRADS') == 3,
            expr("date_add(dth_pedido, 180)")  
        ).otherwise(None)
    )
    return df_spk

In [0]:
def transform_fields(df_spk: DataFrame) -> DataFrame:
    """
    Transforma os campos do DataFrame fornecido.

    - Verifica se o DataFrame está vazio. Se estiver, registra um aviso e retorna o DataFrame sem alterações.
    - Adiciona uma coluna 'retorno_cliente' com valores baseados na coluna 'BIRADS':
      - 12 meses para BIRADS 1 ou 2
      - 6 meses para BIRADS 3
      - 0 para outros valores
    - Calcula a data prevista de retorno usando a função 'calcular_data_prevista'.
    - Converte a coluna 'dth_pedido_retorno_elegivel' para o tipo timestamp.
    - Converte a coluna 'dth_previsao_retorno' para o tipo timestamp.
    - Calcula a diferença em dias entre 'dth_previsao_retorno' e 'dth_pedido', armazenando o resultado na coluna 'dias_ate_retorno'.

    Parâmetros:
    df_spk (DataFrame): O DataFrame a ser transformado.

    Retorna:
    DataFrame: O DataFrame transformado com as novas colunas.
    """
    if df_spk.isEmpty():
        logger.warning("No Data Found!")
        return df_spk
    
    df_spk = df_spk.withColumn(
        'retorno_cliente',
        when(col('BIRADS').isin([1, 2]), 12).when(col('BIRADS') == 3, 6).otherwise(0)
    )

    df_spk = calcular_data_prevista(df_spk)
    df_spk = df_spk.withColumn('dth_pedido_retorno_elegivel', to_timestamp(col('dth_pedido_retorno_elegivel')))
    df_spk = df_spk.withColumn('dth_previsao_retorno', to_timestamp(col('dth_previsao_retorno')))
    df_spk = df_spk.withColumn('dias_ate_retorno', datediff(to_date(col('dth_previsao_retorno')), to_date(col('dth_pedido'))))
    return df_spk

In [0]:
WEBHOOK_DS_AI_BUSINESS_STG = 'prd'

def error_message(e):
    """
    Envia alerta para o Sentinel e exibe o traceback em caso de erro ao salvar dados.

    Parâmetros:
        e (Exception): Exceção capturada.

    Comportamento:
        - Formata o traceback do erro.
        - Envia alerta para o Sentinel com detalhes do erro.
        - Exibe o traceback no console.
        - Relança a exceção.
    """
    error_message = traceback.format_exc()
    summary_message = f"""Erro ao salvar dados.\n{error_message}"""
    sentinela_ds_ai_business = Sentinel(
        project_name='Monitor_Linhas_Cuidado_Mama',
        env_type=WEBHOOK_DS_AI_BUSINESS_STG,
        task_title='Fleury Mamografia'
    )
    sentinela_ds_ai_business.alerta_sentinela(
        categoria='Alerta', 
        mensagem=summary_message,
        job_id_descritivo='1_fleury_mama_birads'
    )
    traceback.print_exc()
    raise e

In [0]:
def insert_data(df_spk: DataFrame, table_name: str, serializable: bool = False):
    """
    Insere os dados do DataFrame em uma tabela Delta no Databricks.
    
    Parâmetros:
        df_spk (DataFrame): DataFrame Spark a ser salvo.
        table_name (str): Nome da tabela destino.
        serializable (bool): Define o nível de isolamento da transação Delta. 
            Se True, usa 'Serializable', caso contrário 'WriteSerializable'.
    
    Comportamento:
        - Escreve o DataFrame na tabela Delta especificada, sobrescrevendo dados existentes.
        - Atualiza o schema da tabela conforme necessário.
        - Define o nível de isolamento da transação Delta.
    """
    try:
        logger.info(f"Inserting Data: {table_name}")
        isolation_level = 'Serializable' if serializable else 'WriteSerializable'
        (
            df_spk.write
                .format('delta')
                .option('overwriteSchema', 'true')
                .option('delta.isolationLevel', isolation_level)
                .mode('overwrite')
                .saveAsTable(table_name)
        )
    except Exception as e:
        error_message(e)
 
def merge_data(df_spk: DataFrame, table_name: str):
    """
    Realiza um merge (upsert) dos dados do DataFrame em uma tabela Delta existente.
    
    Parâmetros:
        df_spk (DataFrame): DataFrame Spark a ser mesclado.
        table_name (str): Nome da tabela destino.
    
    Comportamento:
        - Cria uma view temporária com os dados do DataFrame.
        - Executa um comando MERGE SQL para atualizar registros existentes (com base em ficha, id_item e id_subitem)
          ou inserir novos registros na tabela Delta.
    """
    try:
        logger.info(f"Merging Data: {table_name}")
        df_spk.createOrReplaceTempView(f"increment_birads")
        merge_query = f"""
            MERGE INTO {table_name} AS target
            USING increment_birads AS source
                ON target.ficha = source.ficha
                AND target.id_item = source.id_item
                AND target.id_subitem = source.id_subitem
            WHEN MATCHED THEN
                UPDATE SET *
            WHEN NOT MATCHED THEN
                INSERT *
        """
        spark.sql(merge_query)
    except Exception as e:
        error_message(e)
 
def save_data(df_spk: DataFrame, table_name: str, serializable: bool = False):
    """
    Salva os dados do DataFrame em uma tabela Delta, realizando merge se a tabela já existir.
    
    Parâmetros:
        df_spk (DataFrame): DataFrame Spark a ser salvo.
        table_name (str): Nome da tabela destino.
        serializable (bool): Define o nível de isolamento da transação Delta na inserção inicial.
    
    Comportamento:
        - Se o DataFrame estiver vazio, não faz nada.
        - Se a tabela já existe, realiza merge (upsert) dos dados.
        - Se a tabela não existe, insere os dados criando a tabela.
    """
    if df_spk.isEmpty():
        return None
 
    if spark.catalog.tableExists(table_name):
        merge_data(df_spk, table_name)
    else:
        insert_data(df_spk, table_name, serializable)

In [0]:
# Aplicar transformações nos dfs
df_spk = transform_fields(df_spk)
df_spk_ativacao = transform_fields(df_spk_ativacao)

In [0]:
# Excluir duplicados para considerar apenas a ficha na ativação e não os exames (itens). Assim vamos enviar apenas 
# 1 push por ficha
df_spk_ativacao = df_spk_ativacao.dropDuplicates(['ficha'])

In [0]:
print('quantidade de laudos salvos na tabela',df_spk.count())
print('quantidade de laudos salvos na tabela de ativação', df_spk_ativacao.count())

In [0]:
# - Se o DataFrame estiver vazio, não faz nada.
# - Se a tabela já existe, realiza merge (upsert) dos dados.
# - Se a tabela não existe, insere os dados criando a tabela.
save_data(df_spk, table_birads)

# - Escreve o DataFrame na tabela Delta especificada, sobrescrevendo dados existentes.
# - Atualiza o schema da tabela conforme necessário.
# - Define o nível de isolamento da transação Delta.
insert_data(df_spk_ativacao, table_birads_ativacao)