# Create a Pipeline
We provide three ways of creating a pipeline.
* Functional API
* Imperative API
* Constructor API

In the following, we briefly describe all three APIs, before explaining the control flow under the hood.

In [22]:
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_regression

# From pyWATTS the pipeline is imported
from pywatts.callbacks import LinePlotCallback

from pywatts_pipeline.core.steps.step import Step
from pywatts_pipeline.core.util.computation_mode import ComputationMode
from pywatts_pipeline.core.pipeline import Pipeline
# All modules required for the pipeline are imported
from pywatts.modules import CalendarExtraction, CalendarFeature, ClockShift, LinearInterpolater, SKLearnWrapper, Sampler
from pywatts.modules.preprocessing.select import Select
from pywatts.summaries import RMSE
from load_data import load_elec_data

## Functional API

The functional API provides an easy way to create pipelines. However, it requires that the call dunder is implemented in the used transformers/modules, which is the case for pyWATTS transformers.
The API is inspired by the functional API of Keras. In general the notation is as follows:

```Transformer()(x=predeccessor, y=predecessor, ...)```

In the following, we show how a simple Pipeline can be created with the Functional API

In [23]:
functional_api_pipeline = Pipeline(path="../results")

# Extract dummy calendar features, using holidays from Germany
# NOTE: CalendarExtraction can't return multiple features.

functional_preprocessing_pipeline = Pipeline()
calendar = CalendarExtraction(continent="Europe",
                              country="Germany",name="calendar",
                              features=[CalendarFeature.month, CalendarFeature.weekday,
                                        CalendarFeature.weekend]
                              )(x=functional_preprocessing_pipeline["load_power_statistics"])
imputer_power_statistics = LinearInterpolater(method="nearest", dim="time",
                                              name="imputer_power"
                                              )(x=functional_preprocessing_pipeline["load_power_statistics"])


added_prepro_pipe = functional_preprocessing_pipeline(load_power_statistics=functional_api_pipeline["load_power_statistics"])
# Create lagged time series to later be used as regressors

# Scale the data using a standard SKLearn scaler
power_scaler = SKLearnWrapper(module=StandardScaler(), name="scaled_input")
scale_power_statistics = power_scaler(x=added_prepro_pipe["imputer_power"])
scaler_target = SKLearnWrapper(module=StandardScaler(), name="scaled_target")
scaled_target = scaler_target(x=added_prepro_pipe["imputer_power"])
lag_features = Select(start=-2, stop=0, step=1)(x=scale_power_statistics)
target_multiple_output = Select(start=0, stop=24, step=1, name="sampled_data")(x=scaled_target)

# Select features based on F-statistic
selected_features = SKLearnWrapper(
    module=SelectKBest(score_func=f_regression, k=2), name="kbest"
)(
    lag_features=lag_features,
    calendar=added_prepro_pipe["calendar"],
    target=scale_power_statistics,
)

# Create a linear regression that uses the lagged values to predict the current value
# NOTE: SKLearnWrapper has to collect all **kwargs itself and fit it against target.
#       It is also possible to implement a join/collect class
regressor_power_statistics = SKLearnWrapper(
    module=LinearRegression(fit_intercept=True)
)(
    features=selected_features,
    target=target_multiple_output,
    callbacks=[LinePlotCallback("linear_regression")],
)

# Rescale the predictions to be on the original time scale
inverse_power_scale = scaler_target(
    x=regressor_power_statistics, computation_mode=ComputationMode.Transform,
    method="inverse_transform", callbacks=[LinePlotCallback("rescale")]
)

# Calculate the root mean squared error (RMSE) between the linear regression and the true values
# save it as csv file
rmse = RMSE()(y_hat=inverse_power_scale, y=target_multiple_output)




## Imperative API

The imperative API is an alternative API for pyWATTS Pipelines. It can be used if the transformers do not implement a call dunder.
The general notation is as follows

```TODO```

In the following, we implement the same pipeline as above with a functional API.

In [24]:
imperative_preprocessing_pipeline = Pipeline()


