In [1]:
import numpy as np
from scipy import stats
import unittest

## Probability functions and checks
This notebook is to be used as a submodule that contains wrappers for all the basic probabilty functions used by the ordinal probit model for survey data, and Metropolis-Hastings sampler. There is also an optional testing suite.

In [2]:
def initialize_suite(TestCase):
    loader = unittest.TestLoader()
    suite = loader.loadTestsFromTestCase(TestCase)
    return suite 

In [2]:
def mu_prior(mu, mu_0, sigma_0, printing=False, debug=False):
    """Wrapper for prior on mu. Distribution and hyperparameters taken from original paper.
    
    Inputs
    -------------------
    mu: latent mean; scalar value in Reals;
    mu_0: prior mean; scalar value in Reals;
    sigma_0: prior standard devation; scalar value in (0, inf);
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    p: prior probability of the input value of mu."""
    
    p = stats.norm.pdf(mu, mu_0, sigma_0)
    
    if debug:
        assert 0 < p < 1, "p not in right range: {}".format(p)
        assert isinstance(p, float) is True, "type of p is not scalar: {}".format(type(p))
    return p

def mus_log_prior(mus, mu_0, sigma_0, printing=False, debug=False):
    """Returns log prior over all mus. 
    Procedure, distributions and hyperparameters taken from original paper.
    
    Inputs
    -------------------
    mus: vector of scalar value in Reals corresponding to mus;
    mu_0: prior mean; scalar value in Reals;
    sigma_0: prior standard devation for all mus; scalar value in (0, inf);
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    LP: log prior probability of the input value of mus."""
    
    LP = 0 # log prior
    
    for mu in mus: # ignore first and last value, as these aren't really variables
        LP += np.log(mu_prior(mu, mu_0, sigma_0, printing, debug))
    if debug:
        assert isinstance(LP, float) is True, "type of LP is not scalar: {}".format(type(LP))
            
    return LP

def mu_proposal(mu, sigma_prop, printing=False, debug=False):
    """Function for Metropolis sampler to generate proposal for individual mu. 
    Simple random walk. 
    
    Inputs
    -------------------
    mu: latent mean; scalar value in Reals;
    sigma_prop: standard deviation of jumps; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    mu_star: new proposed value for mu."""
    
    mu_star = np.random.normal(mu, sigma_prop)
    
    if debug:
        assert isinstance(mu_star, float) is True, "type of LP is not scalar: {}".format(type(mu_star))
        
    return mu_star

def mu_log_jump_probs(mu, mu_star, sigma_prop, printing=False, debug=False):
    """While using a symmetric (i.e., non-truncated) proposal, this is merely"""
    return 0  

def sigma_prior(sigma, lower_0, upper_0, printing=False, debug=False):
    """Wrapper for prior on sigma. Distribution and hyperparameters taken from original paper.
    
    Inputs
    -------------------
    sigma: scalar value in (0, inf);
    lower_0: lower end of uniform;
    upper_0: upper end of uniform; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    p: prior probability of the input value of sigma."""
    if lower_0 <= sigma <= upper_0:
        p = 1/(upper_0-lower_0)
    else:
        p = 0.0
    
    if debug:
        assert 0 <= p <= 1, "p not in right range: {}".format(p)
        assert isinstance(p, float) is True, "type of p is not scalar: {}".format(type(p))
    return p

def sigmas_log_prior(sigmas, lower_0, upper_0, printing=False, debug=False):
    """Returns log prior over all sigmas. 
    Procedure, distributions and hyperparameters taken from original paper.
    
    Inputs
    -------------------
    sigmas: vector of scalar values in (0, inf);
    lower_0: lower end of uniform;
    upper_0: upper end of uniform; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    LP: log prior probability of the input value of sigmas."""
    
    LP = 0 # log prior
    
    for sigma in sigmas: # ignore first and last value, as these aren't really variables
        LP += np.log(sigma_prior(sigma, lower_0, upper_0, printing, debug))
    
    if debug:
        assert isinstance(LP, float), "type of LP is not scalar: {}".format(type(LP))
            
    return LP

