# Geração de Databases com SQLalchemy e PosgreSQL

O presente notebook demonstra a construção de bancos de dados (BD) SQL locais a partir do uso das ferramentas SQLalchemy e PostgreSQL, para fins de realização de uma análise exploratória de dados descritos em arquivos CSV. Para tal, fizemos uso do dataset [Covid-19 Data Sharing](https://agencia.fapesp.br/covid-19-data-sharingbr-makes-more-datasets-available/35348) disponibilizado pela Agência FAPESP.

## Autores

| Nome | nUSP |
| :--- | :--- |
| Guilherme de Abreu Barreto | 12543033 |
| Lucas Eduardo Gulka Pulcinelli | 12547336 |
| Vinicio Yusuke Hayashibara | 13642797 |

## Configuração

É necessário o ao correto funcionamento deste projeto possuir uma instalação local de PostgreSQL e atribuir os valores correspondentes para acesso a este nas seguintes constantes:

- `DATABASE`: O nome do database onde serão carregadas as informações. Atente-se se este não corresponde ao nome de um database preexistente **ou que esteja sendo acessado**, pois este será então sobrescrito.

- `USER` e `PASSWORD`: Informações de autententicação válidas e com privilégios para a criação de bancos de dados no servidor.

- `HOST` e `PORT`: A URL e porta para realização do acesso ao servidor.

- `BATCH_SIZE`: O número máximo de operações sobre o BD a serem realizadas conjuntamente. Recomenda-se ser em um número o qual caiba na memória RAM que você dispõe. O número abaixo foi capaz de caber confortávelmente em 10 GiB de RAM **na minha máquina**. 

In [1]:
DATABASE = "fapcov2103"
USER = "postgres"
PASSWORD = "password"
HOST = "localhost"
PORT = 5432
BATCH_SIZE = 2 * 10**7

DATABASE_URI = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}/{DATABASE}"

## Carregamento das dependências deste projeto

In [31]:
import enum
import pandas as pd
import re
from datetime import datetime, date
from sqlalchemy import (
    CheckConstraint as constraint,
    Enum,
    Date,
    ForeignKey as fk,
    String,
    MetaData,
    Table,
    TypeDecorator,
    create_engine,
    column as sql_column,
    insert,
    text
)
from sqlalchemy.orm import (
    Mapped,
    Session,
    declarative_base,
    relationship,
    sessionmaker,
    mapped_column as column,
    validates,
)
from pathlib import Path
from tqdm import tqdm
from typing import Any, Annotated, final

## Funções e tipos de dados auxiliares

Algumas funções e tipos de dados os quais utilizamos em nossa implementação, mas cuja funcionalidade provavelmente não será crucial ao caso geral.

In [3]:
def backref(back_populates: str) -> Mapped[Any]:
    return relationship(back_populates=back_populates)


def childOf(back_populates: str) -> Mapped[Any]:
    return relationship(
        back_populates=back_populates,
        cascade="all, delete-orphan",
    )


def normalize_column_name(column_name: str) -> str:
    """
    Convert column name from UU_UUU_UUU format to UU_Uuu_Uuu format
    Example: "ID_PACIENTE" -> "ID_Paciente", "IC_SEXO" -> "IC_Sexo"
    """
    match column_name:
        case "CD_UF":
            return column_name
        case "CD_CEPREDUZIDO":
            return "CD_CEPReduzido"
        case _:
            parts = column_name.split("_")
            return "_".join(
                [parts[0].upper()] + [part.capitalize() for part in parts[1:]]
            )

def parse_date(date_column: pd.Series) -> pd.Series:
    return  pd.to_datetime(
        date_column, format='%d/%m/%Y', errors='coerce'
    )


