# Generators, Coroutines and all the other things

Adapted from [PyMC4 Design overview. Generators, Coroutines and all the things](https://github.com/pymc-devs/pymc4/blob/master/notebooks/pymc4_design_guide.ipynb)

**This guide is NOT meant to serve as an introduction to probabilistic programming. It is meant for users of pyMC3 switcing to pyMC4, explaining the new model creation syntax and logic.**

With Theano reaching the [end of its life](https://groups.google.com/d/msg/theano-users/7Poq8BZutbY/rNCIfvAEAwAJ), pyMC core developers decided to port the framework's backend to [Tensorflow Probability](https://www.tensorflow.org/probability) (TF Probability/ TPF).  
But there are some fundamental design issues which the developers faced. Mainly, because TF's design focuses on functional programming and eager execution but Theano represented models as symbolic computational graphs.
In this document, we go over those problems, and their suggested solutions.

## The Problem With TF Probability

In [57]:
import tensorflow as tf
from tensorflow_probability import distributions as tfd

### What do we want from a PPL ?

We want to implement a probabilisitc program (think of a Bayesian model that you usually represent in a directed acyclic graph) that does 2 things: 

1. Forward sampling to get prior (predictive) samples; 
2. Reverse evaluation on some inputs to compute the log-probability (if we conditioned on the observed, we are evaluating the unnormalized posterior distribution of the unknown random variables). 

Specifically for computing the log_prob, we need to keep track of the dependency. For example, if we have something simple like `x ~ Normal(0, 1), y ~ Normal(x, 1)`, in the reverse mode (goal 2 from above) we need to swap `x` with the input `x` (either from user or programmatically) to essentially do `log_prob = Normal(0, 1).log_prob(x_input) + Normal(x_input, 1).log_prob(y_input)`.

There are a few approaches to this problem. For example, in PyMC3 with a static graph backend (Theano), we are able to write things in a declarative way and use the static computational graph to keep track of the dependences. This means we are able to do `log_prob = x.log_prob(x) + y.log_prob(y)`, as the value representation of the random variables `x` and `y` are "swap-in" with some input at runtime. 

With a dynamic graph, like of TFP, we run into problems as we write the model in a forward mode. We essentially lose track of the dependence in reverse evaluation. We need to either keep track of the graph representation ourselves, or write a function that could be re-evaluated to make sure we have the right dependencies. 

This ideally should look like this

In [2]:
def model(x):
    scale = tfd.HalfCauchy(loc=0, scale=1)
    coefs = tfd.Normal(loc=tf.zeros(x.shape[1]), scale=1)
    predictions = tfd.Normal(loc=tf.tensordot(x, coefs, axes=1), scale=scale)
    return predictions

In [3]:
model(tf.random.normal((100, 10)))

ValueError: TypeError: object of type 'Normal' has no len()


But this function will **not** work, because there is no random variable concept in `tfp`, meaning that you cannot do `RV_a.log_prob(RV_a)`.

We can try to sample, along the way without deferring the computation, as in `model_w_sample`

In [4]:
def model_w_sample(x):
    scale = tfd.HalfCauchy(loc=0, scale=1).sample()
    coefs = tfd.Normal(loc=tf.zeros(x.shape[1]), scale=1).sample()
    predictions = tfd.Normal(loc=tf.tensordot(x, coefs, axes=1), scale=scale)
    return predictions

In [5]:
model_w_sample(tf.random.normal((100, 10)))

<tfp.distributions.Normal 'Normal' batch_shape=[100] event_shape=[] dtype=float32>

Generating a log_prob from this model also wont work, because the computation is different than the function we have written down above. Since, `scale` and `coefs` are rather fixed now.

What we want here is to track function evaluation at runtime, depending on the goal (1 or 2 as stated above).

The very first way to cope with is was writing a wrapper over a distribution object. This wrapper was intended to catch a call to the distribution and use context to figure out what to do: 

* For goal 1, we draw a sample from a random variable and plug the concrete value into the downstream dependencies.
* For goal 2, we get the concrete value and evaluate it with the respective random variable, and also plug the concrete value into the downstream dependencies.

In [6]:
def model_w_yield(x):
    scale = yield tfd.HalfCauchy(loc=0, scale=1)
    coefs = yield tfd.Normal(loc=tf.zeros(x.shape[1]), scale=1)
    predictions = yield tfd.Normal(loc=tf.tensordot(x, coefs, axes=1), scale=scale)
    return predictions

In [7]:
model_w_yield(tf.random.normal((100, 10)))

<generator object model_w_yield at 0x14170fb50>

Now, we evaluate the model as expected. `yield` allows us to defer the program execution. 

Before evaluating this function, let's figure out what does `yield` do.

## Sidenote -- Primer on Generators

In [8]:
def generator(message):
    print("I will yield:", message)
    while True:
        yield message
    return "Nothing to yield, Goodbye!"

In [9]:
g = generator(message='generators are cool')

In [10]:
yielded_value = next(g)

I will yield: generators are cool


In [11]:
yielded_value

'generators are cool'

In [12]:
yielded_value = next(g)
yielded_value

'generators are cool'

In [13]:
yielded_value = next(g)
yielded_value

'generators are cool'

This is a simple infinite loop which will keep returning (rather yielding) the `message` argument passed to it.
What if we want it to stop at a specific point. That specific point can either be defined by:

1. The number of iteration
2. Occurence of a specific (user) input

The first goal can be achieved simply by replacing `while True:` by `for i in range(num_iterations):`.


The second goal is somewhat trickier. How do we send an input to a already created generator ? 
Python makes it *easier done than said* by adding a `send` method to generators. These supercharged generators have a special name - **coroutines**.

In [14]:
def coroutine(message):
    print("I will yield:", message)
    sent_value = yield message
    while sent_value != 'bye':
        print("I will yield:", message)
        print("And, I was sent:", sent_value)
        sent_value = yield message
        
    return "Nothing to yield, Goodbye!"

**Note**: There is no difference between the definition of a `generator` and `coroutine`. The generator `g` that we defined has a `send` method attached to it too. 
The only difference is that in coroutines, the programmer uses the `send` method to perform some compution inside the generator function.

In [15]:
c = coroutine(message='coroutines are cooler')

In [16]:
# 1st call
yielded_value = c.send(None) # or next(c) 
# assignment doesn't happen on the 1st call, so have to pass `None` as an argument to send
# It makes some sense, because if the generator were to yield only one value 
# (which it computed in the function), there isn't any point passing an input 
# because it will never be used anyway.

I will yield: coroutines are cooler


In [17]:
yielded_value

'coroutines are cooler'

In [18]:
# On 2nd call -- assign this to `sent_value` in 3rd line of `coroutine`
# Note: The `yield message` on the RHS won't actually yield anything
# As it has already done that when `c.send(None)` was executed

yielded_value = c.send('keep going') 

I will yield: coroutines are cooler
And, I was sent: keep going


In [19]:
yielded_value

'coroutines are cooler'

In [20]:
yielded_value = c.send('bye') 
yielded_value

StopIteration: Nothing to yield, Goodbye!

#### Wrapping Generators

Since our model will be yielding multiple objects, each of which has to be tracked. It is best to handle that iterative process using a wrapper which deals with this iteration and tracking process process.

In [21]:
def decorator(f):
    def wrapper(*args, **kwargs):
        g = f(kwargs['message'])
        try:
            l = yield from g
            print('done')
        except StopIteration as e:
            print('Function Finished')
        finally:
            print(l)
        return 'Wrapper finished'
    return wrapper

In [22]:
@decorator
def generator(message):
    print("I will yield:", message)
    result = yield message
    result = yield result
    return "Nothing to yield, Goodbye!"

In [23]:
k = generator(message='generators are cool')
next(k)

I will yield: generators are cool


'generators are cool'

In [24]:
k.send('Hi')

'Hi'

In [25]:
k.send('Hi')

done
Nothing to yield, Goodbye!


StopIteration: Wrapper finished

## Solving The Problem using Coroutines

In [26]:
state = dict(dists=dict(), samples=dict())
state

{'dists': {}, 'samples': {}}

In [27]:
state["input"] = tf.random.normal((3, 10))
m = model_w_yield(state["input"])

In [28]:
scale_dist = next(m) # or m.send(None)

In [29]:
print(scale_dist)

tfp.distributions.HalfCauchy("HalfCauchy", batch_shape=[], event_shape=[], dtype=float32)


which means, we are here

```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1) # <---- yielding this
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    ...
```

What can we do with this distribution? We can choose forward sampling (in this case we sample from the state-less distribution `HalfCauchy(0, 1)`). But we need it to be used by user seamlessly later on regardless of the context (goal 1 or 2 above). On the model side, we need to store intermediate values and its associated distributions.

In [30]:
assert scale_dist.name not in state["dists"]
state["samples"][scale_dist.name] = scale_dist.sample()
state["dists"][scale_dist.name] = scale_dist

In [31]:
state

{'dists': {'HalfCauchy': <tfp.distributions.HalfCauchy 'HalfCauchy' batch_shape=[] event_shape=[] dtype=float32>},
 'samples': {'HalfCauchy': <tf.Tensor: shape=(), dtype=float32, numpy=9.384968>},
 'input': <tf.Tensor: shape=(3, 10), dtype=float32, numpy=
 array([[-1.3179185 ,  1.6086259 , -1.3616879 ,  0.02001234, -1.2103086 ,
         -1.6375118 ,  2.7937667 ,  0.13853055, -0.41751987,  0.09009951],
        [-0.87523323,  0.90529805, -0.09060009, -1.7013458 , -0.196005  ,
          1.080928  , -1.8215148 ,  1.2471192 , -0.8949209 , -0.4108426 ],
        [-0.04518619,  1.1348138 , -0.35127786, -0.3947919 , -0.5378178 ,
          0.5538728 ,  0.9303225 , -0.6615112 ,  0.36607537, -0.6107869 ]],
       dtype=float32)>}

In [32]:
coefs_dist = m.send(state["samples"][scale_dist.name]) 
## assingment `scale = ...` and `yield tfd.Normal(...)` happen simultaneously in this call

In [33]:
print(coefs_dist)

tfp.distributions.Normal("Normal", batch_shape=[10], event_shape=[], dtype=float32)


```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1) # <---- assinging sent value to `scale`
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, ) # <---- yielding this
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale)
    return predictions
```

We do the same thing

In [34]:
assert coefs_dist.name not in state["dists"]
state["samples"][coefs_dist.name] = coefs_dist.sample()
state["dists"][coefs_dist.name] = coefs_dist

In [35]:
preds_dist = m.send(state["samples"][coefs_dist.name]) 
# assingment coefs = ... and yield tfd.Normal(tf.linalg...) happens simultaneously in this call

In [36]:
print(preds_dist)

tfp.distributions.Normal("Normal", batch_shape=[3], event_shape=[], dtype=float32)


```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, ) # <---- assinging sent value to `coefs`
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale) # <---- yielding this
    return predictions
```

We now have the predictive distribution. Here we have several options:
* sample from it: we get prior predictive
* set custom values instead of sample, essentially conditioning on data. We might be interested in this to compute unnormalized posterior
* replace it with another distribution, arbitrary magic

In [37]:
assert preds_dist.name not in state["dists"]
state["samples"][preds_dist.name] = tf.zeros(preds_dist.batch_shape)
state["dists"][preds_dist.name] = preds_dist

AssertionError: 

Gotcha, we found duplicated names in our toy graphical model. We can easily tell our user to rewrite the model to get rid of duplicate names

In [38]:
m.throw(RuntimeError(
    "We found duplicate names in your cool model: {}, "
    "so far we have other variables in the model, {}".format(
        preds_dist.name, set(state["dists"].keys()), 
    )
))

RuntimeError: We found duplicate names in your cool model: Normal, so far we have other variables in the model, {'HalfCauchy', 'Normal'}

The good thing is that we *communicate* with user, and can give meaningful exceptions.

The correct model should look like this:

```python
def model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="Normal_1") # <--- HERE we asked out user to change the name
    return predictions
```


Let's set all the names according to the new model and interact with user again using the same model

Our generator is now at the end of its execution - we can't interact with it any more. Let's create a new one and reevaluate with same sampled values (A hint how to get the desired `logp` function)

In [39]:
def new_model(x):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="Normal_1") # <--- HERE we asked out user to change the name
    return predictions

In [40]:
m = new_model(state["input"])
print(m.send(None))
print(m.send(state["samples"]["HalfCauchy"]))
print(m.send(state["samples"]["Normal"]))
try:
    m.send(tf.zeros(state["input"].shape[0]))
except StopIteration as e:
    stop_iteration = e
else:
    raise RuntimeError("No exception met")

tfp.distributions.HalfCauchy("HalfCauchy", batch_shape=[], event_shape=[], dtype=float32)
tfp.distributions.Normal("Normal", batch_shape=[10], event_shape=[], dtype=float32)
tfp.distributions.Normal("Normal_1", batch_shape=[3], event_shape=[], dtype=float32)


In [41]:
print(stop_iteration)

tf.Tensor([0. 0. 0.], shape=(3,), dtype=float32)


Instead of returning some value in the last `send`, generator raises `StopIteration` because it is exhausted and reached the `return` statement (no more `yield` met). As explained (and checked here) in [PEP0342](https://www.python.org/dev/peps/pep-0342/), we have a return value inside

## Automate the Process

We all are lazy humans and can't stand doing repetitive things. In our model evaluation we followed these simple rules:
* asserting name is not used
* checking if we should sample or place a specific value instead
* recording distributions and samples

Next step is to make a function that does all this instead of us. In this tutorial let's keep it simple:

In [42]:
def automator(gen):
    
    def interact(x, state=None):
        if state == None:
            state = dict(dists=dict(), samples=dict())
            
        control_flow = gen(x)
        return_value = None
        while True:
            try:
                dist = control_flow.send(return_value)
                if dist.name in state["dists"]:
                    control_flow.throw(RuntimeError(
                        "We found duplicate names in your cool model: {}, "
                        "so far we have other variables in the model, {}".format(
                            dist.name, set(state["dists"].keys()), 
                        )
                    ))
                if dist.name in state["samples"]:
                    return_value = state["samples"][dist.name]
                else:
                    return_value = dist.sample()
                    state["samples"][dist.name] = return_value
                state["dists"][dist.name] = dist
            except StopIteration as e:
                if e.args:
                    return_value = e.args[0]
                else:
                    return_value = None
                break
        return return_value, state
    
    return interact

In [43]:
@automator
def new_model(x, state=None):
    scale = yield tfd.HalfCauchy(0, 1)
    coefs = yield tfd.Normal(tf.zeros(x.shape[1]), 1, )
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="Normal_1") 
    return predictions

In [44]:
x = tf.random.normal((3, 10))
preds, state = new_model(x)

In [45]:
state

{'dists': {'HalfCauchy': <tfp.distributions.HalfCauchy 'HalfCauchy' batch_shape=[] event_shape=[] dtype=float32>,
  'Normal': <tfp.distributions.Normal 'Normal' batch_shape=[10] event_shape=[] dtype=float32>,
  'Normal_1': <tfp.distributions.Normal 'Normal_1' batch_shape=[3] event_shape=[] dtype=float32>},
 'samples': {'HalfCauchy': <tf.Tensor: shape=(), dtype=float32, numpy=7.377093>,
  'Normal': <tf.Tensor: shape=(10,), dtype=float32, numpy=
  array([ 0.58071095,  0.979962  , -0.27915847,  1.8314253 ,  2.2730825 ,
          0.63325155, -1.0530564 , -1.5042036 ,  0.05723219, -0.25051627],
        dtype=float32)>,
  'Normal_1': <tf.Tensor: shape=(3,), dtype=float32, numpy=array([ -1.4686446, -14.179521 ,   1.4781713], dtype=float32)>}}

In [46]:
preds

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([ -1.4686446, -14.179521 ,   1.4781713], dtype=float32)>

We get all the things as expected. To calculate `logp` you just iterate over distributions and match them with the correspondig values. But let's dive deeper

## One level deeper

Recall the motivating example from [PR#125](https://github.com/pymc-devs/pymc4/pull/125)

In [47]:
def Horseshoe(mu=0, tau=1., s=1., name=None):
    with tf.name_scope(name):
        scale = yield tfd.HalfCauchy(0, s, name="scale")
        noise = yield tfd.Normal(0, tau, name="noise")
        return scale * noise + mu
    
@automator
def linreg(x, state=None):
    scale = yield tfd.HalfCauchy(0, 1, name="scale")
    coefs = yield Horseshoe(tf.zeros(x.shape[1]), name="coefs")
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="predictions")
    return predictions

In [48]:
x = tf.random.normal((3, 10))
preds, state = linreg(x)

AttributeError: 'generator' object has no attribute 'name'

Oooups, we have a type error. What we want is a nested model, but nesting models is something different from a plain generator. As we have our model being a generator itself, the return value of `Horseshoe(tf.zeros(x.shape[1]), name="coefs")` is a generator. Of course this generator has no name attribute. Okay, we can ask user to use `yield from` construction to generate from the generator

In [49]:
@automator
def linreg_ugly(x, state=None):
    scale = yield tfd.HalfCauchy(0, 1, name="scale")
    coefs = yield from Horseshoe(tf.zeros(x.shape[1]), name="coefs")
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="predictions")
    return predictions

