In [1]:
import numpy as np
import scipy as sp

from scipy import integrate
from scipy.optimize import basinhopping, fminbound, brentq

from bokeh.plotting import figure, show, output_notebook
output_notebook()

# Functions to Synthesize Data

In [3]:
# Functions to create synthetic data
def irt_evaluation(difficulty, discrimination, thetas):
    """
        Evaluates an IRT model and returns the exact values
        
        Args:
            difficulty: [array] of difficulty parameters
            discrimination:  [array | number] of discrimination parameters
            thetas: [array] of person abilities
            
        Returns:
            dichotomous matrix of [difficulty.size x thetas.size] representing
            synthetic data
    """
    # If discrimination is a scalar, make it an array
    if not np.ndim(discrimination):
        discrimination = np.ones_like(difficulty) * discrimination

    kernel = difficulty[:, None] - thetas
    kernel *= discrimination[:, None]
    return 1.0 / (1 + np.exp(kernel))



def create_synthetic_irt_dichotomous(difficulty, discrimination, thetas):
    """
        Creates synthetic IRT data to test parameters estimation
        functions.  Only for use with dichotomous outputs
        
        Args:
            difficulty: [array] of difficulty parameters
            discrimination:  [array | number] of discrimination parameters
            thetas: [array] of person abilities
            
        Returns:
            dichotomous matrix of [difficulty.size x thetas.size] representing
            synthetic data
    """
    continuous_output = irt_evaluation(difficulty, discrimination, thetas)

    # convert to binary based on probability
    random_compare = np.random.rand(*continuous_output.shape)
    
    return random_compare <= continuous_output


# Utility functions

In [53]:
from scipy.special import roots_legendre


def _get_quadrature_points(n, a, b):
    """
        Utility function to get the legendre points, 
        shifted from [-1, 1] to [a, b]
        
        Args:
            n: number of quadrature_points
            a: lower bound of integration
            b: upper bound of integration 
            
        A local function of the based fixed_quad found in scipy
    """
    x, w = roots_legendre(n)
    x = np.real(x)
    
    return (b - a) * (x + 1) * 0.5 + a
    

def _compute_partial_integral(theta, difficulty, discrimination, the_sign):
    """
        To be added
    """
    kernel = the_sign[:, :, None] * np.ones((1, 1, theta.size))
    kernel *= discrimination   
    kernel *= (theta[None, None, :] - difficulty[:, None, None])
    
    # Distribution
    gauss = 1.0 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)

    return  gauss[None, :] * (1.0 / (1.0 + np.exp(kernel))).prod(axis=0).squeeze()    

# Estimate functions based on approximation

In [41]:
def rauch_estimate(dataset, discrimination=1):
    """
        Estimates the difficulty parameters via the approximation
    
        Args:
            dataset: [items x participants] matrix of True/False Values
            discrimination: scalar of discrimination used in model (default to 1)
            
        Returns:
            array of discrimination estimates
    """
    n_no = np.count_nonzero(~dataset, axis=1)
    n_yes = np.count_nonzero(dataset, axis=1)
    return (np.sqrt(1 + discrimination**2 / 3) * 
            np.log(n_no / n_yes) / discrimination)


def onepl_estimate(dataset):
    """
        Estimates the difficulty parameters via the approximation
    
        Args:
            dataset: [items x participants] matrix of True/False Values
            
        Returns:
            array of discrimination, difficulty estimates
    """
    n_no = np.count_nonzero(~dataset, axis=1)
    n_yes = np.count_nonzero(dataset, axis=1)
    scalar = np.log(n_no / n_yes)

    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)
    the_sign = (-1)**unique_sets

    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination, response):
        gauss = 1.0 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)
        kernel = the_sign[:, :, None] * np.ones((1, 1, theta.size))
        kernel *= discrimination   
        kernel *= (theta[None, None, :] - difficulty[:, None, None])
        
        return  gauss[None, :] * (1.0 / (1.0 + np.exp(kernel))).prod(axis=0).squeeze()

    # Inline definition of cost function to minimize
    def min_func(estimate):
        difficulty = np.sqrt(1 + estimate**2 / 3) * scalar / estimate
        otpt = integrate.fixed_quad(quadrature_function, -5, 5, 
                                    (difficulty, estimate, unique_sets), n=61)[0]
        return -np.log(otpt).dot(counts)
       
    # Perform the minimization
    discrimination = fminbound(min_func, 0.25, 10)
    
    return discrimination, np.sqrt(1 + discrimination**2 / 3) * scalar / discrimination


