# **Data Models**

In [1]:
import os
import sys
import numpy as np
import pandas as pd
from tqdm import tqdm
import datetime as dt
from simpledbf import Dbf5
from pydantic import BaseModel

PyTables is not installed. No support for HDF output.


In [2]:
from sqlalchemy import create_engine
from sqlalchemy import Table, MetaData
from sqlalchemy import select, insert, update, text, inspect
from sqlalchemy import Column, DateTime, Integer, Numeric, String, Sequence, ForeignKey, CheckConstraint

In [3]:
from sqlalchemy.exc import IntegrityError

## Data sample

 - SIVEP-Gripe (500 records).

In [4]:
basepath = os.path.join(os.environ["HOMEPATH"], "Documents", "data")
siveppath = os.path.join(basepath, "SIVEP-GRIPE", "MILLENA_14JUN2023")
fname = "SRAGHOSPITALIZADO1930520_00.dbf"

sivep_df = Dbf5(os.path.join(siveppath, fname), codec="latin").to_dataframe()
sample_df = sivep_df.sample(n=500, random_state=1)

## **Define Warehouse**

**Data Models**

In [362]:
class smart_dict(dict):
    def __missing__(self, x):
        if pd.notna(x):
            return x
        return None

class SivepGripe:
    def __init__(self, metadata):
        self.metadata = metadata
    
    def define(self):
        # --> Data model
        tb_name = 'sivep_gripe'
        self.sivep_warehouse = Table(
            tb_name, self.metadata,
            Column("ID_SIVEP", String, primary_key=True),
            Column("DATA_NOTIFICACAO", DateTime, nullable=False),
            Column("NOME_PACIENTE", String, nullable=True),
            Column("DATA_NASCIMENTO", DateTime, nullable=True),
            Column("NOME_MAE", String, nullable=True),
            Column("MUNICIPIO_RESIDENCIA", String, nullable=True),
            Column("BAIRRO_RESIDENCIA", String, nullable=True),
            Column("LOGRADOURO", String, nullable=True),
            Column("LOGRADOURO_NUMERO", String, nullable=True),
            Column("CEP", String, nullable=True),
            Column("CNS", String, nullable=True),
            Column("CPF", String, nullable=True),
            Column("CRIADO_EM", DateTime, default=dt.datetime.now),
            Column("ATUALIZADO_EM", DateTime, default=dt.datetime.now, onupdate=dt.datetime.now),
        )
        # --> Data mapping (could be imported if too big)
        mapping = {
            "NU_NOTIFIC" : "ID_SIVEP",  "DT_NOTIFIC": "DATA_NOTIFICACAO",
            "NM_PACIENT": "NOME_PACIENTE", "DT_NASC": "DATA_NASCIMENTO",
            "NM_MAE_PAC": "NOME_MAE", "CO_MUN_RES": "MUNICIPIO_RESIDENCIA",
            "NM_BAIRRO": "BAIRRO_RESIDENCIA", "NM_LOGRADO": "LOGRADOURO",
            "NU_NUMERO": "LOGRADOURO_NUMERO", "NU_CEP": "CEP", 
            "NU_CNS": "CNS", "NU_CPF": "CPF"
        }
        return { tb_name : self.sivep_warehouse }, { tb_name : mapping }
    
class SIM:
    def __init__(self, metadata):
        self.metadata = metadata
    
    def define(self):
        # --> Data model
        tb_name = 'sim'
        self.sim_warehouse = Table(
            tb_name, self.metadata,
            Column("ID_SIM", String, primary_key=True),
            Column("DATA_OBITO", DateTime, nullable=False),
            Column("NOME_PACIENTE", String, nullable=True),
            Column("DATA_NASCIMENTO", DateTime, nullable=True),
            Column("NOME_MAE", String, nullable=True),
            Column("NOME_PAI", String, nullable=True),
            Column("MUNICIPIO_RESIDENCIA", String, nullable=True),
            Column("BAIRRO_RESIDENCIA", String, nullable=True),
            Column("LOGRADOURO", String, nullable=True),
            Column("LOGRADOURO_NUMERO", String, nullable=True),
            Column("CEP", String, nullable=True),
            Column("CNS", String, nullable=True),
            Column("CPF", String, nullable=True),
            Column("CRIADO_EM", DateTime, default=dt.datetime.now),
            Column("ATUALIZADO_EM", DateTime, default=dt.datetime.now, onupdate=dt.datetime.now),
        )
        # --> Data mapping (could be imported if too big)
        mapping = {
            "NUMERODO" : "ID_SIM",  "DTOBITO": "DATA_OBITO",
            "NOME": "NOME_PACIENTE", "DTNASC": "DATA_NASCIMENTO",
            "NOME_MAE": "NOME_MAE", "CO_MUN_RES": "MUNICIPIO_RESIDENCIA",
            "NM_BAIRRO": "BAIRRO_RESIDENCIA", "NM_LOGRADO": "LOGRADOURO",
            "NU_NUMERO": "LOGRADOURO_NUMERO", "NU_CEP": "CEP", 
            "NU_CNS": "CNS", "NU_CPF": "CPF"
        }
        return { tb_name : self.sivep_warehouse }, { tb_name : mapping }
    