imperative_preprocessing_pipeline.add(
    CalendarExtraction(continent="Europe", country="Germany",
                       features=[CalendarFeature.month, CalendarFeature.weekday,
                                 CalendarFeature.weekend]),
    "calendar",
    {"x": "load_power_statistics"}
)

imperative_preprocessing_pipeline.add(
    LinearInterpolater(method="nearest", dim="time", name="imputer_power"),
    "imputer",
    {"x": "load_power_statistics"}
)

imperative_api_pipeline = Pipeline()

power_scaler = SKLearnWrapper(module=StandardScaler(), name="scaler_power")

imperative_api_pipeline.add(imperative_preprocessing_pipeline,
                            "preprocessing",
                            {"load_power_statistics": "load_power_statistics"})

imperative_api_pipeline.add(
    power_scaler,
    "scaler",
    {"x": "preprocessing__imputer"}
)

imperative_api_pipeline.add(
    Select(start=-2, stop=0, step=1),
    "lag_features",
    {"x": "scaler"}
)

imperative_api_pipeline.add(
    Select(start=0, stop=24, step=1),
    "target",
    {"x": "scaler"}
)

imperative_api_pipeline.add(
    SKLearnWrapper(module=SelectKBest(score_func=f_regression, k=2), name="kbest"),
    "selected_features",
    {"lag_features": "lag_features",
     "calendar": "preprocessing__calendar",
     "target": "scaler"}
)

imperative_api_pipeline.add(
    SKLearnWrapper(module=LinearRegression(fit_intercept=True)),
    "regression",
    {"selected_features": "selected_features",
     "target": "target"}
)

imperative_api_pipeline.add(
    power_scaler,
    "inverse_scaler",
    {"x": "regression"},
    method="inverse_transform",
    callbacks=[LinePlotCallback("rescale")],
    computation_mode=ComputationMode.Transform
)

imperative_api_pipeline.add(
    RMSE(),
    "rmse",
    {"y_hat": "inverse_scaler",
     "y": "target"},
)


<pywatts_pipeline.core.steps.step_information.StepInformation at 0x1fad8a55d90>

## The constructor API

The constructor API is available to be sklearn compatible. Since each pipeline step is a parameter which can be retrieved by the `get_params` method, thus the constructor API has to consider also these parameters as arguments.

In [25]:
constructor_api_preprocessing_pipeline = Pipeline(
    steps=[
        (1,
         "calendar",
         {"x": "load_power_statistics"}, {}),
        (2,
         "imputer",
         {"x": "load_power_statistics"}, {}),
    ],
    model_dict={
        1:CalendarExtraction(continent="Europe", country="Germany",
                            features=[CalendarFeature.month, CalendarFeature.weekday,
                                      CalendarFeature.weekend]),
        2:LinearInterpolater(method="nearest", dim="time", name="imputer_power"),
    }
)


pipeline = Pipeline(
    steps=[
        (1,
         "preprocessing",
         {"load_power_statistics": "load_power_statistics"}, {}),
        (2,
         "scaler",
         {"x": "preprocessing__imputer"}, {}),
        (3,
         "lag_features",
         {"x": "scaler"}, {}),
        (4,
         "target",
         {"x": "scaler"}, {}),
        (5,
         "selected_features",
         {"lag_features": "lag_features",
          "calendar": "preprocessing__calendar",
          "target": "scaler"}, {}),
        (6,
         "regression",
         {"selected_features": "selected_features",
          "target": "target"}, {}),
        (2,
         "inverse_scaler",
         {"x": "regression"},
         {"method": "inverse_transform",
          "callbacks": [LinePlotCallback("rescale")],
          "computation_mode": ComputationMode.Transform}),
        (7,
         "rmse",
         {"y_hat": "inverse_scaler",
          "y": "target"}, {})
    ], model_dict={
        1: constructor_api_preprocessing_pipeline,
        2: power_scaler,
        3: Select(start=-2, stop=0, step=1),
        4: Select(start=0, stop=24, step=1),
        5: SKLearnWrapper(module=SelectKBest(score_func=f_regression, k=2), name="kbest"),
        6: SKLearnWrapper(module=LinearRegression(fit_intercept=True)),
        7: RMSE()
    }
)


