# Auto reload modules

In [1]:
%load_ext autoreload
%autoreload 2

# Directories

In [2]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql.functions import col, when, count, sum, avg
from pyspark.sql import SparkSession, Window
from dataclasses import dataclass
import time
import fs
import os

In [3]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "../../spark-3.5.3-bin-hadoop3"

In [4]:
import findspark
findspark.init()

Preparación del ambiente de Spark

In [5]:
spark = (SparkSession.builder
  .master('local[*]')
  .appName('hello_world_spark')
  .getOrCreate())

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

24/12/24 21:16:31 WARN Utils: Your hostname, ThinkPad-X1-Nano resolves to a loopback address: 127.0.1.1; using 192.168.3.7 instead (on interface wlp0s20f3)
24/12/24 21:16:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/24 21:16:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


You are working with 1 core(s)


In [6]:
sc = spark.sparkContext

Mejorar la lectura de las tablas

In [7]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Funciones adicionales

In [8]:
def calculate_exec_time(funct):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = funct(*args, **kwargs)
        end = time.time()
        print(f"Time of execution: {(end - start):.2} s of {funct.__name__}")
        return result
    return wrapper

Cargar archivos usando la librería fs para que sea agnóstico a cualquier ambiente distribuido o local.

In [9]:
RAW_DIR = fs.open_fs("../../data/raw")
CSV_DIR = RAW_DIR.getsyspath("dummy_dataset.csv")
PARQUET_DIR = RAW_DIR.getsyspath("dummy_dataset.parquet")

REPORTS_DIR = fs.open_fs("../../reports")

In [10]:
filePath = CSV_DIR

df = spark.read.csv(filePath,
                       header="true",
                       inferSchema="true",
                       multiLine="true",
                       escape='"')

df.show(3, truncate=False)

                                                                                

+-----------+-----------+--------+---------------+
|customer_id|cutoff_date|amount  |has_full_months|
+-----------+-----------+--------+---------------+
|1          |2021-01-01 |41217.08|true           |
|1          |2021-02-01 |48969.98|true           |
|1          |2021-03-01 |33564.35|true           |
+-----------+-----------+--------+---------------+
only showing top 3 rows



In [11]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- cutoff_date: date (nullable = true)
 |-- amount: double (nullable = true)
 |-- has_full_months: boolean (nullable = true)



In [12]:
threshold = 5_000
df_clean = (
    df
    .select(
        "cutoff_date",
        "amount",
        when(
            (col("amount") > threshold), True
        ).otherwise(False).alias("Criteria"),

    )
    
)

In [13]:
df_clean.show()

+-----------+--------+--------+
|cutoff_date|  amount|Criteria|
+-----------+--------+--------+
| 2021-01-01|41217.08|    true|
| 2021-02-01|48969.98|    true|
| 2021-03-01|33564.35|    true|
| 2021-04-01| 6543.62|    true|
| 2021-05-01|30311.81|    true|
| 2021-06-01| 40808.1|    true|
| 2021-07-01|48102.71|    true|
| 2021-08-01| 15100.6|    true|
| 2021-09-01|44219.72|    true|
| 2021-10-01|45992.39|    true|
| 2021-11-01|  738.01|   false|
| 2021-12-01|46892.18|    true|
| 2022-01-01|30858.53|    true|
| 2022-02-01|27262.94|    true|
| 2022-03-01|16794.78|    true|
| 2022-04-01|11206.15|    true|
| 2022-05-01|48435.95|    true|
| 2022-06-01| 46674.3|    true|
| 2022-07-01|28267.97|    true|
| 2022-08-01|15815.85|    true|
+-----------+--------+--------+
only showing top 20 rows



In [41]:
from pyspark.sql import Window
from pyspark.sql.functions import col, when, count

def uun(
        partitionBy_col: str,
        orderBy_col: str,
        n_months: int,
        u_months: int,
        process_col: str,
        threshold: float,
        criteria: str,
        start_month: int = 0
):
    window = Window.partitionBy(partitionBy_col).orderBy(col(orderBy_col).desc()).rowsBetween(start_month, n_months)
    threshold_result = get_threshold_result(
        process_col=process_col,
        threshold=threshold,
        criteria=criteria
    )
    count_months = count(when(threshold_result , True).otherwise(None)).over(window)
    return when(count_months >= u_months , True).otherwise(None)