def sigma_proposal(sigma, sigma_prop, lower_0, upper_0, printing=False, debug=False):
    """Function for Metropolis sampler to generate proposal for individual sigma. 
    Simple random walk that reflects on both sides of interval.
    
    Inputs
    -------------------
    sigma: latent standard deviation; scalar value in (0, inf);
    sigma_prop: scale of jumps; 
    lower_0: lower end of uniform;
    upper_0: upper end of uniform; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    sigma_star: new proposed value for sigma."""
    
    sigma_star = stats.truncnorm.rvs(lower_0, upper_0, sigma, sigma_prop)
    if debug:
        assert lower_0 <= sigma_star <= upper_0
    return sigma_star

def sigma_proposal_prob(sigma_new, sigma_old, sigma_prop, lower_0, upper_0, printing=False, debug=False):
    """Function for Metropolis-Hastings sampler to evaluate proposal for individual sigma. 
    Simple random walk that reflects on both sides of interval. Note, have to convert lower and upper
    to standard normal form; scipy then reconverts based on scale and location parameters.
    
    Inputs
    -------------------
    sigma_old: latent standard deviation; scalar value in (lower_0, upper_0);
    sigma_new: latent standard deviation; scalar value in (lower_0, upper_0);
    sigma_prop: scale of jumps; 
    lower_0: lower end of range;
    upper_0: upper end of range; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    p: probability for value of sigma."""
    a = (lower_0 - sigma_old)/sigma_prop
    b = (upper_0 - sigma_old)/sigma_prop
    p = stats.truncnorm.pdf(sigma_new, a, b, sigma_old, sigma_prop)
    if debug:
        assert 0 <= p <= 1, "p not in right range: {}".format(p)
        assert isinstance(p, float) is True, "type of p is not scalar: {}".format(type(p))
    return p


def sigma_log_jump_probs(sigma, sigma_star, sigma_prop, lower_0, upper_0, printing=False, debug=False):
    """If using truncated normal, no longer symmetric. This is now Metropolis-Hastings algorithm, 
    and we have to add log of ratio of proposal probabilities to log of acceptance probabilities.
    
        Inputs
    -------------------
    sigma: latent standard deviation; scalar value in (lower, upper);
    sigma_star: proposed value for latent standard deviation; scalar value in (lower, upper);
    sigma_prop: scale of jump; 
    lower_0: lower end of uniform;
    upper_0: upper end of uniform; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    LP: log probability of ratio of jump probabilities."""
    # first argument is proposal, second mean
    LP = np.log(sigma_proposal_prob(sigma, sigma_star, sigma_prop, 
                                    lower_0, upper_0, printing, debug)) - np.log(sigma_proposal_prob(sigma_star, sigma, sigma_prop, 
                                                                                                 lower_0, upper_0, printing, debug))
        
    if debug:
        assert isinstance(LP, float) is True, "type of LP is not scalar: {}".format(type(LP))
        
    return LP

def theta_prior(theta, mu_0, sigma_0, printing=False, debug=False):
    """Wrapper for prior on theta. Distribution and hyperparameters taken from original paper.
    
    Inputs
    -------------------
    theta: scalar value in Reals;
    mu_0: prior mean; scalar value in Reals;
    sigma_0: prior standard devation; scalar value in (0, inf);
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    p: prior probability of the input value of theta."""
    
    p = stats.norm.pdf(theta, mu_0, sigma_0)
    
    if debug:
        assert 0 < p < 1, "p not in right range: {}".format(p)
        assert isinstance(p, float) is True, "type of p is not scalar: {}".format(type(p))
    if printing:
        print("in theta prior")
        print_fn(["theta", theta, "mu_0", mu_0, sigma_0, "sigma_0"])
    return p

def thetas_log_prior(thetas, shift, sigma_0, printing=False, debug=False):
    """Returns log prior over all thetas. 
    Procedure, distributions and hyperparameters taken from original paper.
    
    Inputs
    -------------------
    thetas: vector of scalar value in Reals corresponding to thetas;
    shift: value to shift prior means by; scalar value in Reals;
    sigma_0: prior standard devation for all thetas; scalar value in (0, inf);
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    LP: log prior probability of the input value of thetas."""
    
    k = len(thetas) - 2 # thetas have endpoints of -inf and inf
    thetaPriors = []
    ks = []
    
    
    for k in np.arange(1,k+1): # ignore first and last value, as these aren't really variables
        ks.append(k+shift)
        thetaPriors.append(theta_prior(thetas[k], k+shift, sigma_0, printing, debug))
    LP = np.sum(np.log(thetaPriors))
    
    if debug:
        assert isinstance(LP, float) is True, "type of LP is not scalar: {}".format(type(LP))
        test = np.append(thetas[1:], 1) - thetas < 0
        if test[:-1].any():
            print("some thetas out of order: {}; test: {}".format(thetas, test))
    if printing:
            print_fn(["sigma_0", sigma_0])
            print_fn(["ks plus shift", ks])
            print_fn(["thetas", thetas])
            print_fn(["thetaPriors", thetaPriors])
            print_fn(["theta log prior internal", LP])
    return LP