In [26]:
data = load_elec_data()
train = data.iloc[:6000, :]
test = data.iloc[6000:, :]

In [27]:
pipeline.train(data=train)
pipeline.test(data=test)[1]

  y = column_or_1d(y, warn=True)


'# Summary: \n## Summary\n### RMSE\n\n* y_hat : 59471.781035211294\n## FitTime\n### Pipeline Training Time\n\n*  : 0.0\n### CalendarExtraction Training Time\n\n*  : 0.0\n### imputer_power Training Time\n\n*  : 0.0\n### scaler_power Training Time\n\n*  : 0.0\n### SampleModule Training Time\n\n*  : 0.0\n### SampleModule Training Time\n\n*  : 0.0\n### kbest Training Time\n\n*  : 0.0019969940185546875\n### LinearRegression Training Time\n\n*  : 0.005998849868774414\n## TransformTime\n### Pipeline Transform Time\n\n*  : 0.01599907875061035\n### CalendarExtraction Transform Time\n\n*  : 0.012034416198730469\n### imputer_power Transform Time\n\n*  : 0.002968311309814453\n### scaler_power Transform Time\n\n*  : 0.00099945068359375\n### SampleModule Transform Time\n\n*  : 0.00700068473815918\n### SampleModule Transform Time\n\n*  : 0.06501364707946777\n### kbest Transform Time\n\n*  : 0.0009684562683105469\n### LinearRegression Transform Time\n\n*  : 0.001007080078125\n### scaler_power Transfor

In [28]:
functional_api_pipeline.train(data=train)
functional_api_pipeline.test(data=test)[1]

  y = column_or_1d(y, warn=True)


'# Summary: \n## Summary\n### RMSE\n\n* y_hat : 59471.781035211294\n## FitTime\n### Pipeline Training Time\n\n*  : 0.0\n### calendar Training Time\n\n*  : 0.0\n### imputer_power Training Time\n\n*  : 0.0\n### scaled_input Training Time\n\n*  : 0.0\n### scaled_target Training Time\n\n*  : 0.0\n### SampleModule Training Time\n\n*  : 0.0\n### sampled_data Training Time\n\n*  : 0.0\n### kbest Training Time\n\n*  : 0.0009641647338867188\n### LinearRegression Training Time\n\n*  : 0.00896453857421875\n## TransformTime\n### Pipeline Transform Time\n\n*  : 0.016999244689941406\n### calendar Transform Time\n\n*  : 0.011997699737548828\n### imputer_power Transform Time\n\n*  : 0.004001617431640625\n### scaled_input Transform Time\n\n*  : 0.00099945068359375\n### scaled_target Transform Time\n\n*  : 0.001001596450805664\n### SampleModule Transform Time\n\n*  : 0.007001399993896484\n### sampled_data Transform Time\n\n*  : 0.06400322914123535\n### kbest Transform Time\n\n*  : 0.0010008811950683594\

  y = column_or_1d(y, warn=True)


'# Summary: \n## Summary\n### RMSE\n\n* y_hat : 59471.781035211294\n## FitTime\n### Pipeline Training Time\n\n*  : 0.0\n### calendar Training Time\n\n*  : 0.0\n### imputer_power Training Time\n\n*  : 0.0\n### scaled_input Training Time\n\n*  : 0.0\n### scaled_target Training Time\n\n*  : 0.0\n### SampleModule Training Time\n\n*  : 0.0\n### sampled_data Training Time\n\n*  : 0.0\n### kbest Training Time\n\n*  : 0.0019989013671875\n### LinearRegression Training Time\n\n*  : 0.006997585296630859\n## TransformTime\n### Pipeline Transform Time\n\n*  : 0.013998270034790039\n### calendar Transform Time\n\n*  : 0.00899815559387207\n### imputer_power Transform Time\n\n*  : 0.003000020980834961\n### scaled_input Transform Time\n\n*  : 0.0\n### scaled_target Transform Time\n\n*  : 0.0009992122650146484\n### SampleModule Transform Time\n\n*  : 0.006999969482421875\n### sampled_data Transform Time\n\n*  : 0.06100130081176758\n### kbest Transform Time\n\n*  : 0.0010004043579101562\n### LinearRegress

