**Trabajo Final 2024-02-Grupo4-Bigdata**



Importacion de Librerías

In [1]:
%pip install mysql-connector-python


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Conexión con Fuente de Datos MySQL donde se encuentran las tablas de empleados y de ventas

In [2]:
import mysql.connector
import pandas as pd

# Configurar la conexión con la base de datos MySQL
connection_mysql = mysql.connector.connect(
   host='localhost',  
    user='root',     
    password='Nombre de usuario',  
    database='Nombre de la base de datos'  
)

# Función para extraer datos de MySQL
def extract_data_from_mysql(query):
    return pd.read_sql(query, connection_mysql)

# Extracción de datos de las tablas
empleados = extract_data_from_mysql("SELECT * FROM empleados")
ventas = extract_data_from_mysql("SELECT * FROM ventas")

# Vista previa de los datos extraídos
print(empleados.head())
print(ventas.head())

   empleado_id  nombre   apellido      puesto fecha_contratacion  salario
0            1  Carlos   Martínez    Vendedor         2023-02-15   3200.0
1            2   Laura  Rodríguez  Supervisor         2022-08-10   4000.0
2            3   Pedro      López     Gerente         2021-11-05   5000.0
   venta_id  empleado_id   producto  cantidad  precio_unitario fecha_venta
0         1            1     Tablet         5            150.0  2023-09-01
1         2            2    Monitor         3            200.0  2023-09-10
2         3            3    Teclado        10             30.0  2023-09-15
3         4            1  Impresora         2            120.0  2023-09-20


  return pd.read_sql(query, connection_mysql)


Transformacion de Datos (Eliminación de espacios, Convertir a mayusculas, eliminacion de caracteres especiales, eliminación de duplicados)

In [3]:
import re

# Función para limpiar cadenas de texto
def clean_text(text):
    # Eliminar espacios al inicio y al final
    text = text.strip()
    # Convertir todo a mayúsculas
    text = text.upper()
    # Eliminar caracteres especiales, permitiendo solo letras, números y espacios
    text = re.sub(r'[^A-Za-z0-9\s]+', '', text)
    return text

# Aplicación de las transformaciones a la tabla 'empleados'
def transform_empleados(empleados):
    # Limpiar espacios, convertir a mayúsculas y eliminar caracteres especiales en columnas de texto
    empleados['nombre'] = empleados['nombre'].apply(clean_text)
    empleados['apellido'] = empleados['apellido'].apply(clean_text)
    empleados['puesto'] = empleados['puesto'].apply(clean_text)
    
    # Eliminar registros duplicados basados en 'empleado_id'
    empleados = empleados.drop_duplicates(subset=['empleado_id'])
    
    return empleados

# Aplicación de las transformaciones a la tabla 'ventas'
def transform_ventas(ventas):
    # Limpiar espacios, convertir a mayúsculas y eliminar caracteres especiales en columnas de texto
    ventas['producto'] = ventas['producto'].apply(clean_text)
    
    # Eliminar registros duplicados basados en 'venta_id'
    ventas = ventas.drop_duplicates(subset=['venta_id'])
    
    return ventas

# Ejecutacion de las transformaciones
empleados_limpios = transform_empleados(empleados)
ventas_limpias = transform_ventas(ventas)

# Unión de las tablas transformadas (empleados y ventas)
empleados_ventas_limpios = pd.merge(ventas_limpias, empleados_limpios, on='empleado_id', how='left')

# vista previa de los datos transformados
print(empleados_ventas_limpios.head())


   venta_id  empleado_id   producto  cantidad  precio_unitario fecha_venta  \
0         1            1     TABLET         5            150.0  2023-09-01   
1         2            2    MONITOR         3            200.0  2023-09-10   
2         3            3    TECLADO        10             30.0  2023-09-15   
3         4            1  IMPRESORA         2            120.0  2023-09-20   

   nombre  apellido      puesto fecha_contratacion  salario  
0  CARLOS   MARTNEZ    VENDEDOR         2023-02-15   3200.0  
1   LAURA  RODRGUEZ  SUPERVISOR         2022-08-10   4000.0  
2   PEDRO      LPEZ     GERENTE         2021-11-05   5000.0  
3  CARLOS   MARTNEZ    VENDEDOR         2023-02-15   3200.0  


# Carga de tablas a S3

Se realiza conversión de las tablas de MySql a archivos Csv para su posterior carga en un Bucket de S3

Fuente de datos 1 Mysql

In [4]:
%pip install boto3

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import csv
import boto3

# Paso 1: Guardar los DataFrames en archivos CSV locales

# Guardar la tabla 'empleados_limpios' en un archivo CSV
empleados_limpios.to_csv('empleados_limpios.csv', index=False, sep=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, encoding='utf-8')

# Guardar la tabla 'ventas_limpias' en un archivo CSV
ventas_limpias.to_csv('ventas_limpias.csv', index=False, sep=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, encoding='utf-8')

print("Archivos CSV creados exitosamente.")

# Paso 2: Subir los archivos CSV a AWS S3
s3 = boto3.client('s3')

