In [None]:


import requests
import os
from collections import Counter
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, FloatType


sesion_spark = (
    SparkSession.builder
    .appName("Windmar_Home_ETL")
    .getOrCreate()
)
context_spark = sesion_spark.sparkContext

# datos de la Api 
url_base = "https://mnpwhdbcsk.us-east-2.awsapprunner.com/api"
headers = {"x-api-key": "8yBO1wKiiIbcBT0"}

# productos y purchases
respuesta_productos = requests.get(f"{url_base}/products", headers=headers)
respuesta_purchases = requests.get(f"{url_base}/purchases", headers=headers)

# convertir las respuestas a JSON
data_productos = respuesta_productos.json().get("data", [])
data_purchases = respuesta_purchases.json().get("data", [])

# Transformar la data 

# Crear un diccionario con precios de productos
precios = {p["id"]: p["price"] for p in data_productos}

# calcular el total de cada compra
for compra in data_purchases:  
    total = 0
    for p in compra["products"]:
        prod_id = p["id"]
        descuento = p.get("discount", 0)
        precio = precios.get(prod_id, 0)
        total += precio * (1 - descuento / 100)
    compra["total"] = round(total, 2)

# tabla intermedia purchase_products
purchase_products_data = []

for compra in data_purchases:
    purchase_id = compra["id"]
    conteo_productos = Counter([p["id"] for p in compra["products"]])
    
    for product_id, quantity in conteo_productos.items():
        purchase_products_data.append({
            "purchase_id": purchase_id,
            "product_id": product_id,
            "quantity": quantity
        })

# Normalizar precios a tipo float
for item in data_productos:
    if "price" in item and isinstance(item["price"], int):
        item["price"] = float(item["price"])


# Esquemas requeridos

esquema_productos = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("description", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("createdAt", StringType(), True)
])

schema_purchase_products = StructType([
    StructField("purchase_id", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True)
])

esquema_purchases = StructType([
    StructField("id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("creditCardType", StringType(), True),
    StructField("purchaseDate", StringType(), True),
    StructField("total", FloatType(), True)
])

# Dataframes

df_products = sesion_spark.createDataFrame(data_productos, esquema_productos)
df_purchases = sesion_spark.createDataFrame(data_purchases, esquema_purchases)
df_purchase_products = sesion_spark.createDataFrame(purchase_products_data, schema_purchase_products)