In [7]:
# 1️⃣ Leer CSV (ya lo hiciste, pero para que quede completo)
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/FactTransaction.csv")  # la ruta que te dio Fabric

# 2️⃣ Crear la vista temporal para poder usar SQL
df.createOrReplaceTempView("FactTransaction_temp")

# 3️⃣ Ahora sí podemos usar SQL
spark.sql("SELECT COUNT(*) AS total_records FROM FactTransaction_temp").show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 9, Finished, Available, Finished)

+-------------+
|total_records|
+-------------+
|        10000|
+-------------+



In [4]:
df = spark.read.format("csv").option("header","true").load("Files/FactTransaction.csv")
# df now is a Spark DataFrame containing CSV data from "Files/FactTransaction.csv".
display(df)

StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 425e307a-5089-46c1-a75d-cf9a8e6ae727)

In [8]:
spark.sql("""
SELECT 
    SUM(CASE WHEN TransactionID IS NULL THEN 1 ELSE 0 END) AS TransactionID_nulls,
    SUM(CASE WHEN AccountID IS NULL THEN 1 ELSE 0 END) AS AccountID_nulls,
    SUM(CASE WHEN TransactionDate IS NULL THEN 1 ELSE 0 END) AS TransactionDate_nulls,
    SUM(CASE WHEN TransactionAmount IS NULL THEN 1 ELSE 0 END) AS TransactionAmount_nulls,
    SUM(CASE WHEN TransactionType IS NULL THEN 1 ELSE 0 END) AS TransactionType_nulls,
    SUM(CASE WHEN TransactionChannel IS NULL THEN 1 ELSE 0 END) AS TransactionChannel_nulls,
    SUM(CASE WHEN ProductID IS NULL THEN 1 ELSE 0 END) AS ProductID_nulls,
    SUM(CASE WHEN Status IS NULL THEN 1 ELSE 0 END) AS Status_nulls
FROM FactTransaction_temp
""").show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 10, Finished, Available, Finished)

+-------------------+---------------+---------------------+-----------------------+---------------------+------------------------+---------------+------------+
|TransactionID_nulls|AccountID_nulls|TransactionDate_nulls|TransactionAmount_nulls|TransactionType_nulls|TransactionChannel_nulls|ProductID_nulls|Status_nulls|
+-------------------+---------------+---------------------+-----------------------+---------------------+------------------------+---------------+------------+
|                  0|              0|                    0|                      0|                    0|                       0|              0|           0|
+-------------------+---------------+---------------------+-----------------------+---------------------+------------------------+---------------+------------+



In [9]:
spark.sql("""
SELECT TransactionID, COUNT(*) AS count_occurrences
FROM FactTransaction_temp
GROUP BY TransactionID
HAVING COUNT(*) > 1
""").show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 11, Finished, Available, Finished)

+-------------+-----------------+
|TransactionID|count_occurrences|
+-------------+-----------------+
+-------------+-----------------+



In [10]:
spark.sql("""
SELECT *
FROM FactTransaction_temp
WHERE TransactionAmount < 0
""").show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 12, Finished, Available, Finished)

+-------------+---------+---------------+-----------------+---------------+------------------+---------+-------+
|TransactionID|AccountID|TransactionDate|TransactionAmount|TransactionType|TransactionChannel|ProductID| Status|
+-------------+---------+---------------+-----------------+---------------+------------------+---------+-------+
|            4|      138|     11/29/2023|         -3911.93|         Credit|               ATM|     1021|Success|
|            5|      159|      2/18/2024|         -2211.72|         Credit|            Mobile|     1001|Success|
|           13|      164|     11/13/2021|         -1008.37|         Credit|            Mobile|     1019|Success|
|           15|      102|      6/13/2024|         -1025.75|         Credit|            Mobile|     1015|Success|
|           17|      139|      11/8/2024|         -4624.55|         Credit|            Mobile|     1012|Success|
|           31|       43|     11/26/2024|         -3764.04|         Credit|            Mobile|  