def get_threshold_result(
        process_col : str,
        threshold : float,
        criteria : str = ">"
    ):
    
    operators = {
        ">": lambda : col(process_col) > threshold,
        "<": lambda : col(process_col) < threshold,
        "==": lambda : col(process_col) == threshold,
        "!=": lambda : col(process_col) != threshold,
        ">=": lambda : col(process_col) >= threshold,
        "<=": lambda : col(process_col) <= threshold,
    }
    
    # Verificar si el operador es válido
    if criteria in operators:
        condition = operators[criteria]()
        return when(condition, True).otherwise(None)
    else:
        raise ValueError(f"Unsupported criteria: {criteria}")


In [42]:
df_clean = (
    df
    .select(
        "cutoff_date",
        "customer_id",
        "amount",
        uun (
            partitionBy_col = "customer_id",
            orderBy_col = "cutoff_date",
            n_months = 11,
            u_months = 2, 
            process_col = "amount",
            threshold = 47_000,
            criteria = ">=",
            start_month = 0
        ).alias("unn_amount")
    )
)

In [43]:
df_clean.show(12*2)

+-----------+-----------+--------+----------+
|cutoff_date|customer_id|  amount|unn_amount|
+-----------+-----------+--------+----------+
| 2024-12-01|          1|29240.81|      NULL|
| 2024-11-01|          1|15843.72|      NULL|
| 2024-10-01|          1|17079.07|      NULL|
| 2024-09-01|          1| 1414.37|      NULL|
| 2024-08-01|          1|42982.31|      NULL|
| 2024-07-01|          1|19211.52|      NULL|
| 2024-06-01|          1| 11995.0|      NULL|
| 2024-05-01|          1| 5626.83|      NULL|
| 2024-04-01|          1|31138.45|      NULL|
| 2024-03-01|          1|  5235.0|      NULL|
| 2024-02-01|          1|19863.17|      NULL|
| 2024-01-01|          1| 3500.15|      NULL|
| 2023-12-01|          1|18206.38|      NULL|
| 2023-11-01|          1|21242.76|      NULL|
| 2023-10-01|          1|33622.52|      NULL|
| 2023-09-01|          1| 8341.58|      NULL|
| 2023-08-01|          1|38901.08|      NULL|
| 2023-07-01|          1|29370.23|      NULL|
| 2023-06-01|          1|35007.94|

In [24]:
@dataclass
class ColumnConfig:
    """
    Configuration for processing a specific column with threshold criteria.

    Attributes:
        column_name (str): Name of the column to process.
        threshold (float): Threshold value for the column.
        criteria (str): Comparison operator to apply (e.g., '>', '<=', '==').
        u_months (int): Minimum number of months the threshold condition must be met.
    """
    column_name: str
    threshold: float
    criteria: str
    u_months: int


@dataclass
class ThresholdProcessor:
    """
    Processor for applying threshold-based logic on columns with window functions.

    Attributes:
        partition_by (str): Column to partition the data by (e.g., customer ID).
        order_by (str): Column to order the data by (e.g., cutoff date).
        n_months (int): Number of months to include in the window.
        start_month (int): Starting month offset for the window. Defaults to 0.
    """

    partition_by: str
    order_by: str
    n_months: int
    start_month: int = 0

    def get_threshold_result(self, process_col: str, threshold: float, criteria: str = ">") -> pyspark.sql.Column:
        """
        Generate a threshold condition for a given column.

        Args:
            process_col (str): Name of the column to evaluate.
            threshold (float): Threshold value to compare against.
            criteria (str): Comparison operator (e.g., '>', '<=', '==').

        Returns:
            pyspark.sql.Column: A conditional column with the threshold logic applied.

        Raises:
            ValueError: If the provided criteria is not supported.
        """
        operators = {
            ">": lambda: col(process_col) > threshold,
            "<": lambda: col(process_col) < threshold,
            "==": lambda: col(process_col) == threshold,
            "!=": lambda: col(process_col) != threshold,
            ">=": lambda: col(process_col) >= threshold,
            "<=": lambda: col(process_col) <= threshold,
        }

        if criteria in operators:
            condition = operators[criteria]()
            return when(condition, True).otherwise(None)
        else:
            raise ValueError(f"Unsupported criteria: {criteria}")

    def process_column(self, config: ColumnConfig) -> pyspark.sql.Column:
        """
        Apply a threshold-based logic to a column using a window function.

        Args:
            config (ColumnConfig): Configuration for the column to process.

        Returns:
            pyspark.sql.Column: A column with the threshold logic applied and aggregated.
        """
        window = Window.partitionBy(self.partition_by).orderBy(col(self.order_by).desc()).rowsBetween(self.start_month, self.n_months)
        threshold_result = self.get_threshold_result(config.column_name, config.threshold, config.criteria)
        count_months = count(when(threshold_result, True).otherwise(None)).over(window)
        return when(count_months >= config.u_months, True).otherwise(None)


