In [1]:
import pandas as pd
import numpy as np

from sklearn.pipeline import Pipeline, FunctionTransformer, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.base import BaseEstimator, TransformerMixin

In [2]:
claims = pd.read_csv('./claim.csv', names=['debt_id', 'collected_amount', 'date_of_trans'], header=0)
actions = pd.read_csv('./action.csv', names=['debt_id', 'contact_date'], header=0)
collections = pd.read_csv('collection.csv', names=['debt_id', 'collector_id', 'enter_date', 'amount_outstanding', 'amount',
       'service_end_date', 'agent_id'], header=0)

In [38]:
def nan_droper(X):
    """
    Remove rows with missing values from the DataFrame.

    This function creates a copy of the input DataFrame and removes all rows that contain
    missing values (NaNs) in any column. The resulting DataFrame will only contain rows
    with complete data.

    Parameters
    ----------
    X : DataFrame
        The input data from which rows with missing values will be removed. A pandas DataFrame
        containing potential NaN values.

    Returns
    -------
    X_transformed : DataFrame
        A new DataFrame with rows containing missing values removed. The original DataFrame
        remains unchanged.

    Examples
    --------
    >>> import pandas as pd
    >>> data = pd.DataFrame({
    ...     'A': [1, 2, None, 4],
    ...     'B': [None, 2, 3, 4],
    ...     'C': [1, None, 3, 4]
    ... })
    >>> nan_droper(data)
       A    B    C
    3  4  4.0  4.0
    """
    X_transformed = X.copy()
    return X_transformed.dropna()


class DropBy(BaseEstimator, TransformerMixin):
    """
    A custom transformer that drops rows from a DataFrame based on a specified algorithm.

    The `DropBy` transformer allows users to remove rows from a DataFrame according to a
    custom algorithm provided at initialization. This is particularly useful in scenarios
    where specific, complex conditions need to be applied to filter out rows from the data.

    Parameters
    ----------
    algorithm : callable
        A function that takes a DataFrame `X` as input and returns a boolean Series
        with the same index as `X`. Rows corresponding to `True` in the Series will
        be dropped from the DataFrame during the transformation.

    Methods
    -------
    fit(X, y=None):
        Fit the transformer to the data. This transformer does not require fitting,
        so it simply returns `self`.

    transform(X):
        Apply the row-dropping algorithm to the input DataFrame.

    Parameters
    ----------
    X : DataFrame
        The input data to be transformed. A pandas DataFrame from which rows will
        be removed based on the algorithm.

    y : None, optional
        Ignored. This parameter is included to maintain compatibility with the
        scikit-learn transformer API.

    Returns
    -------
    X_transformed : DataFrame
        The transformed DataFrame with rows dropped according to the algorithm.

    Examples
    --------
    >>> import pandas as pd
    >>> from sklearn.pipeline import Pipeline
    >>> from sklearn.base import BaseEstimator, TransformerMixin
    >>> 
    >>> # Define a sample algorithm that drops rows where the value in column 'A' is greater than 2
    >>> def drop_algorithm(X):
    ...     return X['A'] > 2
    ...
    >>> # Sample data
    >>> data = pd.DataFrame({
    ...     'A': [1, 2, 3, 4],
    ...     'B': [5, 6, 7, 8]
    ... })
    ...
    >>> # Create the transformer
    >>> drop_by_transformer = DropBy(algorithm=drop_algorithm)
    ...
    >>> # Apply the transformation
    >>> transformed_data = drop_by_transformer.fit_transform(data)
    >>> print(transformed_data)
       A  B
    0  1  5
    1  2  6
    """

    def __init__(self, algorithm):
        """
        Initialize the DropBy transformer.

        Parameters
        ----------
        algorithm : callable
            A function that takes a DataFrame `X` as input and returns a boolean Series
            with the same index as `X`. Rows corresponding to `True` in the Series will
            be dropped from the DataFrame during the transformation.
        """
        self.algorithm_ = algorithm
        
    def fit(self, X, y=None):
        """
        Fit the transformer to the data. This transformer does not require fitting,
        so it simply returns `self`.

        Parameters
        ----------
        X : DataFrame
            The input data to fit. Not used in this transformer.

        y : None, optional
            Ignored. This parameter is included to maintain compatibility with the
            scikit-learn transformer API.

        Returns
        -------
        self : DropBy
            The fitted transformer.
        """
        return self
    
    def transform(self, X):
        """
        Apply the row-dropping algorithm to the input DataFrame.

        Parameters
        ----------
        X : DataFrame
            The input data to be transformed. A pandas DataFrame from which rows will
            be removed based on the algorithm.

        Returns
        -------
        X_transformed : DataFrame
            The transformed DataFrame with rows dropped according to the algorithm.
        """
        X_transformed = X.copy()
        return X_transformed.drop(X_transformed.loc[self.algorithm_(X)].index)