estados = [
    "AC",  # Acre
    "AL",  # Alagoas
    "AP",  # Amapá
    "AM",  # Amazonas
    "BA",  # Bahia
    "CE",  # Ceará
    "DF",  # Distrito Federal
    "ES",  # Espírito Santo
    "GO",  # Goiás
    "MA",  # Maranhão
    "MT",  # Mato Grosso
    "MS",  # Mato Grosso do Sul
    "MG",  # Minas Gerais
    "PA",  # Pará
    "PB",  # Paraíba
    "PR",  # Paraná
    "PE",  # Pernambuco
    "PI",  # Piauí
    "RJ",  # Rio de Janeiro
    "RN",  # Rio Grande do Norte
    "RS",  # Rio Grande do Sul
    "RO",  # Rondônia
    "RR",  # Roraima
    "SC",  # Santa Catarina
    "SP",  # São Paulo
    "SE",  # Sergipe
    "TO",  # Tocantins
]

## Definição das tabelas comuns

Abaixo descrevemos a estrutura pretendida às tabelas Pacientes, ExamLabs e Despachos, comuns a todos o BDs.

In [4]:
Base = declarative_base()

class PacienteBase(Base):
    __abstract__: bool = True

    IC_Sexo: Mapped[str] = column(
        Enum('M', 'F', name='sexo_enum'),
        comment="Sexo do Paciente. F - Feminino; M - Masculino"
    )
    AA_Nascimento: Mapped[str | None] = column(
        String(4),
        comment="Ano de nascimento do Paciente. 4 caracteres alfanuméricos. Os 4 dígitos do ano do nascimento; ou AAAA - para ano de nascimento igual ou anterior a 1930 (visando anonimização); YYYY - quaisquer outros anos, em caso de anonimização do ano"
    )
    CD_Pais: Mapped[str | None] = column(
        String(2),
        comment="Pais de residencia do Paciente. 2 caracteres alfanuméricos. BR ou XX (país estrangeiro)"
    )
    CD_UF: Mapped[str | None] = column(
        Enum(*estados, name='estado_enum'),
        comment="Unidade da Federacao de residencia do Paciente. 2 caracteres alfanuméricos"
    )
    CD_Municipio: Mapped[str | None] = column(
        comment="Municipio de residencia do Paciente. Alfanumérico."
    )
    CD_CEPReduzido: Mapped[str | None] = column(comment="[Descrição não encontrada nos comentários]")

    @validates("AA_Nascimento")
    def validates_nascimento(self, _key: str, value: str) -> str:
        match value:
            case "AAAA" | "YYYY":
                return value
            case year if len(year) == 4 and year.isdigit():
                return year
            case _:
                raise ValueError(
                    f"Invalid AA_Nascimento value: {value}. Must be 'AAAA', 'YYYY', or a 4-digit number"
                )


class Paciente(PacienteBase):
    """
    Tabela de pacientes Covid-19 FAPESP
    """

    __tablename__: str = "Pacientes"

    ID_Paciente: Mapped[str] = column(
        primary_key=True,
        comment="Identificação única do paciente (correlaciona com o ID_PACIENTE de todos os arquivos onde aparece). 32 caracteres alfanuméricos"
    )

    # Relações
    exames: Mapped[list["ExamLab"]] = childOf("paciente")
    desfechos: Mapped[list["Desfecho"]] = childOf("paciente")



