# Configuração do ambiente e carregamento do catálogo

**Lembre-se de baixar o arquivo json que garante acesso ao google storage**
 - monitor-rosa-escrita.json: acesso para criação de tabelas;
 - monitor-rosa-leitura.json: acesso para analises e consultas.

**Se o seu usuário possui acesso ao drive compartilhado, remova os comentários a seguir**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Escolha do ambiente
São disponibilizadas duas opções de execução (dependendo do arquivo de credencial utilizado)
 - gcp-leitura.json: disponível para analises e usuários que não possuem permissão de escrita;
 - gcp-escrita.json: disponível para atualização de bases (testes de desenvolvimento ou ambiente de produção).

In [2]:
import os
if os.path.isfile('/content/monitor-rosa-leitura.json'):
    datalake_mode = 'leitura'
    %env SERVICE_ACCOUNT_USER=acesso-leitura@monitor-rosa.iam.gserviceaccount.com
    %env SERVICE_ACCOUNT_JSON=/content/monitor-rosa-leitura.json
elif os.path.isfile('/content/monitor-rosa-escrita.json'):
    datalake_mode = 'escrita'
    %env SERVICE_ACCOUNT_USER=acesso-escrita@monitor-rosa.iam.gserviceaccount.com
    %env SERVICE_ACCOUNT_JSON=/content/monitor-rosa-escrita.json
else:
    assert(os.path.isdir('/content/drive/Shareddrives/monitor-rosa-gold') == True)
    datalake_mode = 'shared_drive'
    %env SERVICE_ACCOUNT_USER=''
    %env SERVICE_ACCOUNT_JSON=''
datalake_mode

env: SERVICE_ACCOUNT_USER=''
env: SERVICE_ACCOUNT_JSON=''


'shared_drive'

In [3]:
!rm -r sus-kpis-analysis
!git clone https://github.com/heber-augusto/sus-kpis-analysis.git

