# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [18]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session f169373d-4773-49d1-a7f7-cd2823608052.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session f169373d-4773-49d1-a7f7-cd2823608052.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session f169373d-4773-49d1-a7f7-cd2823608052.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session f169373d-4773-49d1-a7f7-cd2823608052.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [None]:
from pyspark.sql.functions import col, explode, to_date, sum, when, lit, count, first, regexp_replace
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext

# Configuración inicial
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
glueContext = GlueContext(spark.sparkContext)

# Configuración de rutas
BUCKET_NAME = "s3://ecommerce-data-raw-dataengineer"
INPUT_PRODUCTS = f"{BUCKET_NAME}/products/products.json"
INPUT_PURCHASES = f"{BUCKET_NAME}/purchases/purchases.json"
OUTPUT_PRODUCTS = f"{BUCKET_NAME}/processed-products/"
OUTPUT_PURCHASES = f"{BUCKET_NAME}/processed-purchases/"
OUTPUT_RELATIONS = f"{BUCKET_NAME}/purchase-relations/"
OUTPUT_TOTALS = f"{BUCKET_NAME}/purchase-totals/"

# 1. Carga y transformación de productos
products = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [INPUT_PRODUCTS]},
    format="json",
    format_options={"multiline": True}
)

df_products = products.toDF()
products_clean = df_products.select(
    explode("data").alias("product")
).select(
    col("product.id").cast("int").alias("product_id"),
    col("product.name").alias("product_name"),
    col("product.description").alias("description"),
    when(col("product.price.double").isNull(), 0.0)
      .otherwise(col("product.price.double").cast("float")).alias("price"),
    col("product.category").alias("category"),
    to_date(
        regexp_replace(
            col("product.createdAt"), 
            r"^(\d{1})/(\d{1})/", 
            "0$1/0$2/"
        ),
        "MM/dd/yyyy"
    ).alias("created_at")
)

# 2. Carga y transformación de compras
purchases = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [INPUT_PURCHASES]},
    format="json",
    format_options={"multiline": True}
)

df_purchases = purchases.toDF()
purchases_clean = df_purchases.select(
    explode("data").alias("purchase")
).select(
    col("purchase.id").alias("purchase_id"),
    col("purchase.status").alias("status"),
    col("purchase.creditCardType").alias("credit_card_type"),
    to_date(
        regexp_replace(
            col("purchase.purchaseDate"), 
            r"^(\d{1})/(\d{1})/", 
            "0$1/0$2/"
        ),
        "MM/dd/yyyy"
    ).alias("purchase_date")
)

# 3. Creación de purchase_products
purchase_products = df_purchases.select(
    explode("data").alias("purchase")
).select(
    col("purchase.id").alias("purchase_id"),
    explode(col("purchase.products")).alias("product")
).groupBy(
    "purchase_id", "product.id"
).agg(
    count("*").alias("quantity"),
    first("product.discount").alias("discount")
).select(
    col("purchase_id"),
    col("id").alias("product_id"),
    col("quantity"),
    col("discount")
)

# 4. Cálculo de totales por compra - VERSIÓN CORREGIDA
purchase_totals = purchase_products.join(
    products_clean, 
    purchase_products.product_id == products_clean.product_id
).join(
    purchases_clean,
    purchase_products.purchase_id == purchases_clean.purchase_id
).withColumn(
    "subtotal", 
    col("price") * col("quantity") * (1 - col("discount")/100)
).groupBy(
    purchase_products["purchase_id"], purchases_clean["purchase_date"]
).agg(
    sum("subtotal").alias("total_amount")
)

# 5. Escritura de resultados
print("💾 Guardando resultados...")
write_config = {
    "format": "parquet",
    "format_options": {"compression": "snappy"}
}

try:
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(products_clean, glueContext, "products"),
        connection_type="s3",
        connection_options={
            "path": OUTPUT_PRODUCTS,
            "partitionKeys": ["created_at"]
        },
        **write_config
    )

    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(purchases_clean, glueContext, "purchases"),
        connection_type="s3",
        connection_options={
            "path": OUTPUT_PURCHASES,
            "partitionKeys": ["purchase_date"]
        },
        **write_config
    )

    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(purchase_products, glueContext, "purchase_products"),
        connection_type="s3",
        connection_options={"path": OUTPUT_RELATIONS},
        **write_config
    )

    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(purchase_totals, glueContext, "purchase_totals"),
        connection_type="s3",
        connection_options={
            "path": OUTPUT_TOTALS,
            "partitionKeys": ["purchase_date"]
        },
        **write_config
    )

    print(" Proceso completado exitosamente!")

except Exception as e:
    print(f" Error al guardar resultados: {str(e)}")
    raise e

📂 Cargando y transformando productos...
📂 Cargando y transformando compras...
🧮 Calculando relaciones productos-compras...
🧮 Calculando totales por compra...
💾 Guardando resultados...
<awsglue.dynamicframe.DynamicFrame object at 0x7f0dd6d9add0>
<awsglue.dynamicframe.DynamicFrame object at 0x7f0dd6d99150>
<awsglue.dynamicframe.DynamicFrame object at 0x7f0dd6d9b210>
<awsglue.dynamicframe.DynamicFrame object at 0x7f0dd6d9bbd0>
✅ Proceso completado exitosamente!


In [29]:
print("Hola mundo")

Hola mundo
