In [1]:
# importamos librerias
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

StatementMeta(, de50318a-a197-426a-9803-500cc16b7058, 5, Finished, Available, Finished)

In [2]:
# enriquecemos los datos de customers con la info de transacciones

# calculamos el monto total trasnsaccionado para la semana en curso y para la semana anterior.F
current_week = F.weekofyear(F.current_date())
current_year = F.year(F.current_date())
last_week = F.when(current_week == 1, 52).otherwise(current_week - 1) # Controlamos por la primera semana del año
last_year = F.when(current_week == 1, current_year - 1).otherwise(current_year)

# cargamos los dfs de transacciones con los datos que necesitamos
transactions_current_week = spark.read.format("delta").option("inferSchema", "true")\
                                                      .load("Files/silver/transactions")\
                                                      .where((F.weekofyear(F.col("date")) == current_week) & (F.year(F.col("date")) == current_year))

"""
transactions_last_week = spark.read.format("delta").option("inferSchema", "true")\
                                                      .load("Files/silver/transactions")\
                                                      .where((F.weekofyear(F.col("date")) == last_week) & (F.year(F.col("date")) == last_year))
"""

def calculos_transacciones(df: DataFrame, week: str) -> DataFrame:
    """
    """
    if week not in ["current", "prev"]:
        raise ValueError(f"El paramentro 'week' debe tomar el valor 'current' o 'prev'. '{week}' no es valido")

    # cantidad total
    col_quantity_name = "total_quantity_" + week
    df_total_quantity = df.groupBy("customer_id").agg(F.sum(F.col("quantity")).alias(col_quantity_name))

    # cantidad de trasacciones por tipo
    df_transaction_type = df.groupBy("customer_id", "transfer_type").agg(F.count(F.col("transfer_type")).alias("transaction_count"))
    df_transaction_type_pivot = df_transaction_type.groupBy("customer_id").pivot("transfer_type").sum("transaction_count")
    df_transaction_type_pivot = df_transaction_type_pivot.fillna(0)

    for col_name in df_transaction_type_pivot.columns:
        if col_name != "customer_id":
            new_col_name = col_name.replace(" ", "_") + "_" + week
            df_transaction_type_pivot = df_transaction_type_pivot.withColumnRenamed(col_name, new_col_name)
    
    # cantidad de operaciones con origen o destino en el extranjero
    def transaciones_origen_destino(df: DataFrame, column:str, week:str) -> DataFrame:
        df_fuera_esp = df.filter(F.col(column) != "ESP")
        column_name = "origin_not_esp" + "_" + week if column == "country_of_origin" else "destiny_not_esp" + "_" + week
        df_fuera_esp = df_fuera_esp.groupBy(F.col("customer_id")).agg(F.count("customer_id").alias(column_name))
        return df_fuera_esp
    
    df_transaction_origin = transaciones_origen_destino(df, "country_of_origin", week)
    df_transaction_destiny = transaciones_origen_destino(df, "country_of_destiny", week)

    df_transactiones = df_total_quantity.join(df_transaction_type_pivot, on="customer_id", how="outer")\
                                        .join(df_transaction_origin, on="customer_id", how="outer")\
                                        .join(df_transaction_destiny, on="customer_id", how="outer")

    df_transactiones = df_transactiones.fillna(0)

    return df_transactiones

# Obtenemos los datos para la semana en curso y la semana anterior
transactions_info_current = calculos_transacciones(transactions_current_week, week="current")
# transactions_info_prev = calculos_transacciones(transactions_last_week, week="prev")

# Enriquecemos el dataset de cliente con la info calculada sobre transacciones
customers_df = spark.read.format("delta").option("inferSchema", "true").load("Files/silver/customers")
customers_df = customers_df.join(transactions_info_current, on="customer_id", how="left")
                           #.join(transactions_info_prev, on="customer_id", how="left")
customers_df = customers_df.fillna(0)

# Creamos el riego final de cliente
customer_gold = customers_df.withColumn("risk_customer", F.round(F.col("risk_customer"), 2))\
                            .withColumn("risk_customer_d", F.when(F.col("risk_customer") <= 0.3, "low")
                                                            .when(F.col("risk_customer") <= 0.35, "mid")
                                                            .when(F.col("risk_customer") <= 0.5, "high")
                                                            .otherwise("very high"))

# Guardamos la informacion en el lakehouse y creamos la tabla final

print("Guardando ficheros y creando tabla...")
customer_gold.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("Files/gold/customers_enhanced")
customer_gold.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("customers_gold")
print("Operacion termianda")


StatementMeta(, 2c2dc0d8-5108-4a57-ac75-66838739db73, 6, Finished, Available, Finished)

Guardando ficheros y creando tabla...
Operacion termianda


In [2]:
# Creamos la tabla gold de transacciones
checkpoint_dir = "Files/gold/transactions_enhanced/checkpoint"
checkpoint_dir_table = "Files/gold/transactions_enhanced/checkpoint_table"
gold_transactions_dir = "Files/gold/transactions_enhanced"


# Cargamos el dataset y lo enriquecemos con el riesgo cliente
transaction_df = spark.readStream.format("delta").option("inferSchema", "true").load("Files/silver/transactions")
customer_df = spark.sql("SELECT customer_id, risk_customer, risk_customer_d FROM customers_gold")
transaction_df = transaction_df.join(customer_df, on="customer_id", how="left")
transaction_gold = transaction_df.withColumn("risk_transactional", F.round(F.col("risk_transactional"), 2))

# Persitimos los datos en la capa gold
query_file = transaction_gold.writeStream.outputMode("append").format("delta")\
                                                             .option("checkpointLocation", checkpoint_dir)\
                                                             .option("path", gold_transactions_dir)\
                                                             .option("mergeSchema", "true")\
                                                             .trigger(availableNow=True)\
                                                             .start()

# Persistir en la tabla de transacciones gold
query_transaction = transaction_gold.writeStream.outputMode("append").format("delta")\
                                                             .option("checkpointLocation", checkpoint_dir_table)\
                                                             .option("mergeSchema", "true")\
                                                             .trigger(availableNow=True)\
                                                             .toTable("transactions_gold")

query_file.awaitTermination()
query_transaction.awaitTermination()

StatementMeta(, de50318a-a197-426a-9803-500cc16b7058, 6, Finished, Available, Finished)

<pyspark.sql.streaming.query.StreamingQuery at 0x7553c4c3c580>