# `ccflow` Tutorial

## Introduction

The `ccflow` framework is a collection of tools and patterns for workflow configuration and orchestration.
Its intended uses include ETL, data validation, model training, live trading configuration, backtesting, hyperparameter search, and automated report generation.

The framework provides 
 - a way to to manage hierarchical, strongly typed configurations and the relationships between them through composition
 - a way to associate user-defined functions with configurations to define and name configurable workflow graphs
 - flexibility in how to interact with configurations and workflows, including files/command line, native python/Jupyter notebook, Airflow/job scheduler, REST API, etc  (in progress)

In this tutorial, we walk through the background, motivation and simple examples.

## Configuration Design Goals

In both production applications and research workflows, the need arises to configure various components. As these applications and workflows get increasingly complex, so do the patterns and frameworks that people use for configuration. While some of this complexity is unavoidable, in an ideal world, there is a single well-designed (hopefully!) framework that can be used across all use cases, spanning data retrieval, validation, transformation, and loading (i.e. ETL worfklows), model training and hyperparameter search, portfolio construction and optimization, backtesting, report generation, and live system and trading configuration. 

In order to meet the demands of these varying applications, the ideal configuration framework must satisy several needs

### Interactivity

Since the aim is to use configuration for both production applications/worflows as well as for research, there needs to be both a relatively static, well-controlled way of defining the entirety of the configuration, as well as much more dynamic ways of interacting and iterating over the configuration options. Versioned file-based configurations are almost always used to accomplish for former, and flexible command line interfaces are often used to satisfy the latter, but in an ideal world, it should be possible both modify and add completely new configurations directly from a python script or notebook for research, without having to resort to modifying files or leveraging command line overrides (though this is also useful functionality to have).

### Schemas 

As the framework scales, the chances increase of two common types of errors:
 
 1. Misnaming or mistyping of configuration options, that could cause the configuration to silently fail (if the parameter was optional) i.e. "threshold" vs "threshhold"
 2. Type errors and value constraints, i.e. the identifier string "12345" vs the integer 1234, or specifying that a parameter "sigma" should be non-negative.
 
Typically, we want to catch these errors as soon as possible: when the configurations are loaded rather than when they are used. This also allows for writing testing of configurations that is decoupled from testing the logic that depends on the configurations, making it easier to spot issues quickly and easily. 

In order to solve these issues, configurations need strongly typed schemas, with the option to perform additional (and custom) validations. There may be a need to coerce values (i.e. if the string "1234" is passed to a parameter that expects an int, it may be desirable to coerce it to 1234), and additional validation may be needed on the entire structure (to test validity of combinations of parameters rather than just parameters themselves). The use of schemas also means there must be a way to evolve the schema over time (adding and removing attributes), and even to version it if necessary.  