In [29]:
imperative_api_pipeline.train(data=train)
imperative_api_pipeline.test(data=test)[1]

  y = column_or_1d(y, warn=True)


'# Summary: \n## Summary\n### RMSE\n\n* y_hat : 59471.781035211294\n## FitTime\n### Pipeline Training Time\n\n*  : 0.0\n### CalendarExtraction Training Time\n\n*  : 0.0\n### imputer_power Training Time\n\n*  : 0.0\n### scaler_power Training Time\n\n*  : 0.0010042190551757812\n### SampleModule Training Time\n\n*  : 0.0\n### SampleModule Training Time\n\n*  : 0.0\n### kbest Training Time\n\n*  : 0.0010004043579101562\n### LinearRegression Training Time\n\n*  : 0.0069997310638427734\n## TransformTime\n### Pipeline Transform Time\n\n*  : 0.01799917221069336\n### CalendarExtraction Transform Time\n\n*  : 0.013001203536987305\n### imputer_power Transform Time\n\n*  : 0.003995180130004883\n### scaler_power Transform Time\n\n*  : 0.0009996891021728516\n### SampleModule Transform Time\n\n*  : 0.009002208709716797\n### SampleModule Transform Time\n\n*  : 0.0709996223449707\n### kbest Transform Time\n\n*  : 0.00099945068359375\n### LinearRegression Transform Time\n\n*  : 0.001001119613647461\n###

  y = column_or_1d(y, warn=True)


'# Summary: \n## Summary\n### RMSE\n\n* y_hat : 59471.781035211294\n## FitTime\n### Pipeline Training Time\n\n*  : 0.0\n### CalendarExtraction Training Time\n\n*  : 0.0\n### imputer_power Training Time\n\n*  : 0.0\n### scaler_power Training Time\n\n*  : 0.0009992122650146484\n### SampleModule Training Time\n\n*  : 0.0\n### SampleModule Training Time\n\n*  : 0.0\n### kbest Training Time\n\n*  : 0.002000093460083008\n### LinearRegression Training Time\n\n*  : 0.006000041961669922\n## TransformTime\n### Pipeline Transform Time\n\n*  : 0.013965129852294922\n### CalendarExtraction Transform Time\n\n*  : 0.009961605072021484\n### imputer_power Transform Time\n\n*  : 0.0029993057250976562\n### scaler_power Transform Time\n\n*  : 0.0010001659393310547\n### SampleModule Transform Time\n\n*  : 0.006002664566040039\n### SampleModule Transform Time\n\n*  : 0.059999704360961914\n### kbest Transform Time\n\n*  : 0.0009925365447998047\n### LinearRegression Transform Time\n\n*  : 0.0009882450103759766

## What happens under the hood
The main method for adding a module to the pipeline via the imperative or the functional API is the `add` method of the pipeline. While the imperative API of pyWATTS uses this method directly, the `__call__` dunder of the functional API uses the mehod by extracting the correct arguments from the provided step information.

Internally the `add` method performs three steps:
1. The method resets the pipeline to empty the buffer of all pipeline steps in the case the pipeline is executed before. This is necessary to ensure that undesired side effects are minimized.
2. It adds the current parameters to a list. This list contains all add statements. We do this to easily rebuild the pipeline if a new step is added.
3. We construct the pipeline by calling the `_add` method with the list of add statements. This method iterates through the list and performs mainly the following steps:
      1. It looks for a clone of the module/transformer, if already a clone exist it takes the clone if not it creates a new one. We do this to ensure that there are no side effects if a module is changed outside of the pipeline.
      2. We search for the steps of the predecessors.
      3. We create the module/summary for the current module. Note, one module or summary can lead to the creation of multiple steps. E.g., if EitherOrSteps are used or the predecessors provide multiple outputs.
      4. All of these new created step are added to the steps dictionary.

  Note the Construction API uses directly the `_add` method