class ExamLab(Base):
    """
    Tabela de exames Covid-19 FAPESP
    """

    __tablename__: str = "ExamLabs"

    id: Mapped[int] = column(autoincrement=True, primary_key=True)
    ID_Paciente: Mapped[str] = column(
        fk("Pacientes.ID_Paciente"),
        comment="Identificação única do paciente (correlaciona com o ID_PACIENTE de todos os arquivos onde aparece). 32 caracteres alfanuméricos"
    )
    ID_Atendimento: Mapped[str | None] = column(
        comment="Identificação única do atendimento. Correlaciona com o ID_ATENDIMENTO de todas as tabelas onde aparece. 32 caracteres alfanuméricos"
    )
    DE_Exame: Mapped[str | None] = column(
        comment="Descrição do exame realizado. Alfanumérico. Exemplo: HEMOGRAMA, sangue total / GLICOSE, plasma / SODIO, soro / POTASSIO, soro. Um exame é composto por 1 ou mais analitos."
    )
    DE_Resultado: Mapped[str | None] = column(
        comment="Resultado do exame, associado ao DE_ANALITO. Alfanumérico. Se DE_ANALITO exige valor numérico, NNNN se inteiro ou NNNN,NNN se casas decimais; Se DE_ANALITO exige qualitativo, String com domínio restrito; Se DE_ANALITO por observação microscópica, String conteúdo livre. Exemplo de dominio restrito - Positivo, Detectado, Reagente, nâo reagente, etc. Exemplo de conteúdo livre - 'não foram observados caracteres tóxico-degenerativos nos neutrófilos, não foram observadas atipias linfocitárias'"
    )
    DT_Coleta: Mapped[date | None] = column(
        comment="Data em que o material foi coletado do paciente"
    )
    DE_Origem: Mapped[str | None] = column(
        comment="Local de Coleta do exame. 4 caracteres alfabéticos: LAB – Exame realizado por paciente em uma unidade de atendimento laboratorial; HOSP – Exame realizado por paciente dentro de uma Unidade Hospitalar; UTI - exame realizado na UTI"
    )
    DE_Analito: Mapped[str | None] = column(
        comment="Descrição do analito. Alfanumérico. Exemplo: Eritrócitos / Leucócitos / Glicose / Ureia / Creatinina. Para o exame Hemograma, tem o resultado de vários analitos: Eritrócitos, Hemoglobina, Leucócitos, Linfócitos, etc. A maioria dos exames tem somente 1 analito, por exemplo Glicose, Colesterol Total, Uréia e Creatinina."
    )
    CD_Unidade: Mapped[str | None] = column(
        comment="Unidade de Medida utilizada na Metodologia do laboratório específico para analisar o exame. Alfanumérico. Exemplo: g/dL (gramas por decilitro)"
    )
    DE_Valor_Referencia: Mapped[str | None] = column(
        comment="Faixa de valores de referência. Alfanumérico. Resultado ou faixa de resultados considerado normal para este analito. Exemplo para Glicose: 75 a 99"
    )

    @property
    def DE_resultNum(self) -> float | None:
        """
        Extrai valor numérico do resultado ou atribui códigos especiais para resultados textuais.
        Baseado na lógica do script COVID19_Corrige_21_02.sql
        """
        if not self.DE_Resultado:
            return None

        # Extrai valor numérico do resultado
        numeric_match = re.search(r"-?\d+[,.]?\d*", self.DE_Resultado)
        if numeric_match:
            numeric_str = numeric_match.group().replace(",", ".")
            try:
                return float(numeric_str)
            except ValueError:
                pass

        # Aplica códigos especiais para exames COVID
        if self.DE_Exame and re.search(
            r"(covid)|(sars.cov.2)|(corona)", self.DE_Exame, re.IGNORECASE
        ):
            resultado_lower = self.DE_Resultado.lower()

            if re.search(r"detectados anticorpos", resultado_lower):
                return -1000.0
            elif re.search(
                r"(n.o detectado)|(n.o reagente)|(negativo)|(aus.ncia de anticorpos)",
                resultado_lower,
            ):
                return -1111.0
            elif re.search(r"(detectado)|(reagente)|(positivo)", resultado_lower):
                return -1000.0
            elif re.search(r"(indetect.avel)|(inconclusivo)", resultado_lower):
                return -1234.0
            else:
                return -2222.0

        return None

    # Relações
    paciente: Mapped["Paciente"] = backref("exames")


