In [None]:
import torch
import pyro

In [157]:
import pyro.distributions as dist
import pyro.distributions.constraints as constraints

def DCModel(Y, D):
    N, J, L = Y.shape[0], Y.shape[1], D[1].shape[0]
    beta_dims = list(map(lambda x: x.shape[1], D))

    a0 = torch.tensor(0.1)
    b0 = torch.tensor(0.1)
    d0 = torch.ones(L) * 0.1
    sigma2 = pyro.sample("item_response_variance", dist.InverseGamma(a0, b0))
    pi = pyro.sample("profile_probabilities", dist.Dirichlet(d0))
    betas = [torch.zeros(D[j].shape[1]) for j in range(J)]
    profiles = torch.Tensor(N, L)

    question_indices = pyro.plate("questions", size = J)
    individual_indices = pyro.plate("individuals", size = N)
    
    for j in question_indices:
        dim = beta_dims[j]
        betas[j] = pyro.sample("item_response_coefficients_{}".format(j), 
                               dist.MultivariateNormal(torch.zeros(D[j].shape[1]), sigma2 * torch.eye(D[j].shape[1])))

    for i in individual_indices:
        profiles[i] = pyro.sample("profile_{}".format(i), dist.Multinomial(1, pi))
        for j in question_indices:
            p = torch.sigmoid(torch.matmul(torch.matmul(profiles[i], D[j].float()), betas[j]))
            pyro.sample("obs_{}_{}".format(i, j), dist.Bernoulli(p), obs = Y[i, j])

def DCMguide(Y, D):
    N, J, L = Y.shape[0], Y.shape[1], D[1].shape[0]
    beta_dims = list(map(lambda x: x.shape[1], D))

    a_star = pyro.param("a_star", torch.tensor(0.1),
                        constraint = constraints.positive)
    b_star = pyro.param("b_star", torch.tensor(0.1),
                        constraint = constraints.positive)
    d_star = pyro.param("d_star", torch.tensor(torch.ones(L) * 0.1),
                        constraint = constraints.positive)
    beta_means = [torch.zeros(D[j].shape[1]) for j in range(J)]
    beta_vars = [torch.eye(D[j].shape[1]) for j in range(J)]
    profiles = torch.Tensor(N, L)

    question_indices = pyro.plate("questions", size = J)
    individual_indices = pyro.plate("individuals", size = N)

    for j in question_indices:
        beta_means[j] = pyro.param("beta_mean_{}".format(j), torch.zeros(D[j].shape[1]),
                                   constraint = constraints.real)
        beta_vars[j] = pyro.param("beta_var_{}".format(j), torch.eye(D[j].shape[1]),
                                   constraint = constraints.positive_definite)
    
    for i in individual_indices:
        profiles[i] = pyro.param("pi_probs_{}".format(i), torch.ones(L) * 0.25,
                                   constraint = constraints.simplex)
        
    pyro.sample("item_response_variance", dist.InverseGamma(a_star, b_star))
    pyro.sample("profile_probabilities", dist.Dirichlet(d_star))
    for j in question_indices:
        pyro.sample("item_response_coefficients_{}".format(j), 
                    dist.MultivariateNormal(beta_means[j], beta_vars[j]))
    for i in individual_indices:
        pyro.sample("profile_{}".format(i), dist.Multinomial(1, profiles[i]))



In [158]:
def dim_beta(d):
    # return 2**int(d).bit_count()
    return 2 ** bin(d).count("1") 
def generate_beta(d):
    if (d == 1):
        return torch.distributions.uniform.Uniform(-1.15,-1.05).sample([1])
    elif (d == 2):
        return torch.cat(( torch.distributions.uniform.Uniform(-1.15,-1.05).sample([1]), torch.distributions.uniform.Uniform(2.9,3.1).sample([1])))
    elif (d == 4):
        return torch.cat(( torch.distributions.uniform.Uniform(-1.15,-1.05).sample([1]), torch.distributions.uniform.Uniform(1.45,1.55).sample([2]), 
                             torch.distributions.uniform.Uniform(0.45,0.55).sample([1])))
    elif (d == 8):
        return torch.cat(( torch.distributions.uniform.Uniform(-1.15,-1.05).sample([1]), torch.distributions.uniform.Uniform(0.65,0.75).sample([2]), 
                             torch.distributions.uniform.Uniform(0.55,0.65).sample([1]),torch.distributions.uniform.Uniform(0.65,0.75).sample([1]),
                           torch.distributions.uniform.Uniform(0.55,0.65).sample([2]), torch.distributions.uniform.Uniform(0.35,0.45).sample([1]) ))
    elif (d == 16):
        return torch.cat(( torch.distributions.uniform.Uniform(-2.02,-1.99).sample([1]), torch.distributions.uniform.Uniform(0.68,0.72).sample([2]), 
                             torch.distributions.uniform.Uniform(0.23,0.27).sample([1]),torch.distributions.uniform.Uniform(0.68,0.72).sample([1]),
                           torch.distributions.uniform.Uniform(0.23,0.27).sample([2]), torch.distributions.uniform.Uniform(0.18,0.22).sample([1]), 
                         torch.distributions.uniform.Uniform(0.68,0.72).sample([1]), torch.distributions.uniform.Uniform(0.23,0.27).sample([2]), 
                             torch.distributions.uniform.Uniform(0.23,0.27).sample([1]),torch.distributions.uniform.Uniform(0.68,0.72).sample([1]),
                           torch.distributions.uniform.Uniform(0.23,0.27).sample([2]), torch.distributions.uniform.Uniform(0.13,0.17).sample([1]), 
                         ))
    else:
        return 0
    