In `ccflow`, we leverage the power of the very popular [pydantic](https://pydantic-docs.helpmanual.io/) library to tackle these issues, with some additional extensions. Note that while python's [dataclasses](https://docs.python.org/3/library/dataclasses.html) solve the misnaming/mistyping problem, they do not provide type checking or additional run-time validation. One can think of pydantic as a powerful extension of dataclasses which does.

### Hierarchy

Quantitative workflows are typically very hierarchical in nature. For example, portfolio construction depends on multiple signals, and each signal may depend on its own techniques and data sources, and each technique will have its own configuration parameters, and each data source will also have parameters that configure how it was cleaned/transformed and how to access it. Thus, the configuration framework must have a modular and hierarchical structure, which means that entire parts of the hierarchy must be easy to add and remove without affecting the rest of the configuration. In a file-based representation, this means that the configuration should be spreadable across multiple files spanning several sub-directories. The interactive representation of these configs must mirror the same kind of structure. 

Furthermore, the hierarchy of configurations can have complex dependencies, forming a graph structure, rather than a simple tree. For example, a data source may be configured to be transformed in a particular way, and then used in multiple signals, which are then all used as part of portfolio construction. If changing the configuration of the data source, it is then important that all the signals pick up this change. The challenge lies in defining this graph structure both statically (i.e. for trading) as well as dynamically in the python code (for research).

Lastly, there should ideally be a way to automatically map a piece of configuration to the code that decides how to use it. Without this, the configuration can end up acting like a large catalog of global variables that proliferate throughout the codebase, with all the same drawbacks as global variables (including increased coupling between everything). So, each piece of configuration should get used by as few high-level pieces of code as possible, rather than by multiple low-level pieces fo code. We tackle this problem by frequently binding together the configuration parameters and the code which uses the configuration in a single object.

In `ccflow`, we leverage the power of Meta's [hydra](https://hydra.cc/) library for file-based and command line configuration management, but add some of our own functionality to support the interactive configuration use case. 

## Workflow Design Goals

We define a "workflow" solution to mean a library to help define and run a collection of inter-dependent tasks (or steps). We can break this down further into separate components
 1. Defining (via configuration) what tasks/steps make up the workflow
 2. Passing data between tasks, so that each task has the information that it needs from upstream tasks
 3. Determining the order in which to run the tasks (often referred to scheduling, or more appropriately "task scheduling")
 4. Automating the launch of the workflow so that it runs regularly according to some rules (also referred to as scheduling, or more appropriately "workflow scheduling")
 5. Advanced features such as caching, distributed evaluation, monitoring, UI's, etc
 
There are numerous existing packages and products in the Python ecosystem which tackle the problem of workflow management, each written with different use cases in mind and supporting different sets of requirements and features. As a result
 * It is best for us to have a layer in between the business logic and any specific framework
 * It is important to define our high level requirements
 
Even within Cubist, there is no definitive answer to the first question, so we attempt to lay out the goals from a CCRT perspective, with the understanding that we may not meet everyone's use cases.

One might argue that most of the existing open source solution to this problem tend to focus on (or are marketed on) the later elements in the list above rather than the earlier ones. Our approch is to focus on the components roughly in the order they are listed.

### Ease of Use

As much as possible, it should be easy and intuitive to define workflows in the framework. Simple things should be easy, but arbitrarily complex things should remain possible. 

Furthermore, we should not impose too many constraints on how users write their code - they should be able to bring their existing analytics, no matter what underlying python packages or tools they use, and hook it into the framework. 

We aim to leverage standard and familiar programming paradigms as much as possible (writing objects and functions), as they are time-tested and easy for users to understand. By using composition of classes and functions and their return values, the python language essentially handles items 2. and 3. above for us, without a need to do anything special or for users to learn anything new.

We are not trying to write a new language that people have to learn in order to implement their analytics. However, the framework should support workflow steps that use any such "language" that already exists (i.e. tensorflow/pytorch/jax/csp/etc).

We do not wish to make assumptions about how data is represented within the framework, or even that all data should be tabular or array-like; we should be able to support documents, charts, event streams or any other kinds of objects as part of the workflows. At the same time, we should be able to offer common tools (within the framework) to facilitate common tabular data processing tasks (i.e. reading and writing).

Once defined/configured, we would like a workflow to have a very simple way to run it, whether interactively or from command line. Furthermore, it should be equally easy to run any intermediate step of a workflow (and it's dependencies) to promote reusability and make debugging easy.

### Configuration

Given a configuration framework meeting the design goals laid out above, suitable for both production and research configuration, a key requirement is to be able to configure workflows using the same framework. 

This implies that workflows should be easy to define from version configs (i.e. using files) in production, or to change and re-run interactively from python (for research). Thus, the worfklow, the steps in the workflow, and the objects used by those steps will all belong to the same configuration paradigm.

### Parameterization

While the configuration framework allows for arbitrary complexity in the configuration and definition of the workflow steps (and thus of the workflow), it is often natural to thinkof the workflows as being parameterized (or templatized) across certain dimensions, and to treat these "context" parameters separately from other configuration options to make it easier to run multiple templatized workflows without needing to re-configure anything. 

For example, many data processing workflows are parameterized by date. However, this is not the only option; in many cases it is more efficient to process data across a date range. Taking the idea further, one may also want to specify a workflow that applies to a specific region and time range, or even down to a symbol and date range level. In the realm of data orchestration workflows, another way to think about the "context" is as the definition of the smallest "chunk" of data we are willing to operate on in a step.  

Thus, we want to be able to define a flexible, parametric "context" for each step, such that the step can be easily run across multiple contexts, and depend on other steps, each of which may use the same context or even a different context.

A technical reason for parameterizing the steps separately from the configuration is to prevent run-time mutation of the configuration, which is dangerous as configurations are shared across multiple components. 

# Configuration with `ccflow`

## Config Examples

In [1]:
from IPython.display import JSON
from ccflow import BaseModel, ModelRegistry
from datetime import date
from pathlib import Path
from pprint import pprint
from typing import List

import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore')

### Basic Config with BaseModel

Let's get started with some very simple examples. Pydantic calls their classes "Models", and so we use the same terminology; think of a "Model" as a "Configurable" class.

The `BaseModel` is our base class for all configuration. We have subclassed Pydantic's `BaseModel` to change some of the default configuration options, and to make the objects play nicer with Hydra and the rest of our framework. However, as they are still Pydantic models, everything you can do with pydantic's [Models](https://pydantic-docs.helpmanual.io/usage/models/) can be done with these. 

We begin with a dummy example, but one which illustrates how new config schemas and values can be easily defined and manipulated on-the-fly.

In [2]:
class MyFileConfig(BaseModel):
    """This is an example of a config class."""

    file: Path
    description: str = "N/A"
    version: int = 0

This is not very exciting yet, basically just the definition of schema configs, but it already illustrates how they can be nested and we can show how pydantic will conform input data to the right types (i.e. Path, str and int in this case).

In [3]:
c = MyFileConfig(file="./Tutorial.ipynb", version="1")
c

MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='N/A', version=1)

Note that the config object is mutable by default (though they can be frozen too). This makes it easy to change configs, especially once they get nested

In [4]:
c.description = "Flow example notebook"
c

MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=1)

Pydantic allows for objects to be created directly from dictionaries

In [5]:
config = {"file": "./Tutorial.ipynb", "version": "1"}
MyFileConfig.parse_obj(config)

MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='N/A', version=1)