In [11]:
spark.sql("""
SELECT DISTINCT TransactionType, TransactionChannel
FROM FactTransaction_temp
""").show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 13, Finished, Available, Finished)

+---------------+------------------+
|TransactionType|TransactionChannel|
+---------------+------------------+
|         Credit|            Mobile|
|         Credit|               Web|
|          Debit|            Mobile|
|          Debit|               Web|
|          Debit|               ATM|
|         Credit|               ATM|
+---------------+------------------+



In [12]:
# 1️⃣ Corregir tipos de columnas
from pyspark.sql.functions import col, to_date

df_clean = df.withColumn("TransactionDate", to_date(col("TransactionDate"), "M/d/yyyy")) \
             .withColumn("TransactionAmount", col("TransactionAmount").cast("double"))

# 2️⃣ Crear columnas derivadas (opcional pero útil para portafolio)
# Por ejemplo, un indicador de crédito o débito y monto neto positivo
from pyspark.sql.functions import when

df_clean = df_clean.withColumn("IsCredit", when(col("TransactionType") == "Credit", 1).otherwise(0)) \
                   .withColumn("IsDebit", when(col("TransactionType") == "Debit", 1).otherwise(0)) \
                   .withColumn("TransactionAmount_Positive", when(col("TransactionAmount") < 0, 0).otherwise(col("TransactionAmount")))

# 3️⃣ Manejar nulos (rellenar o eliminar si es necesario)
df_clean = df_clean.fillna({
    "TransactionChannel": "Unknown",
    "TransactionType": "Unknown",
    "ProductID": -1,
    "Status": "Unknown"
})

# 4️⃣ Registrar como nueva vista temporal para usar en el modelado dimensional
df_clean.createOrReplaceTempView("FactTransaction_cleaned")

# 5️⃣ Verificar resultado
df_clean.show(5)
df_clean.describe().show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 14, Finished, Available, Finished)

+-------------+---------+---------------+-----------------+---------------+------------------+---------+-------+--------+-------+--------------------------+
|TransactionID|AccountID|TransactionDate|TransactionAmount|TransactionType|TransactionChannel|ProductID| Status|IsCredit|IsDebit|TransactionAmount_Positive|
+-------------+---------+---------------+-----------------+---------------+------------------+---------+-------+--------+-------+--------------------------+
|            1|      162|     2023-02-24|          10000.0|         Credit|               Web|     1019|Success|       1|      0|                   10000.0|
|            2|      154|     2022-08-31|          10000.0|         Credit|               Web|     1001|Success|       1|      0|                   10000.0|
|            3|      179|     2020-01-03|           4883.8|          Debit|               Web|     1024|Success|       0|      1|                    4883.8|
|            4|      138|     2023-11-29|         -3911.93

In [13]:
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, dayofweek, col

dim_date = df_clean.select(
    col("TransactionDate").alias("full_date")
).distinct().withColumn("date_key", col("full_date").cast("string").substr(1,10)) \
 .withColumn("year", year(col("full_date"))) \
 .withColumn("month", month(col("full_date"))) \
 .withColumn("day", dayofmonth(col("full_date"))) \
 .withColumn("week", weekofyear(col("full_date"))) \
 .withColumn("day_of_week", dayofweek(col("full_date")))

dim_date.createOrReplaceTempView("DimDate")
dim_date.show(5)


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 15, Finished, Available, Finished)

+----------+----------+----+-----+---+----+-----------+
| full_date|  date_key|year|month|day|week|day_of_week|
+----------+----------+----+-----+---+----+-----------+
|2021-11-13|2021-11-13|2021|   11| 13|  45|          7|
|2022-07-31|2022-07-31|2022|    7| 31|  30|          1|
|2020-08-24|2020-08-24|2020|    8| 24|  35|          2|
|2021-10-11|2021-10-11|2021|   10| 11|  41|          2|
|2021-12-18|2021-12-18|2021|   12| 18|  50|          7|
+----------+----------+----+-----+---+----+-----------+
only showing top 5 rows