def theta_proposal(theta, sigma_prop, lower_0, upper_0, printing=False, debug=False):
    """Function for Metropolis sampler to generate proposal for individual theta. 
    Simple random walk that reflects on both sides of interval.
    
    Inputs
    -------------------
    theta: latent threshold; scalar value in (lower_0, upper_0); typically (0, k)
    sigma_prop: scale of jumps; 
    lower_0: lower end of range;
    upper_0: upper end of range; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    theta_star: new proposed value for theta."""
    
    theta_star = stats.truncnorm.rvs(lower_0, upper_0, theta, sigma_prop)
    if debug:
        assert lower_0 <= theta_star <= upper_0
    return sigma_star

def theta_proposal_prob(theta_new, theta_old, sigma_prop, lower_0, upper_0, printing=False, debug=False):
    """Function for Metropolis-Hastings sampler to evaluate proposal for individual theta. 
    Simple random walk that reflects on both sides of interval. Note, have to convert lower and upper
    to standard normal form; scipy then reconverts based on scale and location parameters.
    
    Inputs
    -------------------
    theta_old: latent threshold; scalar value in (lower_0, upper_0); typically (0, k)
    theta_new: latent threshold; scalar value in (lower_0, upper_0); typically (0, k)
    sigma_prop: scale of jumps; 
    lower_0: lower end of uniform;
    upper_0: upper end of uniform; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    p: probability for value of theta."""
    a = (lower_0 - theta_old)/sigma_prop
    b = (upper_0 - theta_old)/sigma_prop
    p = stats.truncnorm.pdf(theta_new, a, b, theta_old, sigma_prop)
    if debug:
        assert 0 <= p <= 1, "p not in right range: {}".format(p)
        assert isinstance(p, float) is True, "type of p is not scalar: {}".format(type(p))
    return p
    

def theta_log_jump_probs(theta, theta_star, sigma_prop, lower_0, upper_0, printing=False, debug=False):
    """If using truncated normal, no longer symmetric. This is now Metropolis-Hastings algorithm, 
    and we have to add log of ratio of proposal probabilities to log of acceptance probabilities.
    
        Inputs
    -------------------
    theta: latent threshold; scalar value in (lower_0, upper_0); typically (0, k)
    theta_star: latent threshold proposal; scalar value in (lower_0, upper_0); typically (0, k)
    sigma_prop: scale of jumps; 
    lower_0: lower end of uniform;
    upper_0: upper end of uniform; 
    printing: Bool; whether to print messages;
    debug: Bool; whether to run internal error checks.
    
    Outputs
    -------------------
    LP: log probability of ratio of jump probabilities."""
    # first argument is new proposal
    first_term = theta_proposal_prob(theta, theta_star, sigma_prop, 
                                    lower_0, upper_0, printing, debug)
    second_term = theta_proposal_prob(theta_star, theta, sigma_prop, 
                                      lower_0, upper_0, printing, debug)
    LP = np.log(first_term) - np.log(second_term)
    
#     if printing:
#         print("First theta log jump probs is {}; second is {}; log diff is {}".format(first_term, 
#                                                                                       second_term, LP))
    if debug:
        assert isinstance(LP, float) is True, "type of LP is not scalar: {}".format(type(LP))
        
    return LP

