In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import re
from pyspark.sql.functions import sum as _sum, col, when

In [None]:
# Simula los argumentos como si los estuvieras pasando por CLI
sys.argv = [
    'glue_script.py',  # nombre ficticio del script, solo para cumplir con la forma
    '--JOB_NAME', 'job_local_test',
    '--PAIS', '1',
    '--ANO', '2025',
    '--MES', '1'
]

In [None]:
# Obtener argumentos del job
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'PAIS', 'ANO', 'MES'])

In [None]:
# Inicializar GlueContext y Job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logger = glueContext.get_logger()

In [None]:
def select_table_from_rd(table):
    node = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options={
        "redshiftTmpDir": "s3://aws-glue-assets-575238426438-us-east-1/temporary/",
        "useConnectionProperties": "true",
        "dbtable": table,
        "connectionName": "conexion-redshift"
    },
    transformation_ctx="node"
    )
    return node

def select_query_from_rd(query):
    node = glueContext.create_dynamic_frame.from_options(
        connection_type="redshift",
        connection_options={
            "sampleQuery": query,
            "redshiftTmpDir": "s3://aws-glue-assets-575238426438-us-east-1/temporary/",
            "useConnectionProperties": "true",
            "connectionName": "conexion-redshift"
        },
        transformation_ctx="node"
    )
    return node


In [None]:
pais = args['PAIS']
ano = args['ANO']
mes = args['MES']

query_ventas = f"""
    select
    	to_char(last_day(ut.unidad_fecha), 'yyyymmdd') unidad2,
    	to_char(last_day(ut.unidad_fecha), 'yyyymmdd') periodo, 
    	case
    		when a.agencia_mismas_tiendas = 'INSUMOS' then 'INSUMOS'
    		else tc.tcre_tipo 
    	end tcre_tipo,
    	case 
    		when tc.tcre_tipo in('CREDITO', 'CONTADO') then 1
    		else 0 
    	end es_venta_real,
    	ff.fact_unidad,
    	ut.unidad_fecha,
    	ut.unidad_mes,
    	ut.unidad_ano,
    	a.agencia_codigo,
    	a.agencia_descripcion,
    	a.agencia_pais,
    	c.cliente_codigo,
    	c.hashed_id,
    	c.cliente_descripcion,
    	tc.tcre_codigo,
    	tc.tcre_descripcion,
    	tc.tcre_tipo tcre_tipo_og,
    	ff.fact_costo_merc_vendida,
    	ff.fact_fletes,
    	ff.fact_armado,
    	ff.fact_bodegaje,
    	ff.fact_garantia
    from
    	gold.gi_fact_facturas ff
    join gold.gi_unidad_tiempo ut
    	on (ff.fact_unidad = ut.unidad_id)
    join gold.gi_agencias a
    	on (ff.fact_agencia = a.agencia_id)
    join gold.gi_clientes c
    	on (ff.fact_cliente = c.CLIENTE_ID)
    join gold.gi_tipo_credito tc
    	on (ff.FACT_TIPO_VENTA = tc.TCRE_ID)
    where
    	a.agencia_pais = '{pais}'
    	and ut.unidad_ano = '{ano}'
    	and ut.unidad_mes = '{mes}'  -- Mes agregado para probar 
"""

In [None]:
ventas_node = select_query_from_rd(query_ventas) 
ventas_df = ventas_node.toDF()

In [None]:
ventas_df.show(5)

In [None]:
# Calcula el total del costo de mercancía vendida por mes (unidad2) y tipo de venta (es_venta_real)
costo_merc_vend_df = ventas_df.groupBy("unidad2", "es_venta_real") \
    .agg(_sum("fact_costo_merc_vendida").alias("tot_merc_vend"))

In [None]:
costo_merc_vend_df.show()

In [None]:
# Calcular proporción de costo mercancía vendida
ventas_df = ventas_df.join(costo_merc_vend_df, on=["unidad2", "es_venta_real"], how="left")

ventas_df = ventas_df.withColumn(
    "prop_cost_merc_vend",
    when(col("tot_merc_vend").isNotNull(),
         col("fact_costo_merc_vendida") / col("tot_merc_vend")).otherwise(0)
)

In [None]:
totales_costos_df = ventas_df.groupBy("unidad2").agg(
    _sum("fact_fletes").alias("tot_fletes"),
    _sum("fact_armado").alias("tot_armado"),
    _sum("fact_bodegaje").alias("tot_bodegaje"),
    _sum("fact_garantia").alias("tot_garantia")
)
totales_costos_df.show(5)

In [None]:
#Cálculo de proporciones de fletes, armado, bodegaje y garantía
ventas_df = ventas_df.join(totales_costos_df, on="unidad2", how="left")

ventas_df = ventas_df.withColumn(
    "prop_fletes",
    when((col("tot_fletes").isNotNull()) & (col("tot_fletes") != 0),  #se evita dividir entre 0 y NA
         col("fact_fletes") / col("tot_fletes")).otherwise(0)
).withColumn(
    "prop_armado",
    when((col("tot_armado").isNotNull()) & (col("tot_armado") != 0),
         col("fact_armado") / col("tot_armado")).otherwise(0)
).withColumn(
    "prop_bodegaje",
    when((col("tot_bodegaje").isNotNull()) & (col("tot_bodegaje") != 0),
         col("fact_bodegaje") / col("tot_bodegaje")).otherwise(0)
).withColumn(
    "prop_garantia",
    when((col("tot_garantia").isNotNull()) & (col("tot_garantia") != 0),
         col("fact_garantia") / col("tot_garantia")).otherwise(0)
).withColumn(
    "prop_empaque",
    col("prop_armado")
)


In [None]:
ventas_df.select("unidad2", "cliente_descripcion", "tcre_tipo", 
                 "prop_cost_merc_vend", "prop_fletes", "prop_armado",
                 "prop_bodegaje", "prop_garantia", "prop_empaque"
                ).show(10, truncate=False)