class Desfecho(Base):
    """
    Tabela de desfechos Covid-19 FAPESP
    """

    __tablename__: str = "Desfechos"

    ID_Paciente: Mapped[str] = column(
        fk("Pacientes.ID_Paciente"),
        comment="Identificação única do paciente (correlaciona com o ID_PACIENTE de todos os arquivos onde aparece. 32 caracteres alfanuméricos)"
    )
    ID_Atendimento: Mapped[str] = column(
        comment="Identificação única do atendimento. Cada atendimento tem um desfecho. Correlaciona com ID_ATENDIMENTO de todas as tabelas onde aparece"
    )
    DT_Atendimento: Mapped[date | None] = column(
        comment="Data de realização do atendimento"
    )
    DE_Tipo_Atendimento: Mapped[str] = column(
        comment="Descrição do tipo de atendimento realizado. Texto livre. Exemplo: Pronto atendimento."
    )
    ID_Clinica: Mapped[int] = column(
        comment="Identificação da clínica onde o evento aconteceu. Numérico. Exemplo: 1013"
    )
    DE_Clinica: Mapped[str] = column(
        comment="Descrição da clínica onde o evento aconteceu. Texto livre. Exemplo: Cardiologia"
    )
    DT_Desfecho: Mapped[date | None] = column(
        comment="Data do desfecho - Nulo se DE_DESFECHO for óbito"
    )
    DE_Desfecho: Mapped[str] = column(
        comment="Descriçao do desfecho. Texto livre. Exemplo: Alta médica melhorado"
    )

    # Relações
    paciente: Mapped["Paciente"] = backref("desfechos")

    __table_args__: tuple[pkc,] = (pkc("ID_Paciente", "ID_Atendimento"),)

## População dos bancos de dados

Abaixo descrevemos a lógica para população dos bancos de dados. Os datasets que descrevem a cada tabela de cada banco de dados estão localizados em uma pasta `dataset` colocada no mesmo diretório que este notebook.

```bash
~/Public/USP/Ciência da Computação/Semestre 6/Mineração de dados/01-Introdução-Preparação de dados main*
❄️impure ❯ exa --tree
.
├── datasets
│   ├── BPSP
│   │   ├── BPSP_Desfechos.csv
│   │   ├── BPSP_ExamLabs.csv
│   │   └── BPSP_Pacientes.csv
│   ├── Einstein
│   │   ├── Einstein_ExamLabs.csv
│   │   └── Einstein_Pacientes.csv
│   ├── GrupoFleury
│   │   ├── GrupoFleury_ExamLabs.csv
│   │   └── GrupoFleury_Pacientes.csv
│   ├── HC
│   │   ├── HC_ExamLabs.csv
│   │   └── HC_Pacientes.csv
│   └── HSL
│       ├── HSL_Desfechos.csv
│       ├── HSL_ExamLabs.csv
│       └── HSL_Pacientes.csv
└── 'Geração de Databases com SQLalchemy e PostgreSQL.ipynb'
```
Como se vê, os datasets encontram-se nomeados de maneira padronizada, e os nomes das colunas de suas tabelas correspondem aos nomes dados aos atributos das classes que aqui descrevem as tabelas contidas nos BDs.

In [7]:
datasets_folder = str(Path.cwd()) + "/datasets"
hospitals = ["BPSP", "Einstein", "GrupoFleury", "HC", "HSL"]
tables_dict = {"Pacientes": Paciente, "ExamLabs": ExamLab, "Desfechos": Desfecho}

### Criação do Banco de Dados

As duas células seguintes executam a criação do banco de dados e dos _schemas_ para cada hospital, respectivamente.

In [6]:
engine = create_engine(f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}/postgres", echo=True)

with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
    conn.execute(text(f"CREATE DATABASE IF NOT EXISTS{DATABASE}"))