In [30]:
# Configurations for each column
columns_to_process = [
    ColumnConfig(column_name="amount", threshold=37_000, criteria=">=", u_months=3),
    # ColumnConfig(column_name="payroll", threshold=35_000, criteria=">", u_months=3),
    # ColumnConfig(column_name="credit", threshold=20_000, criteria="<", u_months=5),
]

# General processor
processor = ThresholdProcessor(
    partition_by="customer_id",
    order_by="cutoff_date",
    n_months=11
)

# Apply to the DataFrame
df_clean = df.select(
    "cutoff_date",
    "customer_id",
    "amount",
    # "payroll",
    # "credit",
    *[
        processor.process_column(config).alias(f"uun_{config.column_name}")
        for config in columns_to_process
    ]
)

In [31]:
df_clean.show(12*3)

+-----------+-----------+--------+----------+
|cutoff_date|customer_id|  amount|uun_amount|
+-----------+-----------+--------+----------+
| 2024-12-01|          1|29240.81|      NULL|
| 2024-11-01|          1|15843.72|      NULL|
| 2024-10-01|          1|17079.07|      NULL|
| 2024-09-01|          1| 1414.37|      NULL|
| 2024-08-01|          1|42982.31|      NULL|
| 2024-07-01|          1|19211.52|      NULL|
| 2024-06-01|          1| 11995.0|      NULL|
| 2024-05-01|          1| 5626.83|      NULL|
| 2024-04-01|          1|31138.45|      NULL|
| 2024-03-01|          1|  5235.0|      NULL|
| 2024-02-01|          1|19863.17|      NULL|
| 2024-01-01|          1| 3500.15|      NULL|
| 2023-12-01|          1|18206.38|      NULL|
| 2023-11-01|          1|21242.76|      NULL|
| 2023-10-01|          1|33622.52|      NULL|
| 2023-09-01|          1| 8341.58|      NULL|
| 2023-08-01|          1|38901.08|      NULL|
| 2023-07-01|          1|29370.23|      NULL|
| 2023-06-01|          1|35007.94|

In [34]:
window = Window.partitionBy("customer_id").orderBy(col("cutoff_date").desc()).rowsBetween(0, 11)

In [35]:
df_clean = df.select(
    "cutoff_date",
    "customer_id",
    "amount",
    (
        avg(col("amount")).over(window)
    ).alias("avg_threshold")
)

In [36]:
df_clean.show(12*1)

+-----------+-----------+--------+------------------+
|cutoff_date|customer_id|  amount|     avg_threshold|
+-----------+-----------+--------+------------------+
| 2024-12-01|          1|29240.81|16927.533333333333|
| 2024-11-01|          1|15843.72|        16007.9975|
| 2024-10-01|          1|17079.07|        16457.9175|
| 2024-09-01|          1| 1414.37|17836.538333333334|
| 2024-08-01|          1|42982.31|18413.805833333332|
| 2024-07-01|          1|19211.52|18073.703333333335|
| 2024-06-01|          1| 11995.0|        18920.2625|
| 2024-05-01|          1| 5626.83|        20838.0075|
| 2024-04-01|          1|31138.45|20978.256666666664|
| 2024-03-01|          1|  5235.0|20901.516666666666|
| 2024-02-01|          1|19863.17|          23221.25|
| 2024-01-01|          1| 3500.15|22449.489166666666|
+-----------+-----------+--------+------------------+
only showing top 12 rows



In [14]:
from dataclasses import dataclass
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import col, when, count, avg, sum, expr


@dataclass
class WindowConfig:
    """
    Configuration for the window function in the column processing.

    Attributes:
        partition_by (str): The column to partition by.
        order_by (str): The column to order by.
        n_months (int): The number of months in the window.
        start_month (int): The starting month for the window.
    """
    partition_by: str
    order_by: str
    n_months: int
    start_month: int = 0


