# Migracion de Datos de OLTP a OLAP

In [1]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import logging 
import json

Lectura del Archivo  `config.json` el cual tiene las credenciales de ingreso a la Base de Datos

In [2]:
with open ('Credenciales/config.json', 'r') as file:
    config = json.load(file)

Crea la configuracion del Archivo .log en el cual se registraran los puntos clave de nuestro ETL

In [3]:
logging.basicConfig(
    level=logging.INFO,   # Establecemos el nivel mínimo para que se capture INFO y superior
    format='%(asctime)s - %(levelname)s - %(message)s',  # Formato de los logs
    handlers=[
        logging.StreamHandler(),  # Imprime en la consola
        logging.FileHandler("Mig_Data_ETL.log")  # Guarda los logs en un archivo 'app.log'
    ]
)

La Funcion <span style="color:#FFEB99">connect_db</span> Permite la ejecucion de consulas <span style="color:#98FF98">SQL</span> para crear DataFrames

In [4]:
engine = create_engine(f'mysql+pymysql://{config['Connection']['user']}:{config['Connection']['password']}@{config['Connection']['host']}:{config['Connection']['port']}/{config['Connection']['database']}')

def connect_db(query: str) -> pd.DataFrame:
    """
    Conecta a la base de datos, ejecuta una consulta y devuelve los resultados en un DataFrame de pandas.
    
    Parameters:
    query (str): La consulta SQL que se ejecutará en la base de datos.
    
    Returns:
    pd.DataFrame: El resultado de la consulta en formato de DataFrame.
    """
    try:
        # Establece la conexión con el motor de base de datos
        with engine.connect() as conn:
            # Ejecuta la consulta SQL y guarda el resultado en un DataFrame
            df = pd.read_sql_query(query, conn)
            logging.info('Datos Cargados')
        return df

    except SQLAlchemyError as e:
        # Captura excepciones específicas de SQLAlchemy
        logging.error(f"Error en la consulta: {e}")
        return pd.DataFrame()  # Devuelve un DataFrame vacío en caso de error

La Funcion <span style="color:#FFEB99">insert_data_to_sql</span> Permite la insercion de datos a <span style="color:#98FF98">SQL server</span>

In [5]:
engine_SQL_Server = create_engine(f"mssql+pyodbc://{config['Connection_SQL_Server']['username']}:{config['Connection_SQL_Server']['password']}@{config['Connection_SQL_Server']['server']}/{config['Connection_SQL_Server']['database']}?driver=ODBC Driver 17 for SQL Server")

# Función para insertar datos a una tabla usando Pandas to_sql
def insert_data_to_sql(df: pd.DataFrame, table_name: str, if_exists: str = 'append'):
    """
    Inserta los datos de un DataFrame en la tabla especificada de SQL Server.
    
    Args:
        df (pd.DataFrame): El DataFrame con los datos a insertar.
        table_name (str): El nombre de la tabla en SQL Server.
        if_exists (str): Especifica qué hacer si la tabla ya existe. Puede ser 'replace', 'append' o 'fail'.
    
    Returns:
        None
    """
    try:
        # Insertar los datos en la tabla especificada
        df.to_sql(table_name, con=engine_SQL_Server, index=False, if_exists=if_exists)
        logging.info(f"Datos insertados correctamente en la tabla '{table_name}'.")
    
    except Exception as e:
        logging.error(f"Error al insertar los datos en la tabla '{table_name}': {e}")

La Funcion <span style="color:#FFEB99">duplicates_Pk</span> verifica que no existan duplicados en una llave primaria 

