# Datatypes and the Experimentalist Pipeline

The experimentalist pipeline is a tool for running **sequences of functions** (callables) on some data.

If you have $n$ functions $f_i$,

$$
f_0(x), f_1(x), ... f_n(x)\, ,
$$

the pipeline of those functions $P_{f_i}$, is defined as

$$
P_{f_i}(x) = f_n(f_{n-1}(...(f_0(x))))\, .
$$

Key principles:
1. The *pipeline passes the data unchanged* between the $f_i$. It is agnostic to the data which the
functions require or return.
2. The $f_i$ can *mutate the data*.
3. The $f_i$ must be *compatible with one another*. It is on the user to ensure this is true.

***TODO: what about the parameters? Can we fit those into the definition?***

By being agnostic about the data, the pipeline is extremely simple and flexible. To allow a new kind of data, it is
only required to define the datatype (perhaps as a
[python `dataclass`](https://docs.python.org/3/library/dataclasses.html)) and then start writing $f_i$ which use that
dataclass as an input and output type.

Whilst it is possible to write $f_i$ which change the type of the data which pass through and still work as a
pipeline, it is recommended to return the same type, so $\mathrm{type}(x) = \mathrm{type}(f_i(x))$.

In this tutorial, we'll demonstrate some pipelines which:
- use standard library functions to operate on python built-in types,
- use numpy functions to operate on numpy arrays,
- use pandas functions to operate on pandas dataframes,
- use custom functions to operate on custom dataclasses, and
- mutate the type of the data in more complicated ways.

In [None]:
!pip install autora

Collecting autora
  Downloading autora-3.0.1-py3-none-any.whl (3.8 kB)
Collecting autora-core (from autora)
  Downloading autora_core-3.1.0-py3-none-any.whl (13 kB)
Collecting autora-workflow (from autora)
  Downloading autora_workflow-0.3.6-py3-none-any.whl (34 kB)
Collecting autora-synthetic (from autora)
  Downloading autora_synthetic-1.0.2-py3-none-any.whl (11 kB)
Collecting dill (from autora-workflow->autora)
  Downloading dill-0.3.6-py3-none-any.whl (110 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m110.5/110.5 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
Collecting colorama<0.5.0,>=0.4.3 (from typer[all]->autora-workflow->autora)
  Downloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Collecting shellingham<2.0.0,>=1.3.0 (from typer[all]->autora-workflow->autora)
  Downloading shellingham-1.5.0.post1-py2.py3-none-any.whl (9.4 kB)
Collecting rich<13.0.0,>=10.11.0 (from typer[all]->autora-workflow->autora)
  Downloading rich-12.6.0-py3-none-any.whl (237 kB)

In [None]:
from autora.experimentalist.pipeline import make_pipeline

## Standard Library Functions and Built-In-Types

### Elementary types

Define a pipeline which operates on numeric types.

In [None]:
functions = [lambda x: x+1]
pipeline = make_pipeline(functions)
pipeline(1)

2

In [None]:
functions = [lambda x: x+1, lambda x: x*3, lambda x: x/2]
pipeline = make_pipeline(functions)
pipeline(1)  # 1 -> 2 -> 6 -> 3.0

3.0

In [None]:
pipeline(16)  # 16 -> 17 -> 51 -> 25.5

25.5

Define a pipeline which operates on strings.

In [None]:
functions = [
    str.title,
    lambda s: str.center(s, 22, "*"),
    str.upper,
    str.lower
    ]
pipeline = make_pipeline(functions)
pipeline("the input string")

'***the input string***'

### Collections
A pipeline which operates on sequences, like a list of characters:

In [None]:
from random import seed, choices
functions = [
    lambda l: choices(l, k=10),
    sorted
  ]
pipeline = make_pipeline(functions)
seed(1)
pipeline(list("abcdefghijklmnopqrstuvwxyz!@#$%^&*()"))

['#', '%', '@', 'b', 'd', 'e', 'j', 'q', 'r', 'x']

... or on a sequence of numbers:

In [None]:
functions = [
    lambda l: choices(l, k=10),
    lambda l: [x / 1_000_000 for x in l]
]
pipeline = make_pipeline(functions)
seed(1)
pipeline(range(1_000_000))

[0.134364,
 0.847433,
 0.763774,
 0.255069,
 0.495435,
 0.449491,
 0.651592,
 0.788723,
 0.093859,
 0.028347]

... or on an infinitely long iterable of numbers:

In [None]:
import random
from itertools import count, takewhile
def sample_with_probability(x, p=(1/1000)):
    """Samples from a (potentially unbounded) iterable `x` with a fixed probability `p`."""
    for x_i in x:
        if random.random() < p:
            yield x_i


functions = [
    sample_with_probability,
    lambda l: takewhile(lambda x: x < 10_000, l)
]

pipeline = make_pipeline(functions)
seed(1)
result = pipeline(count(1))
list(result)

[504,
 1155,
 3873,
 3919,
 4386,
 5329,
 6194,
 6734,
 6742,
 8071,
 9108,
 9129,
 9654,
 9947]

... or on an unboun iterable of tuples:

In [None]:
functions = [
    sample_with_probability,  # the same as before
    lambda l: takewhile(lambda t: t[0] + t[1] < 20_000, l)
]
pipeline = make_pipeline(functions)
seed(1)
result = pipeline(zip(count(1), count(500)))
list(result)

[(504, 1003),
 (1155, 1654),
 (3873, 4372),
 (3919, 4418),
 (4386, 4885),
 (5329, 5828),
 (6194, 6693),
 (6734, 7233),
 (6742, 7241),
 (8071, 8570),
 (9108, 9607),
 (9129, 9628),
 (9654, 10153)]

A pipeline which acts on a dictionary:

In [None]:
functions = [
    lambda d: dict(d, a=d["a"]+1),
    lambda d: dict(d, b=d["b"]-1),
    lambda d: dict(d, c=3),
]
pipeline = make_pipeline(functions)
pipeline(dict(a=1, b=1))  # {a: 1, b: 1} -> {a: 2, b: 1} -> {a: 2, b: 0} -> {a: 2, b: 0, c: 3}

{'a': 2, 'b': 0, 'c': 3}

In [None]:
## Numpy

## Numpy

In [None]:
rng = np.random.default_rng()
functions = [
    lambda a: rng.choice(a, axis = 0, size=5)
]
pipeline = make_pipeline(functions)
a_ = np.arange(1_000_000).reshape((-1, 4))
pipeline(a_)


array([[185760, 185761, 185762, 185763],
       [516812, 516813, 516814, 516815],
       [994952, 994953, 994954, 994955],
       [952720, 952721, 952722, 952723],
       [457604, 457605, 457606, 457607]])

## Dataclasses

In [None]:
from dataclasses import dataclass, replace

@dataclass(frozen=True)
class ExperimentalData:
    """Class for keeping track of an item in inventory."""
    observations: list
    conditions: list


functions = [
    lambda ed: replace(ed, observations=choices(ed.observations, k=10))
]

pipeline = make_pipeline(functions)
pipeline(
 ExperimentalData(
    observations=list(range(1_000_000)),
    conditions=list(range(20)))
)

ExperimentalData(observations=[488679, 731209, 353808, 16931, 952533, 31153, 764214, 586842, 205913, 199552], conditions=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19])