**Datasus warehouse**

In [349]:
class WarehouseSUS:
    '''
        Data warehouse to store personal identification from DATASUS-specific databases.
        
        To assist the procedures of data matching within and between specific databases originated
        from DATASUS information systems, this class manages the storage and also CRUD operations
        on individual records. Only information identifying individuals in the original data are stored.
        
        Args:
        -----
            engine_url:
                String. Absolute path to the warehouse database.
                
        Attributes:
        -----------
            tables:
                Dictionary. Following a key-value schema, it stores the SQLALCHEMY data models defined for 
                the database. Keys refer to the specific data models names. 
            mappings:
                Dictionary. Following a key-value schema, it stores the field relations between the original
                data sources and the schema used in the data models. 
    '''
    def __init__(self, engine_url):
        self._engine = create_engine(engine_url)
        self._metadata = MetaData()
        self.tables = {}
        self.mappings = {}
        
    @property
    def engine(self):
        return self._engine
    
    @engine.setter
    def engine(self, v):
        raise Exception()
        
    @property
    def metadata(self):
        return self._metadata
    
    @metadata.setter
    def metadata(self, v):
        raise Exception()
        
    # -------------------------------------------------
        
    def models(self):
        # --> Get the models
        # ----> (SIVEP-Gripe)
        sivep_updt, sivep_mapping = SivepGripe(self._metadata).define()
        self.tables.update(sivep_updt)
        self.mappings.update(sivep_mapping)
        
    def create_all(self):
        self._metadata.create_all(self._engine)
        return self._engine
    
    def db_init(self):
        self.models()
        engine = self.create_all()
        return engine
    
    def include(self, table_name, data_df, batchsize=50, verbose=True):
        '''
            Insert new records from a given dataframe.
            
            Args:
            -----
                table_name:
                    String. Table name inside the database. Possible to extract
                    from 'self.tables'.
                data_df:
                    pandas.DataFrame. Records to be inserted. Schema should match the 
                    official data sources. For instance, if the data source is SIVEP-Gripe,
                    then the columns must match the original ones. 
                chunksize:
                    Integer. Size of the batches of records to insert in the table.
        '''
        # - Load the data model and the schema mapping from 'tb_name' and rename the columns of 'data_df'
        table_model, table_mapping = self.tables[table_name], self.mappings[table_name]
        try:
            data_df = data_df.rename(table_mapping, axis=1, errors='raise')
        except:
            Exception('Data source schema could not be properly mapped.')
        
        # - Define 'smart_hash' to avoid 'NaN' values in the records during insert
        nonan_hash = smart_dict()
        # - Perform batch insertion of records into the table.
        data_df = data_df[ table_mapping.values() ]
        splitted_data = np.split(data_df, np.arange(batchsize, data_df.shape[0]+1, batchsize))
        for nindex, current_batch in enumerate(splitted_data):
            if verbose:
                print(f'Insertion of batch {nindex+1} of {len(splitted_data)} ... ', end='')
            
            # - Format records to be inserted
            records = [ { field : nonan_hash[val] for field, val in btc.items() } for btc in current_batch.to_dict(orient='records')]
            if len(records)==0: 
                print('no records ... done.')
                continue
        
            # --> insert batch
            try:
                ins = table_model.insert()
                with self._engine.connect() as conn:
                    rp = conn.execute(ins, records)
                    conn.commit()
            except IntegrityError as error:
                if verbose:
                    print(f'error: {error.args[0]} ... ', end='')
            
            if verbose:
                print('done.')
                
    def query(self, table_name, date_col=None, period=None):
        '''
            Select records from a specific table within the warehouse.
            
            Args:
            -----
                table_name:
                    String. Table name inside the database. Possible to extract
                    from 'self.tables'.
                date_col:
                    String. Column date of the table used for ordering and filtering
                    by period (if 'period' is provided).
                period:
                    2-tuple of datetime.datetime. Starting and ending dates of the period
                    selected for the query. This period is applied over the column name
                    parsed to 'date_col' variable. If the end date is not provided, then
                    datetime.datetime.today() is used.
                    
            Results:
                results:
                    List. List of sql table rows queried from the database. 
        '''
        # - Load  and select the data model
        table_model = self.tables[table_name]
        sel = select(table_model)
        
        # -- Build the query
        if date_col is not None:
            sel = sel.order_by(table_model.c[date_col])
            if period is not None:
                if period[1] is None:
                    period[1] = dt.datetime.today()
                sel = sel.where(table_model.c[date_col].between(period[0], period[1]))
                
        try:
            with engine.connect() as conn:
                rp = conn.execute(sel)
                results = [ record for record in rp ]
                return results
        except Exception as error:
            print(error.args[0])
            return []
        
    def update(self, table_name, primary_key_value, update_hash, verbose=True):
        '''
            Update a given record identified by its primary key value 'primary_key_value'.
            
            Args:
            -----
                table_name:
                    String.
                primary_key_value:
                    String.
                update_hash:
                    Dictionary.
        '''
        # - Load the data model and define update filtering
        table_model = self.tables[table_name]
        primary_key_name = [ p.name for p in inspect(table_model).primary_key ][0]
        updt = update(table_model).where(table_model.c[primary_key_name] == primary_key_value)
        updt = updt.values(update_hash)
        if verbose:
            print(f'Update query: {updt} ...', end='')
        
        try:
            with self._engine.connect() as conn:
                rp = conn.execute(updt)
                conn.commit()
        except IntegrityError as error:
            print(f'error: {error.args[0]}', end='')
        
        if verbose:
            print(' done.')
    
    def delete(self, list_of_records, verbose=True):
        '''
            Delete a list of records from the warehouse.
            
            Args:
            -----
                list_of_records:
                    List of unique IDs representing the primary key of the records 
                    to be deleted.
        '''
        # - Load the data model and extract the name of its primary key
        table_model = self.tables[table_name]
        primary_key_name = [ p.name for p in inspect(table_model).primary_key ][0]
        
        if list_of_records:
            for nindex, current_rec in enumerate(list_of_records):
                if verbose:
                    print(f'Deletion of record {current_rec} ({nindex+1}/{len(list_of_records)}) ... ', end='')
                
                try:
                    qdel = delete(table_model).where(table_model.c[primary_key_name]==current_rec)
                    with self._engine.connect() as conn:
                        rp = conn.execute(qdel)
                        conn.commit()
                except IntegrityError as error:
                    if verbose:
                        print(f'error: {error.args[0]}', end='')
                
                if verbose:
                    print('done.')
                    
    def delete_table(self, table_name, is_sure=False, authkey=""):
        '''
            Delete a given table from the database.
            
            Args:
                table_name. String. Table name inside the database. Possible to extract
                from 'self.tables'.
            is_sure:
                Bool. To delete table, it must be parsed as True.
            pkey:
                String. To delete table, it must be assigned to the correct string. For
                now, it avoids accidental deletions.
        '''
        sql_str = f"DROP TABLE IF EXISTS {table_name};"
        sql_query = text(sql_str)
        with self._engine.connect() as conn:
            if is_sure and authkey=="###!Y!.":
                rp = conn.execute(sql_query)
                conn.commit()
            else:
                raise Exception('delete table command called, but without assurance.')
                

