In [1]:
import bambi as bmb
import numpy as np
import pandas as pd
import pymc as pm

In [2]:
rng = np.random.default_rng(1234)
x = rng.normal(size=100)
y = 5 + 8 * x + rng.normal(scale=0.1, size=100)

data = pd.DataFrame({"x": x, "y": y})

In [3]:
model = bmb.Model("y ~ x", data)
model

       Formula: y ~ x
        Family: gaussian
          Link: mu = identity
  Observations: 100
        Priors: 
    target = mu
        Common-level effects
            Intercept ~ Normal(mu: 6.0256, sigma: 22.5695)
            x ~ Normal(mu: 0.0, sigma: 20.0145)
        
        Auxiliary parameters
            y_sigma ~ HalfStudentT(nu: 4.0, sigma: 8.9701)

In [4]:
idata = model.fit(random_seed=1234)

Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (2 chains in 2 jobs)
NUTS: [y_sigma, Intercept, x]


Sampling 2 chains for 1_000 tune and 1_000 draw iterations (2_000 + 2_000 draws total) took 6 seconds.


In [5]:
model.predict(idata)

In [6]:
from bambi.utils import get_aliased_name


def get_response_dist(family):
    if family.likelihood.dist:
        dist = family.likelihood.dist
    else:
        dist = dist = getattr(pm, family.likelihood.name)
    return dist


def expand_array(x, ndim):
    if x.ndim == ndim:
        return x
    dims_to_expand = tuple(range(ndim - 1, x.ndim -1, -1))
    return np.expand_dims(x, dims_to_expand)


def pps(model, idata):
    family = model.family
    response_dist = get_response_dist(family)
    params = family.likelihood.params
    response_aliased_name = get_aliased_name(model.response_component.response_term)
    kwargs = {}
    for param in params:
        if param == model.family.likelihood.parent:
            component = model.components[model.response_name]
            var_name = f"{response_aliased_name}_mean"
            kwargs[param] = idata.posterior[var_name].to_numpy()
        else:
            component = model.components[param]
            component_aliased_name = component.alias if component.alias else param
            var_name = f"{response_aliased_name}_{component_aliased_name}"
            if var_name in idata.posterior:
                kwargs[param] = idata.posterior[var_name].to_numpy()
            elif hasattr(component, "prior") and isinstance(component.prior, (int, float)):
                kwargs[param] = np.asarray(component.prior)
    
    # Determine the array with largest number of dimensions
    ndim_max = max(x.ndim for x in kwargs.values())
    
    # Append a dimension when needed
    for key, values in kwargs.items():
        kwargs[key] = expand_array(values, ndim_max)

    return pm.draw(response_dist.dist(**kwargs))

In [7]:
pps(model, idata)

array([[[-7.92931485,  5.3599891 , 10.96102903, ..., 12.80414419,
         15.97604574,  1.24241458],
        [-7.83942675,  5.39092801, 11.04369738, ..., 12.78528137,
         15.9653248 ,  1.06164032],
        [-7.586334  ,  5.51735475, 11.01387176, ..., 12.53074504,
         15.91067418,  0.99589946],
        ...,
        [-7.97856717,  5.38943216, 11.21162206, ..., 12.95332533,
         16.07597332,  1.07408496],
        [-7.91150013,  5.51382436, 10.74113429, ..., 12.83737177,
         15.87279383,  1.16936144],
        [-7.73144688,  5.49430956, 10.93111785, ..., 12.91980595,
         16.10894713,  0.98222478]],

       [[-7.89964836,  5.44805941, 10.91187453, ..., 12.73782085,
         16.10408788,  1.36835786],
        [-7.8016104 ,  5.52677545, 10.89231546, ..., 12.80098095,
         16.11457394,  1.04708347],
        [-8.03396024,  5.45379518, 10.9936181 , ..., 12.60423728,
         15.99418771,  1.10616061],
        ...,
        [-7.75763361,  5.41162864, 10.8395884 , ..., 1