rm: cannot remove 'sus-kpis-analysis': No such file or directory
Cloning into 'sus-kpis-analysis'...
remote: Enumerating objects: 2099, done.[K
remote: Counting objects: 100% (127/127), done.[K
remote: Compressing objects: 100% (112/112), done.[K
remote: Total 2099 (delta 77), reused 15 (delta 15), pack-reused 1972 (from 2)[K
Receiving objects: 100% (2099/2099), 4.92 MiB | 23.65 MiB/s, done.
Resolving deltas: 100% (1036/1036), done.


## Instalação de libs Python, inicialização de variáveis de ambiente e configuração/instalação do Spark

In [4]:
!pip install -r /content/sus-kpis-analysis/sia/etls/requirements.txt

%env PYTHONHASHSEED=1234
%env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
%env SPARK_HOME=/content/spark-3.5.5-bin-hadoop3
%env SPARK_VERSION=3.5.5

!source /content/sus-kpis-analysis/sia/etls/bin/setup_spark_env.sh '/content/'

Collecting findspark (from -r /content/sus-kpis-analysis/sia/etls/requirements.txt (line 1))
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Collecting pyspark==3.5.5 (from -r /content/sus-kpis-analysis/sia/etls/requirements.txt (line 2))
  Downloading pyspark-3.5.5.tar.gz (317.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.2/317.2 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting imagehash (from -r /content/sus-kpis-analysis/sia/etls/requirements.txt (line 3))
  Downloading ImageHash-4.3.2-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting delta-spark==3.3.1 (from -r /content/sus-kpis-analysis/sia/etls/requirements.txt (line 4))
  Downloading delta_spark-3.3.1-py3-none-any.whl.metadata (1.9 kB)
Downloading delta_spark-3.3.1-py3-none-any.whl (21 kB)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Downloading ImageHash-4.3.2-py2.py3-none-any.whl (296 kB)
[2K   [

In [5]:
if datalake_mode != 'shared_drive':
    %env XDG_CONFIG_HOME=/content/datalake
    !source /content/sus-kpis-analysis/sia/etls/bin/install-google-drive-ocamlfuse.sh
    !source /content/sus-kpis-analysis/sia/etls/bin/mount_google_drive_v2.sh '/content/datalake' $SERVICE_ACCOUNT_USER '0ABIY-a4qrdY9Uk9PVA' 'monitor-rosa-bronze' $SERVICE_ACCOUNT_JSON '/content'
    !source /content/sus-kpis-analysis/sia/etls/bin/mount_google_drive_v2.sh '/content/datalake' $SERVICE_ACCOUNT_USER '0ALl0owLNr53oUk9PVA' 'monitor-rosa-silver' $SERVICE_ACCOUNT_JSON '/content'
    !source /content/sus-kpis-analysis/sia/etls/bin/mount_google_drive_v2.sh '/content/datalake' $SERVICE_ACCOUNT_USER '0AMHp9pBeLvZiUk9PVA' 'monitor-rosa-gold' $SERVICE_ACCOUNT_JSON '/content'


## Inicializa variáveis de acesso ao delta lake criado no google storage

- Local do arquivo de credencial do storage, diretorio do warehouse e path do spark:

> O arquivo json_file_name deve ser enviado para o ambiente e deve ser utilizado um com as devidas permissões (em caso de escrita)

> O caminho do warehouse pode ser alterado em caso de testes de escritas locais.

> O caminho do spark é setado pelo script de configuração

In [6]:
import os

lake_prefix = "temp-output"

if datalake_mode in ('leitura','shared_drive',):
    warehouse_dir = f"/content/datalake/{lake_prefix}/"

if datalake_mode == 'escrita':
    warehouse_dir = f"/content/datalake/"

spark_path = os.getenv('SPARK_HOME')
spark_path

'/content/spark-3.5.5-bin-hadoop3'

## Inclusão da pasta do repositório no python path

Procedimento permite que funções e classes presentes no repositório sejam utilizadas

In [7]:
import sys
sys.path.append('/content/sus-kpis-analysis')
sys.path

['/content',
 '/env/python',
 '/usr/lib/python311.zip',
 '/usr/lib/python3.11',
 '/usr/lib/python3.11/lib-dynload',
 '',
 '/usr/local/lib/python3.11/dist-packages',
 '/usr/lib/python3/dist-packages',
 '/usr/local/lib/python3.11/dist-packages/IPython/extensions',
 '/root/.ipython',
 '/content/sus-kpis-analysis']

## Importação de funções utilizadas pelo código

In [8]:
from sia.etls.lib.catalog_loader import DeltaLakeDatabaseFsCreator, load_entire_catalog_fs, load_entire_catalog_fs_v2
from sia.etls.lib.table_utilities import vacuum_tables_from_database, table_exists
from sia.etls.lib.fs_spark_session import create_fs_spark_session
from sia.etls.lib.bronze_files_utilities import get_pending_files_from_bronze
from sia.etls.lib.delta_table_creators import ParquetToDelta

## Cria Sessão Spark conectada ao Delta Lake presente no Google Storage

In [9]:
spark = create_fs_spark_session(
    warehouse_dir=warehouse_dir,
    spark_path=spark_path
)


## Refresh do catálogo para utilizar consultas

In [10]:
zone_names = ['monitor-rosa-bronze', 'monitor-rosa-silver', 'monitor-rosa-gold']
zone_names = ['monitor-rosa-silver', 'monitor-rosa-gold']
if datalake_mode in ('leitura', 'escrita'):
    zone_paths = [f'/content/datalake/{zone_name}/databases' for zone_name in zone_names]
else:
    zone_paths = [f'/content/drive/Shareddrives/{zone_name}/databases' for zone_name in zone_names]


# carrega catalogo de banco de dados, na zona bronze
database_filter = None #['cnes_bronze.db',]
table_filter = None #['cnes_bronze.sr',]

for databases_path in zone_paths:
    load_entire_catalog_fs_v2(
        spark_session = spark,
        databases_path = databases_path,
        use_db_folder_path=(datalake_mode == 'escrita'),
        database_filter=database_filter,
        table_filter=table_filter
    )

['cancer_data.db', 'ibge_silver.db', 'cancer_mama_silver.db']
Banco de dados cancer_data criado.
listando conteúdos do caminho /content/drive/Shareddrives/monitor-rosa-silver/databases e database cancer_data
prefix: /content/drive/Shareddrives/monitor-rosa-silver/databases/cancer_data.db/
table_list: ['aq_filtered', 'ar_filtered', 'dados_estados_mensal', 'dados_municipios_mensal', 'pacientes', 'procedimentos', 'procedimentos_e_pacientes', 'demografia_municipios', 'cadastro_municipios']
Tabela aq_filtered criada
Tabela aq_filtered criada com comando CREATE TABLE IF NOT EXISTS cancer_data.aq_filtered USING delta LOCATION '/content/drive/Shareddrives/monitor-rosa-silver/databases/cancer_data.db/aq_filtered'
Tabela ar_filtered criada
Tabela ar_filtered criada com comando CREATE TABLE IF NOT EXISTS cancer_data.ar_filtered USING delta LOCATION '/content/drive/Shareddrives/monitor-rosa-silver/databases/cancer_data.db/ar_filtered'
Tabela dados_estados_mensal criada
Tabela dados_estados_mensal 

# Exemplos de consultas

## Listagem de bancos e tabelas

In [11]:
databases = spark.sql(f"SHOW DATABASES;")
databases.show()

+------------------+
|         namespace|
+------------------+
|       cancer_data|
|       cancer_mama|
|cancer_mama_silver|
|           default|
|       ibge_silver|
+------------------+



In [None]:
for row in databases.collect():
    spark.sql(f"SHOW TABLES FROM {row['namespace']};").show(truncate=False)

+-----------+-------------------------+-----------+
|namespace  |tableName                |isTemporary|
+-----------+-------------------------+-----------+
|cancer_data|aq_filtered              |false      |
|cancer_data|ar_filtered              |false      |
|cancer_data|cadastro_municipios      |false      |
|cancer_data|dados_estados_mensal     |false      |
|cancer_data|dados_municipios_mensal  |false      |
|cancer_data|demografia_municipios    |false      |
|cancer_data|pacientes                |false      |
|cancer_data|procedimentos            |false      |
|cancer_data|procedimentos_e_pacientes|false      |
+-----------+-------------------------+-----------+

+-----------+-------------------------+-----------+
|namespace  |tableName                |isTemporary|
+-----------+-------------------------+-----------+
|cancer_mama|dados_estados_mensal     |false      |
|cancer_mama|dados_municipios_mensal  |false      |
|cancer_mama|pacientes                |false      |
|cancer_mam

In [12]:
dados_estad_mensal_df = spark.sql(f"""
    SELECT
        year(to_date(data, 'yyyyMM')) AS ano
        ,primeiro_estadiamento
        --SUM(custo) AS custo_estadiamento
        ,COUNT(DISTINCT(paciente)) AS numero_pacientes
        --,SUM(DISTINCT(obito)) AS obitos
        --,SUM(DISTINCT(indicacao_obito)) AS obito_futuro
        --,COUNT(1) AS numero_procedimentos
    FROM
        (SELECT * FROM cancer_mama.procedimentos_e_pacientes ORDER BY data)
    GROUP BY ano, primeiro_estadiamento
""")


In [13]:
df = dados_estad_mensal_df.toPandas()

In [14]:
# prompt: Usando o DataFrame df: stacked percentage of numero_pacientes by ano and estadiamento, using 0 in the bottom, and 1, 2, 3, 4 ordered to the top

import pandas as pd
import altair as alt

# Create a DataFrame (replace with your actual data)
# Assuming your DataFrame is named 'df'

# Convert 'primeiro_estadiamento' to numeric, handling errors
df['primeiro_estadiamento'] = pd.to_numeric(df['primeiro_estadiamento'], errors='coerce')

# Group by 'ano' and 'primeiro_estadiamento'
grouped_data = df.groupby(['ano', 'primeiro_estadiamento'])['numero_pacientes'].sum().reset_index()

# Calculate the total number of patients for each year
total_patients = grouped_data.groupby('ano')['numero_pacientes'].sum()

# Calculate the percentage for each group within each year
grouped_data['percentage'] = grouped_data.apply(lambda x: (x['numero_pacientes'] / total_patients[x['ano']]) * 100, axis=1)

# Create the stacked percentage chart using Altair
chart = alt.Chart(grouped_data).mark_bar().encode(
    x='ano:N',  # Categorical x-axis for years
    y=alt.Y('percentage:Q', stack='normalize', axis=alt.Axis(format='%')), # Stacked percentage with percentage formatting
    color=alt.Color('primeiro_estadiamento:N', sort='ascending'),  # Color by stage
    order=alt.Order('primeiro_estadiamento:N', sort='ascending')  # Order bars by stage
).properties(
    title='Stacked Percentage of Patients by Year and Stage'
)
chart


In [15]:
# prompt: # prompt: Usando o DataFrame df crie uma tabela com o percentual de numero_pacientes por ano e estadiamento, coloque os valores percentuais em colunas, seguindo a sequencia 0, 1, 2 , 3 e 4. Cada ano deve ser uma linha.

import pandas as pd
# Pivot the table to have stages as columns and years as rows
pivot_df = pd.pivot_table(grouped_data, values='percentage', index='ano', columns='primeiro_estadiamento', aggfunc='sum', fill_value=0)

# Rename columns for clarity (optional)
pivot_df = pivot_df.rename(columns={0: '0%', 1: '1%', 2: '2%', 3: '3%', 4: '4%'})

# Display the resulting table
pivot_df


primeiro_estadiamento,0%,1%,2%,3%,4%
ano,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2008,3.823096,19.42193,38.62114,27.392156,10.741677
2009,3.790915,19.789579,37.900404,28.97064,9.548462
2010,3.807674,20.175102,37.045402,30.15726,8.814562
2011,3.738138,20.754905,36.430967,30.775598,8.300392
2012,3.958062,21.260391,36.005684,30.698388,8.077475
2013,4.015751,21.582711,35.425351,31.030803,7.945383
2014,3.607326,21.697289,35.28674,31.280879,8.127766
2015,3.446682,21.372666,34.677079,32.090479,8.413094
2016,3.512967,21.223986,34.229895,32.365757,8.667396
2017,3.584308,21.06633,34.07159,32.71493,8.562842


In [None]:
# prompt: # prompt: Usando o DataFrame df crie uma tabela com o percentual de numero_pacientes por ano e estadiamento, coloque os valores percentuais em colunas, seguindo a sequencia 0, 1, 2 , 3 e 4. Cada ano deve ser uma linha.

import pandas as pd
# Pivot the table to have stages as columns and years as rows
pivot_df = pd.pivot_table(grouped_data, values='percentage', index='ano', columns='primeiro_estadiamento', aggfunc='sum', fill_value=0)

# Rename columns for clarity (optional)
pivot_df = pivot_df.rename(columns={0: '0%', 1: '1%', 2: '2%', 3: '3%', 4: '4%'})

# Display the resulting table
pivot_df


primeiro_estadiamento,0%,1%,2%,3%,4%
ano,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2008,3.823096,19.42193,38.62114,27.392156,10.741677
2009,3.790915,19.789579,37.900404,28.97064,9.548462
2010,3.807674,20.175102,37.045402,30.15726,8.814562
2011,3.738138,20.754905,36.430967,30.775598,8.300392
2012,3.958062,21.260391,36.005684,30.698388,8.077475
2013,4.015751,21.582711,35.425351,31.030803,7.945383
2014,3.607326,21.697289,35.28674,31.280879,8.127766
2015,3.446682,21.372666,34.677079,32.090479,8.413094
2016,3.512967,21.223986,34.229895,32.365757,8.667396
2017,3.584308,21.06633,34.07159,32.71493,8.562842


In [None]:
spark.table(f"""cancer_data.aq_filtered""")

DataFrame[AP_MVM: string, AP_CONDIC: string, AP_GESTAO: string, AP_CODUNI: string, AP_AUTORIZ: string, AP_CMP: string, AP_PRIPAL: string, AP_VL_AP: string, AP_UFMUN: string, AP_TPUPS: string, AP_TIPPRE: string, AP_MN_IND: string, AP_CNPJCPF: string, AP_CNPJMNT: string, AP_CNSPCN: string, AP_COIDADE: string, AP_NUIDADE: string, AP_SEXO: string, AP_RACACOR: string, AP_MUNPCN: string, AP_UFNACIO: string, AP_CEPPCN: string, AP_UFDIF: string, AP_MNDIF: string, AP_DTINIC: string, AP_DTFIM: string, AP_TPATEN: string, AP_TPAPAC: string, AP_MOTSAI: string, AP_OBITO: string, AP_ENCERR: string, AP_PERMAN: string, AP_ALTA: string, AP_TRANSF: string, AP_DTOCOR: string, AP_CODEMI: string, AP_CATEND: string, AP_APACANT: string, AP_UNISOL: string, AP_DTSOLIC: string, AP_DTAUT: string, AP_CIDCAS: string, AP_CIDPRI: string, AP_CIDSEC: string, AP_ETNIA: string, AQ_CID10: string, AQ_LINFIN: string, AQ_ESTADI: string, AQ_GRAHIS: string, AQ_DTIDEN: string, AQ_TRANTE: string, AQ_CIDINI1: string, AQ_DTINI1: s

In [16]:
spark.sql(f"""
  SELECT
      COUNT(DISTINCT(paciente)) AS numero_pacientes, count(1)
  FROM cancer_mama.pacientes AS c
  where data_primeiro_estadiamento = '202501'
""").show()

+----------------+--------+
|numero_pacientes|count(1)|
+----------------+--------+
|            4298|    4298|
+----------------+--------+



In [17]:
spark.sql(f"""
  SELECT
      COUNT(DISTINCT(paciente)) AS numero_pacientes, count(1)
  FROM cancer_mama.procedimentos AS c
  where data = '202501'
""").show()

+----------------+--------+
|numero_pacientes|count(1)|
+----------------+--------+
|          159433|  172224|
+----------------+--------+



In [None]:
spark.sql(f"""
    SELECT
        COUNT(DISTINCT(paciente)) AS numero_pacientes
    FROM
        cancer_mama.procedimentos_e_pacientes
    where data = '202501'

""").show()

+----------------+
|numero_pacientes|
+----------------+
|          159433|
+----------------+



In [None]:

spark.sql(f"""
  SELECT
      COUNT(DISTINCT(c.paciente)) AS numero_pacientes, count(1)
  FROM cancer_mama.procedimentos AS c
  LEFT JOIN cancer_mama.pacientes AS p
  ON c.paciente = p.paciente
  where data = '202501'
""").show()

+----------------+--------+
|numero_pacientes|count(1)|
+----------------+--------+
|          159433|  172224|
+----------------+--------+



In [None]:
spark.sql(f"""
  SELECT
      COUNT(DISTINCT(c.paciente)) AS numero_pacientes, count(1)
  FROM cancer_mama.procedimentos AS c
  full outer JOIN cancer_mama.pacientes AS p
  ON c.paciente = p.paciente
  where data = '202501'
""").show()

+----------------+--------+
|numero_pacientes|count(1)|
+----------------+--------+
|          159433|  172224|
+----------------+--------+



In [None]:
spark.sql(f"""
    SELECT
        SUM(numero_pacientes) AS numero_pacientes
    FROM
        cancer_mama.dados_estados_mensal
    where data = '202501'

""").show()

+----------------+
|numero_pacientes|
+----------------+
|          159433|
+----------------+



In [18]:
spark.sql(f"""
    SELECT
        count(distinct(pp.paciente)) AS numero_pacientes, cadastro_cidades.nome_uf
    FROM
        cancer_mama.procedimentos_e_pacientes pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501'
    group by nome_uf
    order by 1

""").show()

+----------------+-------------------+
|numero_pacientes|            nome_uf|
+----------------+-------------------+
|             232|              Amapá|
|             297|            Roraima|
|             427|               Acre|
|            1105|          Tocantins|
|            1403|           Rondônia|
|            1615|            Sergipe|
|            1755|              Goiás|
|            1844|            Paraíba|
|            1883|           Amazonas|
|            2134|   Distrito Federal|
|            2320|        Mato Grosso|
|            2380|              Piauí|
|            2381|               Pará|
|            2505| Mato Grosso do Sul|
|            2675|            Alagoas|
|            3346|           Maranhão|
|            3450|Rio Grande do Norte|
|            4338|     Espírito Santo|
|            4969|         Pernambuco|
|            6314|              Ceará|
+----------------+-------------------+
only showing top 20 rows



In [19]:
spark.sql(f"""
    SELECT
        SUM(numero_pacientes) AS numero_pacientes, estado
    FROM
        cancer_mama.dados_estados_mensal
    where data = '202501'
    group by estado
    order by 1

""").show()

+----------------+-------------------+
|numero_pacientes|             estado|
+----------------+-------------------+
|             285|            Roraima|
|             333|              Amapá|
|             431|               Acre|
|            1086|          Tocantins|
|            1390|           Rondônia|
|            1585|            Sergipe|
|            1795|              Goiás|
|            1849|            Paraíba|
|            1916|           Amazonas|
|            2129|   Distrito Federal|
|            2275|               Pará|
|            2319|        Mato Grosso|
|            2370|              Piauí|
|            2480| Mato Grosso do Sul|
|            2655|            Alagoas|
|            3337|           Maranhão|
|            3453|Rio Grande do Norte|
|            4342|     Espírito Santo|
|            4976|         Pernambuco|
|            6319|              Ceará|
+----------------+-------------------+
only showing top 20 rows



In [20]:
spark.sql(f"""
    SELECT
        SUM(numero_pacientes) AS numero_pacientes, cadastro_cidades.nome_uf as estado
    FROM
        cancer_mama.dados_municipios_mensal mm
        LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
        ON int(mm.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501'
    group by estado
    order by 1

""").show()

+----------------+-------------------+
|numero_pacientes|             estado|
+----------------+-------------------+
|             285|            Roraima|
|             333|              Amapá|
|             431|               Acre|
|            1086|          Tocantins|
|            1390|           Rondônia|
|            1585|            Sergipe|
|            1795|              Goiás|
|            1849|            Paraíba|
|            1916|           Amazonas|
|            2129|   Distrito Federal|
|            2275|               Pará|
|            2319|        Mato Grosso|
|            2370|              Piauí|
|            2480| Mato Grosso do Sul|
|            2655|            Alagoas|
|            3337|           Maranhão|
|            3453|Rio Grande do Norte|
|            4342|     Espírito Santo|
|            4976|         Pernambuco|
|            6319|              Ceará|
+----------------+-------------------+
only showing top 20 rows



In [21]:
spark.sql(f"""
    SELECT a.estado, b.nome_uf, a.numero_pacientes, b.numero_pacientes_b, a.numero_pacientes - b.numero_pacientes_b as diferenca
    FROM

    (SELECT
        SUM(numero_pacientes) AS numero_pacientes, cadastro_cidades.nome_uf as estado
    FROM
        cancer_mama.dados_municipios_mensal mm
        LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
        ON int(mm.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501'
    group by estado
    order by 1) A

    LEFT JOIN

    (    SELECT
        count(distinct(pp.paciente)) AS numero_pacientes_b, cadastro_cidades.nome_uf
    FROM
        cancer_mama.procedimentos_e_pacientes pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.primeiro_municipio) = int(cadastro_cidades.id / 10)
    where data = '202501'
    group by nome_uf
    order by 1) B

    ON A.estado = B.nome_uf
    --where a.numero_pacientes != b.numero_pacientes_b
    order by diferenca

""").show(30)

+-------------------+-------------------+----------------+------------------+---------+
|             estado|            nome_uf|numero_pacientes|numero_pacientes_b|diferenca|
+-------------------+-------------------+----------------+------------------+---------+
|           Rondônia|           Rondônia|            1390|              1390|        0|
|               Acre|               Acre|             431|               431|        0|
|           Amazonas|           Amazonas|            1916|              1916|        0|
|            Roraima|            Roraima|             285|               285|        0|
|               Pará|               Pará|            2275|              2275|        0|
|              Amapá|              Amapá|             333|               333|        0|
|          Tocantins|          Tocantins|            1086|              1086|        0|
|           Maranhão|           Maranhão|            3337|              3337|        0|
|              Piauí|           

In [None]:
spark.sql(f"""SELECT
        mm.municipio, count(*)
    FROM
        (select
           case when int(municipio/10000) = 53 then 530010 else municipio end as municipio, data
        from cancer_mama.dados_municipios_mensal
        where data = '202501'

        ) mm
        LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
        ON int(mm.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf is null
    group by municipio
    order by 2""").show()

+---------+--------+
|municipio|count(1)|
+---------+--------+
+---------+--------+



In [None]:
spark.sql(f"""
SELECT
        *
    FROM
        ibge_silver.cadastro_municipios
    where id > 5300000
    """).show()

+-------+--------+-----+----------------+
|     id|    nome|id_uf|         nome_uf|
+-------+--------+-----+----------------+
|5300108|Brasília|   53|Distrito Federal|
+-------+--------+-----+----------------+



In [None]:
spark.sql(f"""
    SELECT
        count(distinct(pp.paciente)) AS numero_pacientes, cadastro_cidades.nome_uf
    FROM
        cancer_mama.procedimentos_e_pacientes pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.primeiro_municipio) = int(cadastro_cidades.id / 10)
    where data = '202501'  and cadastro_cidades.nome_uf = 'Minas Gerais'
    group by nome_uf
    order by 1

""").show()

+----------------+------------+
|numero_pacientes|     nome_uf|
+----------------+------------+
|           16491|Minas Gerais|
+----------------+------------+



In [None]:
spark.sql(f"""
    SELECT
        count(distinct(pp.paciente)) AS numero_pacientes, cadastro_cidades.nome_uf, cadastro_cidades.id as cidade
    FROM
        cancer_mama.procedimentos_e_pacientes pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais'
    group by nome_uf, cidade
    order by 3

""").show()

+----------------+------------+-------+
|numero_pacientes|     nome_uf| cidade|
+----------------+------------+-------+
|               7|Minas Gerais|3100104|
|              23|Minas Gerais|3100203|
|               2|Minas Gerais|3100302|
|               6|Minas Gerais|3100401|
|               6|Minas Gerais|3100500|
|               3|Minas Gerais|3100609|
|               1|Minas Gerais|3100708|
|               4|Minas Gerais|3100807|
|               9|Minas Gerais|3100906|
|               6|Minas Gerais|3101003|
|              13|Minas Gerais|3101102|
|               5|Minas Gerais|3101201|
|               6|Minas Gerais|3101300|
|               4|Minas Gerais|3101409|
|               3|Minas Gerais|3101508|
|               4|Minas Gerais|3101607|
|               2|Minas Gerais|3101631|
|              19|Minas Gerais|3101706|
|               1|Minas Gerais|3101805|
|              25|Minas Gerais|3101904|
+----------------+------------+-------+
only showing top 20 rows



In [None]:
spark.sql(f"""
    SELECT
        SUM(numero_pacientes) AS numero_pacientes, mm.municipio as cidade
    FROM
        cancer_mama.dados_municipios_mensal mm

    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(mm.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais'

    group by cidade
    order by 2

""").show()

+----------------+------+
|numero_pacientes|cidade|
+----------------+------+
|               7|310010|
|              22|310020|
|               2|310030|
|               6|310040|
|               5|310050|
|               3|310060|
|               1|310070|
|               4|310080|
|               9|310090|
|               6|310100|
|              13|310110|
|               5|310120|
|               6|310130|
|               4|310140|
|               4|310150|
|               5|310160|
|               1|310163|
|              18|310170|
|               1|310180|
|              24|310190|
+----------------+------+
only showing top 20 rows



In [None]:
spark.sql(f"""

    SELECT sum(A.numero_pacientes), sum(B.numero_pacientes_b)
      ,sum(A.numero_pacientes) - sum(B.numero_pacientes_b) as diff
      ,b.cidade_b
    FROM


    (SELECT
        SUM(numero_pacientes) AS numero_pacientes_b, mm.municipio as cidade_b
    FROM
        cancer_mama.dados_municipios_mensal mm

    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(mm.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais'

    group by cidade_b
    order by 2) B

    LEFT JOIN

    (SELECT
        count(distinct(pp.paciente)) AS numero_pacientes, cadastro_cidades.nome_uf, int(cadastro_cidades.id / 10) as cidade
    FROM
        cancer_mama.procedimentos_e_pacientes pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.primeiro_municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais'
    group by nome_uf, cidade
    order by 3) A


    ON A.cidade = B.cidade_b

    group by b.cidade_b
    order by diff desc


""").show(1000)

+---------------------+-----------------------+----+--------+
|sum(numero_pacientes)|sum(numero_pacientes_b)|diff|cidade_b|
+---------------------+-----------------------+----+--------+
|                    7|                      7|   0|  310010|
|                   22|                     22|   0|  310020|
|                    2|                      2|   0|  310030|
|                    6|                      6|   0|  310040|
|                    5|                      5|   0|  310050|
|                    3|                      3|   0|  310060|
|                    1|                      1|   0|  310070|
|                    4|                      4|   0|  310080|
|                    9|                      9|   0|  310090|
|                    6|                      6|   0|  310100|
|                   13|                     13|   0|  310110|
|                    5|                      5|   0|  310120|
|                    6|                      6|   0|  310130|
|       

In [None]:
spark.sql(f"""

select
   *
FROM
        cancer_mama.procedimentos_e_pacientes
where  paciente in
    (select paciente FROM
      (SELECT
            pp.paciente,
            count(distinct(cadastro_cidades.id)) as num_cidade_1,
            count(distinct( int(cadastro_cidades.id / 10) )) as num_cidade_2
        FROM
            cancer_mama.procedimentos_e_pacientes pp
        LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
        ON int(pp.municipio) = int(cadastro_cidades.id / 10)
        where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais'
        group by pp.paciente
        order by 2 desc, 3 desc)
    where num_cidade_1 > 1) and data = '202501' -- and cadastro_cidades.nome_uf = 'Minas Gerais'
""").show()

+------+---------------+------------+------+-----+---------+--------------------------+------------------------+---------------------+------------------+-------------------+--------------------+------------------+----------------+---------------+
|  data|       paciente|estadiamento| custo|obito|municipio|data_primeiro_estadiamento|data_ultimo_estadiamento|primeiro_estadiamento|maior_estadiamento|ultimo_estadiamento|         custo_total|primeiro_municipio|ultimo_municipio|indicacao_obito|
+------+---------------+------------+------+-----+---------+--------------------------+------------------------+---------------------+------------------+-------------------+--------------------+------------------+----------------+---------------+
|202501|{{{{{{{{{{{{{{{|           3|   0.0|    0|   260600|                    202404|                  202504|                    3|                 4|                  4|2.9090112449999608E7|            230523|          355030|              1|
|202501|{{{{

In [None]:
spark.sql(f"""
    SELECT
        data,
        primeiro_municipio as municipio,
        primeiro_estadiamento,
        SUM(custo) AS custo_estadiamento,
        COUNT(DISTINCT(paciente)) AS numero_pacientes,
        SUM(DISTINCT(obito)) AS obitos,
        SUM(DISTINCT(indicacao_obito)) AS obito_futuro,
        COUNT(1) AS numero_procedimentos
    FROM
        (SELECT * FROM cancer_mama.procedimentos_e_pacientes ORDER BY data)
    GROUP BY data, primeiro_municipio, primeiro_estadiamento
""").show()

In [None]:
spark.sql(f"""

  SELECT
        *
    FROM

  (SELECT
      c.*,
      p.data_primeiro_estadiamento,
      p.data_ultimo_estadiamento,
      p.primeiro_estadiamento,
      p.maior_estadiamento,
      p.ultimo_estadiamento,
      p.custo_total,
      p.primeiro_municipio,
      p.ultimo_municipio,
      p.indicacao_obito
  FROM (select pp.* from cancer_mama.procedimentos pp
        LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
        ON int(pp.municipio) = int(cadastro_cidades.id / 10)
       where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais') AS c
  RIGHT JOIN
       (select pp.* from cancer_mama.pacientes pp
          LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
        ON int(pp.primeiro_municipio) = int(cadastro_cidades.id / 10)
       where cadastro_cidades.nome_uf = 'Minas Gerais' )AS p
  ON c.paciente = p.paciente
  where data is not null)

""").createOrReplaceTempView("procedimentos_corrigido")

In [None]:
spark.sql(f"""
    SELECT
        *
    FROM
        cancer_mama.procedimentos_e_pacientes pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais' and paciente in


    (SELECT paciente
    from
    (SELECT
        count(distinct(id)) cidades, paciente
    FROM
        procedimentos_corrigido pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.primeiro_municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais'
    group by paciente
    order by 1 desc)
    --where cidades >1
    -- limit 1
    )

""").show()

+------+---------------+------------+------+-----+---------+--------------------------+------------------------+---------------------+------------------+-------------------+------------------+------------------+----------------+---------------+-------+----------------+-----+------------+
|  data|       paciente|estadiamento| custo|obito|municipio|data_primeiro_estadiamento|data_ultimo_estadiamento|primeiro_estadiamento|maior_estadiamento|ultimo_estadiamento|       custo_total|primeiro_municipio|ultimo_municipio|indicacao_obito|     id|            nome|id_uf|     nome_uf|
+------+---------------+------------+------+-----+---------+--------------------------+------------------------+---------------------+------------------+-------------------+------------------+------------------+----------------+---------------+-------+----------------+-----+------------+
|202501||{|{~|~{{{||           3|2378.9|    0|   313380|                    201902|                  202502|                    3

In [None]:
spark.sql(f"""

select *
from cancer_mama.pacientes
where paciente in
(
SELECT paciente
    from
    (SELECT
        count(distinct(id)) cidades, paciente
    FROM
        procedimentos_corrigido pp
    LEFT JOIN ibge_silver.cadastro_municipios AS cadastro_cidades
    ON int(pp.municipio) = int(cadastro_cidades.id / 10)
    where data = '202501' and cadastro_cidades.nome_uf = 'Minas Gerais'
    group by paciente
    order by 1 desc)
    where cidades >1
    -- limit 1
    )
""").show()

+---------------+--------------------------+------------------------+--------------------+---------------------+-------------------+------------------+------------------+-----------+---------------+------------------+----------------+
|       paciente|data_primeiro_estadiamento|data_ultimo_estadiamento|numero_procedimentos|primeiro_estadiamento|ultimo_estadiamento|maior_estadiamento|menor_estadiamento|custo_total|indicacao_obito|primeiro_municipio|ultimo_municipio|
+---------------+--------------------------+------------------------+--------------------+---------------------+-------------------+------------------+------------------+-----------+---------------+------------------+----------------+
|{{{}{}{|                    202411|                  202501|                   4|                    2|                  2|                 2|                 2|    6143.25|              0|            312800|          312800|
|{{{|{|~}|                    202410|                

In [None]:
spark.sql(f"""
    SELECT
        SUM(numero_pacientes) AS numero_pacientes
    FROM
        cancer_mama.dados_municipios_mensal
    where data = '202501'

""").show()

+----------------+
|numero_pacientes|
+----------------+
|          145963|
+----------------+



In [None]:
spark.sql(f"""
    SELECT
        SUM(custo_estadiamento) AS custo_pacientes
    FROM
        cancer_mama.dados_estados_mensal
    where data = '202501'

""").show()

+-------------------+
|    custo_pacientes|
+-------------------+
|7.746116284999998E7|
+-------------------+



In [22]:
# prompt: faça uma consulta na tabela    cancer_mama.dados_municipios_mensal e crie um único arquivo csv com o conteúdo dela

# Realiza a consulta na tabela e salva em um DataFrame do Spark
df_cancer_municipios = spark.sql("SELECT * FROM cancer_mama.dados_municipios_mensal")

# Converte o DataFrame do Spark para um DataFrame do Pandas
df_cancer_municipios_pandas = df_cancer_municipios.toPandas()

# Salva o DataFrame do Pandas em um arquivo CSV
output_csv_path = "dados_municipios_mensal.csv"
df_cancer_municipios_pandas.to_csv(output_csv_path, index=False)

print(f"Dados exportados para {output_csv_path}")

Dados exportados para dados_municipios_mensal.csv


In [23]:
# Realiza a consulta na tabela e salva em um DataFrame do Spark
df_cancer_estado = spark.sql("SELECT * FROM cancer_mama.dados_estados_mensal")

# Converte o DataFrame do Spark para um DataFrame do Pandas
df_cancer_estado_pandas = df_cancer_estado.toPandas()

# Salva o DataFrame do Pandas em um arquivo CSV
output_csv_path = "dados_estados_mensal.csv"
df_cancer_estado_pandas.to_csv(output_csv_path, index=False)

print(f"Dados exportados para {output_csv_path}")

Dados exportados para dados_estados_mensal.csv


In [None]:
spark.sql(f"""
    select sum(num_mun), count(1) from

    (SELECT
        count(distinct municipio) num_mun, paciente


    FROM
        cancer_mama.procedimentos_e_pacientes
     where data = '202501'
     group by paciente)

     where num_mun > 1

""").show()

+------------+--------+
|sum(num_mun)|count(1)|
+------------+--------+
|        2409|     117|
+------------+--------+



In [27]:
from sia.etls.lib.bronze_files_utilities import autenticar_servico
auth_json_path='/content/monitor-rosa-leitura.json'
escopos = ['https://www.googleapis.com/auth/drive.readonly']
service = autenticar_servico(auth_json_path, escopos)


def read_file_from_drive(service, file_id, output_file_name):
    """Reads the content of a file from Google Drive by its ID and saves it to a local file."""
    try:
        # Call the Drive v3 API
        # You can specify the mimeType if you know it, or handle different types
        # This example focuses on text/plain or similar readable files
        request = service.files().get_media(fileId=file_id)

        # Download the file content
        # For larger files, you might need to use MediaIoBaseDownload
        content = request.execute()

        # Save the content to the specified output file
        with open(output_file_name, 'wb') as f:
            f.write(content)
        print(f"File '{output_file_name}' downloaded successfully.")

    except HttpError as error:
        print(f'An API error occurred: {error}')

In [28]:
read_file_from_drive(
    service,
    '140mI8ZZGBDqPUtJXkoSFFGWzCcRPduJv',
    output_file_name='dados_output.csv')

File 'dados_output.csv' downloaded successfully.


In [None]:
# prompt: create python code to read files from google drive folder (consider i'm outside google colab)

# Install the Google Client Library
!pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

import os
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] # Or other scopes as needed

def authenticate_google_drive():
    """Authenticates with Google Drive API."""
    creds = None
    # The file token.json stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first
    # time.
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            # Replace 'credentials.json' with the path to your downloaded credentials file
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    return creds

def read_file_from_drive(file_id, creds):
    """Reads the content of a file from Google Drive by its ID."""
    try:
        service = build('drive', 'v3', credentials=creds)

        # Call the Drive v3 API
        # You can specify the mimeType if you know it, or handle different types
        # This example focuses on text/plain or similar readable files
        request = service.files().get_media(fileId=file_id)

        # Download the file content
        # For larger files, you might need to use MediaIoBaseDownload
        content = request.execute()

        # If the file is a text file, you can decode and print its content
        # You might need to adjust decoding based on the file type
        try:
            print(content.decode('utf-8'))
        except UnicodeDecodeError:
            print("Could not decode file content as UTF-8. Content might be binary.")
            # Handle binary content differently, e.g., save to a file

    except HttpError as error:
        print(f'An API error occurred: {error}')

# --- How to use ---
# 1. Follow the Google Drive API setup guide to get a 'credentials.json' file.
#    - Go to Google Cloud Console.
#    - Create a new project or select an existing one.
#    - Enable the Google Drive API.
#    - Create credentials (OAuth client ID for desktop app).
#    - Download the credentials JSON file and save it in the same directory as your script, named 'credentials.json'.
# 2. Run this script. The first time, it will open a browser window for authentication.
# 3. Replace 'YOUR_FILE_ID_HERE' with the actual Google Drive file ID you want to read.
#    You can find the file ID in the file's URL in Google Drive.

# Authenticate and get credentials
drive_creds = authenticate_google_drive()

# Replace with the ID of the file you want to read
file_to_read_id = 'YOUR_FILE_ID_HERE'

# Read the file
if drive_creds:
    read_file_from_drive(file_to_read_id, drive_creds)



In [None]:
143671 - 145963

-2292

In [None]:
2409 - 117

2292

In [None]:
spark.sql(f"""
    select * from

    (SELECT
        count(distinct municipio) num_mun, paciente


    FROM
        cancer_mama.procedimentos_e_pacientes
     where data = '202501'
     group by paciente)

     where num_mun > 1

""").show()

+-------+---------------+
|num_mun|       paciente|
+-------+---------------+
|   1792|               |
|    387|{{{{{{{{{{{{{{{|
|      2||{~{{{{|
|      2|}{||{{{{|
|      2|}{}{{{{|
|      2|}{|~~{{{|
|      2|{{{{|{|
|      2|{{{{}}}~{|
|      2|{{{{~~{~{|
|      2|{{{{~}{{~|
|      2|{{{{~{{|
|      2|{{{{~}||{}|
|      2|{{{{}|{{|
|      2|{{{{{|{|
|      2|{{{{|~{|
|      2|{{{{{}{~|
|      2|{{{{{}~|{|
|      2|{{{{}~}{|
|      2|{{}{{}~|
|      2|{{}{{}|
+-------+---------------+
only showing top 20 rows



In [None]:
spark.sql(f"""
    select *, length(paciente) from

    (SELECT
        count(distinct municipio) num_mun, paciente


    FROM

  (SELECT
      c.*,
      p.data_primeiro_estadiamento,
      p.data_ultimo_estadiamento,
      p.primeiro_estadiamento,
      p.maior_estadiamento,
      p.ultimo_estadiamento,
      p.custo_total,
      p.primeiro_municipio,
      p.ultimo_municipio,
      p.indicacao_obito
  FROM cancer_mama.pacientes AS p
  LEFT JOIN cancer_mama.procedimentos AS c
  ON c.paciente = p.paciente)

     where data = '202501'
     group by paciente)

     where num_mun > 1
     order by num_mun desc
     limit 2

""").show()

+-------+--------+----------------+
|num_mun|paciente|length(paciente)|
+-------+--------+----------------+
|   1792|        |               0|
+-------+--------+----------------+



In [None]:
spark.sql(f"""
select * from

(    select *, length(paciente) from

    (SELECT
        count(distinct municipio) num_mun, paciente


    FROM

  (SELECT
      c.*,
      p.data_primeiro_estadiamento,
      p.data_ultimo_estadiamento,
      p.primeiro_estadiamento,
      p.maior_estadiamento,
      p.ultimo_estadiamento,
      p.custo_total,
      p.primeiro_municipio,
      p.ultimo_municipio,
      p.indicacao_obito
  FROM cancer_mama.pacientes AS p
  LEFT JOIN cancer_mama.procedimentos AS c
  ON c.paciente = p.paciente)

     where data = '202501'
     group by paciente)

     where num_mun > 1
     order by num_mun desc
) a
left join cancer_mama.pacientes AS p
on a.paciente = p.paciente

""").show()

+-------+---------------+----------------+---------------+--------------------------+------------------------+--------------------+---------------------+-------------------+------------------+------------------+--------------------+---------------+------------------+----------------+
|num_mun|       paciente|length(paciente)|       paciente|data_primeiro_estadiamento|data_ultimo_estadiamento|numero_procedimentos|primeiro_estadiamento|ultimo_estadiamento|maior_estadiamento|menor_estadiamento|         custo_total|indicacao_obito|primeiro_municipio|ultimo_municipio|
+-------+---------------+----------------+---------------+--------------------------+------------------------+--------------------+---------------------+-------------------+------------------+------------------+--------------------+---------------+------------------+----------------+
|   1792|               |               0|               |                    202404|                  202502|               88593|              