In [23]:
import apache_beam as beam
from beam_mysql.connector.io import ReadFromMySQL
from apache_beam import Pipeline
from datetime import datetime
import pytz
import os
from dotenv import load_dotenv

In [14]:
"""
Objetivo:

Construir un pipeline de procesamiento de datos que permita la ingestión de datos de una instancia SQL , su transformación conforme a los requisitos especificados y, finalmente, su carga en la misma instancia SQL.

Requisitos:

    Imputar valores faltantes de forma coherente con el contexto de los datos.

    Almacenar la fecha de la orden (fec_FechaOrden) en formato UTC-6.

    Agrupar los datos por sec_OrdenCommerce para reducir la dimensionalidad y generar nuevas columnas:

        Calcular el importe total de cada orden y almacenar el resultado en la columna imp_TotalOrden.

        Calcular el total de productos por orden a partir de la suma de num_Cantidad  y almacenar el resultado en la columna total_ProdsOrden.

        Agrupar las columnas des_Area, des_Categoria, des_Subcategoria  en un formato adecuado (arrays, JSON, JSON array, etc.) para facilitar su consulta y análisis.

Consideraciones Técnicas:

    Utilizar  Apache Beam SDK (Java o Python) para la construcción del pipeline.

    Cargar los datos del archivo CSV disponible aquí en tu instancia local SQL (SQL Server, PostgreSQL o MongoDB) para que la ingesta de tu pipeline suceda desde ese origen.

    El destino de los datos deberá ser un asset nuevo en la instancia SQL que hayas elegido. Se considerarán puntos extras si escribes la salida de datos a BigQuery.
."""

'\nObjetivo:\n\nConstruir un pipeline de procesamiento de datos que permita la ingestión de datos de una instancia SQL , su transformación conforme a los requisitos especificados y, finalmente, su carga en la misma instancia SQL.\n\nRequisitos:\n\n    Imputar valores faltantes de forma coherente con el contexto de los datos.\n\n    Almacenar la fecha de la orden (fec_FechaOrden) en formato UTC-6.\n\n    Agrupar los datos por sec_OrdenCommerce para reducir la dimensionalidad y generar nuevas columnas:\n\n        Calcular el importe total de cada orden y almacenar el resultado en la columna imp_TotalOrden.\n\n        Calcular el total de productos por orden a partir de la suma de num_Cantidad  y almacenar el resultado en la columna total_ProdsOrden.\n\n        Agrupar las columnas des_Area, des_Categoria, des_Subcategoria  en un formato adecuado (arrays, JSON, JSON array, etc.) para facilitar su consulta y análisis.\n\nConsideraciones Técnicas:\n\n    Utilizar  Apache Beam SDK (Java o Py

In [24]:
load_dotenv()

True

In [25]:
host = os.getenv('endpoint')
port = os.getenv('port')
db= os.getenv('db')
user = os.getenv('user_db')
password = os.getenv('pass_db')


In [33]:
errors=[]

In [32]:
def convert_to_utc_minus_6(row):
   
    try:
        local_datetime = row['fec_FechaOrden']
    
   
        local_datetime = local_datetime.replace(tzinfo=pytz.UTC)
        utc_minus_6_datetime = local_datetime.astimezone(pytz.timezone('America/Mexico_City'))
    
        row['fec_FechaOrden'] = utc_minus_6_datetime
        return row
    except Exception as e:
        errors.append(e)
    

In [30]:
def run():
    with Pipeline() as pipeline:
        rows = (
            pipeline
            | 'LeerDesdeMySQL' >> ReadFromMySQL(
                query="SELECT * FROM test;",
                host=host,
                database=db,
                user=user,
                password=password,
                port=port
            )
            | 'Convertir a UTC-6' >> beam.Map(convert_to_utc_minus_6)
            | 'MostrarDatos2' >> beam.Map(print) 
        )


In [34]:
run()

INFO:beam_mysql.connector.client:Successfully execute query: EXPLAIN SELECT * FROM (SELECT * FROM test) as subq
INFO:beam_mysql.connector.client:Successfully execute query: SELECT * FROM test


{'idu_ArticuloCodigo': 507202, 'flag_Estatus': 1, 'des_Area': 'Muebles', 'des_Categoria': 'Bebés', 'des_Subcategoria': 'Bebés', 'num_Cantidad': 1, 'sec_Orden': 1195043, 'sec_OrdenCommerce': 4484389, 'fec_FechaOrden': datetime.datetime(2018, 1, 1, 5, 33, 46, tzinfo=<DstTzInfo 'America/Mexico_City' CST-1 day, 18:00:00 STD>), 'idu_BodegaCodigo': 30011.0, 'flag_Promocion': 0, 'imp_PrecioContadoUnitario': 1809.48, 'imp_DescuentoPromocion': 0.0}
{'idu_ArticuloCodigo': 526223, 'flag_Estatus': 1, 'des_Area': 'Muebles', 'des_Categoria': 'Bebés', 'des_Subcategoria': 'Bebés', 'num_Cantidad': 1, 'sec_Orden': 1195097, 'sec_OrdenCommerce': 4484817, 'fec_FechaOrden': datetime.datetime(2018, 1, 1, 7, 53, 4, tzinfo=<DstTzInfo 'America/Mexico_City' CST-1 day, 18:00:00 STD>), 'idu_BodegaCodigo': 30014.0, 'flag_Promocion': 0, 'imp_PrecioContadoUnitario': 2262.93, 'imp_DescuentoPromocion': 753.45}
{'idu_ArticuloCodigo': 526541, 'flag_Estatus': 1, 'des_Area': 'Muebles', 'des_Categoria': 'Bebés', 'des_Subcat