## Supporting classes

In [1]:
import logging
import pandas as pd


class Transformer:
    """Transformer base class."""
    
    required_args = None
    
    def __init__(self):
        self._log = logging.getLogger(self.__class__.__name__)
    
    def apply_transform(self, df, args):
        """Calls transform on the specific transformer."""
        
        if not isinstance(df, pd.DataFrame):
            msg = "Must supply a pandas DataFrame to transform."
            self._log.error(msg)
            raise TypeError(msg)

        if self.required_args:
            missing_args = set(self.required_args) - set(args)
            if missing_args:
                msg = "Missing required arguments: " + ", ".join(missing_args)
                self._log.error(msg)
                raise ValueError(msg)
            
        self._log.debug("Got DataFrame with %d rows and %d columns", *df.shape)
        self._log.debug("Column names: " + ", ".join(df.columns))
        return self.transform(df, **args)
    
    def transform(self, df, **kwargs):
        """Transform method to be overriden by child classes."""
        raise NotImplementedError

    @classmethod
    def get(cls, name):
        """Get a specific Transformer class by name."""
        available = cls.available()
        if name in available:
            return available[name]
    
    @classmethod
    def available(cls):
        """List available Transformers."""
        return {
            tfm.__name__: tfm
            for tfm in cls.__subclasses__()
        }
        
        
class Pipeline:
    """Pipeline class for applying transformations."""
    
    def __init__(self, steps: list):
        if not isinstance(steps, list):
            raise TypeError("Steps must be provided as a list.")
        self.steps = steps
        self._log = logging.getLogger(self.__class__.__name__)

    def transform(self, df):
        """Applies all transformation steps."""
        self._log.debug(f"Starting pipeline with {len(self.steps)} steps.")
        
        for idx, step in enumerate(self.steps, start=1):
            name = step.get("name", "step_{idx}")
            self._log.debug(f"Performing step: {name!r}.")
            
            transformer_name = step.get("transformer")
            if not transformer_name:
                raise ValueError(f"No transformer specified for step {step!r}.")
            self._log.debug(f"Using Transformer: {transformer_name!r}.")
            transformer_cls = Transformer.get(transformer_name)
            
            args = step.get("args", {})
            self._log.debug(f"Using arguments: {args!r}.")
            
            transformer = transformer_cls()
            df = df.pipe(lambda df: transformer.apply_transform(df, args))

        self._log.debug(f"Finished processing pipeline.")
        self._log.debug("Final DataFrame has %d rows and %d columns", *df.shape)
        self._log.debug("Column names: " + ", ".join(df.columns))
        
        return df


## Transformers

In [2]:
class ColumnRenamer(Transformer):
    """Renames columns of a pandas DataFrame."""
    
    required_args = ["mapping"]
    
    def transform(self, df, mapping):
        """Renames columns using a provided mapping."""
        
        if not isinstance(mapping, (dict, callable)):
            msg = f"ColumnRenamer: Mapping must be dict or callable, got {type(mapping)!r}."
            self._log.error(msg)
            raise TypeError(msg)
        return df.rename(columns=mapping)


class ColumnDropper(Transformer):
    """Drops columns from a pandas DataFrame."""
    
    required_args = ["columns"]
    
    def transform(self, df, columns):
        """Drops columns using a provided list of names."""
        
        if not isinstance(columns, list):
            msg = f"Columns must be provided as a list, got {type(mapping)!r}."
            self._log.error(msg)
            raise TypeError(msg)
        
        invalid = set(columns) - set(df.columns)
        if invalid:
            msg = f"Could not find these columns in the data: {', '.join(invalid)}."
            self._log.error(msg)
            raise ValueError(msg)
        return df.drop(columns=columns)

## Demo

In [3]:
logging.basicConfig(level=logging.DEBUG)

In [4]:
step_cfg = [
    {
        "name": "step_one",
        "transformer": "ColumnRenamer",
        "args": {"mapping": {"column_a": "test_columns"}}
    },
    {
        "name": "step_two",
        "transformer": "ColumnDropper",
        "args": {"columns": ["column_b", "column_c"]}
    },
    # Missing argument example
    # {
    #     "name": "step_two",
    #     "transformer": "ColumnDropper",
    # },
]

In [5]:
pp = Pipeline(step_cfg)

In [6]:
df = pd.DataFrame({
    "column_a": [1, 2, 3, 4],
    "column_b": [5, 6, 7, 8],
    "column_c": list("ABCD"),
})

In [7]:
pp.transform(df)

DEBUG:Pipeline:Starting pipeline with 2 steps.
DEBUG:Pipeline:Performing step: 'step_one'.
DEBUG:Pipeline:Using Transformer: 'ColumnRenamer'.
DEBUG:Pipeline:Using arguments: {'mapping': {'column_a': 'test_columns'}}.
DEBUG:ColumnRenamer:Got DataFrame with 4 rows and 3 columns
DEBUG:ColumnRenamer:Column names: column_a, column_b, column_c
DEBUG:Pipeline:Performing step: 'step_two'.
DEBUG:Pipeline:Using Transformer: 'ColumnDropper'.
DEBUG:Pipeline:Using arguments: {'columns': ['column_b', 'column_c']}.
DEBUG:ColumnDropper:Got DataFrame with 4 rows and 3 columns
DEBUG:ColumnDropper:Column names: test_columns, column_b, column_c
DEBUG:Pipeline:Finished processing pipeline.
DEBUG:Pipeline:Final DataFrame has 4 rows and 1 columns
DEBUG:Pipeline:Column names: test_columns


Unnamed: 0,test_columns
0,1
1,2
2,3
3,4