def delta(skill, q):
    if q == 0:
        return 1
    return((skill & q) == q)
# function to determine all relevant subsets of skills for a given question
def subset(q):
    list = [q]
    i = q
    while i > 0:
        i = (i-1) & q
        list.append(i)
    list.sort()
    return list    
# generate delta matrix for given question profile
def generate_delta(L, q):
    sbset_q = subset(q)
    return torch.tensor([delta(x,y) for x in range(L) for y in sbset_q]).reshape(L, len(sbset_q))

In [159]:
import random
import numpy as np

random.seed(123)
# number of students
N = 1000
# number of skills
k = 2
# number of questions
J = 25
# number of possible skill profiles
L = 2**k

# generate data
# skill profiles
skill = torch.multinomial(torch.ones(L), N, replacement = True)
# Q matrix -> delta 
Q = torch.multinomial(torch.ones(L), J, replacement = True)

beta_dim = list(map(dim_beta, Q))
true_beta = torch.nested.nested_tensor(list(map(generate_beta, beta_dim)))
delta_list = torch.nested.nested_tensor(list(map(generate_delta, [L]*len(Q), Q)))
beta_mat = torch.nested.to_padded_tensor(true_beta, padding = 0.0, output_size = (J, L))
delta_mat = [0]*L
for i in range(L):
    delta_mat[i] = []
    for j in range(J):
        if delta_list[j].shape[1] < delta_list[j].shape[0]:
            delta_mat[i].append(torch.tensor(np.pad(delta_list[j][i], (0, L - delta_list[j].shape[1] )) ) )
        else:
            delta_mat[i].append(torch.tensor(delta_list[j][i]))
    delta_mat[i] = torch.stack(delta_mat[i])
delta_mat = torch.cat(delta_mat)
delta_mat = delta_mat.reshape([L,J,L])

  delta_mat[i].append(torch.tensor(delta_list[j][i]))


In [160]:
Y = torch.bernoulli(torch.sigmoid(torch.sum((delta_mat[skill] * beta_mat), axis = 2)))

In [None]:
from pyro.optim import Adam
from pyro.infer import SVI, Trace_ELBO

pyro.clear_param_store()

# setup the optimizer
adam_params = {"lr": 0.005, "betas": (0.90, 0.999)}
optimizer = Adam(adam_params)

# setup the inference algorithm
svi = SVI(DCModel, DCMguide, optimizer, loss=Trace_ELBO())


  d_star = pyro.param("d_star", torch.tensor(torch.ones(L) * 0.1),


.....

In [192]:
# do gradient steps
for step in range(5):
    svi.step(Y, delta_list)
    if step % 1 == 0:
        print('.', end='')

  d_star = pyro.param("d_star", torch.tensor(torch.ones(L) * 0.1),


.....

In [190]:
pyro.param("beta_mean_3")

tensor([ 0.0095, -0.0144], requires_grad=True)

In [181]:
pyro.param("beta_var_0")

tensor([[ 0.9900, -0.0050, -0.0050, -0.0050],
        [-0.0050,  0.9901, -0.0050, -0.0050],
        [-0.0050, -0.0050,  0.9901, -0.0049],
        [-0.0050, -0.0050, -0.0049,  0.9901]], grad_fn=<MmBackward0>)

In [195]:
pyro.param("pi_probs_0")

tensor([0.2527, 0.2485, 0.2574, 0.2415], grad_fn=<DivBackward0>)

In [193]:
pyro.param("a_star")

tensor(0.1065, grad_fn=<AddBackward0>)

In [194]:
pyro.param("b_star")

tensor(0.0986, grad_fn=<AddBackward0>)

In [196]:
skill[0]

tensor(2)