@dataclass
class ColumnConfig:
    """
    Configuration for processing a specific column with threshold criteria and aggregation.

    Attributes:
        column_name (str): Name of the column to process.
        threshold (float): Threshold value for the column.
        criteria (str): Comparison operator to apply (e.g., '>', '<=', '==').
        u_months (int): Minimum number of months the threshold condition must be met (only used for 'threshold' operation).
        operation (str): Aggregation operation to apply ('sum', 'avg', 'percentile', or 'threshold').
    """
    column_name: str
    threshold: float
    criteria: str
    operation: str  # 'sum', 'avg', 'percentile', or 'threshold'
    u_months: int = None  # Only used for 'threshold' operation


class WindowProcessor:
    """
    Processes columns using Spark SQL Window functions with threshold-based filtering and aggregation.
    """
    def __init__(self, window_config: WindowConfig):
        self.window_config = window_config
        self.window = Window.partitionBy(self.window_config.partition_by) \
                            .orderBy(col(self.window_config.order_by).desc()) \
                            .rowsBetween(self.window_config.start_month, self.window_config.n_months)

    def process_column(self, config: ColumnConfig):
        """
        Processes a single column based on the configuration.

        Args:
            config (ColumnConfig): Configuration for the column processing.

        Returns:
            Column: A processed column with the specified aggregation or threshold condition.
        """
        if config.operation in ["sum", "avg"]:
            # Apply the window aggregation correctly
            operation_func = {"sum": sum, "avg": avg}[config.operation]
            aggregated_column = operation_func(col(config.column_name)).over(self.window)
            return self.apply_threshold(aggregated_column, config)
        elif config.operation == "percentile":
            # Use Spark SQL percentile_approx for percentile calculation
            aggregated_column = expr(f"percentile_approx({config.column_name}, 0.5)").over(self.window)
            return self.apply_threshold(aggregated_column, config)
        elif config.operation == "threshold":
            return self.process_threshold(config)
        else:
            raise ValueError(f"Unsupported operation: {config.operation}")

    def apply_threshold(self, aggregated_column, config: ColumnConfig):
        """
        Applies the threshold condition on the aggregated column.

        Args:
            aggregated_column (Column): The aggregated column.
            config (ColumnConfig): Configuration for the column processing.

        Returns:
            Column: A column indicating if the aggregated value meets the threshold condition.
        """
        condition = self.get_threshold_condition(config)
        return when(aggregated_column >= config.threshold, True).otherwise(None)

    def process_threshold(self, config: ColumnConfig):
        """
        Processes a column to evaluate if it meets the threshold condition over the specified number of months.

        Args:
            config (ColumnConfig): Configuration for the column processing.

        Returns:
            Column: A column indicating if the threshold condition is met for the required number of months.
        """
        condition = self.get_threshold_condition(config)
        count_months = count(when(condition, True).otherwise(None)).over(self.window)
        return when(count_months >= config.u_months, True).otherwise(None)

    def get_threshold_condition(self, config: ColumnConfig):
        """
        Generates the threshold condition based on the configuration.

        Args:
            config (ColumnConfig): Configuration for the column processing.

        Returns:
            Column: A column representing the threshold condition.
        """
        operators = {
            ">": lambda: col(config.column_name) > config.threshold,
            "<": lambda: col(config.column_name) < config.threshold,
            "==": lambda: col(config.column_name) == config.threshold,
            "!=": lambda: col(config.column_name) != config.threshold,
            ">=": lambda: col(config.column_name) >= config.threshold,
            "<=": lambda: col(config.column_name) <= config.threshold,
        }
        if config.criteria in operators:
            return operators[config.criteria]()
        else:
            raise ValueError(f"Unsupported criteria: {config.criteria}")

In [15]:
# Crear la configuración de la ventana
window_config = WindowConfig(partition_by="customer_id", order_by="cutoff_date", n_months=11)

# Crear las configuraciones de las columnas
columns_to_process = [
    # ColumnConfig(column_name="amount", threshold=20_500, criteria=">=", operation="avg"),
    # ColumnConfig(column_name="payroll", threshold=35000, criteria=">", u_months=3, operation="sum"),
    # ColumnConfig(column_name="amount", threshold=20_500, criteria="<", u_months=5, operation="percentile"),
    ColumnConfig(column_name="amount", threshold=10_000, criteria=">=", u_months=10, operation="threshold"),  # Ejemplo de umbral
]

# Crear el procesador de ventana
processor = WindowProcessor(window_config)

