In [1]:
import numpy as np
from gradvi.tests import toy_priors, toy_data
#from gradvi.tests.derivative import numerical_derivative
from gradvi.normal_means import NormalMeansFromPosterior as NMFromPost
from gradvi.normal_means import NormalMeans as NormalMeans

In [2]:
import numbers
from gradvi.priors import Prior

def numerical_derivative(cls, func, x, cpre_args=(), cpost_args=(),
                         fpre_args = (), fpost_args = (), 
                         is_class_input = False, is_func_property = False,
                         eps = 1e-8, ckwargs = {}, fkwargs = {}, 
                         is_scale = False, is_sum = False):
    callargs = { 
        'cpre': cpre_args,
        'cpost': cpost_args,
        'fpre': fpre_args,
        'fpost': fpost_args,
        'is_class_input': is_class_input,
        'is_func_property': is_func_property,
        'ckwargs': ckwargs,
        'fkwargs': fkwargs,
        'return_sum': is_sum,
        }
    if is_scale:
        dfdx = symmetric_diff(cls, func, x + eps, x - eps,
                    call_func_nmscale, eps = 1e-8, callargs = callargs)
    elif isinstance(x, numbers.Number):
        dfdx = symmetric_diff(cls, func, x + eps, x - eps,
                    call_func, eps = eps, callargs = callargs)
    elif isinstance(x, np.ndarray) or isinstance(x, list):
        dfdx = np.zeros(x.shape[0])
        for i in range(x.shape[0]):
            x1      = x.copy()
            x2      = x.copy()
            x1[i]  += eps 
            x2[i]  -= eps 
            dfdx[i] = symmetric_diff(cls, func, x1, x2, 
                    call_func, eps = eps, callargs = callargs)
    elif isinstance(x, Prior):
        dfdx = np.zeros(x.k)
        for i in range(x.k):
            pr1 = x.copy()
            pr2 = x.copy()
            w1  = pr1.w.copy()
            w2  = pr2.w.copy()
            w1[i] += eps 
            w2[i] -= eps 
            pr1.update_w(w1)
            pr2.update_w(w2)
            diff = symmetric_diff(cls, func, pr1, pr2, call_func, eps = eps, callargs = callargs)
            dfdx[i] = np.sum(diff)
    return dfdx

def symmetric_diff(cls, func, x1, x2, func_wrapper, eps = 1e-8, callargs = {}):
    f1 = func_wrapper(cls, func, x1, **callargs)
    f2 = func_wrapper(cls, func, x2, **callargs)
    dfdx   = (f1 - f2) / (2. * eps)
    return dfdx


def call_func(cls, func, x, cpre = (), cpost = (), fpre = (), fpost = (), 
              is_class_input = False, is_func_property = False,
              ckwargs = {}, fkwargs = {}, return_sum = False):
    # Call a function
    if cls is None:
        f = func(*fpre, x, *fpost, **fkwargs)

    # Call a class method
    else:
        # create new class
        if is_class_input:
            cl = cls(*cpre, x, *cpost, **ckwargs)
        else:
            cl = cls(*cpre, **ckwargs)

        # call the method
        if is_func_property:
            f = getattr(cl, func)
        elif is_class_input:
            f  = getattr(cl, func)(*fpre, **fkwargs)
        else:
            f  = getattr(cl, func)(*fpre, x, *fpost, **fkwargs)

    # Do we have to sum the output?
    if return_sum:
        return np.sum(f)
    return f

def call_func_nmscale(cls, func, scale, **callargs):
    args = callargs.copy()
    args['ckwargs']['scale'] = scale
    sj2 = scale / callargs['ckwargs']['d']
    f   = call_func(cls, func, sj2, **args)
    return f

In [3]:
priors = toy_priors.get_all(k = 10, skbase = 10., sparsity = 0.3)
for prior in priors:
    z, sj2, s2, dj = toy_data.get_normal_means(prior)

    ckwargs = {'scale': s2, 'd': dj, 'method': 'fssi-cubic', 'ngrid': 500}
    fkwargs = {'jac': False}

    nm = NMFromPost(z, prior, sj2, **ckwargs)
    x, x_bd, x_wd, x_s2d = nm.penalty_operator(jac = True)

    # Check derivatives
    print (f"{prior.prior_type} prior, derivative wrt b")
    dfdb_numeric = numerical_derivative(NMFromPost, 'penalty_operator', z, cpost_args = (prior, sj2),
                                        ckwargs = ckwargs, is_class_input = True, fkwargs = fkwargs,
                                        is_sum = True)
    np.testing.assert_allclose(x_bd, dfdb_numeric, atol = 1e-4, rtol = 1e-8)
    
    print (f"{prior.prior_type} prior, derivative wrt sj2")
    dfds2_numeric = numerical_derivative(NMFromPost, 'penalty_operator', s2, cpre_args = (z, prior),
                                     ckwargs = ckwargs, is_class_input = True, fkwargs = fkwargs,
                                     is_scale = True)
    np.testing.assert_allclose(x_s2d / dj, dfds2_numeric, atol = 1e-4, rtol = 1e-8)
    
    print (f"{prior.prior_type} prior, derivative wrt w")
    dfdw_numeric = numerical_derivative(NMFromPost, 'penalty_operator', prior, cpre_args = (z,),
                                        cpost_args = (sj2,), ckwargs = ckwargs, 
                                        is_class_input = True, fkwargs = fkwargs)
    np.testing.assert_allclose(np.sum(x_wd, axis = 0), dfdw_numeric, atol = 1e-4, rtol = 1e-8)