bucket_name = 'Insertar el nombre del bucket'  # Nombre del bucket de S3
ruta_s3_empleados = 'tablas/empleados_limpios.csv'
ruta_s3_ventas = 'tablas/ventas_limpias.csv'

try:
    # Subir el archivo CSV 'empleados_limpios.csv' al bucket especificado en la ruta adecuada
    s3.upload_file('empleados_limpios.csv', bucket_name, ruta_s3_empleados)
    print("Archivo 'empleados_limpios.csv' subido exitosamente a S3.")

    # Subir el archivo CSV 'ventas_limpias.csv' al bucket especificado en la ruta adecuada
    s3.upload_file('ventas_limpias.csv', bucket_name, ruta_s3_ventas)
    print("Archivo 'ventas_limpias.csv' subido exitosamente a S3.")

except Exception as e:
    print("Error al subir los archivos a S3:", e)


Archivos CSV creados exitosamente.
Archivo 'empleados_limpios.csv' subido exitosamente a S3.
Archivo 'ventas_limpias.csv' subido exitosamente a S3.


# Fuente de datos 2 csv local

Ahora se sube la segunda fuente de datos para el datalake la cual corresponde a un csv almacenado en local

Transformaciones

In [6]:
import pandas as pd

# Función para transformar el DataFrame
def transformar_csv(filepath):
    # Leer el CSV desde la ruta local proporcionada
    try:
        # Cargar el archivo CSV en un DataFrame de pandas
        df = pd.read_csv(filepath)
        print(f"Archivo '{filepath}' leído correctamente. Previsualización de los datos:")
        print(df.head())

        # Aplicar transformaciones: limpieza de texto
        if 'nombre' in df.columns:
            df['nombre'] = df['nombre'].apply(clean_text)
        if 'apellido' in df.columns:
            df['apellido'] = df['apellido'].apply(clean_text)
        if 'puesto' in df.columns:
            df['puesto'] = df['puesto'].apply(clean_text)
        if 'producto' in df.columns:
            df['producto'] = df['producto'].apply(clean_text)
            
        # Renombrar todas las columnas a mayúsculas
        df.columns = [col.upper() for col in df.columns]

        # Eliminar duplicados basados en las claves necesarias
        df = df.drop_duplicates()
        print("Transformaciones aplicadas correctamente. Previsualización de datos transformados:")
        print(df.head())

        return df

    except Exception as e:
        print(f"Error al leer el archivo CSV: {e}")
        return None

In [7]:
# ruta local del archivo CSV
ruta_archivo = "C:\\Users\\Windows\\Downloads\\pedidos.csv"

# función de transformación para leer y limpiar el archivo
df_transformado = transformar_csv(ruta_archivo)

# ruta donde se desea guardar el archivo CSV transformado
ruta_guardado = "C:\\Users\\Windows\\OneDrive - Universidad EIA\\Especialización IA\\Big data e integración de datos masivos\\Trabajo final datalake\\finalv2\\pedidos_transformados.csv"

# transformación se realizó correctamente
if df_transformado is not None:
    # Guardar el DataFrame transformado como CSV en la ruta especificada
    df_transformado.to_csv(ruta_guardado, index=False)
    print("Datos transformados listos para usar y guardados correctamente.")
else:
    print("Hubo un problema al transformar los datos.")


Archivo 'C:\Users\Windows\Downloads\pedidos.csv' leído correctamente. Previsualización de los datos:
   pedido_id  venta_id  empleado_id fecha_pedido  cantidad_productos
0          1       101            1   2024-10-20                   5
1          2       102            2   2024-10-21                   3
2          3       103            1   2024-10-22                   8
3          4       104            3   2024-10-23                   6
4          5       105            2   2024-10-24                   4
Transformaciones aplicadas correctamente. Previsualización de datos transformados:
   PEDIDO_ID  VENTA_ID  EMPLEADO_ID FECHA_PEDIDO  CANTIDAD_PRODUCTOS
0          1       101            1   2024-10-20                   5
1          2       102            2   2024-10-21                   3
2          3       103            1   2024-10-22                   8
3          4       104            3   2024-10-23                   6
4          5       105            2   2024-10-24         

Carga a S3

In [8]:
import boto3

s3 = boto3.client('s3')

bucket_name = 'Nombre del bucket'
archivo_origen = 'pedidos_transformados.csv'
archivo_destino = 'tablas/pedidos_transformados.csv'

try:
    # Subir el archivo CSV 'pedidos_transformados.csv' al bucket especificado en la ruta adecuada
    s3.upload_file(archivo_origen, bucket_name, archivo_destino)
    print("Archivo 'pedidos_transformados.csv' subido exitosamente a S3.")

except Exception as e:
    print("Error al subir el archivo a S3:", e)


Archivo 'pedidos_transformados.csv' subido exitosamente a S3.


# Conexión con snowflake

Conexión con Snowflake (Query Engine)

In [9]:
%pip install snowflake-connector-python

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [10]:
import snowflake.connector
print("Conector de Snowflake importado exitosamente")

Conector de Snowflake importado exitosamente


In [11]:

try:
    # Conectar a Snowflake
    connection_snowflake = snowflake.connector.connect(
        user='Usuario Snowflake',
        password='Contraseña Snowflake',
        account='id de cuenta',
        warehouse='Nombre del Warehouse',
        database='Nombre de la base de datos',
        schema='PUBLIC'
    )
    
    # Creación de un cursor para ejecutar consultas
    cursor = connection_snowflake.cursor()
    
    # Crear un stage que apunte al bucket de S3
    # Necesitarás las credenciales de AWS
    aws_access_key_id = "Aws access key"
    aws_secret_access_key = "Aws Secret access key"
    
    # Consulta para crear un stage externo
    create_stage_query = f"""
    CREATE OR REPLACE STAGE my_s3_stage
    URL = 'Url del bucket en s3'
    CREDENTIALS = (AWS_KEY_ID='{aws_access_key_id}' AWS_SECRET_KEY='{aws_secret_access_key}')
    FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1)
    """
    
    # Ejecutar la consulta para crear el stage
    cursor.execute(create_stage_query)
    print("Stage creado exitosamente en Snowflake para el bucket S3.")
    
    # Ejecutar una consulta de prueba para verificar la conexión
    cursor.execute("SELECT CURRENT_VERSION();")
    
    # Resultado
    result = cursor.fetchone()
    print(f"Conexión exitosa. Versión de Snowflake: {result[0]}")
    
except snowflake.connector.errors.Error as e:
    print(f"Error al conectar a Snowflake: {e}")
    
finally:
    # Cierre del cursor y conexión
    try:
        if cursor:
            cursor.close()
        if connection_snowflake:
            connection_snowflake.close()
    except NameError:
        pass


Stage creado exitosamente en Snowflake para el bucket S3.
Conexión exitosa. Versión de Snowflake: 8.40.1


Carga de tablas de s3 a snowflake

In [12]:

try:
    # Conectar a Snowflake
    connection_snowflake = snowflake.connector.connect(
        user='Usuario Snowflake',
        password='Contraseña Snowflake',
        account='id de cuenta',
        warehouse='Nombre del Warehouse',
        database='Nombre de la base de datos',
        schema='PUBLIC'
    )
    
    # Creación de un cursor para ejecutar consultas
    cursor = connection_snowflake.cursor()
    
    # Crear la tabla empleados_limpios
    create_empleados_table_query = """
    CREATE OR REPLACE TABLE empleados_limpios (
        empleado_id INT,
        nombre VARCHAR,
        apellido VARCHAR,
        puesto VARCHAR,
        fecha_contratacion DATE,
        salario FLOAT
    );
    """
    cursor.execute(create_empleados_table_query)
    print("Tabla 'empleados_limpios' creada exitosamente.")
    
    # Crear la tabla ventas_limpias
    create_ventas_table_query = """
    CREATE OR REPLACE TABLE ventas_limpias (
        venta_id INT,
        empleado_id INT,
        producto VARCHAR,
        cantidad INT,
        precio_unitario FLOAT,
        fecha_venta DATE
    );
    """
    cursor.execute(create_ventas_table_query)
    print("Tabla 'ventas_limpias' creada exitosamente.")
    
    # Crear la tabla pedidos_transformados
    create_pedidos_table_query = """
    CREATE OR REPLACE TABLE pedidos_transformados (
        pedido_id INT,
        venta_id INT,
        empleado_id INT,
        fecha_pedido DATE,
        cantidad_productos INT
    );
    """
    cursor.execute(create_pedidos_table_query)
    print("Tabla 'pedidos_transformados' creada exitosamente.")
    
except snowflake.connector.errors.Error as e:
    print(f"Error al conectar a Snowflake o ejecutar consultas: {e}")
    
finally:
    # Cierre del cursor y conexión
    try:
        if cursor:
            cursor.close()
        if connection_snowflake:
            connection_snowflake.close()
    except NameError:
        pass


Tabla 'empleados_limpios' creada exitosamente.
Tabla 'ventas_limpias' creada exitosamente.
Tabla 'pedidos_transformados' creada exitosamente.


Finalmente en un workbench de Snowflake se crean los file format para el cargue de los csv desde S3 hasta Snowflake.

De esta manera se completa la arquitectura de un Datalake con dos fuentes de datos diferentes aplicando transformaciones con python.

CREATE OR REPLACE FILE FORMAT csv_4
TYPE = CSV
FIELD_DELIMITER = ';'
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SKIP_HEADER = 1
NULL_IF = ('');

CREATE OR REPLACE FILE FORMAT csv_3
TYPE = CSV
FIELD_DELIMITER = ','
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SKIP_HEADER = 1
NULL_IF = ('');

-- Cargar datos en la tabla 'empleados_limpios'
COPY INTO empleados_limpios
FROM @final_bigdata/empleados_limpios.csv
FILE_FORMAT = csv_4;

-- Cargar datos en la tabla 'ventas_limpias'
COPY INTO ventas_limpias
FROM @final_bigdata/ventas_limpias.csv
FILE_FORMAT = csv_4;

-- Cargar datos en la tabla 'pedidos_transformados'
COPY INTO pedidos_transformados
FROM @final_bigdata/pedidos_transformados.csv
FILE_FORMAT = csv_3;
