In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, array, concat_ws, abs

# Inicializar Spark
spark = SparkSession.builder.appName("ErroresLoans").getOrCreate()


In [10]:
# Cargar datos
df = spark.read.option("header", True).option("inferSchema", True).csv("../data/bronze/loans_101.csv")

In [11]:
# Definir columnas de error
df = df.withColumn(
    "error_num_payed_gt_num_fees",
    when(col("num_payed_fees") > col("num_fees"), "num_payed_fees > num_fees")
)

In [12]:
df = df.withColumn(
    "error_active_true_with_end_date",
    when((col("active") == True) & (col("end_date").isNotNull()), "active TRUE con end_date")
)

In [13]:

df = df.withColumn(
    "error_active_false_no_end_date",
    when((col("active") == False) & (col("end_date").isNull()), "active FALSE sin end_date")
)

In [15]:
df = df.withColumn(
    "expected_total_returned",
    (col("num_payed_fees") / col("num_fees")) * col("total_amount")
)

In [16]:
df = df.withColumn(
    "error_total_returned_incorrect",
    when(abs((col("total_returned") - col("expected_total_returned")).cast("double")) > 0.01, "total_returned incorrecto")
)

In [17]:
df = df.withColumn(
    "error_total_amount_incorrect",
    when(col("total_amount") != col("capital"), "total_amount incorrecto")
)


In [20]:
df = df.withColumn(
    "error_defaulter_paid",
    when((col("status") == "defaulter") & (col("num_payed_fees") == col("num_fees")), "defaulter con todo pagado")
)

In [21]:
error_cols = [
    "error_num_payed_gt_num_fees",
    "error_active_true_with_end_date",
    "error_active_false_no_end_date",
    "error_total_returned_incorrect",
    "error_total_amount_incorrect",
    "error_defaulter_paid"
]

In [22]:
df_errores = df.withColumn(
    "errores",
    concat_ws(", ", *[col(c) for c in error_cols])
).filter(col("errores") != "")

In [23]:
# Seleccionar columnas importantes + errores
df_errores = df_errores.select("customer_id", "num_fees", "num_payed_fees", "total_amount", "total_returned", "status", "errores")

In [27]:
df_errores.show(7)

+-----------+--------+--------------+------------+--------------+------+--------------------+
|customer_id|num_fees|num_payed_fees|total_amount|total_returned|status|             errores|
+-----------+--------+--------------+------------+--------------+------+--------------------+
|          1|      48|            42|    16944.81|      14826.84|    ok|total_returned in...|
|          2|      24|             3|     3287.77|        410.97|  late|total_amount inco...|
|          3|      18|             0|    10500.78|           0.0|    ok|total_amount inco...|
|          4|      10|             2|     3333.56|        666.72|    ok|total_amount inco...|
|          5|      48|            17|    16005.79|       5668.65|    ok|total_returned in...|
|          6|      18|            14|     8584.72|       6677.02|    ok|total_returned in...|
|          7|      18|             1|     5341.12|        296.73|    ok|total_amount inco...|
+-----------+--------+--------------+------------+----------

In [29]:
df_errores.select("errores").limit(7).collect()

[Row(errores='total_returned incorrecto, total_amount incorrecto'),
 Row(errores='total_amount incorrecto'),
 Row(errores='total_amount incorrecto'),
 Row(errores='total_amount incorrecto'),
 Row(errores='total_returned incorrecto, total_amount incorrecto'),
 Row(errores='total_returned incorrecto, total_amount incorrecto'),
 Row(errores='total_amount incorrecto')]

In [None]:
df_errores.write.mode("overwrite").option("header", True).csv("../data/bronze/loans_errores.csv")


In [None]:
spark.stop()