In [14]:
dim_product = df_clean.select(
    col("ProductID")
).distinct().withColumnRenamed("ProductID", "product_key")

dim_product.createOrReplaceTempView("DimProduct")
dim_product.show(5)


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 16, Finished, Available, Finished)

+-----------+
|product_key|
+-----------+
|       1025|
|       1005|
|       1016|
|       1019|
|       1008|
+-----------+
only showing top 5 rows



In [15]:
dim_account = df_clean.select(
    col("AccountID")
).distinct().withColumnRenamed("AccountID", "account_key")

dim_account.createOrReplaceTempView("DimAccount")
dim_account.show(5)


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 17, Finished, Available, Finished)

+-----------+
|account_key|
+-----------+
|        148|
|         31|
|         85|
|        137|
|         65|
+-----------+
only showing top 5 rows



In [16]:
fact_transactions = df_clean.join(dim_date, df_clean.TransactionDate == dim_date.full_date) \
                            .join(dim_product, df_clean.ProductID == dim_product.product_key) \
                            .join(dim_account, df_clean.AccountID == dim_account.account_key) \
                            .select(
                                col("TransactionID"),
                                col("account_key"),
                                col("product_key"),
                                col("date_key"),
                                col("TransactionAmount_Positive").alias("amount"),
                                col("IsCredit"),
                                col("IsDebit"),
                                col("TransactionChannel"),
                                col("Status")
                            )

fact_transactions.createOrReplaceTempView("FactTransactions")
fact_transactions.show(5)


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 18, Finished, Available, Finished)

+-------------+-----------+-----------+----------+-------+--------+-------+------------------+-------+
|TransactionID|account_key|product_key|  date_key| amount|IsCredit|IsDebit|TransactionChannel| Status|
+-------------+-----------+-----------+----------+-------+--------+-------+------------------+-------+
|            1|        162|       1019|2023-02-24|10000.0|       1|      0|               Web|Success|
|            2|        154|       1001|2022-08-31|10000.0|       1|      0|               Web|Success|
|            3|        179|       1024|2020-01-03| 4883.8|       0|      1|               Web|Success|
|            4|        138|       1021|2023-11-29|    0.0|       1|      0|               ATM|Success|
|            5|        159|       1001|2024-02-18|    0.0|       1|      0|            Mobile|Success|
+-------------+-----------+-----------+----------+-------+--------+-------+------------------+-------+
only showing top 5 rows



In [24]:
from pyspark.sql.functions import col, when, abs

# Crear columna TransactionAmount_Positive
df = df.withColumn(
    "TransactionAmount_Positive",
    abs(col("TransactionAmount"))  # convierte negativos a positivos
)

# Crear flags de crédito y débito
df = df.withColumn("IsCredit", when(col("TransactionAmount") > 0, 1).otherwise(0)) \
       .withColumn("IsDebit", when(col("TransactionAmount") < 0, 1).otherwise(0))


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 26, Finished, Available, Finished)

In [25]:
from pyspark.sql.functions import year, month, sum as spark_sum

df = df.withColumn("TransactionDate", col("TransactionDate").cast("date"))
df = df.withColumn("year", year(col("TransactionDate"))) \
       .withColumn("month", month(col("TransactionDate")))

monthly_summary = df.groupBy("year", "month").agg(
    spark_sum("TransactionAmount_Positive").alias("total_amount"),
    spark_sum("IsCredit").alias("total_credits"),
    spark_sum("IsDebit").alias("total_debits")
).orderBy("year", "month")

monthly_summary.show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 27, Finished, Available, Finished)

+----+-----+--------------------+-------------+------------+
|year|month|        total_amount|total_credits|total_debits|
+----+-----+--------------------+-------------+------------+
|NULL| NULL|2.5077590559999976E7|         6040|        3960|
+----+-----+--------------------+-------------+------------+