# Procesar el DataFrame con las configuraciones de las columnas
df_clean = df.select(
    "cutoff_date",
    "customer_id",
    "amount",
    *[
        # processor.process_column(config).alias(f"{config.operation}_{config.column_name}")
        processor.process_column(config).alias(f"Criteria_{config.column_name}")

        for config in columns_to_process
    ]
)

In [None]:
# df_clean.orderBy("customer_id", col("cutoff_date").desc()).show(12*1)
df_clean.show(12*2)

+-----------+-----------+--------+---------------+
|cutoff_date|customer_id|  amount|Criteria_amount|
+-----------+-----------+--------+---------------+
| 2024-12-01|          1|29240.81|           NULL|
| 2024-11-01|          1|15843.72|           NULL|
| 2024-10-01|          1|17079.07|           NULL|
| 2024-09-01|          1| 1414.37|           NULL|
| 2024-08-01|          1|42982.31|           NULL|
| 2024-07-01|          1|19211.52|           NULL|
| 2024-06-01|          1| 11995.0|           NULL|
| 2024-05-01|          1| 5626.83|           NULL|
| 2024-04-01|          1|31138.45|           NULL|
| 2024-03-01|          1|  5235.0|           NULL|
| 2024-02-01|          1|19863.17|           NULL|
| 2024-01-01|          1| 3500.15|           NULL|
| 2023-12-01|          1|18206.38|           true|
| 2023-11-01|          1|21242.76|           true|
| 2023-10-01|          1|33622.52|           true|
| 2023-09-01|          1| 8341.58|           NULL|
| 2023-08-01|          1|38901.

24/12/24 21:13:55 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 258494 ms exceeds timeout 120000 ms
24/12/24 21:13:55 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/24 21:13:59 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [121]:
(
    df
    .orderBy("customer_id", col("cutoff_date").desc())
).show()

+-----------+-----------+--------+---------------+
|customer_id|cutoff_date|  amount|has_full_months|
+-----------+-----------+--------+---------------+
|          1| 2024-12-01|29240.81|           true|
|          1| 2024-11-01|15843.72|           true|
|          1| 2024-10-01|17079.07|           true|
|          1| 2024-09-01| 1414.37|           true|
|          1| 2024-08-01|42982.31|           true|
|          1| 2024-07-01|19211.52|           true|
|          1| 2024-06-01| 11995.0|           true|
|          1| 2024-05-01| 5626.83|           true|
|          1| 2024-04-01|31138.45|           true|
|          1| 2024-03-01|  5235.0|           true|
|          1| 2024-02-01|19863.17|           true|
|          1| 2024-01-01| 3500.15|           true|
|          1| 2023-12-01|18206.38|           true|
|          1| 2023-11-01|21242.76|           true|
|          1| 2023-10-01|33622.52|           true|
|          1| 2023-09-01| 8341.58|           true|
|          1| 2023-08-01|38901.

In [None]:
from dataclasses import dataclass, field
from pyspark.sql import Window
from pyspark.sql.functions import col, when, count, avg, sum, expr
from typing import Optional, List


@dataclass
class WindowConfig:
    """
    Configuration for Spark SQL window functions.
    """
    partition_by: str
    order_by: str
    n_months: int
    start_month: int = 0

    def __post_init__(self):
        valid_operations = {"sum", "avg", "percentile", "threshold"}
        if self.operation not in valid_operations:
            raise ValueError(f"Invalid operation: {self.operation}. Must be one of {valid_operations}.")
        if self.operation == "threshold" and (self.threshold is None or self.criteria is None or self.u_months is None):
            raise ValueError("Threshold operations requiere 'threshold', 'criteria', and 'u_months'.")

    @property
    def window(self):
        """
        Generates a Spark SQL Window specification based on the configuration.
        """
        return Window.partitionBy(self.partition_by) \
                     .orderBy(col(self.order_by).desc()) \
                     .rowsBetween(self.start_month, self.n_months)


@dataclass
class ColumnConfig:
    """
    Configuration for processing a specific column with aggregation or threshold operations.
    """
    column_name: str
    operation: str  # 'sum', 'avg', 'percentile', or 'threshold'
    threshold: Optional[float] = None
    criteria: Optional[str] = None  # '>', '<=', '==', etc., for threshold operations
    u_months: Optional[int] = None  # Minimum months for 'threshold' operation