In [6]:
def duplicates_Pk(df: pd.DataFrame, campo: str) -> pd.DataFrame:
    """
    Verifica si existen duplicados en la columna especificada de un DataFrame.
    
    Args:
        df (pd.DataFrame): El DataFrame en el que se verificarán los duplicados.
        campo (str): El nombre de la columna en la que se verificarán los duplicados.
    
    Raises:
        ValueError: Si se encuentran duplicados en la columna especificada, se lanza una excepción.
    
    Returns:
        pd.DataFrame: El DataFrame original si no se encuentran duplicados en la columna especificada.
    """
    # Verificar si existen duplicados en la columna especificada
    if df[campo].duplicated().any():
        # Mostrar los duplicados encontrados
        duplicates = df[df[campo].duplicated()]
        logging.error(f'Duplicados encontrados en la columna "{campo}":\n{duplicates}')
        
        # Lanzar excepción en lugar de finalizar el script
        raise ValueError(f'Duplicados en la columna "{campo}" del DataFrame.')
    
    # Caso exitoso si no se encuentran duplicados
    logging.info(f'No se encontraron duplicados en la columna "{campo}".')
    
    return df

La Funcion <span style="color:#FFEB99">Faltantes</span> verifica que no existan Faltantes en las tablas de dimensioens 

In [7]:

def Faltantes(df: pd.DataFrame, campos: list) -> pd.DataFrame:
    """
    Verifica si existen valores faltantes (NaN) en las columnas especificadas de un DataFrame.
    
    Args:
        df (pd.DataFrame): El DataFrame en el que se verificarán los valores faltantes.
        campos (list): Lista de nombres de las columnas a verificar.
    
    Raises:
        ValueError: Si se encuentran valores faltantes en alguna de las columnas especificadas.
    
    Returns:
        pd.DataFrame: El DataFrame original si no se encuentran valores faltantes en las columnas especificadas.
    """
    # Verificar si hay valores faltantes en las columnas especificadas
    faltantes = df[campos].isnull().sum()
    
    # Filtrar las columnas con valores faltantes
    columnas_faltantes = faltantes[faltantes > 0].index.tolist()
    
    if columnas_faltantes:
        logging.error(f'El DataFrame presenta valores faltantes en las siguientes columnas: {", ".join(columnas_faltantes)}')
        
        # Lanzar excepción con información adicional sobre las columnas faltantes
        raise ValueError(f"Valores faltantes en las columnas: {', '.join(columnas_faltantes)}")
    logging.info(f'No se encontraron valores faltantes en las columnas especificadas: {", ".join(campos)}')
    return df


La Funcion <span style="color:#FFEB99">relaciones</span> valida que las llaves foraneas existan en las llaves primaras de dimensiones 

In [8]:
def relaciones(df1: pd.DataFrame, key: str, df2: pd.DataFrame, foreing_key: str) -> pd.DataFrame:
    """
    Verifica que los valores en la columna `foreing_key` del DataFrame `df2` existan en la columna `key` del DataFrame `df1`.
    
    Args:
        df1 (pd.DataFrame): Primer DataFrame que contiene la clave primaria.
        key (str): Nombre de la columna en `df1` que contiene las claves primarias.
        df2 (pd.DataFrame): Segundo DataFrame que contiene la clave foránea.
        foreing_key (str): Nombre de la columna en `df2` que contiene las claves foráneas.
    
    Raises:
        ValueError: Si hay valores en `foreing_key` que no existen en `key` de `df1`.
    
    Returns:
        pd.DataFrame: El DataFrame `df2` si todas las claves foráneas son válidas.
    """
    # Obtener las claves primarias únicas del df1
    claves_primarias = df1[key].unique()

    # Verificar si todas las claves foráneas en df2 están en las claves primarias de df1
    if not df2[foreing_key].isin(claves_primarias).all():
        # Encontrar las claves foráneas que no existen en df1
        claves_inexistentes = df2[~df2[foreing_key].isin(claves_primarias)][foreing_key].unique()
        logging.error(f'Existen claves foráneas en {foreing_key} que no están presentes en la columna {key} de {df1}.')
        logging.error(f'Las claves foráneas inexistentes son: {", ".join(map(str, claves_inexistentes))}')
        
        # Lanzar una excepción con más información
        raise ValueError(f"Claves foráneas no válidas: {', '.join(map(str, claves_inexistentes))}")
    
    # Si todas las claves foráneas son válidas, registrar el caso exitoso
    logging.info(f'Todas las claves foráneas en {foreing_key} están presentes en {key} de {df1}. Verificación exitosa.')

    return df2

