In [2]:
import pyodbc
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
import pandas as pd
import warnings
import re

In [3]:
def get_dataconn():
    """
    Descripcion:
    Funcion que crea una conexion a la base de datos de SQL Server
    Retorna:
    connection: Conexion a la base de datos de SQL Server
    """
    load_dotenv()
    server = os.getenv('DB_SERVER')
    database = os.getenv('DB_DATABASE')
    username = os.getenv('DB_USER')
    password = os.getenv('DB_PASSWORD')
    connection_string = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'
    return connection_string

In [4]:
def read_consult():
    """
    Descripcion:
    Funcion que lee el archivo consult.sql y retorna el contenido del archivo
    Retorna:
    consult: Contenido del archivo consult.sql
    """
    with open ('consults/consultH.sql', 'r') as file:
        consult = file.read()
    return consult

In [5]:
def get_dataset(connection_string):
    """
    Descripcion:
    Funcion que realiza una consulta a la base de datos de SQL Server y retorna un DataFrame con los resultados
    Retorna:
    df: DataFrame con los resultados de la consulta
    """
    try:
        conn = pyodbc.connect(connection_string)
        cursor = conn.cursor()
        query = read_consult()
        df = pd.read_sql_query(query, conn)
        return df
    except Exception as e:
        print(f'Error: {e}')
        exit()

