Solution: Pipeline Enhancements
===============================

In [1]:
%cd forml-solution-avazuctr

/opt/forml/workspace/3-solution/forml-solution-avazuctr


Model Ensembling
----------------

Instead of just the plain `LogisticRegression` used in our base model pipeline, we can combine multiple different classifiers using the stacked ensembling technique to further improve the performance. ForML already [comes with one possible operator](https://docs.forml.io/en/latest/_auto/forml.pipeline.ensemble.html) implementing this concept so let's try to use it.

### Adding the Ensemble

Add a basic model ensemble of **two classifiers** with just **two-fold crossvalidation** to the [avazuctr/pipeline.py](forml-solution-avazuctr/avazuctr/pipeline.py):

1. Open the [avazuctr/pipeline.py](forml-solution-avazuctr/avazuctr/pipeline.py) component.
2. Update it with the code below using the ensemble of `GradientBoostingClasifier` and `RandomForestClassifier`.
3. Save the file!

```python
from sklearn import model_selection

from forml import project
from forml.pipeline import ensemble, wrap

with wrap.importer():
    from category_encoders import TargetEncoder
    from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import StandardScaler


CATEGORICAL_COLUMNS = [
    "C1",
    "banner_pos",
    "site_id",
    "site_domain",
    "site_category",
    "app_id",
    "app_domain",
    "app_category",
    "device_id",
    "device_ip",
    "device_model",
    "device_type",
    "device_conn_type",
    "C14",
    "C15",
    "C16",
    "C17",
    "C18",
    "C19",
    "C20",
    "C21",
]

STACK = ensemble.FullStack(
    GradientBoostingClassifier(random_state=42),
    RandomForestClassifier(random_state=42),
    crossvalidator=model_selection.StratifiedKFold(n_splits=2, shuffle=True, random_state=42),
)

PIPELINE = (
    TargetEncoder(cols=CATEGORICAL_COLUMNS)
    >> StandardScaler()
    >> STACK
    >> LogisticRegression(random_state=42)
)

# Registering the pipeline
project.setup(PIPELINE)
```
**SAVE THE [avazuctr/pipeline.py](forml-solution-avazuctr/avazuctr/pipeline.py) FILE!**

In [2]:
! forml project eval

running eval
0.4059090359226547


In [3]:
! git add avazuctr/pipeline.py

Balancing the Target Classes 
----------------------------

As noticed during the [exploration](1-setup-and-exploration.ipynb), the target variable is highly imbalanced (417,963 in the negative class vs only 82,037 in the positive). This might be getting the model biased towards the majority class. Let's try to use our [Balancer implemented previously](../2-tutorial/2-task-dependency-management.ipynb) to see if it brings any improvements.

### Adding the Balancer

Edit the [pyproject.toml](forml-solution-avazuctr/pyproject.toml) and add the new dependency of `imbalanced-learn==0.10.1`:

1. Open the [pyproject.toml](forml-solution-avazuctr/pyproject.toml).
2. Update it with the config below adding the new dependency of `imbalanced-learn==0.10.1`.
3. Save the file!

```toml
[project]
name = "forml-solution-avazuctr"
version = "0.1.dev1"
dependencies = [
    "category-encoders==2.6.0",
    "forml==0.93",
    "imbalanced-learn==0.10.1 ",
    "openschema==0.6.dev2",
    "scikit-learn==1.2.2"
]


[tool.forml]
package = "avazuctr"
```

**SAVE THE [pyproject.toml](forml-solution-avazuctr/pyproject.toml) FILE!**

In [4]:
! git add pyproject.toml

Now, add the `Balancer` implementation to the [avazuctr/pipeline.py](forml-solution-avazuctr/avazuctr/pipeline.py):

1. Open the [avazuctr/pipeline.py](forml-solution-avazuctr/avazuctr/pipeline.py) component.
2. Update it with the code below providing and engaging the `Balancer` implementation.
3. Save the file!

```python
import typing

from imblearn import over_sampling
from sklearn import model_selection

from forml import flow, project
from forml.pipeline import ensemble, wrap

with wrap.importer():
    from category_encoders import TargetEncoder
    from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import StandardScaler


@wrap.Actor.apply
def OverSampler(features, labels, *, random_state: typing.Optional[int] = None):
    """Stateless actor with two input and two output ports for oversampling the features/labels of the minor class."""
    return over_sampling.RandomOverSampler(random_state=random_state).fit_resample(features, labels)


class Balancer(flow.Operator):
    """Balancer operator inserting the provided sampler into the ``train`` & ``label`` paths."""

    def __init__(self, sampler: flow.Builder = OverSampler.builder(random_state=42)):
        self._sampler = sampler

    def compose(self, scope: flow.Composable) -> flow.Trunk:
        left = scope.expand()
        sampler = flow.Worker(self._sampler, 2, 2)
        sampler[0].subscribe(left.train.publisher)
        new_features = flow.Future()
        new_features[0].subscribe(sampler[0])
        sampler[1].subscribe(left.label.publisher)
        new_labels = flow.Future()
        new_labels[0].subscribe(sampler[1])
        return left.use(
            train=left.train.extend(tail=new_features),
            label=left.label.extend(tail=new_labels),
        )


CATEGORICAL_COLUMNS = [
    "C1",
    "banner_pos",
    "site_id",
    "site_domain",
    "site_category",
    "app_id",
    "app_domain",
    "app_category",
    "device_id",
    "device_ip",
    "device_model",
    "device_type",
    "device_conn_type",
    "C14",
    "C15",
    "C16",
    "C17",
    "C18",
    "C19",
    "C20",
    "C21",
]

STACK = ensemble.FullStack(
    GradientBoostingClassifier(warm_start=True, random_state=42),
    RandomForestClassifier(warm_start=True, random_state=42),
    crossvalidator=model_selection.StratifiedKFold(n_splits=2, shuffle=True, random_state=42),
)

PIPELINE = (
    TargetEncoder(cols=CATEGORICAL_COLUMNS)
    >> Balancer()
    >> StandardScaler()
    >> STACK
    >> LogisticRegression(warm_start=True, random_state=42)
)

# Registering the pipeline
project.setup(PIPELINE)
```

**SAVE THE [avazuctr/pipeline.py](forml-solution-avazuctr/avazuctr/pipeline.py) FILE!**

In [5]:
from forml import project
from avazuctr import pipeline
PROJECT = project.open(path='.', package='avazuctr')
PROJECT.components.source.bind(pipeline.Balancer()).launcher.train().labels.value_counts()

click
0    417970
1    417970
Name: count, dtype: int64

In [6]:
! forml project eval

running eval
0.40215845294699354


In [7]:
! git add avazuctr/pipeline.py

In [8]:
! forml project train -R graphviz

running train


[![Train Flow](./forml-solution-avazuctr/forml.dot.svg)](./forml-solution-avazuctr/forml.dot.svg)

### Adding the Balancer Unit Test

Let's also add the [Balancer unit test implemented previously](../2-tutorial/2-task-dependency-management.ipynb) to the project tests:

In [9]:
! touch tests/test_pipeline.py

Edit the created [test_pipeline.py](forml-solution-avazuctr/tests/test_pipeline.py) and implement the unit test:

1. Open the [test_pipeline.py](forml-solution-avazuctr/tests/test_pipeline.py).
2. Update it with the code below providing the `TestBalancer` unit test implementation.
3. Save the file!

```python
from forml import testing

from avazuctr import pipeline

class TestBalancer(testing.operator(pipeline.Balancer)):
    """Balancer unit tests."""

    default_oversample = (
        testing.Case()
        .train([[1], [1], [0]], [1, 1, 0])
        .returns([[1], [1], [0], [0]], labels=[1, 1, 0, 0])
    )
```
**SAVE THE [test_pipeline.py](forml-solution-avazuctr/tests/test_pipeline.py) FILE!**

In [10]:
! git add tests/test_pipeline.py

In [11]:
! forml project test

running test
running egg_info
writing forml_solution_avazuctr.egg-info/PKG-INFO
writing dependency_links to forml_solution_avazuctr.egg-info/dependency_links.txt
writing requirements to forml_solution_avazuctr.egg-info/requires.txt
writing top-level names to forml_solution_avazuctr.egg-info/top_level.txt
reading manifest file 'forml_solution_avazuctr.egg-info/SOURCES.txt'
writing manifest file 'forml_solution_avazuctr.egg-info/SOURCES.txt'
running build_ext
test_missing_column (tests.test_source.TestTimeExtractor)
Test of Missing Column ... ERROR: 2023-05-23 15:43:29,009: __init__: Instruction TimeExtractor.apply failed when processing arguments: Empty DataFrame
Columns: []
Index: []
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/forml/flow/_code/target/__init__.py", line 56, in __call__
    result = self.execute(*args)
  File "/usr/local/lib/python3.10/site-packages/forml/flow/_code/target/user.py", line 196, in execute
    return self.action(self.b