2025-08-24 20:27:29,217 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-08-24 20:27:29,218 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-24 20:27:29,220 INFO sqlalchemy.engine.Engine select current_schema()
2025-08-24 20:27:29,221 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-24 20:27:29,222 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-08-24 20:27:29,223 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-24 20:27:29,226 INFO sqlalchemy.engine.Engine BEGIN (implicit; DBAPI should not BEGIN due to autocommit mode)
2025-08-24 20:27:29,226 INFO sqlalchemy.engine.Engine DROP DATABASE IF EXISTS fapcov2103
2025-08-24 20:27:29,227 INFO sqlalchemy.engine.Engine [generated in 0.00186s] {}
2025-08-24 20:27:29,261 INFO sqlalchemy.engine.Engine CREATE DATABASE fapcov2103
2025-08-24 20:27:29,262 INFO sqlalchemy.engine.Engine [generated in 0.00080s] {}
2025-08-24 20:27:29,328 INFO sqlalchemy.engine.Engine ROLLBACK using DBAPI connection.rollback(), DBAP

In [11]:
engine = create_engine(DATABASE_URI, echo=True)

with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
    for hospital in hospitals + ["D2"]:
        conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {hospital}"))

2025-08-25 06:42:52,153 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-08-25 06:42:52,155 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-25 06:42:52,157 INFO sqlalchemy.engine.Engine select current_schema()
2025-08-25 06:42:52,158 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-25 06:42:52,160 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-08-25 06:42:52,161 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-25 06:42:52,163 INFO sqlalchemy.engine.Engine BEGIN (implicit; DBAPI should not BEGIN due to autocommit mode)
2025-08-25 06:42:52,165 INFO sqlalchemy.engine.Engine CREATE SCHEMA IF NOT EXISTS HSL
2025-08-25 06:42:52,165 INFO sqlalchemy.engine.Engine [generated in 0.00284s] {}
2025-08-25 06:42:52,167 INFO sqlalchemy.engine.Engine CREATE SCHEMA IF NOT EXISTS D2
2025-08-25 06:42:52,169 INFO sqlalchemy.engine.Engine [generated in 0.00160s] {}
2025-08-25 06:42:52,170 INFO sqlalchemy.engine.Engine ROLLBACK using DBAPI connection.rollback(), DBA

## População dos schemas dos hospitais

Abaixo descrevemos a lógica para população dos bancos de dados. Os datasets que descrevem a cada tabela de cada banco de dados estão localizados em uma pasta dataset colocada no mesmo diretório que este notebook.
```
❄️impure ❯ exa --tree
.
├── datasets
│   ├── BPSP
│   │   ├── BPSP_Desfechos.csv
│   │   ├── BPSP_ExamLabs.csv
│   │   └── BPSP_Pacientes.csv
│   ├── Einstein
│   │   ├── Einstein_ExamLabs.csv
│   │   └── Einstein_Pacientes.csv
│   ├── GrupoFleury
│   │   ├── GrupoFleury_ExamLabs.csv
│   │   └── GrupoFleury_Pacientes.csv
│   ├── HC
│   │   ├── HC_ExamLabs.csv
│   │   └── HC_Pacientes.csv
│   └── HSL
│       ├── HSL_Desfechos.csv
│       ├── HSL_ExamLabs.csv
│       └── HSL_Pacientes.csv
└── 'Geração de Databases com SQLalchemy e PostgreSQL.ipynb'
```

Como se vê, os datasets encontram-se nomeados de maneira padronizada, e os nomes das colunas de suas tabelas correspondem aos nomes dados aos atributos das classes que aqui descrevem as tabelas contidas nos BDs.

