In [None]:
%load_ext autoreload
%autoreload 2

This notebook assumes jupytext is installed so that it can be used as a script (convenient to use debugging tools)

This also assumes the modifications added in the branch: `demo-custom-model-and-features`

This explains the apparent mis-ordering of import statements, since we prefer having them in the cell that uses the imports.

## Needed feature

Possibility to specify a feature set that is different from the one computed by OpenSTEF.

## Involved modifications

* Added a `default_modelspecs` field to the **prediction job data class**
* Extraction of the default prediction job in the **train task**
* Added a `default_modelspecs` argument to the input function of the **train pipeline** and its usage.

We can also deal with configs differently :

* Adding defaults in the config manager
* Additional optional method to the db connection. `db.get_default_modelspecs(pj_id)`
* Add some or all the new fields to the model specifications.


Note that this could solve also some parts of the **proloaf specific code**. For instance:

* [Specific conditions in the feature application](https://github.com/OpenSTEF/openstef/blob/f204eccb8dea85b7236e5ebbdb23ffd6ceefac03/openstef/feature_engineering/feature_applicator.py#L122)

In [None]:
import pandas as pd
import json
import numpy as np

## Simple custom model

In [None]:
from openstef.model.regressors.custom_regressor import CustomOpenstfRegressor
from openstef.model.regressors.regressor import OpenstfRegressor
from sklearn.linear_model import LinearRegression
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline

class CustomLinear(CustomOpenstfRegressor):
    def __init__(self, strategy="mean"):
        self.strategy = strategy
        self._feature_names = None
        
    @property
    def feature_names(self):
        self._feature_names

    @classmethod
    def valid_kwargs(cls):
        return ["strategy"]
    
    def fit(self, X, y, **fit_options):
        self._feature_names = list(X.columns)
        self.pipeline_ = Pipeline([
            ("imputer", SimpleImputer(strategy=self.strategy)),
            ("linear", LinearRegression())
        ]).fit(X, y)
        self.feature_importances_ = list(self.pipeline_["linear"].coef_)
        return self
    
    def predict(self, X):
        return self.pipeline_.predict(X)
    
    def objective(self):
        return None
    
    @staticmethod
    def _get_importance_names():
        return {
            "gain_importance_name": "total_gain",
            "weight_importance_name": "weight",
        }

## Dummy extra feature

In [None]:
from openstef.feature_engineering.feature_adder import FeatureAdder

class RandomFeature(FeatureAdder):
    @property
    def name(self):
        return "random"

    @property
    def _regex(self):
        return "^random$"
    
    def required_features(self, feature_names):
        return []
    
    def apply_features(self, df, parsed_feature_names):
        return df.assign(random=np.random.rand(len(df)))


class FeatureCopy(FeatureAdder):
    @property
    def name(self):
        return "feature_copy"

    @property
    def _regex(self):
        return r"^(?P<feature_name>.*)_copy$"
    
    def required_features(self, feature_names):
        return []
    
    def apply_features(self, df, parsed_feature_names):
        return df.assign(
                **{
                    feature.name: df[feature.params["feature_name"]]
                    for feature in parsed_feature_names
                }
            )




## Mocking functionalities

### Unactivate Teams connection
First we cancel the functionality to send messages via teams

In [None]:
from openstef.tasks.utils.taskcontext import TaskContext

# Dirty patch Taskcontext to skip the teams functionality
def dummy_func(*args, **kwargs):
    pass

TaskContext._send_teams_message = dummy_func

### Fake database connection

A function to transform json prediction jobs into an PJ object

In [None]:
from openstef.data_classes.prediction_job import PredictionJobDataClass
from openstef.data_classes.model_specifications import ModelSpecificationDataClass

def parse_pj_json(pj_json):
    return PredictionJobDataClass(
        id=pj_json["id"],
        model=pj_json["model"],
        forecast_type=pj_json["typ"],
        name=pj_json["typ"],
        horizon_minutes=pj_json["horizon_minutes"],
        resolution_minutes=pj_json["resolution_minutes"],
        lat=pj_json["lat"],
        lon=pj_json["lon"],
        train_components=pj_json["train_components"],
        description=pj_json["description"],
        quantiles=pj_json["quantiles"],
        default_modelspecs = ModelSpecificationDataClass(
            id=pj_json["id"],
            hyper_params=pj_json["hyper_params"],
            feature_names=pj_json["feature_names"],
            feature_modules=pj_json.get("feature_modules", [])
        )
    )

We create a test prediction job (the one for which reference data exist)

In [None]:
# Load test prediction jobs
with open("../test/unit/data/prediction_jobs.json") as infile:
    pjs = json.load(infile)
    
pj_307 = parse_pj_json(pjs["307"])

# Use a custom model
pj_307.model = "__main__.CustomLinear"

# Use custom features
pj_307.default_modelspecs.feature_names = (
    pj_307.default_modelspecs.feature_names[:20] + 
    ["random", "horizon_copy"]
)
pj_307.default_modelspecs.feature_modules = ["__main__"]

We load the data returned by the fake database

In [None]:
# Load test data
df = pd.read_csv("../test/unit/data/reference_sets/307-train-data.csv", index_col=0, parse_dates=[0])

The fake database connection class:

In [None]:
import copy

class DataBaseConnection:
    def __init__(self, pj, data):
        self.pj = pj
        self.data = data

    def get_prediction_jobs(self, **kwargs):
        return [copy.deepcopy(self.pj)]

    def get_model_input(self, *args, **kwargs):
        return self.data

We instantiate the fake database:

In [None]:
db = DataBaseConnection(pj_307, df)

### Fake configuration manager

We create a fake connection manager with default model specs

In [None]:
from sklearn.utils import Bunch

config = Bunch(**{
    "teams": Bunch(**{"monitoring_url": ""}),
    "proxies": [],
    "paths": Bunch(
        trained_models_folder="trained/"
    ),
})

## Running the train model task

In [None]:
import shutil
import glob
from openstef.tasks.train_model import main as train_main
from openstef.model.serializer import MLflowSerializer


if __name__ == "__main__":
    # Remove previously trained models
    for d in glob.glob("trained/mlruns/*"):
        shutil.rmtree(d)

    requested_features = pj_307.default_modelspecs.feature_names[:]
    
    train_main("test_pby", config=config, database=db)
    
    # Checking initial features
    init_features = list(db.get_model_input(pj_307["id"]))

    # Checking final features
    s = MLflowSerializer("./trained/")
    model, specs = s.load_model(pj_307["id"])
    final_features = specs.feature_names
    
    # Retrieving requested features
    
    print(">>>>>> Initial features: ", init_features)
    print(">>>>>> Requested features: ", requested_features)
    print(">>>>>> Final features:", final_features)
    print(">>>>>> Features difference:", set(requested_features) ^ set(specs.feature_names))


## Additional notes

* The horizon information in the PJ is not used at train time. Some default horizons are used. We should be able to config this.
* We should also be able to use no horizons if they already come from the input data (for instance because we need to combine with weather forecasts).
* At forecast we cannot specify a set of horizons which does not start from now.
* The start time is computed when the specific PJ is treated. This means that PJ's treated by the same task can have different train data sets.
* The database interface should be declared in OpenSTEF, since it used in tasks
* Is there a config manager interface ?

## Configuration needs

* `train_horizons = None`, with `[]` we can skip the horizon duplication mechanism when data base returns forecasts as features (we do not need it for RTE).
* `train_split_func: str = None` would also need a start time passed by the task (we probably need it).
* `default_modelspecs: ModelSpecificationsDataObject = None`
* `depends_on = []`: the list of prediction jobs the job depends on (we need it)
* `save_train_forecasts`: to enable the possibility to save intermediate results across prediction jobs (we need it)

Can be implemented as:

* Additional fields into the PJ data class
* Optional DB query
* Optional entries in the config manager

## Tasks prediction job loops and dependencies

We illustrate a possible algorithm to order tasks so that they respect some dependency constraints.

* Group prediction jobs taking into account dependencies
* Iterate on groups

Example of algorithm to compute groups:

In [None]:
from random import shuffle
import networkx as nx

def build_graph(pjs):
    nodes = set()
    edges = set()
    
    for pj in pjs:
        nodes.add(pj["id"])
        for pj_dep in pj.get("depends_on", []):
            nodes.add(pj_dep)
            edges.add((pj["id"], pj_dep))
            
    return nodes, edges

def _update_parent_level(parents, levels, child, parent):
    """Recursively updates parent levels
    """
    child_level = levels[child]
    parent_level = levels[parent]
    
    if parent_level <= child_level:
        levels[parent] = child_level + 1
        grand_parents = parents[parent]
        for gp in grand_parents:
            _update_parent_level(parents, levels, parent, gp)


def group_nodes(nodes, edges):
    if has_cycles(edges):
        raise ValueError("Invalid graph structure. The graph has cycles or back edges")

    parents_dict = {
        node: set() for node in nodes
    }
    
    for parent, child in edges:
        parents = parents_dict[child]
        parents.add(parent)

    levels = {n: 0 for n in nodes}
        
    for child, parents in parents_dict.items():
        for parent in parents:
            _update_parent_level(parents_dict, levels, child, parent)
            
    groups = {}
    
    for node, level in levels.items():
        group_nodes = groups.setdefault(level, [])
        group_nodes.append(node)
            
    return groups
            
def has_cycles(edges):
    graph = nx.DiGraph()
    graph.add_edges_from(edges)
    try:
        nx.find_cycle(graph)
        return True
    except nx.NetworkXNoCycle:
        return False

def draw_graph(nodes, edges):
    graph = nx.DiGraph()
    graph.add_edges_from(edges)
    graph.add_nodes_from(nodes)
    nx.draw(graph, with_labels=True, arrows=True)    

In [None]:
pjs = [
    {"id":1},
    {"id":2},
    {"id":3},
    {"id":4, "depends_on":[1, 2]},
    {"id":5, "depends_on":[1, 3]},
    {"id":6, "depends_on":[4]},
    {"id":7}
]

shuffle(pjs)

nodes, edges = build_graph(pjs)
print("Has cycles?", has_cycles(edges))
print(group_nodes(nodes, edges))

draw_graph(nodes, edges)

The idea would then be to loop on groups and for each perform the usual prediction job loop if we assume the possibility to run prediction jobs in parallel or simply sort the prediction jobs otherwise.

Should we create a specific task ?

We must raise an error if cycles are present:

In [None]:
pjs = [
    {"id":1, "depends_on": [6]},
    {"id":2},
    {"id":3},
    {"id":4, "depends_on":[1, 2]},
    {"id":5, "depends_on":[1, 3]},
    {"id":6, "depends_on":[4]}
]

nodes, edges = build_graph(pjs)
draw_graph(nodes, edges)
print("Has cycles?", has_cycles(edges))
print(group_nodes(nodes, edges))

In [None]:
import random
n_pjs = 2000
p_link = 0.01
pjs = []
for i in range(n_pjs):
    pj = {"id": i}
    for j in range(i):
        if random.random() < p_link:
            deps = pj.setdefault("depends_on", [])
            deps.append(j)
    pjs.append(pj)        

pjs

In [None]:
%%prun
nodes, edges = build_graph(pjs)
group_nodes(nodes, edges)