Consulta de Tablas Dimensiones del Modelo OLTP

In [9]:
DimCiudad = connect_db("Select * From Cities")
DimCategoria= connect_db("Select * From Category")
DimDepartamento = connect_db("Select * From departments")
DimUsuario = connect_db("Select * From users")
DimEstado = connect_db('Select * From status')
DimProcess = connect_db('Select * From process')

2024-12-16 11:21:08,233 - INFO - Datos Cargados
2024-12-16 11:21:08,243 - INFO - Datos Cargados
2024-12-16 11:21:08,253 - INFO - Datos Cargados
2024-12-16 11:21:08,263 - INFO - Datos Cargados
2024-12-16 11:21:08,271 - INFO - Datos Cargados
2024-12-16 11:21:08,281 - INFO - Datos Cargados


![Diagrama de la base de datos OLTP](https://github.com/Jeperezp/SqlWorkout/blob/main/Base_de_Datos_OLTP/OLAP/Diagrama_OLAP.PNG)

In [10]:
DimCiudad = duplicates_Pk(DimCiudad,'id_city')
DimCategoria = duplicates_Pk(DimCategoria,'id_category')
DimCategoria = duplicates_Pk(DimCategoria,'category_name')
DimDepartamento = duplicates_Pk(DimDepartamento,'id_department')
DimUsuario = duplicates_Pk(DimUsuario,'Number_document')
DimUsuario = duplicates_Pk(DimUsuario,'id_user')
DimEstado = duplicates_Pk(DimEstado,'status_name')
DimEstado = duplicates_Pk(DimEstado,'id_status')
DimProcess = duplicates_Pk(DimProcess,'id_process')

2024-12-16 11:21:08,344 - INFO - No se encontraron duplicados en la columna "id_city".
2024-12-16 11:21:08,353 - INFO - No se encontraron duplicados en la columna "id_category".
2024-12-16 11:21:08,353 - INFO - No se encontraron duplicados en la columna "category_name".
2024-12-16 11:21:08,353 - INFO - No se encontraron duplicados en la columna "id_department".
2024-12-16 11:21:08,353 - INFO - No se encontraron duplicados en la columna "Number_document".
2024-12-16 11:21:08,353 - INFO - No se encontraron duplicados en la columna "id_user".
2024-12-16 11:21:08,363 - INFO - No se encontraron duplicados en la columna "status_name".
2024-12-16 11:21:08,363 - INFO - No se encontraron duplicados en la columna "id_status".
2024-12-16 11:21:08,369 - INFO - No se encontraron duplicados en la columna "id_process".


In [11]:
DimCiudad = Faltantes(DimCiudad, DimCiudad.columns)
DimCategoria= Faltantes(DimCategoria, DimCategoria.columns)
DimDepartamento = Faltantes(DimDepartamento, DimDepartamento.columns)
DimUsuario = Faltantes(DimUsuario, DimUsuario.columns)
DimEstado = Faltantes(DimEstado, DimEstado.columns)
DimProcess = Faltantes(DimProcess, DimProcess.columns)

2024-12-16 11:21:08,391 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_city, city_name, id_department
2024-12-16 11:21:08,395 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_category, category_name, category_description
2024-12-16 11:21:08,399 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_department, department_name
2024-12-16 11:21:08,402 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_user, Number_document, First_Name, last_Name, email, phone_number
2024-12-16 11:21:08,402 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_status, status_name, status_description
2024-12-16 11:21:08,402 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_process, date_entry, internal_id, id_user, id_category, id_status, id_user_rol, id_department, id_city


In [12]:
DimProcess = relaciones(DimEstado,'id_status',DimProcess,'id_status')
DimProcess = relaciones(DimCategoria,'id_category',DimProcess,'id_category')

2024-12-16 11:21:08,443 - INFO - Todas las claves foráneas en id_status están presentes en id_status de    id_status status_name  status_description
0        100         Vig           Vigente\r
1        200        Susp        Suspendido\r
2        300         Fin        Finalizado\r
3        400         Ven           Vencido\r
4        500     Fin_Ven  Finalizado Vencido. Verificación exitosa.
2024-12-16 11:21:08,443 - INFO - Todas las claves foráneas en id_category están presentes en id_category de    id_category category_name category_description
0            1   Hipotecario    Ley de vivienda\r
1            2       Consumo    Credito Consumo\r
2            3       Leasing    Ley de vivienda\r
3            4      Libranza     Credito Libranza. Verificación exitosa.


In [13]:
DimCiudad = Faltantes(DimCiudad, DimCiudad.columns)
DimCategoria= Faltantes(DimCategoria, DimCategoria.columns)
DimDepartamento = Faltantes(DimDepartamento, DimDepartamento.columns)
DimUsuario = Faltantes(DimUsuario, DimUsuario.columns)
DimEstado = Faltantes(DimEstado, DimEstado.columns)
DimProcess = Faltantes(DimProcess, DimProcess.columns)

2024-12-16 11:21:08,474 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_city, city_name, id_department
2024-12-16 11:21:08,479 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_category, category_name, category_description
2024-12-16 11:21:08,482 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_department, department_name
2024-12-16 11:21:08,486 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_user, Number_document, First_Name, last_Name, email, phone_number
2024-12-16 11:21:08,490 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_status, status_name, status_description
2024-12-16 11:21:08,490 - INFO - No se encontraron valores faltantes en las columnas especificadas: id_process, date_entry, internal_id, id_user, id_category, id_status, id_user_rol, id_department, id_city


In [6]:
engine_SQL_Server = create_engine(f"mssql+pyodbc://sa:Australia2027@DESKTOP-M712PH1/Audiencias?driver=ODBC Driver 18 for SQL Server")

def ejecutar_consulta():
    try:
        # No necesitas volver a crear el engine, ya está definido como engine_SQL_Server
        with engine_SQL_Server.connect() as connection:
            result = connection.execute("SELECT * FROM sys.tables")
            # Imprime los resultados
            for row in result:
                print(row)
    except SQLAlchemyError as e:
        # Manejo de excepciones en caso de error
        print(f"Ocurrió un error al ejecutar la consulta: {e}")
        raise

In [7]:
ejecutar_consulta()

Ocurrió un error al ejecutar la consulta: (pyodbc.InterfaceError) ('IM002', '[IM002] [Microsoft][Administrador de controladores ODBC] No se encuentra el nombre del origen de datos y no se especificó ningún controlador predeterminado (0) (SQLDriverConnect)')
(Background on this error at: https://sqlalche.me/e/14/rvf5)


InterfaceError: (pyodbc.InterfaceError) ('IM002', '[IM002] [Microsoft][Administrador de controladores ODBC] No se encuentra el nombre del origen de datos y no se especificó ningún controlador predeterminado (0) (SQLDriverConnect)')
(Background on this error at: https://sqlalche.me/e/14/rvf5)

In [14]:
DimUsuario = DimUsuario[['id_user','First_Name','last_Name','email','phone_number']]
DimProcess = DimProcess[['id_process','internal_id','date_entry','id_category','id_status']]

In [15]:
insert_data_to_sql(DimUsuario,'DimUsuario')
insert_data_to_sql(DimDepartamento,'DimDepartamento')
insert_data_to_sql(DimCiudad,'DimCiudad')
insert_data_to_sql(DimCategoria,'DimCategoria')
insert_data_to_sql(DimEstado,'DimEstado')
insert_data_to_sql(DimProcess,'DimProcess')

2024-12-16 11:21:09,217 - INFO - Datos insertados correctamente en la tabla 'DimUsuario'.
2024-12-16 11:21:09,221 - INFO - Datos insertados correctamente en la tabla 'DimDepartamento'.
2024-12-16 11:21:09,630 - INFO - Datos insertados correctamente en la tabla 'DimCiudad'.
2024-12-16 11:21:09,645 - INFO - Datos insertados correctamente en la tabla 'DimCategoria'.
2024-12-16 11:21:09,664 - INFO - Datos insertados correctamente en la tabla 'DimEstado'.
2024-12-16 11:21:09,707 - INFO - Datos insertados correctamente en la tabla 'DimProcess'.


In [16]:
Query_FacAudiences = """
Select 	p.id_process,
        p.id_category,
        p.id_status,
        p.id_department,
        p.id_city,
        p.id_user,
		s.stage_name,
        s.description,
        ps.start_date,
        ps.end_date,
        TIMESTAMPDIFF(Day,ps.start_date,ps.end_date) as Duracion_Etapa,
        ss.status_stage_name,
        ss.status_stage_description,
        r.Name
from process_stage ps inner join stage s
on ps.id_stage = s.id_stage
inner join process p on ps.id_process = p.id_process
inner join status_stage ss on ss.id_status_stage = ps.id_status_stage
inner join rol r on r.id_rol = p.id_user_rol
"""

In [17]:
FactAudiences = connect_db(Query_FacAudiences)

2024-12-16 11:21:10,256 - INFO - Datos Cargados


In [18]:
FactAudiences = relaciones(DimDepartamento,'id_department',FactAudiences,'id_department')
FactAudiences = relaciones(DimCiudad,'id_city',FactAudiences,'id_city')
FactAudiences = relaciones(DimUsuario,'id_user',FactAudiences,'id_user')

2024-12-16 11:21:10,279 - INFO - Todas las claves foráneas en id_department están presentes en id_department de     id_department                                    department_name
0               5                                        Antioquia\r
1               8                                        Atlántico\r
2              11                                      Bogotá D.C.\r
3              13                                          Bolívar\r
4              15                                           Boyacá\r
5              17                                           Caldas\r
6              18                                          Caquetá\r
7              19                                            Cauca\r
8              20                                            Cesar\r
9              23                                          Córdoba\r
10             25                                     Cundinamarca\r
11             27                                           

In [21]:
FactAudiences.rename(columns={'Name':'rol_name'},inplace=True)

Unnamed: 0,id_process,id_category,id_status,id_department,id_city,id_user,stage_name,description,start_date,end_date,Duracion_Etapa,status_stage_name,status_stage_description,rol_name
0,2022118662,4,500,11,11001,1,Enviado,Enviado al Juzgado\r,2022-07-26,2022-07-27,1.0,Terminada,Actividad Finalizada,Supervisor
1,2022112391,2,500,11,11001,4,Enviado,Enviado al Juzgado\r,2022-07-18,2022-07-19,1.0,Terminada,Actividad Finalizada,Supervisor
2,2022106489,2,500,5,5088,1,Enviado,Enviado al Juzgado\r,2022-07-07,2022-07-08,1.0,Terminada,Actividad Finalizada,Supervisor
3,2022089733,3,500,5,5079,7,Enviado,Enviado al Juzgado\r,2022-06-15,2022-06-16,1.0,Terminada,Actividad Finalizada,Supervisor
4,2022063641,1,500,11,11001,8,Enviado,Enviado al Juzgado\r,2022-05-10,2022-05-11,1.0,Terminada,Actividad Finalizada,Supervisor


In [22]:
insert_data_to_sql(FactAudiences,'FactAudiences')

2024-12-16 11:26:03,881 - INFO - Datos insertados correctamente en la tabla 'FactAudiences'.