def twopl_estimate(dataset, max_iter=25):
    """
        Estimates the difficulty parameters via the approximation
    
        Args:
            dataset: [items x participants] matrix of True/False Values
            max_iter:  maximum number of iterations to run
            
        Returns:
            array of discrimination, difficulty estimates
    """
    n_no = np.count_nonzero(~dataset, axis=1)
    n_yes = np.count_nonzero(dataset, axis=1)
    scalar = np.log(n_no / n_yes)

    n_items = dataset.shape[0]
    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)
    the_sign = (-1)**unique_sets

    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination, response):
        gauss = 1.0 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)
        kernel = the_sign[:, :, None] * np.ones((1, 1, theta.size))
        kernel *= discrimination[:, None, None]       
        kernel *= (theta[None, None, :] - difficulty[:, None, None])
        
        return  gauss[None, :] * (1.0 / (1.0 + np.exp(kernel))).prod(axis=0).squeeze()

    # Inline definition of cost function to minimize
    def min_func(estimate):
        difficulty = np.sqrt(1 + estimate**2 / 3) * scalar / estimate
        otpt = integrate.fixed_quad(quadrature_function, -5, 5, 
                                    (difficulty, estimate, unique_sets), n=61)[0]
        return -np.log(otpt).dot(counts)
       
    # Perform the minimization
    initial_guess = np.ones((dataset.shape[0],))
    previous_guess = initial_guess.copy()
    
    for iteration in range(max_iter):
        for ndx in range(n_items):
            def min_func_local(estimate):
                initial_guess[ndx] = estimate
                return min_func(initial_guess)

            initial_guess[ndx] = fminbound(min_func_local, 0.25, 6, xtol=1e-3)
        
        previous_guess = initial_guess.copy()
        if np.abs(initial_guess - previous_guess).max() < 1e-3:
            break
        
        
    
    return initial_guess, np.sqrt(1 + initial_guess**2 / 3) * scalar / initial_guess


# Functions based on full integral

In [118]:
def rauch_estimate_int(dataset, discrimination=1, max_iter=25):
    """
        Estimates parameters in an IRT model with full        
        gaussian quadrature
        
        Args:
            dataset: [items x participants] matrix of True/False Values
            discrimination: scalar of discrimination used in model (default to 1)
            max_iter: maximum number of iterations to run
            
        Returns:
            array of discrimination estimates
    """
    n_items = dataset.shape[0]
    n_no = np.count_nonzero(~dataset, axis=1)
    n_yes = np.count_nonzero(dataset, axis=1)
    scalar = n_yes / (n_yes + n_no)
    
    if np.ndim(discrimination) < 1:
        discrimination = np.full(n_items, discrimination)
   
    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination):
        gauss = 1.0 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)
        return irt_evaluation(np.array([difficulty]), np.array([discrimination]), theta) * gauss

    the_parameters = np.zeros((n_items,))

    # Perform the minimization
    for ndx in range(n_items):
        
        # Minimize each item separately
        def min_zero_local(estimate):
            return (scalar[ndx] - 
                    integrate.fixed_quad(quadrature_function, -10, 10, 
                    (estimate, discrimination[ndx]), n=101)[0])
        
        the_parameters[ndx] = brentq(min_zero_local, -6, 6)
            
    return the_parameters


def onepl_estimate_int(dataset):
    """
        Estimates the difficulty parameters via the approximation
    
        Args:
            dataset: [items x participants] matrix of True/False Values
            
        Returns:
            array of discrimination, difficulty estimates
    """
    n_no = np.count_nonzero(~dataset, axis=1)
    n_yes = np.count_nonzero(dataset, axis=1)
    scalar = np.log(n_no / n_yes)

    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)
    the_sign = (-1)**unique_sets

    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination, response):
        gauss = 1.0 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)
        kernel = the_sign[:, :, None] * np.ones((1, 1, theta.size))
        kernel *= discrimination   
        kernel *= (theta[None, None, :] - difficulty[:, None, None])
        
        return  gauss[None, :] * (1.0 / (1.0 + np.exp(kernel))).prod(axis=0).squeeze()

    # Inline definition of cost function to minimize
    def min_func(estimate):
        difficulty = rauch_estimate_int(dataset, estimate)
        otpt = integrate.fixed_quad(quadrature_function, -5, 5, 
                                    (difficulty, estimate, unique_sets), n=61)[0]
        return -np.log(otpt).dot(counts)
       
    # Perform the minimization
    discrimination = fminbound(min_func, 0.25, 10)
    
    return discrimination, rauch_estimate_int(dataset, discrimination)