class WindowProcessor:
    """
    Processes columns using Spark SQL window functions.
    """

    def __init__(self, window_config: WindowConfig):
        self.window_config = window_config

    def process_column(self, config: ColumnConfig):
        """
        Processes a column based on the specified configuration.
        """
        if config.operation == "threshold":
            return self._process_threshold(config)
        aggregated_column = self._aggregate_column(config)
        return self._apply_threshold(aggregated_column, config) if config.threshold else aggregated_column

    def _aggregate_column(self, config: ColumnConfig):
        """
        Aggregates a column based on the specified operation.
        """
        operations = {
            "sum": lambda: sum(col(config.column_name)).over(self.window_config.window),
            "avg": lambda: avg(col(config.column_name)).over(self.window_config.window),
            "percentile": lambda: expr(f"percentile_approx({config.column_name}, 0.5)").over(self.window_config.window),
        }
        if config.operation not in operations:
            raise ValueError(f"Unsupported operation: {config.operation}. Supported operations {', '.join(operations.keys())}")
        return operations[config.operation]()

    def _apply_threshold(self, aggregated_column, config: ColumnConfig):
        """
        Applies a threshold condition to the aggregated column.
        """
        condition = self._get_threshold_condition(config)
        return when(condition, True).otherwise(None)

    def _process_threshold(self, config: ColumnConfig):
        """
        Evaluates if the column meets the threshold condition over the specified number of months.
        """
        condition = self._get_threshold_condition(config)
        count_months = count(when(condition, True)).over(self.window_config.window)
        return when(count_months >= config.u_months, True).otherwise(None)

    def _get_threshold_condition(self, config: ColumnConfig):
        """
        Generates a threshold condition based on the criteria.
        """
        operators = {
            ">": col(config.column_name) > config.threshold,
            "<": col(config.column_name) < config.threshold,
            "==": col(config.column_name) == config.threshold,
            "!=": col(config.column_name) != config.threshold,
            ">=": col(config.column_name) >= config.threshold,
            "<=": col(config.column_name) <= config.threshold,
        }
        if config.criteria not in operators:
            raise ValueError(f"Unsupported criteria: {config.criteria}")
        return operators[config.criteria]

In [12]:
# Crear la configuración de la ventana
window_config = WindowConfig(partition_by="customer_id", order_by="cutoff_date", n_months=11)

# Crear las configuraciones de las columnas
columns_to_process = [
    # ColumnConfig(column_name="amount", threshold=20_500, criteria=">=", operation="avg"),
    # ColumnConfig(column_name="payroll", threshold=35000, criteria=">", u_months=3, operation="sum"),
    # ColumnConfig(column_name="amount", threshold=20_500, criteria="<", u_months=5, operation="percentile"),
    ColumnConfig(column_name="amount", threshold=10_000, criteria=">=", u_months=10, operation="threshold"),  # Ejemplo de umbral
]

# Crear el procesador de ventana
processor = WindowProcessor(window_config)

# Procesar el DataFrame con las configuraciones de las columnas
df_clean = df.select(
    "cutoff_date",
    "customer_id",
    "amount",
    *[
        # processor.process_column(config).alias(f"{config.operation}_{config.column_name}")
        processor.process_column(config).alias(f"Criteria_{config.column_name}")

        for config in columns_to_process
    ]
)

In [13]:
df_clean.show(12*2)

+-----------+-----------+--------+---------------+
|cutoff_date|customer_id|  amount|Criteria_amount|
+-----------+-----------+--------+---------------+
| 2024-12-01|          1|29240.81|           NULL|
| 2024-11-01|          1|15843.72|           NULL|
| 2024-10-01|          1|17079.07|           NULL|
| 2024-09-01|          1| 1414.37|           NULL|
| 2024-08-01|          1|42982.31|           NULL|
| 2024-07-01|          1|19211.52|           NULL|
| 2024-06-01|          1| 11995.0|           NULL|
| 2024-05-01|          1| 5626.83|           NULL|
| 2024-04-01|          1|31138.45|           NULL|
| 2024-03-01|          1|  5235.0|           NULL|
| 2024-02-01|          1|19863.17|           NULL|
| 2024-01-01|          1| 3500.15|           NULL|
| 2023-12-01|          1|18206.38|           true|
| 2023-11-01|          1|21242.76|           true|
| 2023-10-01|          1|33622.52|           true|
| 2023-09-01|          1| 8341.58|           NULL|
| 2023-08-01|          1|38901.