In [350]:
# --> Paths
basepath = os.path.join(os.environ["HOMEPATH"], "Documents", "data")
suspath = os.path.join(basepath, "DATASUS_WAREHOUSE", "datasus_pessoas.db")
engine_url = f"sqlite:///{suspath}"

In [351]:
warehouse = WarehouseSUS(engine_url)
engine = warehouse.db_init()

In [352]:
print("Table names:")
list(warehouse.tables.keys())

Table names:


['sivep_gripe']

In [353]:
sample_df1 = sample_df.copy()
sample_df1["DT_NASC"] = pd.to_datetime(sample_df1["DT_NASC"], format="%d/%m/%Y", errors="coerce")
sample_df1["DT_NOTIFIC"] = pd.to_datetime(sample_df1["DT_NOTIFIC"], format="%d/%m/%Y", errors="coerce")
sample_df1.sample(n=4)

Unnamed: 0,DT_RES_AN,RES_AN,LAB_AN,CO_LAB_AN,POS_AN_FLU,TP_FLU_AN,POS_AN_OUT,AN_SARS2,AN_VSR,AN_PARA1,...,PAC_DSCBO,OUT_ANIM,DOR_ABD,FADIGA,PERD_OLFT,PERD_PALA,TOMO_RES,TOMO_OUT,DT_TOMO,TP_TES_AN
834,,,,,,,,,,,...,,,2.0,2.0,2.0,2.0,0.0,,,0.0
3242,,,,,,,,,,,...,,,,,,,0.0,,,0.0
1734,,5.0,,,,,,,,,...,,,2.0,2.0,2.0,2.0,6.0,,,0.0
2084,04/04/2023,,,,,,,,,,...,,,,,,,0.0,,,0.0