class ProbabilityFunctionsTestSuite(unittest.TestCase):
    
    def gauss(self, val, mu=0, s=1):
            return np.exp(- (1/2)*
                          ((val-mu)/s)**2) / np.sqrt(2*np.pi*(s**2))
    
    def gauss_trunc(self, x, mu, sigma, lower, upper):
        zeta = (x-mu)/sigma
        alpha = (lower-mu)/sigma
        beta = (upper-mu)/sigma
        Z = stats.norm.cdf(beta) - stats.norm.cdf(alpha)
        return (self.gauss(zeta)) / (sigma*Z)
    
    def test_mu_prior(self):
        """Test mu_prior"""
        testValues = [0, 0, 1]
        returned_p = mu_prior(*testValues, printing=True, debug=True)
        real_p = self.gauss(*testValues)
        self.assertAlmostEqual(returned_p, real_p)
        
    def test_mus_log_prior(self):
        """Test mus_log_prior"""
        testValues = [[0, 1], 0, 1]
        returned_LP = mus_log_prior(*testValues, printing=True, debug=True)
        probs = [self.gauss(x, testValues[1], testValues[2]) for x in testValues[0]]
        real_LP = np.sum(np.log(probs))
        self.assertAlmostEqual(returned_LP, real_LP)
        
    def test_sigma_prior(self):
        """Test sigma_prior"""
        testValues = [1, 0, 2]
        returned_p = sigma_prior(*testValues, printing=True, debug=True)
        real_p = 1/(testValues[-1] - testValues[-2])
        self.assertAlmostEqual(returned_p, real_p)
        
        testValues = [0, 1, 2]
        returned_p = sigma_prior(*testValues, printing=True, debug=True)
        real_p = 0.0
        self.assertEqual(returned_p, real_p)
        
    def test_sigmas_log_prior(self):
        """Test sigmas_log_prior"""
        testValues = [[1, 0], 0.5, 1.5]
        returned_LP = sigmas_log_prior(*testValues, printing=True, debug=True)
        probs = [1/(testValues[-1]-testValues[-2]) if testValues[1] <= x <= testValues[2] else 0.0 for x in testValues[0] ]
        real_LP = np.sum(np.log(probs))
        self.assertAlmostEqual(returned_LP, real_LP)
        
    def test_sigma_proposal_prob(self):
        """Test sigma_proposal_prob; https://en.wikipedia.org/wiki/Truncated_normal_distribution"""
        testValues = [1, 2, 1, 0, 3]
        returned_p = sigma_proposal_prob(*testValues, printing=True, debug=True)
        real_p = self.gauss_trunc(*testValues)
        self.assertAlmostEqual(returned_p, real_p)
        
    def test_sigma_log_jump_probs(self):
        testValues1 = [1, 2, 1, 0, 3]
        testValues2 = [2, 1, 1, 0, 3]
        returned_LP = sigma_log_jump_probs(*testValues1, printing=False, debug=False)
        real_LP = np.log(self.gauss_trunc(*testValues2)) - np.log(self.gauss_trunc(*testValues1))
        self.assertAlmostEqual(returned_LP, real_LP)
        
    def test_theta_prior(self):
        """Test theta_prior"""
        testValues = [0, 0, 1]
        returned_p = theta_prior(*testValues, printing=True, debug=True)
        real_p = self.gauss(*testValues)
        self.assertAlmostEqual(returned_p, real_p)
        
    def test_thetas_log_prior(self):
        """Test thetas_log_prior"""
        testValues = [[-np.inf, 1.5, 2.5, 3.5, np.inf], 0.5, 1.0]
        returned_LP = thetas_log_prior(*testValues, printing=False, debug=False)
        probs = [self.gauss(x, i+1.5, 1) for (i, x) in enumerate([1.5, 2.5, 3.5])]
        real_LP = np.sum(np.log(probs))
        self.assertAlmostEqual(returned_LP, real_LP)
        
    def test_theta_proposal_prob(self):
        """Test theta_proposal_prob; https://en.wikipedia.org/wiki/Truncated_normal_distribution"""
        testValues = [1, 2, 1, 0, 3]
        returned_p = theta_proposal_prob(*testValues, printing=True, debug=True)
        real_p = self.gauss_trunc(*testValues)
        self.assertAlmostEqual(returned_p, real_p)
        
    def test_theta_log_jump_probs(self):
        testValues1 = [1, 2, 1, 0, 3]
        testValues2 = [2, 1, 1, 0, 3]
        returned_LP = theta_log_jump_probs(*testValues1, printing=False, debug=False)
        real_LP = np.log(self.gauss_trunc(*testValues2)) - np.log(self.gauss_trunc(*testValues1))
        self.assertAlmostEqual(returned_LP, real_LP)
        
runner = unittest.TextTestRunner(failfast=True)
runner.run(initialize_suite(ProbabilityFunctionsTestSuite))
### add test for above

NameError: name 'unittest' is not defined