In [None]:
def Grupo_Folio_TCEJEI(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y TCEJEI_FormatoTrastornoClinicoEjeIId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y TCEJEI_FormatoTrastornoClinicoEjeIId
    """

    grupo = df.groupby(["FolioId", "TCEJEI_FormatoTrastornoClinicoEjeIId"]).agg(list).reset_index()
    return grupo

In [None]:
def Grupo_Folio_TCEJEII(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y TCEJEII_FormatoTrastornoClinicoEjeIIId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y TCEJEII_FormatoTrastornoClinicoEjeIIId
    """
    
    grupo = df.groupby(["FolioId", "TCEJEII_FormatoTrastornoClinicoEjeIIId"]).agg(list).reset_index()
    return grupo

In [None]:
def Grupo_Folio_TCEJEIII(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y TCEJEIII_FormatoTrastornoClinicoEjeIIIId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y TCEJEIII_FormatoTrastornoClinicoEjeIIIId
    """
    grupo = df.groupby(["FolioId", "TCEJEIII_FormatoTrastornoClinicoEjeIIIId"]).agg(list).reset_index()
    return grupo

In [None]:
def Grupo_Folio_TCEJEIV(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y TCEJEIV_FormatoTrastornoClinicoEjeIVId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y TCEJEIV_FormatoTrastornoClinicoEjeIV
    """
    grupo = df.groupby(["FolioId", "TCEJEIV_FormatoTrastornoClinicoEjeIVId"]).agg(list).reset_index()
    return grupo

In [None]:
def Grupo_Folio_HCEF(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y HCEF_FormatoExploracionFisicaId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y HCEF_FormatoExploracionFisicaId
    """
    
    grupo = df.groupby(["FolioId", "HCEF_FormatoExploracionFisicaId"]).agg(list).reset_index()
    return grupo

In [None]:
def Grupo_Folio_HCMA(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y HCMA_FormatoMetodosAnticonceptivosId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y HCMA_FormatoMetodosAnticonceptivosId
    """
    grupo = df.groupby(["FolioId", "HCMA_FormatoMetodosAnticonceptivosId"]).agg(list).reset_index()
    return grupo

In [None]:
def Grupo_Folio_HCPRO(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y HCPRO_FormatoPronosticoId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y HCPRO_FormatoPronosticoId
    """
    grupo = df.groupby(["FolioId", "HCPRO_FormatoPronosticoId"]).agg(list).reset_index()
    return grupo

In [None]:
def Grupo_Folio_HCID(df):
    """
    Descripcion:
    Funcion que agrupa los registros por FolioId y HCID_FormatoImpresionDiagnosticaId
    Parametros:
    df: DataFrame con los registros a agrupar
    Retorna:
    grupo: DataFrame agrupado por FolioId y HCID_FormatoImpresionDiagnosticaId
    """
    grupo = df.groupby(["FolioId", "HCID_FormatoImpresionDiagnosticaId"]).agg(list).reset_index()
    return grupo

In [None]:
def drop_rows (group_sep):
    """
    Descripción:
    Esta función elimina las filas duplicadas de un DataFrame de pandas agrupado por FolioId y SustanciaId.
    Parametros:
    group_sep: DataFrame de pandas
    Return:
    group_sep: DataFrame de pandas sin filas duplicadas
    """
    group_sep.drop_duplicates( subset = "FolioId" , keep = "first" , inplace = True)
    return group_sep

In [None]:
def piv (grupo_mod , list_columns):
    """
    Descripción:
    Esta función realiza una tabla pivote de un DataFrame de pandas agrupado por FolioId y SustanciaId.
    Parametros:
    grupo_mod: DataFrame de pandas
    list_columns: lista de columnas a procesar
    Return:
    df_complete: DataFrame de pandas con tabla pivote
    """
    
    df_complete = pd.DataFrame()
    for valor , group_sep in grupo_mod.groupby("FolioId"):
        for i in range(len(group_sep.index)):
            for col in list_columns:
                if i != 0 :
                    aux_name_col = col + str(i)
                    if aux_name_col not in group_sep.columns:
                        group_sep[aux_name_col] = np.nan
                    group_sep[aux_name_col] = group_sep[col].iloc[i]
        df_complete = pd.concat([df_complete, group_sep])
    return df_complete

In [None]:
def extract (x):
    """
    Descripción:
    Esta función extrae el primer elemento de una lista.
    Parametros:
    x: lista
    Return:
    x[0]: primer elemento de la lista
    """

    if isinstance(x , list):
        return x[0] if len(x) > 0 else None
    return x

In [None]:
def trat (df_completemerge):
    """
    Descripción:
    Esta función aplica la función extract a todas las columnas de un DataFrame de pandas.
    Parametros:
    df_completemerge: DataFrame de pandas
    Return:
    df_completemerge: DataFrame de pandas con la función extract aplicada a todas las columnas
    """
    
    for col in df_completemerge.columns:
        df_completemerge[col] = df_completemerge[col].apply(extract)

In [None]:
def busqueda_Folio( df_completemerge, val_folio , ind_folio):
    """
    Descripción:
    Esta función busca el índice de un valor en una columna de un DataFrame de pandas.
    Parametros:
    df_completemerge: DataFrame de pandas
    val_folio: valor a buscar
    ind_folio: índice a buscar
    Return:
    ind: índice del valor en la columna
    """
    
    for ind , val in df_completemerge["FolioId"].items():
        if val == val_folio and ind != ind_folio:
            return ind

In [None]:
def AcomDa (df_completemerge , ind , ind_igual):
    """
    Descripción:
    Esta función acomoda los valores de un DataFrame de pandas.
    Parametros:
    df_completemerge: DataFrame de pandas
    ind: índice a acomodar
    ind_igual: índice a acomodar
    Return:
    df_completemerge: DataFrame de pandas acomodado
    """
    
    if ind_igual == None:
        return df_completemerge
    else:
        for col in df_completemerge.columns:
            valor_1 = df_completemerge[col].iloc[ind]
            valor_2 = df_completemerge[col].iloc[ind_igual]
            if pd.isna(valor_1) and pd.notna(valor_2):
                df_completemerge[col].iloc[ind] = valor_2
            elif pd.notna(valor_1) and pd.isna(valor_2):
                df_completemerge[col].iloc[ind_igual] = valor_1
        return df_completemerge

In [None]:
def limp (df_completemerge):
    """
    Descripción:
    Esta función elimina los valores duplicados de un DataFrame de pandas.
    Parametros:
    df_completemerge: DataFrame de pandas
    Return:
    df_new: DataFrame de pandas sin valores duplicados
    """
    
    for ind , val in df_completemerge["FolioId"].items():
        ind_igual = busqueda_Folio(df_completemerge , val , ind)
        df_new = AcomDa(df_completemerge , ind , ind_igual)
    df_new.drop_duplicates(inplace = True)
    return df_new

In [None]:
def main(df):
    """
    Descripción:
    Esta función es la principal y ejecuta las funciones anteriores.
    Parametros:
    df: DataFrame de pandas
    Return:
    df_new: DataFrame de pandas sin valores duplicados
    """
    warnings.filterwarnings("ignore")
    grupo = Grupo_Folio_TCEJEI(df)
    trat(grupo)
    list_columns = ["TCEJEI_FormatoTrastornoClinicoEjeIId", "TCEJEI_FormatoImpresionDiagnosticaId", "TCEJEI_CodigoCompuesto","TCEJEI_ComunNomenclaturaId", "TCEJEI_ComunConsumoId", "TCEJEI_Especifique"]
    df_complete = piv(grupo , list_columns)
    df_complete = limp(df_complete)

    grupo2 = Grupo_Folio_TCEJEII(df)
    trat(grupo2)
    list_columns = ["TCEJEII_FormatoTrastornoClinicoEjeIIId", "TCEJEII_FormatoImpresionDiagnosticaId", "TCEJEII_CodigoCompuesto", "TCEJEII_ComunNomenclaturaId", "TCEJEII_Especifique"]
    df_complete2 = piv(grupo2 , list_columns)
    df_complete2 = limp(df_complete2)

    grupo3 = Grupo_Folio_TCEJEIII(df)
    trat(grupo3)
    list_columns = ["TCEJEIII_FormatoTrastornoClinicoEjeIIIId", "TCEJEIII_FormatoImpresionDiagnosticaId", "TCEJEIII_CodigoCompuesto", "TCEJEIII_ComunNomenclaturaId","TCEJEIII_ComunConsumoId" , "TCEJEIII_Especifique"]
    df_complete3 = piv(grupo3 , list_columns)
    df_complete3 = limp(df_complete3)

    grupo4 = Grupo_Folio_TCEJEIV(df)
    trat(grupo4)
    list_columns = ["TCEJEIV_FormatoTrastornoClinicoEjeIVId", "TCEJEIV_FormatoImpresionDiagnosticaId", "TCEJEIV_Codigo" ,"TCEJEIV_ComunNomenclaturaId","TCEJEIV_ComunConsumoId", "TCEJEIV_Especifique"]
    df_complete4 = piv(grupo4 , list_columns)
    df_complete4 = limp(df_complete4)

    grupo5 = Grupo_Folio_HCEF(df)
    trat(grupo5)
    list_columns = ["HCEF_FormatoExploracionFisicaId", "HCEF_FormatoId", "HCEF_SignosVitalesPeso", "HCEF_SignosVitalesTalla", "HCEF_SignosVitalesIMCComunId", "HCEF_SignosVitalesFC", "HCEF_SignosVitalesFR", "HCEF_SignosVitalesPulso", "HCEF_SignosVitalesTA", "HCEF_SignosVitalesTAHg", "HCEF_SignosVitalesTemperatura", "HCEF_HabitusExterior", "HCEF_ExploracionEspecializadaComunId", "HCEF_ExploracionEspecializadaCuales" ]
    df_complete5 = piv(grupo5 , list_columns)
    df_complete5 = limp(df_complete5)
    
    grupo6 = Grupo_Folio_HCMA(df)
    trat(grupo6)
    list_columns = ["HCMA_FormatoMetodosAnticonceptivosId", "HCMA_FormatoId", "HCMA_ComunMetodoAnticonceptivoId"]
    df_complete6 = piv(grupo6 , list_columns)
    df_complete6 = limp(df_complete6)
    
    grupo7 = Grupo_Folio_HCPRO(df)
    trat(grupo7)
    list_columns = ["HCPRO_FormatoPronosticoId","HCPROFormatoId", "HCPRO_ParaVidaId", "HCPRO_ParaFuncionId", "HCPRO_ParaVidaFuncionId", "HCPRO_Observaciones"]
    df_complete7 = piv(grupo7 , list_columns)
    df_complete7 = limp(df_complete7)

    grupo8 = Grupo_Folio_HCID(df)
    trat(grupo8)
    list_columns = ["HCID_FormatoImpresionDiagnosticaId", "HCID_FormatoId", "HCID_DiagnosticoTrastornoClinicoEjeI", "HCID_DiagnosticoTrastornoClinicoEjeII", "HCID_DiagnosticoTrastornoClinicoEjeIII", "HCID_DiagnosticoTrastornoClinicoEjeIV" , "HCID_PeriodoReferidoComunId"]
    df_complete8 = piv(grupo8 , list_columns)
    df_complete8 = limp(df_complete)

    df_complete = pd.concat([df_complete, df_complete2, df_complete3, df_complete4, df_complete5, df_complete6, df_complete7, df_complete8], ignore_index=True) 
    trat(df_complete)
    df_complete = limp(df_complete)
    drop_rows(df_complete)
    return df_complete

In [None]:
connection_string = get_dataconn()
df = get_dataset(connection_string)
df.to_csv('dataset/SQLHistoriaClinica.csv', index = False)
warnings.filterwarnings("ignore")
df_final = pd.DataFrame()
chunksize = 100
000
for chunk in pd.read_csv("results/HistoriaClinica.csv", chunksize=chunksize):
    df = chunk 
    df = main(df)
    df_final = pd.concat([df_final, df], ignore_index=True)

In [None]:
df_final.to_csv("BSD_HistoriaClinica2021_2023M.csv", index=False)