class ToDateTimeTransformer(BaseEstimator, TransformerMixin):
    """
    A custom transformer that converts specified columns in a DataFrame to datetime format.

    This transformer is designed for use in a scikit-learn pipeline to convert one or more
    columns in a DataFrame to datetime format. The transformation uses pandas' `pd.to_datetime`
    function, with additional parameters passed via `kwargs`.

    Parameters
    ----------
    cols : list of str
        Column(s) to convert to datetime. This can be a single column name or a list of column names.

    kwargs : keyword arguments, optional
        Additional parameters to pass to `pd.to_datetime`. For example, `format`, `errors`, etc.

    Methods
    -------
    fit(X, y=None):
        Fit the transformer to the data. This transformer does not require fitting,
        so it simply returns `self`.

    transform(X):
        Convert specified columns to datetime format in the input DataFrame.

    Parameters
    ----------
    X : DataFrame
        The input data to be transformed. A pandas DataFrame with columns specified in `cols`.

    y : None
        Ignored. This parameter is included to maintain compatibility with the
        scikit-learn transformer API.

    Returns
    -------
    X_transformed : DataFrame
        The transformed DataFrame with specified columns converted to datetime format.

    Examples
    --------
    >>> import pandas as pd
    >>> from sklearn.pipeline import Pipeline
    >>> data = pd.DataFrame({
    ...     'Date': ['2024-01-01', '2024-02-01', '2024-03-01'],
    ...     'Value': [10, 20, 30]
    ... })
    >>> transformer = ToDateTimeTransformer(cols=['Date'])
    >>> transformer.fit_transform(data)
         Date  Value
    0 2024-01-01     10
    1 2024-02-01     20
    2 2024-03-01     30
    """

    def __init__(self, cols, **kwargs):
        """
        Initialize the ToDateTimeTransformer.

        Parameters
        ----------
        cols : list of str
            Column(s) to convert to datetime. This can be a single column name or a list of column names.

        kwargs : keyword arguments, optional
            Additional parameters to pass to `pd.to_datetime`. For example, `format`, `errors`, etc.
        """
        self.cols_ = cols
        self.kwargs_ = kwargs
    
    def fit(self, X, y=None):
        """
        Fit the transformer to the data. This transformer does not require fitting,
        so it simply returns `self`.

        Parameters
        ----------
        X : DataFrame
            The input data to fit. Not used in this transformer.

        y : None
            Ignored. This parameter is included to maintain compatibility with the
            scikit-learn transformer API.

        Returns
        -------
        self : ToDateTimeTransformer
            The fitted transformer.
        """
        return self
    
    def transform(self, X):
        """
        Convert specified columns to datetime format in the input DataFrame.

        Parameters
        ----------
        X : DataFrame
            The input data to be transformed. A pandas DataFrame with columns specified in `cols`.

        Returns
        -------
        X_transformed : DataFrame
            The transformed DataFrame with specified columns converted to datetime format.
        
        Raises
        ------
        KeyError
            If any of the specified columns are not found in the input DataFrame.

        Examples
        --------
        >>> import pandas as pd
        >>> data = pd.DataFrame({
        ...     'Date': ['2024-01-01', '2024-02-01', '2024-03-01'],
        ...     'Value': [10, 20, 30]
        ... })
        >>> transformer = ToDateTimeTransformer(cols=['Date'])
        >>> transformer.transform(data)
             Date  Value
        0 2024-01-01     10
        1 2024-02-01     20
        2 2024-03-01     30
        """
        X_transformed = X.copy()
        if not all(col in X_transformed.columns for col in self.cols_):
            raise KeyError(f"Some columns specified in 'cols' are not in the DataFrame.")
        X_transformed[self.cols_] = X_transformed[self.cols_].apply(pd.to_datetime, **self.kwargs_)
        return X_transformed