def twopl_estimate_int(dataset, max_iter=25):
    """
        Estimates the difficulty parameters via the approximation
    
        Args:
            dataset: [items x participants] matrix of True/False Values
            max_iter:  maximum number of iterations to run
            
        Returns:
            array of discrimination, difficulty estimates
    """
    n_no = np.count_nonzero(~dataset, axis=1)
    n_yes = np.count_nonzero(dataset, axis=1)
    scalar = np.log(n_no / n_yes)

    n_items = dataset.shape[0]
    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)
    the_sign = (-1)**unique_sets

    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination, response):
        gauss = 1.0 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)
        kernel = the_sign[:, :, None] * np.ones((1, 1, theta.size))
        kernel *= discrimination[:, None, None]       
        kernel *= (theta[None, None, :] - difficulty[:, None, None])
        
        return  gauss[None, :] * (1.0 / (1.0 + np.exp(kernel))).prod(axis=0).squeeze()

    # Inline definition of cost function to minimize
    def min_func(estimate):
        difficulty = rauch_estimate_int(dataset, estimate)
        otpt = integrate.fixed_quad(quadrature_function, -5, 5, 
                                    (difficulty, estimate, unique_sets), n=61)[0]
        return -np.log(otpt).dot(counts)
       
    # Perform the minimization
    initial_guess = np.ones((dataset.shape[0],))
    previous_guess = initial_guess.copy()
    
    for iteration in range(max_iter):
        for ndx in range(n_items):
            def min_func_local(estimate):
                initial_guess[ndx] = estimate
                return min_func(initial_guess)

            initial_guess[ndx] = fminbound(min_func_local, 0.25, 6, xtol=1e-3)
        
        previous_guess = initial_guess.copy()
        if np.abs(initial_guess - previous_guess).max() < 1e-3:
            break
        
        
    
    return initial_guess, rauch_estimate_int(dataset, initial_guess)


# Functions based on complete joint probability

In [None]:
def onepl_estimate_full_new(dataset):
    """
        Estimates parameters in an IRT model with full        
        gaussian quadrature
        
        Args:
            dataset: [items x participants] matrix of True/False Values
            
        Returns:
            array of discrimination, difficulty estimates
    """
    n_items = dataset.shape[0]
    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)
    the_sign = (-1)**unique_sets

    theta = _get_quadrature_points(61, -5, 5)
    
    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination, 
                            old_difficulty, old_discrimination, 
                            partial_int, the_sign):
        kernel1 = the_sign[:, None] * (theta[None, :] - difficulty)
        kernel1 *= discrimination

        kernel2 = the_sign[:, None] * (theta[None, :] - old_difficulty)
        kernel2 *= discrimination

        return partial_int * (1 + np.exp(kernel2)) / (1 + np.exp(kernel1))
    
    # Inline definition of cost function to minimize
    def min_func(difficulty, old_difficulty, partial_int, the_sign):
        otpt = integrate.fixed_quad(quadrature_function, -5, 5, 
                (difficulty, old_difficulty, partial_int, the_sign), n=61)[0] + 1e-23
        return -np.log(otpt).dot(counts)

    # Get approximate guess to begin with
    initial_guess = rauch_estimate(dataset, discrimination=discrimination)

    for iteration in range(max_iter):
        previous_guess = initial_guess.copy()

        #Quadrature evaluation for values that do not change
        partial_int = _compute_partial_integral(theta, initial_guess,
                          discrimination, the_sign)
                
        for ndx in range(n_items):
            # Minimize each one separately
            value = initial_guess[ndx] * 1.0
            
            def min_func_local(estimate):
                return min_func(estimate, previous_guess[ndx], 
                                partial_int, the_sign[ndx])
            
            initial_guess[ndx] = fminbound(min_func_local, 
                                           value-0.75,
                                           value+0.75)
            
            partial_int = quadrature_function(theta, initial_guess[ndx], 
                                              previous_guess[ndx], partial_int, the_sign[ndx])

        if(np.abs(initial_guess - previous_guess).max() < 0.001):
            break
            
    # Perform the minimization
    return initial_guess