In [21]:
def batch_insert(session: Session, hospital: str, table_name: str, table_class) -> None:
    chunks = pd.read_csv(
        f"{datasets_folder}/{hospital}/{hospital}_{table_name}.csv",
        delimiter="|",
        encoding="utf-8",
        low_memory=False,
        chunksize=BATCH_SIZE,
        dtype = {'CD_Unidade': str}
    )
    pbar = tqdm(
        chunks, desc=f"Populating table {table_name} from {hospital} schema"
    )
    
    if table_name == "Pacientes":
        for df in pbar:
            df.rename(columns=normalize_column_name, inplace=True)
            # AA_Nascimento verification. Condition 1: Value is 'AAAA' or 'YYYY'
            is_placeholder = df['AA_Nascimento'].isin(['AAAA', 'YYYY'])
    
            # Condition 2: Value is a 4-digit string
            # Ensure it's a string before using .str accessor
            is_4_digit_year = (df['AA_Nascimento'].astype(str).str.isdigit()) & \
                              (df['AA_Nascimento'].astype(str).str.len() == 4)
    
            # Combine conditions: A row is valid if it meets Condition 1 OR Condition 2
            valid_mask = is_placeholder | is_4_digit_year
    
            df.loc[~valid_mask, 'AA_Nascimento'] = None
            df['CD_Pais'] = df['CD_Pais'].replace('XX', None)
            df['CD_UF'] = df['CD_UF'].replace('UU', None)
            df['CD_Municipio'] = df['CD_Municipio'].replace('MMMM', None)
            df['CD_CEPReduzido'] = df['CD_CEPReduzido'].replace('CCCC', None)
            session.execute(insert(table_class), df.to_dict('records'))
    else:
        result = session.execute(text('SELECT "ID_Paciente" FROM "Pacientes"'))
        valid_patient_ids = {row[0] for row in result}
        if table_name == "ExamLabs":
            for df in pbar:
                df.rename(columns=normalize_column_name, inplace=True)
                df = df[df['ID_Paciente'].isin(valid_patient_ids)].copy()
                df['DT_Coleta'] = parse_date(df['DT_Coleta'])
                df = df.astype(object).where(pd.notna(df), None)
                session.execute(insert(table_class), df.to_dict('records'))
        else:
            for df in pbar:
                df.rename(columns=normalize_column_name, inplace=True)
                df = df[df['ID_Paciente'].isin(valid_patient_ids)].copy()
                df['DT_Atendimento'] = parse_date(df['DT_Atendimento'])
                df['DT_Desfecho'] = parse_date(df['DT_Desfecho'])
                df = df.astype(object).where(pd.notna(df), None)
                session.execute(insert(table_class), df.to_dict('records'))


for hospital in hospitals:
    engine = create_engine(
        DATABASE_URI,
        connect_args={'options': f'-c search_path={hospital}'},
    )
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        Base.metadata.create_all(engine)
        session.commit()

        for table_name, table_class in tables_dict.items():
            try:
                batch_insert(session, hospital, table_name, table_class)
                session.commit()
            except FileNotFoundError:
                pass


Populating table Pacientes from HSL schema: 1it [00:02,  2.60s/it]
Populating table ExamLabs from HSL schema: 1it [02:48, 168.95s/it]
Populating table Desfechos from HSL schema: 1it [00:05,  5.09s/it]


O resultado esperado desta execução é a criação dos seguintes BDs estruturados tal qual exibe os seguinte diagrama gerado usando a ferramenta DBeaver:

![Estrutura do BD, onde Pacientes figura como uma tabela associada a ExamLabs e Despachos](imgs/db_structure.png)

# Criação de novo banco de dados para a análise de dados

Em seguida, criamos um novo BD para conter dados agregados a todos os demais BDs. Isto, conforme os critérios de seleção vistos na tabela AnalisesCovid, descrita a seguir.