In [50]:
x = tf.random.normal((3, 10))
preds, state = linreg_ugly(x)

Okay, we passed this thing

In [51]:
state["dists"]

{'scale': <tfp.distributions.HalfCauchy 'scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_scale': <tfp.distributions.HalfCauchy 'coefs_scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_noise': <tfp.distributions.Normal 'coefs_noise' batch_shape=[] event_shape=[] dtype=float32>,
 'predictions': <tfp.distributions.Normal 'predictions' batch_shape=[3] event_shape=[] dtype=float32>}

We got nesting models working, but it requires `yield from`. This is UGLY and potentially confusing for user. Fortunately, we can rewrite out `interact` function to accept nested models in a few lines, and let the Python do the task for us.

In [52]:
from types import GeneratorType

def recursive_automator(gen):

    def interact(x, state=None):
        if state == None:
            state = dict(dists=dict(), samples=dict())
            
        if not isinstance(gen, GeneratorType):
            control_flow = gen(x)
        else:
            control_flow = gen
    
        return_value = None
        
        while True:
            try:
                dist = control_flow.send(return_value)
                if isinstance(dist, GeneratorType):
                    return_value, state = recursive_automator(dist)(x=None, state=state)
                else:
                    if dist.name in state["dists"]:
                        control_flow.throw(RuntimeError(
                            "We found duplicate names in your cool model: {}, "
                            "so far we have other variables in the model, {}".format(
                                dist.name, set(state["dists"].keys()), 
                            )
                        ))
                    if dist.name in state["samples"]:
                        return_value = state["samples"][dist.name]
                    else:
                        return_value = dist.sample()
                        state["samples"][dist.name] = return_value
                    state["dists"][dist.name] = dist
            except StopIteration as e:
                if e.args:
                    return_value = e.args[0]
                else:
                    return_value = None
                break
        return return_value, state
    
    return interact