ash prior, derivative wrt b
ash prior, derivative wrt sj2
ash prior, derivative wrt w
ash_scaled prior, derivative wrt b
ash_scaled prior, derivative wrt sj2
ash_scaled prior, derivative wrt w
point_normal prior, derivative wrt b
point_normal prior, derivative wrt sj2
point_normal prior, derivative wrt w


In [4]:
def get_nm_data():
    np.random.seed(100)
    n  = 100 
    y  = np.random.normal(0, 1, size = n)
    scale = 1.2**2
    #dj = np.square(np.random.normal(0, 1, size = n) * n)
    #dj = np.ones(n) * n
    dj = np.square(np.random.normal(1, 0.5, size = n)) * n 
    sj2 = scale / dj
    return y, sj2, scale, dj

z, sj2, s2, dj = get_nm_data()
ckwargs = {'scale': s2, 'd': dj}

for prior in priors:
    for func in ['shrinkage_operator', 'penalty_operator']:
        nm = NormalMeans(z, prior, sj2, **ckwargs)
        x, x_bd, x_wd, x_s2d = getattr(nm, func)(jac = True)
        # Check derivatives
        print (f"{prior.prior_type} prior, {func} derivative wrt b")
        dfdb_numeric = numerical_derivative(NormalMeans, func, z, cpost_args = (prior, sj2),
                                            ckwargs = ckwargs, is_class_input = True, fkwargs = fkwargs,
                                            is_sum = True)
        np.testing.assert_allclose(x_bd, dfdb_numeric, atol = 1e-4, rtol = 1e-8)

        print (f"{prior.prior_type} prior, {func} derivative wrt s2")
        dfds2_numeric = numerical_derivative(NormalMeans, func, s2, cpre_args = (z, prior),
                                         ckwargs = ckwargs, is_class_input = True, fkwargs = fkwargs,
                                         is_scale = True)
        np.testing.assert_allclose(x_s2d / dj, dfds2_numeric, atol = 1e-4, rtol = 1e-8)

        print (f"{prior.prior_type} prior, {func} derivative wrt w")
        dfdw_numeric = numerical_derivative(NormalMeans, func, prior, cpre_args = (z,),
                                            cpost_args = (sj2,), ckwargs = ckwargs, 
                                            is_class_input = True, fkwargs = fkwargs)
        np.testing.assert_allclose(np.sum(x_wd, axis = 0), dfdw_numeric, atol = 1e-4, rtol = 1e-8)

ash prior, shrinkage_operator derivative wrt b
ash prior, shrinkage_operator derivative wrt s2
ash prior, shrinkage_operator derivative wrt w
ash prior, penalty_operator derivative wrt b
ash prior, penalty_operator derivative wrt s2
ash prior, penalty_operator derivative wrt w
ash_scaled prior, shrinkage_operator derivative wrt b
ash_scaled prior, shrinkage_operator derivative wrt s2
ash_scaled prior, shrinkage_operator derivative wrt w
ash_scaled prior, penalty_operator derivative wrt b
ash_scaled prior, penalty_operator derivative wrt s2
ash_scaled prior, penalty_operator derivative wrt w
point_normal prior, shrinkage_operator derivative wrt b
point_normal prior, shrinkage_operator derivative wrt s2
point_normal prior, shrinkage_operator derivative wrt w
point_normal prior, penalty_operator derivative wrt b
point_normal prior, penalty_operator derivative wrt s2
point_normal prior, penalty_operator derivative wrt w


In [5]:
from gradvi.models import LinearModel


X, y, b, s2 = toy_data.get_linear_model(n = 20, p = 50)
for prior in priors:
    ckwargs = {'objtype': 'reparametrize'}
    lm = LinearModel(X, y, b, s2, prior, **ckwargs)
    
    print (f"{prior.prior_type} prior, derivative wrt b")
    dfdb = numerical_derivative(LinearModel, 'objective', b, cpre_args = (X, y), cpost_args = (s2, prior),
                ckwargs = ckwargs, is_class_input = True, is_func_property = True)
    np.testing.assert_allclose(lm.bgrad, dfdb, atol = 1e-4, rtol = 1e-8)
    
    print (f"{prior.prior_type} prior, derivative wrt s2")
    dfds2 = numerical_derivative(LinearModel, 'objective', s2, cpre_args = (X, y, b), cpost_args = (prior,),
                ckwargs = ckwargs, is_class_input = True, is_func_property = True)
    np.testing.assert_allclose(lm.s2grad, dfds2, atol = 1e-4, rtol = 1e-8)
    
    print (f"{prior.prior_type} prior, derivative wrt s2")
    dfdw = numerical_derivative(LinearModel, 'objective', prior, cpre_args = (X, y, b, s2),
                ckwargs = ckwargs, is_class_input = True, is_func_property = True)
    np.testing.assert_allclose(lm.wgrad, dfdw, atol = 1e-4, rtol = 1e-8)

ash prior, derivative wrt b
ash prior, derivative wrt s2
ash prior, derivative wrt s2
ash_scaled prior, derivative wrt b
ash_scaled prior, derivative wrt s2
ash_scaled prior, derivative wrt s2
point_normal prior, derivative wrt b
point_normal prior, derivative wrt s2
point_normal prior, derivative wrt s2