In [26]:
from pyspark.sql.functions import to_date, col, year, month

# Convertir TransactionDate con formato seguro
df = df.withColumn("TransactionDate", to_date(col("TransactionDate"), "yyyy-MM-dd"))

# Extraer año y mes correctamente
df = df.withColumn("year", year(col("TransactionDate"))) \
       .withColumn("month", month(col("TransactionDate")))

# Revisamos los primeros registros
df.select("TransactionDate", "year", "month").show(5)


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 28, Finished, Available, Finished)

+---------------+----+-----+
|TransactionDate|year|month|
+---------------+----+-----+
|           NULL|NULL| NULL|
|           NULL|NULL| NULL|
|           NULL|NULL| NULL|
|           NULL|NULL| NULL|
|           NULL|NULL| NULL|
+---------------+----+-----+
only showing top 5 rows



In [27]:
monthly_summary = df.groupBy("year", "month").agg(
    spark_sum("TransactionAmount_Positive").alias("total_amount"),
    spark_sum("IsCredit").alias("total_credits"),
    spark_sum("IsDebit").alias("total_debits")
).orderBy("year", "month")

monthly_summary.show()


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 29, Finished, Available, Finished)

+----+-----+--------------------+-------------+------------+
|year|month|        total_amount|total_credits|total_debits|
+----+-----+--------------------+-------------+------------+
|NULL| NULL|2.5077590559999976E7|         6040|        3960|
+----+-----+--------------------+-------------+------------+



In [28]:
df.select("TransactionDate").show(10, truncate=False)


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 30, Finished, Available, Finished)

+---------------+
|TransactionDate|
+---------------+
|NULL           |
|NULL           |
|NULL           |
|NULL           |
|NULL           |
|NULL           |
|NULL           |
|NULL           |
|NULL           |
|NULL           |
+---------------+
only showing top 10 rows



In [35]:
df.columns


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 37, Finished, Available, Finished)

['TransactionID',
 'AccountID',
 'TransactionDate',
 'TransactionAmount',
 'TransactionType',
 'TransactionChannel',
 'ProductID',
 'Status',
 'year',
 'month',
 'TransactionAmount_Positive',
 'IsCredit',
 'IsDebit']

In [37]:
from pyspark.sql.functions import col, to_date, year, month, dayofweek

# 1. Cargamos el archivo asegurándonos de que lea la fecha como texto primero (String)
# para evitar que Spark la convierta en NULL automáticamente
df_raw = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/FactTransaction.csv")

# 2. Corregimos la fecha. 
# NOTA: Si tu fecha en el CSV es '2023-01-01', usamos 'yyyy-MM-dd'
# Si es '01/01/2023', usamos 'dd/MM/yyyy'. Ajusta según veas tu CSV.
df_cleansed = df_raw.withColumn("TransactionDate_Fixed", to_date(col("TransactionDate"), "yyyy-MM-dd"))

# 3. Ahora que ya no es NULL, creamos las columnas para las métricas
df_final = df_cleansed.withColumn("Year", year(col("TransactionDate_Fixed"))) \
                      .withColumn("Month", month(col("TransactionDate_Fixed")))

# Verificamos si ya tenemos datos
df_final.select("TransactionDate", "TransactionDate_Fixed", "Year", "Month").show(5)


StatementMeta(, 6e8a18f6-4a4b-4009-bb4e-8e9457911afd, 39, Finished, Available, Finished)

+---------------+---------------------+----+-----+
|TransactionDate|TransactionDate_Fixed|Year|Month|
+---------------+---------------------+----+-----+
|      2/24/2023|                 NULL|NULL| NULL|
|      8/31/2022|                 NULL|NULL| NULL|
|       1/3/2020|                 NULL|NULL| NULL|
|     11/29/2023|                 NULL|NULL| NULL|
|      2/18/2024|                 NULL|NULL| NULL|
+---------------+---------------------+----+-----+
only showing top 5 rows