class GroupbyTransformer(BaseEstimator, TransformerMixin):
    """
    A custom transformer that groups data by specified columns and applies aggregation functions.

    This transformer is designed for use in a scikit-learn pipeline to perform group-by
    operations on a DataFrame and apply aggregation functions. The resulting DataFrame
    can optionally have its column names updated.

    Parameters
    ----------
    by : list or str
        Column(s) to group by. Can be a single column name or a list of column names.

    agg : dict, list, or None, optional (default=None)
        Aggregation functions to apply to the grouped data. If a dict is provided,
        keys should be column names and values should be functions or strings specifying
        the aggregation. If a list is provided, it should match the order of the columns
        in `by`. If None, no aggregation is performed.

    names : list or None, optional (default=None)
        List of new column names for the transformed DataFrame. If provided, it should
        have the same length as the number of columns in the transformed DataFrame.

    Methods
    -------
    fit(X, y=None):
        Fit the transformer to the data. This transformer does not require fitting,
        so it simply returns `self`.

    transform(X):
        Apply the group-by and aggregation operations to the input DataFrame.

    Parameters
    ----------
    X : DataFrame
        The input data to be transformed. A pandas DataFrame on which group-by and
        aggregation operations will be applied.

    y : None
        Ignored. This parameter is included to maintain compatibility with the
        scikit-learn transformer API.

    Returns
    -------
    X_transformed : DataFrame
        The transformed DataFrame with group-by and aggregation applied. Column names
        are updated if `names_` is provided.

    Examples
    --------
    >>> import pandas as pd
    >>> from sklearn.pipeline import Pipeline
    >>> data = pd.DataFrame({
    ...     'Category': ['A', 'B', 'A', 'B', 'A', 'B'],
    ...     'Value': [10, 20, 30, 40, 50, 60]
    ... })
    >>> transformer = GroupbyTransformer(by='Category', agg={'Value': 'sum'}, names=['Category', 'TotalValue'])
    >>> transformer.fit_transform(data)
      Category  TotalValue
    0         A          90
    1         B         120
    """
    
    def __init__(self, by, agg=None, names=None):
        """
        Initialize the GroupbyTransformer.

        Parameters
        ----------
        by : list or str
            Column(s) to group by. Can be a single column name or a list of column names.

        agg : dict, list, or None, optional (default=None)
            Aggregation functions to apply to the grouped data. If a dict is provided,
            keys should be column names and values should be functions or strings specifying
            the aggregation. If a list is provided, it should match the order of the columns
            in `by`. If None, no aggregation is performed.

        names : list or None, optional (default=None)
            List of new column names for the transformed DataFrame. If provided, it should
            have the same length as the number of columns in the transformed DataFrame.
        """
        self.by_ = by
        self.agg_ = agg
        self.names_ = names
    
    def fit(self, X, y=None):
        """
        Fit the transformer to the data. This transformer does not require fitting,
        so it simply returns `self`.

        Parameters
        ----------
        X : DataFrame
            The input data to fit. Not used in this transformer.

        y : None
            Ignored. This parameter is included to maintain compatibility with the
            scikit-learn transformer API.

        Returns
        -------
        self : GroupbyTransformer
            The fitted transformer.
        """
        return self
    
    def transform(self, X):
        """
        Apply the group-by and aggregation operations to the input DataFrame.

        Parameters
        ----------
        X : DataFrame
            The input data to be transformed. A pandas DataFrame on which group-by and
            aggregation operations will be applied.

        Returns
        -------
        X_transformed : DataFrame
            The transformed DataFrame with group-by and aggregation applied. Column names
            are updated if `names_` is provided.
        
        Raises
        ------
        KeyError
            If any of the columns specified in `by` or `agg` are not found in the DataFrame.

        Examples
        --------
        >>> import pandas as pd
        >>> data = pd.DataFrame({
        ...     'Category': ['A', 'B', 'A', 'B', 'A', 'B'],
        ...     'Value': [10, 20, 30, 40, 50, 60]
        ... })
        >>> transformer = GroupbyTransformer(by='Category', agg={'Value': 'sum'}, names=['Category', 'TotalValue'])
        >>> transformer.transform(data)
          Category  TotalValue
        0         A          90
        1         B         120
        """
        X_transformed = X.groupby(by=self.by_).agg(self.agg_).reset_index()
        if self.names_ is not None:
            X_transformed.columns = self.names_
        return X_transformed
    
    