In [354]:
warehouse.include('sivep_gripe', sample_df1, batchsize=50, verbose=True)

Insertion of batch 1 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 2 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 3 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 4 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 5 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 6 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 7 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 8 of 11 ... error: (sqlite3.IntegrityError) UNIQUE constraint failed: sivep_gripe.ID_SIVEP ... done.
Insertion of batch 9 of 11 ... error: (s

In [357]:
qres = warehouse.query('sivep_gripe', date_col='DATA_NASCIMENTO', period=(dt.datetime(1990,1,1), dt.datetime(1999, 12, 31)))
pd.DataFrame(qres)

Unnamed: 0,ID_SIVEP,DATA_NOTIFICACAO,NOME_PACIENTE,DATA_NASCIMENTO,NOME_MAE,MUNICIPIO_RESIDENCIA,BAIRRO_RESIDENCIA,LOGRADOURO,LOGRADOURO_NUMERO,CEP,CNS,CPF,CRIADO_EM,ATUALIZADO_EM
0,31684413073675,2023-05-18,FRANCISCO ITAMAR BARROS NETO,1990-07-19,AURICELIA MARIA ARAUJO BARROS,230440,CJ SIQUEIRA,RUA 1 1012,1012,,,4747024301,2023-07-24 15:34:42.818705,2023-07-24 15:34:42.818705
1,31683724910341,2023-05-10,ANTONIO JOSE SILVA SANTOS,1991-10-01,MARIA LUCILENE SILVA SANTOS,231040,BELA VISTA,RUA VILA NOVA,353,,705003045272458.0,5852972312,2023-07-24 15:34:42.807410,2023-07-24 15:34:42.807410
2,31681408558580,2023-04-13,ISRAEL RODRIGO MARTINS DOS SANTOS,1996-09-13,MARIA ALDIZIA ALVES MARTINS,230440,NOVO MONDUBIM,105 DO CONJUNTO NOVO MONDUBIM,20,60764280.0,707800665579718.0,3398363342,2023-07-24 15:34:42.832044,2023-07-24 15:34:42.832044


In [333]:
type(qres[0])

sqlalchemy.engine.row.Row

In [356]:
update_hash = { 'NOME_PACIENTE': 'ANTONIO JOSE SILVA SANTOX', "DATA_NASCIMENTO": dt.datetime(1991, 1, 1) }
warehouse.update('sivep_gripe', '316837249103X41', update_hash)

Update query: UPDATE sivep_gripe SET "NOME_PACIENTE"=:NOME_PACIENTE, "DATA_NASCIMENTO"=:DATA_NASCIMENTO, "ATUALIZADO_EM"=:ATUALIZADO_EM WHERE sivep_gripe."ID_SIVEP" = :ID_SIVEP_1 ... done.


In [336]:
warehouse.delete_table('sivep_gripe')

Exception: delete table command called, but without assurance.

In [337]:
warehouse.delete_table('sivep_gripe', is_sure=True, authkey="###!Y!.")

In [260]:
ex = warehouse.tables['sivep_gripe']

In [270]:
[ n.name for n in inspect(ex).primary_key ]

['ID_SIVEP']

In [149]:
# -- query
ex.c['ID_SIVEP']

Column('ID_SIVEP', String(), table=<sivep_gripe>, primary_key=True, nullable=False)

In [223]:
table_model = warehouse.tables['sivep_gripe']
sel = select(table_model).order_by(table_model.c["DATA_NOTIFICACAO"])
period = None
col_date_filter = None #"DATA_NOTIFICACAO"
        
# -- Build the query
if col_date_filter is not None:
    sel = sel.order_by(table_model.c[col_date_filter])
    if period is not None:
        sel = sel.where(table_model.c[col_date_filter].between(period[0], period[1]))

results = []        
try:
    with engine.connect() as conn:
        rp = conn.execute(sel)
        #results = rp.fetchall()
        results = [ record for record in rp ]
        #return results
except Exception as error:
    print(error.args[0])
    #return []

In [275]:
#pd.DataFrame(results)

In [281]:
# ************* DELETE **************
sqlt = text('DROP TABLE IF EXISTS sivep_gripe;')
with engine.connect() as conn:
    rp = conn.execute(sqlt)
    conn.commit()

In [79]:
import pydantic