In [5]:
from pyspark.sql import functions as F

# 1. CARGA DIRECTA (Asegúrate de que el nombre del archivo sea exacto)
# Si tu archivo en el Lakehouse se llama distinto, cámbialo aquí:
path = "Files/FactTransaction.csv" 

df_raw = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(path)

# 2. TRANSFORMACIÓN (Fechas y Montos)
# Aquí arreglamos el problema de las fechas y nos aseguramos de tener montos positivos
df_final = df_raw.withColumn("TransactionDate_Fixed", F.to_date(F.col("TransactionDate"), "M/d/yyyy")) \
                 .withColumn("Year", F.year(F.col("TransactionDate_Fixed"))) \
                 .withColumn("Month", F.month(F.col("TransactionDate_Fixed"))) \
                 .withColumn("TransactionAmount_Positive", F.abs(F.col("TransactionAmount"))) \
                 .withColumn("IsCredit", F.when(F.col("TransactionType") == "Credit", 1).otherwise(0)) \
                 .withColumn("IsDebit", F.when(F.col("TransactionType") == "Debit", 1).otherwise(0))

# 3. AGREGACIÓN MENSUAL (Paso 3 y 4)
summary_monthly = df_final.groupBy("Year", "Month").agg(
    F.count("TransactionID").alias("Total_Transacciones"),
    F.sum("TransactionAmount_Positive").alias("Monto_Total"),
    F.sum("IsCredit").alias("Total_Creditos"),
    F.sum("IsDebit").alias("Total_Debitos")
).orderBy("Year", "Month")

# 4. RESULTADOS
print("✅ ¡Proceso completado con éxito!")
print("--- Resumen Mensual ---")
summary_monthly.show()

print("--- Top 5 Productos ---")
df_final.groupBy("ProductID").agg(
    F.sum("TransactionAmount_Positive").alias("Revenue_Producto")
).orderBy(F.col("Revenue_Producto").desc()).limit(5).show()

StatementMeta(, 8df41384-b7c0-4f61-99ed-c88a70c79b6c, 7, Finished, Available, Finished)

✅ ¡Proceso completado con éxito!
--- Resumen Mensual ---
+----+-----+-------------------+------------------+--------------+-------------+
|Year|Month|Total_Transacciones|       Monto_Total|Total_Creditos|Total_Debitos|
+----+-----+-------------------+------------------+--------------+-------------+
|2020|    1|                127|323717.08000000013|            44|           83|
|2020|    2|                118|283682.50999999995|            52|           66|
|2020|    3|                157| 415269.5799999997|            60|           97|
|2020|    4|                129| 329925.9999999998|            61|           68|
|2020|    5|                124|          323018.2|            56|           68|
|2020|    6|                129| 311603.0899999999|            51|           78|
|2020|    7|                146|375264.63999999996|            62|           84|
|2020|    8|                157| 383821.4900000001|            60|           97|
|2020|    9|                143|376780.73999999993| 

In [6]:
# Guardamos el resumen mensual para usarlo en Power BI
summary_monthly.write.format("delta").mode("overwrite").saveAsTable("FactTransaction_Summary")

# Guardamos el top de productos también
top_productos = df_final.groupBy("ProductID").agg(F.sum("TransactionAmount_Positive").alias("Total_Revenue"))
top_productos.write.format("delta").mode("overwrite").saveAsTable("DimProduct_Performance")

print("✅ ¡Tablas guardadas en el Lakehouse! Ya puedes verlas en la sección 'Tables' a la izquierda.")

StatementMeta(, 8df41384-b7c0-4f61-99ed-c88a70c79b6c, 8, Finished, Available, Finished)

✅ ¡Tablas guardadas en el Lakehouse! Ya puedes verlas en la sección 'Tables' a la izquierda.