In [43]:
class AnaliseCovid(PacienteBase):
    """
    Tabela de análises Covid-19 FAPESP

    Complementa a tabela Paciente com dados relevantes sobre atendimentos,
    exames e desfechos extraídos das tabelas ExamLabs e Desfechos para análise
    epidemiológica de casos associados ao COVID-19
    """

    __tablename__: str = "AnalisesCovid"
    id: Mapped[int] = column(autoincrement=True, primary_key=True)

    # Aggregated data
    ID_Paciente: Mapped[str] = column(
        comment="Identificação única do paciente (32 caracteres alfanuméricos)"
    )
    ID_Atendimento: Mapped[str | None] = column(
        comment="Identificação única do atendimento (32 caracteres alfanuméricos)"
    )
    DT_Coleta: Mapped[date | None] = column(
        comment="Data em que o material foi coletado para exame"
    )
    DT_Atendimento: Mapped[date | None] = column(
        comment="Data de realização do atendimento"
    )
    DT_Desfecho: Mapped[date | None] = column(
        comment="Data do desfecho do paciente (alta, óbito, etc.)"
    )
    DE_Desfecho: Mapped[str | None] = column(
        comment="Descrição do desfecho do paciente (ex: 'Alta médica melhorado', 'Óbito')"
    )
    DE_Exame: Mapped[str | None] = column(
        comment="Descrição do exame realizado (ex: 'Teste COVID-19', 'Hemograma')"
    )
    DE_Resultado: Mapped[str | None]

    # Added metadata
    DE_Classe: Mapped[str | None] = column(
        Enum('P', 'N', name='classe_enum'),
        comment="Resultado do exame COVID-19 simplificado (P - Positivo, N - Negativo, None - Outro/Indeterminado)"
    )
    DE_Hospital: Mapped[str] = column(
        comment="Identificação do hospital de origem dos dados (BPSP, Einstein, GrupoFleury, HC, HSL)"
    )

class PacienteD2(Paciente):
    DE_Hospital: Mapped [str]

class ExamLabD2(ExamLab):
    DE_Hospital: Mapped [str]

class DesfechoD2(Desfecho):
    DE_Hospital: Mapped [str]

## População do BD D2 com dados dos demais BDs

Em seguida acessamos aos demais BDs um a um e criamos registros correspondentes no DB D2:

In [11]:
# Populate a new database D2 with its tables
engine = create_engine(
    DATABASE_URI,
    connect_args={'options': f'-c search_path=D2'},
    echo = True
)
Session = sessionmaker(bind=engine)

with Session() as session:
    tables = [
        AnaliseCovid.__table__,
        PacienteD2.__table__,
        ExamLabD2.__table__,
        DesfechoD2.__table__,
    ]
    Base.metadata.create_all(engine, tables=tables)
    session.commit()

2025-08-25 10:34:00,120 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-08-25 10:34:00,121 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-25 10:34:00,124 INFO sqlalchemy.engine.Engine select current_schema()
2025-08-25 10:34:00,125 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-25 10:34:00,126 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-08-25 10:34:00,127 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-25 10:34:00,130 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-08-25 10:34:00,135 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname

In [13]:
engine = create_engine(DATABASE_URI)

with engine.connect() as conn:
    conn.execution_options(isolation_level="AUTOCOMMIT")

    for table_name, table_class in tables_dict.items():
        for hospital in hospitals:
            print(f"--- Copying {hospital}.{table_name} data ---")    
            columns = [
                col for col in table_class.__table__.columns if col.name != "id" 
            ]
            insert_columns = ", ".join([f'"{col.name}"' for col in columns])

            # Apply the explicit DOUBLE CAST for Enum types
            select_columns = ", ".join([
                f'"{col.name}"::text::d2.{col.type.name}'
                if isinstance(col.type, Enum)
                else f'"{col.name}"'
                for col in columns
            ])
            
            copy_sql = f"""
                INSERT INTO D2."{table_name}" ({insert_columns}, "DE_Hospital")
                SELECT {select_columns}, '{hospital}' as DE_Hospital 
                FROM {hospital}."{table_name}"
            """
            try:
                conn.execute(text(copy_sql))
                print(f"Copied {hospital}.{table_name} to D2.{table_name}")
            except Exception as e:
                print(f"Error copying {hospital}.{table_name}: {e}")