Pydantic provides a [JSON schema ](https://json-schema.org/) in standardized format that can users understand the parameters on the config object, though this only works on models that only contain json-compatible types (even though pydantic supports arbitrary types as we will see later). For example:

In [6]:
JSON(MyFileConfig.schema())

<IPython.core.display.JSON object>

Pydantic's type validation will catch cases that are incompatible with our schema definition. In fact, pydantic can be used to place even greater constraints on the values themselves (i.e. version must be positive)

In [7]:
try:
    c.version = "foo"
except ValueError as e:
    print(e)

1 validation error for MyFileConfig
version
  Input should be a valid integer, unable to parse string as an integer [type=int_parsing, input_value='foo', input_type=str]
    For further information visit https://errors.pydantic.dev/2.7/v/int_parsing


Furthermore, we have enabled the option by default to raise exceptions when field names are mis-specified (or extra fields are provided) to catch potential configuration mistakes.

In [8]:
try:
    MyFileConfig(file="./Tutorial.ipynb", Version=1)
except ValueError as e:
    print(e)

1 validation error for MyFileConfig
Version
  Extra inputs are not permitted [type=extra_forbidden, input_value=1, input_type=int]
    For further information visit https://errors.pydantic.dev/2.7/v/extra_forbidden


### Hierarchical Configs

Hierarchical configs are also easy to work with. Below, we create a new config which consists of two file configs, and we can easily modify nested attributes using standard python syntax.

In [9]:
class MyDataConfig(BaseModel):
    """This is the example of a nested config."""

    data_one: MyFileConfig = None
    data_two: MyFileConfig = None


JSON(MyDataConfig.schema())

<IPython.core.display.JSON object>

In [10]:
c2 = MyDataConfig(
    data_one=c,
    data_two=MyFileConfig(file="ccflow/__init__.py"),
)
c2.data_two.description = "Python init file"
print(f"{c2.data_two.description}: {c2.data_two.file}")

Python init file: ccflow/__init__.py


Pydantic also provides the ability to coerce dictionaries recursively into structured types, so long as the types have been declared on the schema. For example, it will automatically create the `MyFileConfig` instance if we just pass a dictionary to `MyDataConfig`:

In [11]:
MyDataConfig(
    data_one={
        "file": "./Tutorial.ipynb",
        "description": "Flow example notebook",
    }
)

MyDataConfig(data_one=MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=0), data_two=None)

### Config Inheritance and Templatization

In addition to the composition of configs as illustrated above, there may be cases when inheritance and templatization is required. Pydantic supports both of these out of the box. 

Below we provide an example of multiple inheritence of model objects, which further illustrates the power of having schema classes for configuration over raw dictionaries.

In [12]:
class DateRangeMixin(BaseModel):
    start_date: date
    end_date: date


class UniverseMixin(BaseModel):
    universe: str


class MyConfig(DateRangeMixin, UniverseMixin, BaseModel):
    parameter: int


MyConfig(
    parameter=4, universe="US", start_date=date(2022, 1, 1), end_date=date(2023, 1, 1)
)

MyConfig(universe='US', start_date=datetime.date(2022, 1, 1), end_date=datetime.date(2023, 1, 1), parameter=4)

For examples of templatization, refer to the section of the pydantic documentation on [Generic Models](https://pydantic-docs.helpmanual.io/usage/models/#generic-models).

## Registering Configurations

### The Model Registry

`ccflow.flow` provides a `ModelRegistry` class which represents a collection of models (configs). Later we will see how config files can be mapped to a registry, but for now we illustrate how it can be used interactively.

In [13]:
r = ModelRegistry(name="My Raw Data")
r.add(
    "flow",
    MyFileConfig(
        file="Tutorial.ipynb", description="Flow example notebook"
    ),
)
r.add(
    "init",
    MyFileConfig(
        file="ccflow/__init__.py", description="Python init file"
    ),
)
print(r)
pprint(r.models)

ModelRegistry(name='My Raw Data')
mappingproxy({'flow': MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=0),
              'init': MyFileConfig(file=PosixPath('ccflow/__init__.py'), description='Python init file', version=0)})


At this point, a `ModelRegistry` just looks and behaves like a dictionary. However, a bit of extra functionality has been built in, such as validation of items that go into the registry to make sure they are config classes, i.e.

In [14]:
try:
    r.add("bad_data", {"foo": 5, "bar": 6})
except TypeError as e:
    print(e)

model must be a child class of <class 'ccflow.base.BaseModel'>, not '<class 'dict'>'.


This may seem like an unnecessary restriction, but enforcing that all registry elements are using BaseModel means that we can deliver more powerful functionality over time by extending the BaseModel implementation.

As the amount of configuration grows, there is a desire to organize these objects in a hierarchy, and so, the registry class can contain other registries (since they are configuration objects themselves).

Furthermore, instead of passing various registries around in the code, it is sometimes helpful to have a single registry that is a singleton at the "root" of all these registries. `ccflow` provides this:

In [15]:
root = ModelRegistry.root()
print(root is ModelRegistry.root())  # It is a singleton.
root.add("raw data", r, overwrite=True)
print(root)
print(root.models)

True
RootModelRegistry()
{'raw data': ModelRegistry(name='My Raw Data')}


From the root registry, there are three diffent ways to get underlying configs, using dictionary syntax, file path or getter syntax:

In [16]:
print(root["raw data"]["flow"])  # Dictionary syntax
print(root["raw data/flow"])  # File path syntax
print(root.get("raw data").get("flow")) # Getter syntax

MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=0)
MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=0)
MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=0)


Note that the same object can be registered under multiple different names. If one thinks of a registry as a "catalog" of data or configurations, it makes sense that the same item could be indexed in different ways. 

### Dependencies

As mentioned in the Introduction, we wish to allow configuration objects to depend on each other, in such a way that the linkage is dynamic. In the `ccflow` framework, this is done through object composition (and mutability of configs). However, to make things easier, we allow for configs to be referenced by their name in the **root** registry! For example, we can create a new config object like so:

In [17]:
root = ModelRegistry.root()
root.add("raw data", r, overwrite=True)

new_config = MyDataConfig(data_one="raw data/flow")
print(new_config)

MyDataConfig(data_one=MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=0), data_two=None)


If we now change the values on this config model in the registry, they will change in this newly created object as well:

In [18]:
root["raw data"]["flow"].description = "New Flow notebook"
print(new_config.data_one.description)

New Flow notebook


Note, however, that if we replace the config object in the registry with an entirely new config object, the dependency will still reference the old object. This is why you need to pass `overwrite=True` when adding an object to the registry with a name that already exists

With these dependencies set up, we can now register this object as well

In [19]:
root.add("data config", new_config, overwrite=True)
pprint(root.models.copy())

{'data config': MyDataConfig(data_one=MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='New Flow notebook', version=0), data_two=None),
 'raw data': ModelRegistry(name='My Raw Data')}


Even once registered, linkages between objects can be added through simple assignment

In [20]:
root["data config"].data_two = "/raw data/init"
root["data config"].data_two.description

'Python init file'

The config objects in `ccflow` can tell you where they are registered (which may be in more than one place), either as a tuple of (registry, name), or as a path by which the object could be accessed. i.e. For the composite configuration `"data config"`, registered in the root registry:

In [21]:
print(new_config.get_registrations())
print(new_config.get_registered_names())

[(RootModelRegistry(), 'data config')]
['/data config']


Below is an example of that for the calendar config, as accessed from the data config:

In [22]:
print(root["data config"].data_two.get_registrations())
print(root["data config"].data_two.get_registered_names())

[(ModelRegistry(name='My Raw Data'), 'init')]
['/raw data/init']


The config objects can also tell you their dependencies on other registered config objects. It will look recursively through the the entire nested configuration structure to find other models that are in the registry (even if some intermediate levels are not registered):

In [23]:
new_config.get_registry_dependencies()

[['/raw data/flow'], ['/raw data/init']]

To clear the root from all it's entries and start over, one can execute:

In [24]:
root.clear()
print(root.models)

{}


When you do this, the registrations and dependencies on previously registered objects are also reset:

In [25]:
print(new_config.get_registrations())
print(new_config.get_registry_dependencies())

[]
[]


### To/From Dictionaries

Often times, configuration objects start out as dictionaries, and need to be mapped into BaseModels for entry into the registry. While pydantic can do conversion (and validation) of dictionaries to models when the type is known, often the type is part of the configuration itself (i.e. as a string that represents the path to the object). As we leverage Hydra for file-based configuration, they adopt their convention that a field named `_target_` on the config can be used to represent the class type. 

We can then use hydra's utilities to help create the config objects:

In [26]:
from hydra.utils import instantiate

config = instantiate(
    {
        "_target_": "__main__.MyFileConfig",
        "file": "Tutorial.ipynb",
        "description": "Flow example notebook",
    }
)
print(config)

MyFileConfig(file=PosixPath('Tutorial.ipynb'), description='Flow example notebook', version=0)


Pydantic provides the ability to serialize models into dictionaries (or json), and the `ccflow.BaseModel` allows for serialization of the config attributes along with its type (as the `_target_`):

In [27]:
config.dict(by_alias=True)

{'file': PosixPath('Tutorial.ipynb'),
 'description': 'Flow example notebook',
 'version': 0,
 '_target_': '__main__.MyFileConfig'}

Configurations can also be serialized to json (so long as all elements are themselves json serializable, though pydantic allows for user-specified json serializers)

In [28]:
config.json(by_alias=True)

'{"file":"Tutorial.ipynb","description":"Flow example notebook","version":0,"_target_":"__main__.MyFileConfig"}'

We have a utility function to make it easier to view the object as JSON in a Jupyter notebook:

In [29]:
config.get_widget()

<IPython.core.display.JSON object>

Thus, by adding a `_target_` value whenever the type is not known (usually at the root of the structure), pydantic and hydra combined can easily de-serialize (and validate) nested configurations as well:

In [30]:
config_dict = {
    "_target_": "__main__.MyDataConfig",
    "data_one": {
        "file": "Tutorial.ipynb",
        "description": "Flow example notebook",
        "version": "1",
    },  # Note that "1" will be converted to 1
    "data_two": {
        "file": "ccflow/__init__.py",
        "description": "Python init file",
        "version": 0,
    },
}
config = instantiate(config_dict)
config.get_widget(widget_kwargs={"expanded":True})

<IPython.core.display.JSON object>