class DataFrameMerger(BaseEstimator, TransformerMixin):
    """
    A custom transformer for joining two tables (DataFrames) based on a specified key.

    This transformer is designed to join an additional DataFrame to the input DataFrame
    during the transformation step. The join is performed on specified key columns.

    Parameters
    ----------
    right : DataFrame
        The right DataFrame to join with the input DataFrame.

    how : str, optional (default='inner')
        Type of join to perform. Options include 'left', 'right', 'outer', 'inner'.

    on : str or list
        Column or index level names to join on. Must be found in both DataFrames.

    Methods
    -------
    fit(X, y=None):
        This transformer does not require fitting, so it simply returns `self`.

    transform(X):
        Join the input DataFrame `X` with the right DataFrame.

    Examples
    --------
    >>> import pandas as pd
    >>> from sklearn.pipeline import Pipeline
    >>> 
    >>> # Sample data
    >>> left_df = pd.DataFrame({
    ...     'id': [1, 2, 3],
    ...     'value_left': ['A', 'B', 'C']
    ... })
    ...
    >>> right_df = pd.DataFrame({
    ...     'id': [1, 2, 4],
    ...     'value_right': ['D', 'E', 'F']
    ... })
    ...
    >>> # Create the transformer
    >>> join_transformer = JoinTables(right=right_df, how='left', on='id')
    ...
    >>> # Apply the transformation
    >>> transformed_data = join_transformer.fit_transform(left_df)
    >>> print(transformed_data)
       id value_left value_right
    0   1          A           D
    1   2          B           E
    2   3          C         NaN
    """

    def __init__(self, right, how='inner', on=None):
        self.right_ = right
        self.how_ = how
        self.on_ = on

    def fit(self, X, y=None):
        """
        This transformer does not require fitting, so it simply returns `self`.
        
        Parameters
        ----------
        X : DataFrame
            The input data to fit. Not used in this transformer.
        
        y : None, optional
            Ignored. This parameter is included to maintain compatibility with the
            scikit-learn transformer API.
        
        Returns
        -------
        self : JoinTables
            The fitted transformer.
        """
        return self

    def transform(self, X):
        """
        Join the input DataFrame `X` with the right DataFrame.

        Parameters
        ----------
        X : DataFrame
            The input data to be transformed. A pandas DataFrame to be joined with
            the right DataFrame.
        
        Returns
        -------
        X_transformed : DataFrame
            The DataFrame resulting from the join operation.
        """
        return X.merge(self.right_, how=self.how_, on=self.on_)
    

