In [0]:
dbutils.widgets.removeAll()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.text("catalogo", "catalog_project")
dbutils.widgets.text("esquema_source", "bronze")
dbutils.widgets.text("esquema_sink", "silver")

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

In [0]:
df_ecommerce_sales = spark.table(f"{catalogo}.{esquema_source}.ecommerce_sales")
df_customer_detail = spark.table(f"{catalogo}.{esquema_source}.customer_detail")
df_product_detail = spark.table(f"{catalogo}.{esquema_source}.product_detail")

In [0]:
df_ecommerce_sales = df_ecommerce_sales.dropna(how="all")\
                        .filter((col("product_id").isNotNull()) | (col("user_id")).isNotNull())

df_customer_detail = df_customer_detail.dropna(how="all")\
                    .filter((col("customer_id").isNotNull()))

df_product_detail = df_product_detail.dropna(how="all")\
                        .filter((col("product_id").isNotNull()))


In [0]:
df_ecommerce_sales = df_ecommerce_sales.withColumnRenamed("product_id", "ecommerce_product_id")\
                        .withColumnRenamed("user_id", "ecommerce_user_id")\
                        .withColumnRenamed("interaction_type", "ecommerce_interaction_type")\
                        .withColumnRenamed("interaction_date", "ecommerce_interaction_date")\
                        .withColumnRenamed("ingestion_date", "ecommerce_ingestion_date")
df_customer_detail = df_customer_detail.withColumnRenamed("category", "customer_category")\
                        .withColumnRenamed("ingestion_date", "customer_ingestion_date")
df_product_detail = df_product_detail.withColumnRenamed("ingestion_date", "product_ingestion_date")
                        


In [0]:
df_joined = df_ecommerce_sales.alias("e").join(df_product_detail.alias("p"), col("e.ecommerce_product_id") == col("p.product_id"), "inner").join(df_customer_detail.alias("c"), col("e.ecommerce_user_id") == col("c.customer_id"), "inner")

In [0]:
df_matriz = df_joined.select("ecommerce_product_id","ecommerce_user_id","ecommerce_interaction_type","ecommerce_interaction_date","product_name","selling_price","is_amazon_seller","quantity","customer_id","age","gender","item_Purchased","customer_category","purchase_amount_USD","location","season","review_rating","subscription_status","shipping_type","payment_method","frecuency_of_purchase")

In [0]:
df_matriz.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.ecommerce_sales")