In [1]:
import torch

from model import *
from data import *
from helpers import *
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yaml
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.loggers import CSVLogger
from torch.utils.data import DataLoader, Dataset
import math
import sys
import time
import os
import torch


##############
# Data generation
###############
N = 1000  # number of subjects
nit = 60  # number of items
ndim = 3  # number of dimensions
nit_dim = int(nit / ndim)  # number of items per dimension
max_nepoch = 50000  # max number of iterations

X = np.full((N, nit), 999)  # empty matrix for item scores
prob = np.full((N, nit), 0.99)  # empty matrix for probability correct
a = np.full((nit, ndim), 0.0)  # empty matrix for discrimination parameters

covMat = np.full((ndim, ndim), .3)  # covariance matrix of dimensions
np.fill_diagonal(covMat, 1)
theta = np.random.multivariate_normal([0] * ndim, covMat, N)  # draw values for the dimensions

structure_matrix = np.full((nit, ndim), 0.0)

for i in range(0, ndim):
    ax = np.random.uniform(.5, 1.5, nit_dim)  # draw discrimination parameters from uniform distribution
    a[range(nit_dim * i, nit_dim * (i + 1)), i] = ax
Q = np.zeros((60,3))  # simple structure configuration
Q[0:20, 0] = 1
Q[20:40, 1] = 1
Q[40:60,2] = 1

b = np.tile(np.linspace(-3, 3, nit_dim, endpoint=True), ndim)  # decoder intercepts

for i in range(0, nit):
    for p in range(0, N):
        prob[p, i] = 1 / (1 + math.exp(-(sum(a[i, :] * theta[p, :]) + b[i])))  # probability correct
        X[p, i] = np.random.binomial(1, prob[p, i])  # draw item scores on basis of prob correct


trainer = Trainer(max_epochs=50000,
                  min_epochs=100,
                  enable_checkpointing=False,
                  logger=None,
                  accelerator='cpu',
                  callbacks=[EarlyStopping(monitor='train_loss', min_delta=1e-7, patience=50, mode='min')])

dataset = SimDataset(torch.Tensor(X))
train_loader = DataLoader(dataset, batch_size=1000, shuffle=True)


vae = VAE(nitems=X.shape[1],
           dataloader=train_loader,
           latent_dims=3,
           hidden_layer_size=(nit+ndim)//2,
           qm=Q,
           learning_rate=0.001,
           batch_size=N,
           n_samples=1,
           cov_theta=True)

trainer.fit(vae)

a_est = vae.decoder.weights.t().detach().numpy()
d_est = vae.decoder.bias.t().detach().numpy()

print(((a_est-a)**2).mean())

dataset = SimDataset(torch.Tensor(X))
train_loader = DataLoader(dataset, batch_size=1000, shuffle=False)
data, mask = next(iter(train_loader))
theta_est, log_sigma_est = vae.encoder(data)



print('transforming theta')
L = torch.tril(vae.transform.weight, -1) + torch.eye(vae.transform.weight.shape[0])
theta_est = torch.matmul(theta_est, L)

theta_est = theta_est.detach().numpy()

a_est, theta_est = inv_factors(a_est=a_est, theta_est=theta_est, a_true=a)


cor = np.corrcoef(theta_est.T)
print(cor)


est = np.array([cor[1,0], cor[2,0], cor[1,2]])
true = np.array([covMat[1,0], covMat[2,0], covMat[1,2]])
rmse_cor = np.sqrt(np.mean((est-true)**2))
rmse_a = np.sqrt(np.mean((a_est-a)**2))
print(rmse_cor)


# for dim in range(3):
#     plt.figure()
#
#     ai_est = a_est[:, dim]
#     ai_true = a[:, dim]
#
#     mse = MSE(ai_est, ai_true)
#     plt.scatter(y=ai_est, x=ai_true)
#     plt.plot(ai_true, ai_true)
#     # for i, x in enumerate(ai_true):
#     #    plt.text(ai_true[i], ai_est[i], i)
#     plt.title(f'Parameter estimation plot: a{dim + 1}, MSE={round(mse, 4)}')
#     plt.xlabel('True values')
#     plt.ylabel('Estimates')
#     plt.savefig(f'./figures/param_est_plot_a{dim + 1}.png')
#
#     # parameter estimation plot for theta
#     plt.figure()
#     thetai_est = theta_est[:, dim].squeeze()
#     thetai_true = theta[:, dim].squeeze()
#     mse = MSE(thetai_est, thetai_true)
#     plt.scatter(y=thetai_est, x=thetai_true)
#     plt.plot(thetai_true, thetai_true)
#     plt.title(f'Parameter estimation plot: theta{dim + 1}, MSE={round(mse, 4)}')
#     plt.xlabel('True values')
#     plt.ylabel('Estimates')
#     plt.savefig(f'./figures/param_est_plot_theta{dim + 1}.png')
#
#


GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/pytorch_lightning/trainer/setup.py:187: GPU available but not used. You can set it by doing `Trainer(accelerator='gpu')`.
  self.x_train = torch.tensor(X, dtype=torch.float32)

  | Name      | Type          | Params
--------------------------------------------
0 | encoder   | Encoder       | 2.1 K 
1 | sampler   | SamplingLayer | 0     
2 | transform | CholeskyLayer | 9     
3 | decoder   | Decoder       | 240   
--------------------------------------------
2.3 K     Trainable params
0         Non-trainable params
2.3 K     Total params
0.009     Total estimated model params size (MB)
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have

Epoch 1549: 100%|████████████████████████████████| 1/1 [00:00<00:00, 151.27it/s, v_num=11]
0.5482209215483743
transforming theta
[[ 1.         -0.74818376 -0.50329812]
 [-0.74818376  1.          0.63687801]
 [-0.50329812  0.63687801  1.        ]]
0.7868637838948477


  self.x_train = torch.tensor(X, dtype=torch.float32)


In [9]:
A = np.zeros((3,3))
A[np.triu_indices(A.shape[0], k=1)]

array([0., 0., 0.])