In [53]:
def Horseshoe(mu=0, tau=1., s=1., name=None):
    with tf.name_scope(name):
        scale = yield tfd.HalfCauchy(0, s, name="scale")
        noise = yield tfd.Normal(0, tau, name="noise")
        return scale * noise + mu


@recursive_automator
def linreg(x, state=None):
    scale = yield tfd.HalfCauchy(0, 1, name="scale")
    coefs = yield Horseshoe(tf.zeros(x.shape[1]), name="coefs")
    predictions = yield tfd.Normal(tf.linalg.matvec(x, coefs), scale, name="predictions")
    return predictions

In [54]:
x = tf.random.normal((3, 10))
preds, state = linreg(x)

# state = dict(dists=dict(), samples=dict())
# preds, state = linreg(x, state)

In [55]:
state["dists"]

{'scale': <tfp.distributions.HalfCauchy 'scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_scale': <tfp.distributions.HalfCauchy 'coefs_scale' batch_shape=[] event_shape=[] dtype=float32>,
 'coefs_noise': <tfp.distributions.Normal 'coefs_noise' batch_shape=[] event_shape=[] dtype=float32>,
 'predictions': <tfp.distributions.Normal 'predictions' batch_shape=[3] event_shape=[] dtype=float32>}

In [56]:
state["samples"]

{'scale': <tf.Tensor: shape=(), dtype=float32, numpy=0.71308523>,
 'coefs_scale': <tf.Tensor: shape=(), dtype=float32, numpy=3.2109985>,
 'coefs_noise': <tf.Tensor: shape=(), dtype=float32, numpy=-1.0621829>,
 'predictions': <tf.Tensor: shape=(3,), dtype=float32, numpy=array([ 3.1937761, -9.121798 ,  5.871497 ], dtype=float32)>}

## Conclusion

We've discussed the central idea behind PyMC4's core engine using coroutines and decorators. 

Here are some references if you want to learn more about these syntactic elements:
* https://stackoverflow.com/a/19302700
* https://stackoverflow.com/a/26109157
* https://realpython.com/primer-on-python-decorators/