In [107]:
def rauch_estimate_full(dataset, discrimination=1, max_iter=25):
    """
        Estimates parameters in an IRT model with full        
        gaussian quadrature
        
        Args:
            dataset: [items x participants] matrix of True/False Values
            discrimination: scalar of discrimination used in model (default to 1)
            max_iter: maximum number of iterations to run
            
        Returns:
            array of discrimination estimates
    """
    n_items = dataset.shape[0]
    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)
    the_sign = (-1)**unique_sets

    theta = _get_quadrature_points(61, -5, 5)
    
    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, old_difficulty, partial_int, the_sign):
        kernel1 = the_sign[:, None] * (theta[None, :] - difficulty)
        kernel1 *= discrimination

        kernel2 = the_sign[:, None] * (theta[None, :] - old_difficulty)
        kernel2 *= discrimination

        return partial_int * (1 + np.exp(kernel2)) / (1 + np.exp(kernel1))
    
    # Inline definition of cost function to minimize
    def min_func(difficulty, old_difficulty, partial_int, the_sign):
        otpt = integrate.fixed_quad(quadrature_function, -5, 5, 
                (difficulty, old_difficulty, partial_int, the_sign), n=61)[0] + 1e-23
        return -np.log(otpt).dot(counts)

    # Get approximate guess to begin with
    initial_guess = rauch_estimate(dataset, discrimination=discrimination)

    for iteration in range(max_iter):
        previous_guess = initial_guess.copy()

        #Quadrature evaluation for values that do not change
        partial_int = _compute_partial_integral(theta, initial_guess,
                          discrimination, the_sign)
                
        for ndx in range(n_items):
            # Minimize each one separately
            value = initial_guess[ndx] * 1.0
            
            def min_func_local(estimate):
                return min_func(estimate, previous_guess[ndx], 
                                partial_int, the_sign[ndx])
            
            initial_guess[ndx] = fminbound(min_func_local, 
                                           value-0.75,
                                           value+0.75)
            
            partial_int = quadrature_function(theta, initial_guess[ndx], 
                                              previous_guess[ndx], partial_int, the_sign[ndx])

        if(np.abs(initial_guess - previous_guess).max() < 0.001):
            break
            
    # Perform the minimization
    return initial_guess


def onepl_estimate_full(dataset):
    """
        Estimates parameters in an IRT model with full        
        gaussian quadrature
        
        Args:
            dataset: [items x participants] matrix of True/False Values
            
        Returns:
            array of discrimination, difficulty estimates
    """
    minimizer_kwargs = {'method': 'SLSQP'}
    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)

    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination, response):
        the_sign = (-1)**response
        gauss = 1 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)
        kernel = theta - difficulty[:, None]
        kernel *= discrimination * the_sign[:, None]
        return gauss * (1.0 / (1.0 + np.exp(kernel))).prod(axis=0)

    # Inline definition of cost function to minimize
    def min_func(estimate):
        otpt = list(map(lambda ndx: integrate.fixed_quad(quadrature_function, -5, 5, 
                    (estimate[1:], estimate[0], unique_sets[:, ndx]), n=61)[0] + 1e-23, 
                        range(unique_sets.shape[1])))
        return -np.log(otpt).dot(counts)
    
    # Perform the minimization
    initial_guess = np.zeros((dataset.shape[0] + 1,))
    da, db = onepl_estimate(dataset)
    initial_guess[0] = da
    initial_guess[1:] = db
    results =  basinhopping(min_func, initial_guess, niter_success=10,
                            minimizer_kwargs=minimizer_kwargs)['x']
    
    return results[0], results[1:]