In fact, the `ModelRegistry` class has a convenience method that will take a dictionary of configs, and add them all to the registry. It is even clever enough to interpret nested dictionaries (with no `_target_`) as nested registries, and to resolve dependencies specified as strings (by looking up the string path in the **root** registry). Below is a complete example:

In [31]:
all_configs = {
    "raw data": {
        "flow": {
            "_target_": "__main__.MyFileConfig",
            "file": "Tutorial.ipynb",
            "description": "Flow example notebook",
            "version": "1",
        },
        "init": {
            "_target_": "__main__.MyFileConfig",
            "file": "ccflow/__init__.py",
            "description": "Python init file",
            "version": 0,
        },
    },
    "data config": {
        "_target_": "__main__.MyDataConfig",
        "data_one": "raw data/flow",
        "data_two": "raw data/init",
    },
}
root = ModelRegistry.root().clear()
root.load_config(all_configs)
root.get_widget(widget_kwargs={"expanded":True})

<IPython.core.display.JSON object>

### From Files

With the above pieces in place, especially the ability to load dictionaries of configs (specified as dictionaries) into the root `ModelRegistry`, the next step is to be able to load configuration files into this type of structure. Fortunately, this piece has already been solved by [Hydra](https://hydra.cc/docs/intro/) (which itself depends on [OmegaConf](https://omegaconf.readthedocs.io/) under the hood), so we depend on that, rather than re-inventing the wheel. In particular, Hydra allows for configurations to be split across multiple sub-directories and files, re-used in multiple places, and recombined as needed, in addition to some advanced command line tools. 

While hydra is primarily concerned with loading file-based configurations into command-line applications, their [Compose API](https://hydra.cc/docs/advanced/compose_api/) provides a way to load the configs interactively in a notebook, as a special dictionary type. However, to save people the work of first loading config files into config dictionaries, and then adding those into the registry, we've provided a function to do this directly, shown below. Note that these config files are loading models which are defined in `ccflow` and `ccflow.examples`.


In [32]:
import ccflow.examples
root = ModelRegistry.root().clear()
absolute_path = Path(ccflow.examples.__file__).parent / "config/conf.yaml"
root.load_config_from_path(path=absolute_path, config_key="registry")
root.get_widget(widget_kwargs={"expanded":False})

<IPython.core.display.JSON object>

The "config_key" argument in the function call above points to the subset of the hydra configs to load into the registry, as there may be parts of the config which you do not which to load into the registry (such as configuration of hydra itself, or potentially other global configuration variables that are only meant to exist in the file layer). 

It is out-of-scope for this tutorial to cover the various ways in which hydra can be used to generate configs, but please check out their [documentation](https://hydra.cc/docs/intro/) for more information.

## Advanced Config Examples

In this section we cover some more advanced config examples. It can be skipped on the first read if desired

### Custom Validation and Coercion

Pydantic provides a lot of functionality for custom validation. We don't cover all of it in the tutorial, but encourage people to read the section of the pydantic docs on [validators](https://pydantic-docs.helpmanual.io/usage/validators/). 

However, one important piece of information is that when trying to conform data to a specific config type, pydantic will call the `validate` method on that type. 

This can be useful as a user, when trying to explicitly convert non-dictionary data types to a pydantic model. It is also useful as a programmer, as one can (carefully) override the `validate` method to provide custom coercions. We illustrate that below

In [33]:
class NameConfig(BaseModel):
    first_name: str
    last_name: str

    @classmethod
    def validate(cls, values):
        if isinstance(values, str):
            names = values.split(" ")
            if len(names) == 2:
                values = dict(first_name=names[0], last_name=names[1])
        return super().validate(values)


NameConfig.validate("John Doe")

NameConfig(first_name='John', last_name='Doe')

Now, when a string name is passed to a config where `NameConfig` is expected, the validate function will be called, and the data will be coerced to the correct type. We use this trick in several places in `ccflow` to improve usability, and mention it here in case others find it useful.

In [34]:
class MyConfig(BaseModel):
    name: NameConfig

In [35]:
from pydantic import ValidationError

try:
    MyConfig(name="John Doe")
except ValidationError as e:
    print(e)

1 validation error for MyConfig
name
  Value error,  [type=value_error, input_value='John Doe', input_type=str]
    For further information visit https://errors.pydantic.dev/2.7/v/value_error


### Jinja Templates and SQL Queries

Another aspect of configuration that we haven't touched on so far is the need to specify a template document, and then to fill in the data. This occurs commonly when building database queries from parameters or when plugging data into an email template or HTML report. One common solution is to leverage python's string formatting capabilities, but this provides a minimal amount of validation to guard against accidents in the template definition, or malicious users (i.e. SQL injection attacks). The standard solution to this problem is to leverage [Jinja templates](https://jinja.palletsprojects.com/), which are extremely powerful (as they enable some amount of scripting inside the template itself). 

`ccflow` has defined a pydantic extension type that corresponds to Jinja templates, so that they can be used in configuration objects. We illustrate this below (note that unused template arguments are ignored).

In [36]:
from ccflow import JinjaTemplate


class MyTemplateConfig(BaseModel):
    greeting: JinjaTemplate
    user: str
    place: str


config = MyTemplateConfig(
    greeting="Hello {{user|upper}}, welcome to {{place}}!",
    user="friend",
    place="the tutorial",
)
print(config.greeting)
print(config.greeting.template.render(config.dict()))
config.place = "line 2"
print(config.greeting.template.render(config.dict()))

Hello {{user|upper}}, welcome to {{place}}!
Hello FRIEND, welcome to the tutorial!
Hello FRIEND, welcome to line 2!


While the above example may be useful for a templatized email or report, we provide a more complex and realistic example that illustrates how to easily configure a SQL query:

In [37]:
from datetime import date
from pydantic import Field


class MyQueryTemplate(BaseModel):
    query: JinjaTemplate
    columns: List[str]
    where: str = "Test"
    query_date: date = Field(default_factory=date.today)
    filters: List[str] = Field(default_factory=list)

In [38]:
query = """select  {{columns|join(",\n\t")}}
from MyDatabase
where WhereCol = '{{where}}'
    and NextDate = '{{query_date}}'
    and Date >= dateadd(day,-14,'{{query_date}}')
    {% for filter in filters %}and {{filter}} {% endfor %}
"""

config = MyQueryTemplate(
    query=query,
    columns=["Col1", "Col2 as MyOthercol", "SomeID"],
)
print(config.query.template.render(config.dict()))

select  Col1,
	Col2 as MyOthercol,
	SomeID
from MyDatabase
where WhereCol = 'Test'
    and NextDate = '2024-06-27'
    and Date >= dateadd(day,-14,'2024-06-27')
    


Now it's easy to reconfigure the query by, i.e. changing the date and adding filters:

In [39]:
config.query_date = date(2022, 1, 1)
config.filters = ["SomeID IS NOT NULL", "Col1 in 'blerg'"]
print(config.query.template.render(config.dict()))

select  Col1,
	Col2 as MyOthercol,
	SomeID
from MyDatabase
where WhereCol = 'Test'
    and NextDate = '2022-01-01'
    and Date >= dateadd(day,-14,'2022-01-01')
    and SomeID IS NOT NULL and Col1 in 'blerg' 


### Numpy Arrays

Sometimes it is more convenient to work with numpy array objects instead of python lists. `ccflow` provides tools to do this easily, as shown in the following example (which conforms the input data to the declared types automatically).

In [40]:
from ccflow import NDArray


class MyNumpyConfig(BaseModel):
    my_array: NDArray[np.float64]
    my_list: List[float]


MyNumpyConfig(my_array=[1, 2, 3], my_list=[1, 2, 3])

MyNumpyConfig(my_array=array([1., 2., 3.]), my_list=[1.0, 2.0, 3.0])

### Custom Types

Often the need arises for configuration to create objects that are not built-in types (str, int, float, etc). Pydantic supports a number of additional types (see the [documentation](https://pydantic-docs.helpmanual.io/usage/types/) for a full list), but can also handle completely arbitrary types. We can also use hydra to instantiate these arbitrary types from the configs as well, and pydantic will validate that the created object is an instance of the desired type. Furthermore, we specify some extension types  in `ccflow` that have additional validation and functionality.

First, to illustrate how custom types work, we define our own custom object type, and then a configuration object (i.e. `BaseModel`) that contains that type. To prevent accidental inclusion of custom types, pydantic must be explicitly told to include them using the Config option.

In [41]:
from hydra.utils import instantiate


class MyCustomType:
    pass


class MyConfigWithCustomType(BaseModel):
    custom: MyCustomType

    class Config:
        arbitrary_types_allowed = True


config = {
    "_target_": "__main__.MyConfigWithCustomType",
    "custom": {"_target_": "__main__.MyCustomType"},
}
instantiate(config)

MyConfigWithCustomType(custom=<__main__.MyCustomType object at 0x11fafe210>)

### Loading Objects by Path

Sometimes one needs to refer to an object that is already defined in the codebase by its import name. For example, in some cases, it can be easier easier to construct a config object in python, such as when lots of custom classes, enum types, or lambda functions are involved. For this case, `ccflow` also has a solution that is able to refer to any python object by path, as illustrated below:

In [42]:
from ccflow import PyObjectPath


class MyConfigWithPaths(BaseModel):
    builtin_func: PyObjectPath = PyObjectPath("builtins.len")
    separator: PyObjectPath = PyObjectPath("ccflow.REGISTRY_SEPARATOR")


config = MyConfigWithPaths()
assert config.builtin_func.object([1, 2, 3]) == 3
print("Separator: ", config.separator.object)

Separator:  /


At the moment, there is no type checking performed on the object type (we may add it in the future), but it will validate that the path is valid at load time to catch config issues as early as possible:

In [43]:
try:
    config.separator = "foo.bar"
except ValueError as e:
    print(e)

1 validation error for MyConfigWithPaths
separator
  Invalid python path: No module named 'foo.bar' [type=import_error, input_value='foo.bar', input_type=str]


## Using Configuration

With the ability to define arbitrarily complex configuration structures following the examples above, now arises the question of how best to use all that config information. 

One option is simply to access the relevant config information from the root registry whenever it is needed in the code. This method is **fragile** and **strongly discouraged**. It is equivalent to using global variables throughout the code, which has the following problems
 - Causes very tight coupling between parts of the code, and adds dependencies everwhere on, i.e. the naming and structure of the configuration, which may need to change frequently to stay organized
 - Since every configuration option is available to every piece of code, it makes it difficult to reason about which code depends on which parts of configuration. One of the original design goals was to make it very easy to remove (as well as add) un-needed configuration classes (i.e. for unused/unsuccessful models)
 - Because configurations are mutable (by design), using them everywhere makes it harder to reason about the state of the system (nothing is "pure" or "const correct" any more). It would be better to separate the parts of the code that depend on configuration (and are subject to change) from those that do not (i.e. analytics, pure/idempotent functions, etc)
 

### Adding a \_\_call__ method
One solution to this problem is to write code that lives as "close" to the relevant configuration as possible, such that the scope is limited to those configuration parameters it needs and the dependencies are more clear. In this way, the registry is not used at all for run-time access, except perhaps as an initial entry-point into the logic. Furthermore, this code can serve as the bridge between the configuration graph (i.e. `ccflow`) and any other computational graphs which will be doing the heavy lifting. For example, the configurations can be used to define tensor graphs (tensorflow, pytorch), event processing graphs (csp, kafka streams), task graphs (ray, dask), etc, which are then executed separately.

The easiest way to bind together user-defined business logic with the configuration classes is simply to add a method on the class. Following the [Single Responsibility Principle](https://en.wikipedia.org/wiki/Single-responsibility_principle), each of these classes should ideally have only one purpose, and hence following the python convention, we can name the primary method that commplishes this `__call__` so that the BaseModel becomes callable. The following examples illustrate this idea.

### Example: Reading a file

In [44]:
class MyFileConfig(BaseModel):
    """This is an example of a config class."""

    file: Path
    description: str = "N/A"
    version: int = 0

    def __call__(self):
        """Read the file as a pandas data frame"""
        return pd.read_parquet(self.file)

In [45]:

config = MyFileConfig(
    file="ccflow/examples/example.parquet", description="My Data"
)
df = config()
print(config)
df.head()

MyFileConfig(file=PosixPath('ccflow/examples/example.parquet'), description='My Data', version=0)


Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Segment,Country,City,State,Postal Code,Region,Product ID,Category,Sub-Category,Sales,Quantity,Discount,Profit
0,0,30-1718825,2024-06-20,2024-06-22,Standard Class,GF3 E0D,C,US,Victoriabury,Iowa,50365,Region 4,LWSE88451214620527,Telecommunication Services,Diversified Telecommunication Services,9900,190,47.04,541.33
1,1,98-7267735,2024-05-12,2024-06-05,Standard Class,028-OOM,B,US,Melissamouth,Iowa,78255,Region 4,JTSB68523839821087,Materials,Chemicals,1900,510,96.64,266.85
2,2,67-9356762,2024-06-07,2024-06-25,First Class,SIR-971,C,US,Madisontown,Vermont,24144,Region 4,OKGX33209192480300,Energy,Energy Equipment & Services,1300,770,23.79,95.39
3,3,75-5326684,2024-05-25,2024-05-28,Second Class,939 BJK,B,US,Maxwellville,Wyoming,93403,Region 4,NYXC50908782002016,Materials,Metals & Mining,800,690,10.78,734.79
4,4,30-9242910,2024-01-06,2024-06-05,Standard Class,2IJ A87,A,US,Lake Angelashire,Oklahoma,29249,Region 4,SWVG27221019552286,Utilities,Water Utilities,5800,730,37.62,0.68


Thus, using all the machinery in the previous section, we can define configs that are either pure data containers, or that correspond to some piece of arbitrary user-defined functionality, i.e. a "step" in a workflow. While we do leverage pydantic for conforming data, there are **no restrictions** on the kind of code that could live inside the `__call__` method (or strictly speaking, how many methods there are or what those methods are called). Here is a slightly more complex example for illustration:

In [46]:
import pyarrow.parquet as pq


class MyFileConfig(BaseModel):
    """This is an example of a config class."""

    file: Path
    description: str = "N/A"
    version: int = 0

    def read_pandas(self):
        return pd.read_parquet(self.file)

    def read_arrow(self):
        return pq.read_table(self.file)

    def __call__(self):
        """Read the file as a pandas data frame or arrow table"""
        if self.version == 0:
            return self.read_pandas()
        else:
            return self.read_arrow()

In [47]:
config = MyFileConfig(file="ccflow/examples/example.parquet", version=0)
df = config()
print(type(df))
import pyarrow.parquet as pq

config = MyFileConfig(file="ccflow/examples/example.parquet", version=1)
df = config()
print(type(df))

<class 'pandas.core.frame.DataFrame'>
<class 'pyarrow.lib.Table'>


### Example: Custom publisher

A common use case in any research/production framework is to send data from the current process to some other location. In extract-transform-load (ETL) processes, the target location is usually files, an object store or a database. However, automation of research reports containing tables, charts and HTML is another common use case; those may be written to files, sent to an experiment tracking framework or simply emailed to a set of recipients.

In `ccflow.flow`, we used the principles above to define a very simple interface for "publishers" (`ccflow.flow.publishers.BasePublisher`), which are configurable components who know how to take data from one place and send it to another. The reason for having a `BasePublisher` class is so that publishers can easily be substituted for one another as part of the configuration of a larger workflow. If publishers were all implemented a little bit differently, then it would be difficult to switch from, i.e. writing files to sending an email purely based on configuration.

While `ccflow.flow` provides several implementations out of the box for common use cases (files, email, [mlflow](https://www.mlflow.org/docs/latest/tracking.html)), custom implementations of the interface are also straightforward. Below we will create a custom publisher that uses IPython's "display" function to display a list of strings as html. We use Jinja templating as described in a previous section to define the html template.

In [48]:
from IPython.display import display, HTML
from ccflow import JinjaTemplate, BasePublisher
from typing import List


class MyPublisher(BasePublisher):
    data: List[str] = None
    html_template: JinjaTemplate

    def __call__(self):
        display(HTML(self.get_name()))
        display(HTML(self.html_template.template.render(data="<BR>".join(self.data))))


# Create the publisher (i.e. via static configuration)
p = MyPublisher(
    name="<b>My {{desc}} publisher:</b>",
    html_template="""<p style="color:blue;">{{data}}</p>""",
)

# Set the data that we want to publish (i.e. at runtime)
p.name_params = dict(desc="test")
p.data = ["Blue text.", "More blue text."]
p()

Even though "data" is a standard attribute on BasePublisher, implementations can override it to define more specific data types that the publisher allows, and pydantic provides the validation. For example, passing a dictionary to "data" in the example above results in an error (before the call to publish is even made):

In [49]:
p = MyPublisher(
    name="<b>My {{desc}} publisher:</b>",
    html_template="""<p style="color:blue;">{{data}}</p>""",
)
try:
    p.data = {}
except ValueError as v:
    print(v)

1 validation error for MyPublisher
data
  Input should be a valid list [type=list_type, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.7/v/list_type


### Example: Gaussian process regression in sklearn

Gaussian Processes (GP) are a generic supervised learning method designed to solve non-linear regression and probabilistic classification problems. The GaussianProcessRegressor in `sklearn` implements Gaussian processes (GP) for regression purposes. For this, the prior of the GP needs to be specified. The prior mean is assumed to be constant and zero (for normalize_y=False) or the training data’s mean (for normalize_y=True). The prior’s covariance is specified by passing a kernel object.

In the example below, we should how `ccflow` can be used to configure a GP Regression (including the kernel object). Even though the kernel object is not a pydantic type, we can still configure it in this framework. 

Note that `sklearn` must be installed to run this example. For the meaning of the parameters, refer to the [sklearn documentation](https://scikit-learn.org/stable/modules/generated/sklearn.gaussian_process.GaussianProcessRegressor.html#sklearn.gaussian_process.GaussianProcessRegressor); we follow their example for usage of the GP Regressor.


In [50]:
%%capture --no-stdout
try:
    from sklearn.datasets import make_friedman2
    from sklearn.gaussian_process import GaussianProcessRegressor
    from sklearn.gaussian_process.kernels import Kernel
    from typing import Callable, Union
    from hydra.utils import instantiate

    class GPRegressionModel(BaseModel):
        """Wrapping of sklearn's GaussianProcessRegressor for configuration"""

        kernel: Kernel
        alpha: float = 1e-10
        optimizer: Union[str, Callable] = "fmin_l_bfgs_b"
        n_restarts_optimizer: int = 0
        normalize_y: bool = False
        random_state: int = None

        class Config:
            # Here we tell pydantic to allow the "Kernel" type, which is not a standard pydantic type
            arbitrary_types_allowed = True

        def __call__(self):
            """Build the GP Regressor object"""
            # Rather than passing in each attribute individually, we can use the dict representation of the config class (leaving out the "type_" attribute),
            # as we named everything consistently with the sklearn parameter names
            return GaussianProcessRegressor(**self.dict(exclude={"type_"}))

    # Define the config as a dictionary (potentially to live in a config file)
    gpr_config = {
        "_target_": "__main__.GPRegressionModel",
        "kernel": {
            "_target_": "sklearn.gaussian_process.kernels.Sum",
            "k1": {
                "_target_": "sklearn.gaussian_process.kernels.DotProduct",
                "sigma_0": 1.0,
            },
            "k2": {
                "_target_": "sklearn.gaussian_process.kernels.WhiteKernel",
                "noise_level": 1.0,
            },
        },
        "optimizer": "fmin_l_bfgs_b",
        "random_state": 0,
    }

    # Load the config in the root registry
    root = ModelRegistry.root().clear()
    root.load_config({"gpr": gpr_config})

    # Use the config to train the configured model
    X, y = make_friedman2(n_samples=500, noise=0, random_state=0)
    gpr = root["gpr"]().fit(X, y)
    print(gpr.score(X, y))

    # We can now change the config interactively to experiment with different kernels,
    # without needing to go back to config dictionaries or files
    from sklearn.gaussian_process.kernels import RBF, WhiteKernel

    root["gpr"].kernel = RBF() + WhiteKernel()
    gpr = root["gpr"]().fit(X, y)
    print(gpr.score(X, y))

except ImportError:
    print("sklearn must be installed to run this example")

0.36802938610173386
-1.439871997881387
