In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.removeAll()

In [0]:
dbutils.widgets.text("catalogo", "c_a_tunes_dev")
dbutils.widgets.text("esquema_source", "bronze")
dbutils.widgets.text("esquema_sink", "silver")

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

In [0]:
def Amount_type(Total):
    if Total < 3:
        return "Monto Bajo"
    elif 3 <= Total < 13:
        return "Monto Medio"
    else:
        return "Monto Alto"

In [0]:
monto_udf = F.udf(Amount_type, StringType())

In [0]:
df_Invoice = spark.table(f"{catalogo}.{esquema_source}.Invoice")
df_InvoiceLine = spark.table(f"{catalogo}.{esquema_source}.InvoiceLine")
df_Track = spark.table(f"{catalogo}.{esquema_source}.Track")


In [0]:
df_Invoice.head(1)

In [0]:
df_Invoice = df_Invoice.dropna(how="all").filter((col("Invoice_Id").isNotNull()))
df_InvoiceLine = df_InvoiceLine.dropna(how="all").filter((col("Invoice_Line_Id").isNotNull()))
df_Track = df_Track.dropna(how="all").filter((col("Track_Id").isNotNull()))


In [0]:
df_Invoice = df_Invoice.withColumn("Amount_type", monto_udf("Total"))

In [0]:
df_Track.head(1)

In [0]:
df_joined = df_Invoice.alias("h").join(df_InvoiceLine.alias("l"), F.col("h.Invoice_Id") == F.col("l.Invoice_Id"))

In [0]:
df_joined = df_joined.drop(F.col("l.Invoice_Id"))
df_joined = df_joined.drop(F.col("h.Ingestion_Date"))
df_joined = df_joined.drop(F.col("l.Ingestion_Date"))

In [0]:
df_joined = df_joined.alias("i").join(df_Track.alias("t"), F.col("i.Track_Id") == F.col("t.Track_Id"))

In [0]:

df_joined = df_joined.select(
    "i.Invoice_Id",
    "i.Customer_Id",
    "i.Invoice_Date",
    "i.Track_Id",
    "i.Billing_Address",
    "i.Billing_City",
    "i.Billing_State",
    "i.Billing_Country",
    "i.Billing_PostalCode",
    "i.Total",
    "i.Amount_type",
    "i.Invoice_Line_Id",
    "i.Unit_Price",
    "i.Quantity",
    "t.Track_Name",
    "t.Album_Id",
    "t.Media_Type_Id",
    "t.Genre_Id",
    "t.Composer",
    "t.Milliseconds",
    "t.Bytes",
    col("t.Unit_Price").alias("Unit_Price_Track"))



In [0]:
df_joined.head(1)

In [0]:
df_filtered_sorted = df_joined.filter(F.col("Invoice_Date") > "2008-01-01").orderBy(F.col("Invoice_Id"))


In [0]:
df_filtered_sorted = df_filtered_sorted.withColumn(
    "Age_in_Days", 
    F.datediff(F.current_date(), F.col("Invoice_Date"))
)

In [0]:
df_filtered_sorted.head(1)

In [0]:
df_updated = df_filtered_sorted

In [0]:
%sql
--DROP TABLE IF EXISTS silver.`invoices transformed`;

In [0]:
df_updated.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.Invoices_transformed")