Successfully prepared table D2.Pacientes
--- Copying BPSP.Pacientes data ---
Copied BPSP.Pacientes to D2.Pacientes
--- Copying Einstein.Pacientes data ---
Copied Einstein.Pacientes to D2.Pacientes
--- Copying GrupoFleury.Pacientes data ---
Copied GrupoFleury.Pacientes to D2.Pacientes
--- Copying HC.Pacientes data ---
Copied HC.Pacientes to D2.Pacientes
--- Copying HSL.Pacientes data ---
Copied HSL.Pacientes to D2.Pacientes
Successfully prepared table D2.ExamLabs
--- Copying BPSP.ExamLabs data ---
Copied BPSP.ExamLabs to D2.ExamLabs
--- Copying Einstein.ExamLabs data ---
Copied Einstein.ExamLabs to D2.ExamLabs
--- Copying GrupoFleury.ExamLabs data ---
Copied GrupoFleury.ExamLabs to D2.ExamLabs
--- Copying HC.ExamLabs data ---
Copied HC.ExamLabs to D2.ExamLabs
--- Copying HSL.ExamLabs data ---
Copied HSL.ExamLabs to D2.ExamLabs
Successfully prepared table D2.Desfechos
--- Copying BPSP.Desfechos data ---
Copied BPSP.Desfechos to D2.Desfechos
--- Copying Einstein.Desfechos data ---
Copied 

In [None]:
engine = create_engine(
    DATABASE_URI,
    connect_args={'options': f'-c search_path=D2'},
)
Session = sessionmaker(bind=engine)

with Session() as session:
    query = (
        session.query(
            PacienteD2,          # Select the entire PacienteD2 object
            ExamLabD2.DE_Exame,
            ExamLabD2.DE_Resultado,
            ExamLabD2.DT_Coleta,
            ExamLabD2.ID_Atendimento,
            DesfechoD2.DE_Desfecho,
            DesfechoD2.DT_Desfecho,
            DesfechoD2.DT_Atendimento
        )
        .select_from(PacienteD2)
        .join(ExamLabD2, PacienteD2.ID_Paciente == ExamLabD2.ID_Paciente)
        .join(
            DesfechoD2,
            (PacienteD2.ID_Paciente == DesfechoD2.ID_Paciente)
            & (ExamLabD2.ID_Atendimento == DesfechoD2.ID_Atendimento),
        )
        .filter(
            ExamLabD2.DE_Exame.ilike("%covid%")
            | ExamLabD2.DE_Exame.ilike("%corona%")
            | ExamLabD2.DE_Exame.ilike("%sars%")
        )
    )

    records = []
    for chunk in tqdm(query.yield_per(BATCH_SIZE), desc="Processando registros"):
        temp_exam = ExamLab(
            DE_Exame=chunk.DE_Exame, DE_Resultado=chunk.DE_Resultado
        )

        match temp_exam.DE_resultNum:
            case -1000:
                classe = "P"
            case -1111:
                classe = "N"
            case _:
                classe = None

        records.append (
            {
                "ID_Paciente": chunk.PacienteD2.ID_Paciente,
                "ID_Atendimento": chunk.ID_Atendimento,
                "IC_Sexo": chunk.PacienteD2.IC_Sexo,
                "AA_Nascimento": chunk.PacienteD2.AA_Nascimento,
                "CD_UF": chunk.PacienteD2.CD_UF,
                "CD_Pais": chunk.PacienteD2.CD_Pais,
                "CD_Municipio": chunk.PacienteD2.CD_Municipio,
                "CD_CEPReduzido": chunk.PacienteD2.CD_CEPReduzido,
                "DT_Atendimento": chunk.DT_Atendimento,
                "DT_Coleta": chunk.DT_Coleta,
                "DE_Exame": chunk.DE_Exame,
                "DT_Desfecho": chunk.DT_Desfecho,
                "DE_Desfecho": chunk.DE_Desfecho,
                "DE_Resultado": chunk.DE_Resultado,
                "DE_Classe": classe,
                "DE_Hospital": chunk.PacienteD2.DE_Hospital,
            }
        )
    if records:
        session.execute(insert(AnaliseCovid), records)
        session.commit()

df = pd.read_sql_table("AnalisesCovid", engine)
print("\n--- Verificando tabela resultante ---")
print(df.head())
print(f"\nTotal de entradas em AnalisesCovid: {len(df)}")

Processando registros: 0it [00:00, ?it/s]