def twopl_estimate_full(dataset):
    """
        Estimates parameters in an IRT model with full        
        gaussian quadrature
        
        Args:
            dataset: [items x participants] matrix of True/False Values
            
        Returns:
            array of discrimination, difficulty estimates
    """
    minimizer_kwargs = {'method': 'SLSQP'}
    unique_sets, counts = np.unique(dataset, axis=1, return_counts=True)
    n_parameters = dataset.shape[0]

    # Inline definition of quadrature function
    def quadrature_function(theta, difficulty, discrimination, response):
        the_sign = (-1)**response
        gauss = 1.0 / np.sqrt(2 * np.pi) * np.exp(-np.square(theta) / 2)
        kernel = theta - difficulty[:, None]
        kernel *= discrimination[:, None] * the_sign[:, None]
        return gauss * (1.0 / (1.0 + np.exp(kernel))).prod(axis=0)

    # Inline definition of cost function to minimize
    def min_func(estimate):
        otpt = list(map(lambda ndx: integrate.fixed_quad(quadrature_function, -5, 5, 
                    (estimate[n_parameters:], estimate[:n_parameters], 
                     unique_sets[:, ndx]), n=61)[0] + 1e-23, 
                        range(unique_sets.shape[1])))
        return -np.log(otpt).dot(counts)
    
    # Perform the minimization
    initial_guess = np.zeros((2 * dataset.shape[0],))
    initial_guess[:n_parameters] = 1.0
    results =  basinhopping(min_func, initial_guess, niter_success=10,
                            minimizer_kwargs=minimizer_kwargs)['x']
    
    return results[:n_parameters], results[n_parameters:]


# Create a set of synthetic data

In [122]:
n_items, n_participants = 20, 1000
diffc = np.linspace(-2.5, 2.5, n_items)
discr = 1.7 #1.0 + np.random.rand(n_items,)
thetas = np.random.randn(n_participants)

syn_data = create_synthetic_irt_dichotomous(diffc, discr, thetas)

In [108]:
b_est = rauch_estimate(syn_data, discr)
b_int = rauch_estimate_int(syn_data, discr)
b_full = rauch_estimate_full(syn_data, discr)

In [126]:
a_est = onepl_estimate(syn_data)
a_int = onepl_estimate_int(syn_data)
a_full = onepl_estimate_full(syn_data)

In [124]:
a_est

(1.8209763717473142,
 array([-2.34615733, -2.16462544, -1.90373722, -1.71596899, -1.26344307,
        -1.02249428, -0.74860798, -0.55709201, -0.45499941, -0.08294301,
         0.19863105,  0.35977358,  0.5823782 ,  0.88818528,  1.09964194,
         1.38841355,  1.60278849,  1.81476816,  2.22115645,  2.49176567]))

In [125]:
a_int

(1.7848537085514562,
 array([-2.33282921, -2.17410727, -1.9396839 , -1.76606359, -1.32991606,
        -1.08749684, -0.8040389 , -0.60153833, -0.49241242, -0.09016641,
         0.21577001,  0.39002724,  0.62844961,  0.94947886,  1.16585955,
         1.45289178,  1.65934988,  1.85794392,  2.22390805,  2.45771169]))

In [112]:
list(zip(diffc.round(3), b_est.round(3), b_int.round(3), b_full.round(3)))

[(-2.5, -2.456, -2.47, -2.467),
 (-2.237, -2.211, -2.241, -2.238),
 (-1.974, -1.954, -1.997, -1.994),
 (-1.711, -1.568, -1.62, -1.618),
 (-1.447, -1.39, -1.442, -1.44),
 (-1.184, -1.146, -1.197, -1.195),
 (-0.921, -0.866, -0.909, -0.908),
 (-0.658, -0.599, -0.631, -0.631),
 (-0.395, -0.311, -0.328, -0.329),
 (-0.132, -0.154, -0.163, -0.164),
 (0.132, 0.099, 0.105, 0.103),
 (0.395, 0.417, 0.441, 0.438),
 (0.658, 0.67, 0.706, 0.702),
 (0.921, 0.952, 0.998, 0.994),
 (1.184, 1.183, 1.233, 1.228),
 (1.447, 1.346, 1.398, 1.393),
 (1.711, 1.633, 1.684, 1.678),
 (1.974, 1.905, 1.949, 1.943),
 (2.237, 2.156, 2.19, 2.184),
 (2.5, 2.525, 2.535, 2.529)]

In [113]:
np.sqrt(np.square(diffc - b_est).mean()), np.sqrt(np.square(diffc - b_int).mean()), np.sqrt(np.square(diffc - b_full_new).mean())

(0.06035459553021747, 0.04282600357067218, 0.043019340030662497)