class ColumnAdder(BaseEstimator, TransformerMixin):
    """
    A custom transformer that adds a new column to a DataFrame based on existing columns.

    This transformer allows you to create a new column in the input DataFrame by applying
    a specified function to one or more existing columns.

    Parameters
    ----------
    column_names : list of str
        List of column names to be used as input for the function.
    
    adder : callable
        A function that takes the DataFrame and the list of column names as input and returns
        a Series or array-like object to be added as a new column.

    new_col : str
        Name of the new column to be added to the DataFrame.

    Methods
    -------
    fit(X, y=None):
        This transformer does not require fitting, so it simply returns `self`.

    transform(X):
        Add the new column to the input DataFrame by applying the specified function.

    Examples
    --------
    >>> import pandas as pd
    >>> from sklearn.pipeline import Pipeline
    >>> 
    >>> # Sample data
    >>> df = pd.DataFrame({
    ...     'A': [1, 2, 3],
    ...     'B': [4, 5, 6]
    ... })
    ...
    >>> # Example function to add a new column
    >>> def sum_columns(df, cols):
    ...     return df[cols].sum(axis=1)
    ...
    >>> # Create the transformer
    >>> column_adder = ColumnAdder(column_names=['A', 'B'], adder=sum_columns, new_col='A+B')
    ...
    >>> # Apply the transformation
    >>> transformed_df = column_adder.fit_transform(df)
    >>> print(transformed_df)
       A  B  A+B
    0  1  4    5
    1  2  5    7
    2  3  6    9
    """

    def __init__(self, column_names, adder, new_col):
        self.cols_ = column_names
        self.adder_ = adder
        self.new_col_ = new_col
        
    def fit(self, X, y=None):
        """
        This transformer does not require fitting, so it simply returns `self`.
        
        Parameters
        ----------
        X : DataFrame
            The input data to fit. Not used in this transformer.
        
        y : None, optional
            Ignored. This parameter is included to maintain compatibility with the
            scikit-learn transformer API.
        
        Returns
        -------
        self : ColumnAdder
            The fitted transformer.
        """
        return self
    
    def transform(self, X):
        """
        Add the new column to the input DataFrame by applying the specified function.

        Parameters
        ----------
        X : DataFrame
            The input data to be transformed. A pandas DataFrame to which the new column
            will be added.
        
        Returns
        -------
        X_transformed : DataFrame
            The DataFrame with the new column added.
        """
        X_transformed = X.copy()
        X_transformed.loc[:, self.new_col_] = self.adder_(X, self.cols_)
        return X_transformed

In [65]:
collection_pipeline = Pipeline([
    ('enter_and_service_end_date_python_date_trans', ToDateTimeTransformer(cols=['enter_date', 'service_end_date'], errors='coerce')),
    ('nan_enter_date_remover', DropBy(lambda X: X['enter_date'].isna()))
])

claims_pipeline = Pipeline([
    ('date_of_trans_python_date_trans', ToDateTimeTransformer(cols=['date_of_trans'], errors='coerce'))
])

main_pipeline = Pipeline([
    ('contact_date_python_date_trans', ToDateTimeTransformer(cols=['contact_date'], errors='coerce')),
    ('grouper', GroupbyTransformer(by=['contact_date', 'debt_id'], agg='size', names=list(reversed(actions.columns)) + ['number_of_contact'])),
    ('enter_date_merger', DataFrameMerger(collection_pipeline.fit_transform(collections[['debt_id', 'collector_id', 'enter_date', 'service_end_date', 'amount', 'agent_id']]), 
                                          how='left', on='debt_id')),
    ('debt_age_adder', ColumnAdder(['contact_date', 'enter_date'], lambda X, col_names: X[col_names[0]] - X[col_names[1]], 'debt_age')),
    ('transferred_amount_initializer', ColumnAdder([], lambda X, col_names: np.zeros((X.shape[0], )), 'transferred_amount')),
    ('date_of_trans_initializer', ColumnAdder(['contact_date'], lambda X, col_names: X[col_names[0]], 'date_of_trans'))
])


In [63]:
claims_pipeline.fit_transform(claims)

Unnamed: 0,debt_id,collected_amount,date_of_trans
0,6409,75.0000,2021-02-14
1,6396,586.7741,2021-02-14
2,6539,58.0000,2021-02-14
3,6130,58.0000,2021-02-14
4,6647,58.0000,2021-02-14
...,...,...,...
33365,200221,20.0000,2024-07-30
33366,22104,80.0000,2024-07-31
33367,288254,220.0000,2024-07-31
33368,43449,311.0000,2024-07-31
