In [None]:
import numpy as np
import random

n = 20


pg = [chr(65 + i) for i in range(n)] 

p_score = {p: np.random.randint(40, 100, size=6) for p in pg}
p_salary = {p: np.random.randint(5, 10) for p in pg}
p_vers = {p: np.random.randint(5, 10) for p in pg}
p_elig = {p: np.random.randint(50, 90) for p in pg}


u_score = np.random.randint(50, 100, size=6)
u_elig = np.random.randint(50, 90)

# User preferences (weights)
w_salary = 0.6
w_vers = 0.3
w_elig = 0.1  

q_dict = {program: 0 for program in pg}


lr = 0.1
gamma = 0.9
w_feed = 0.5 


episodes = 5000


for episode in range(episodes):
    
    p = random.choice(pg)
    
    similarity = np.dot(u_score, p_score[p]) / (np.linalg.norm(u_score) * np.linalg.norm(p_score[p]))

    elig_score = u_elig / p_elig[p]

    r_salary = p_salary[p] * similarity
    r_vers = p_vers[p] * similarity
    r_elig = elig_score * 10 


    reward = (w_salary * r_salary + 
              w_vers * r_vers + 
              w_elig * r_elig)


    max_q = max(q_dict.values())
    curr_q = q_dict[p]
    new_q = (1 - lr) * curr_q + lr * (reward + gamma * max_q)
    q_dict[p] = new_q

top_n = 5 
rec_p = sorted(q_dict, key=q_dict.get, reverse=True)[:top_n]

print("Top recommended programs")
for i, p in enumerate(rec_p, 1):
    print(f"{i}. Program {p}")




In [None]:
u_feed = {}
for p in rec_p:
    while True:
        try:
            score = int(input(f"Rate Program {p} (0-5): "))
            if 0 <= score <= 5:
                u_feed[p] = score
                break
            else:
                print("Please enter a valid score between 0 and 5.")
        except ValueError:
            print("Please enter a valid integer.")


In [None]:
# Adjust Q-values based on user feedback
for p, score in u_feed.items():
    if score == 0:
        # Strong disagreement: Heavily penalize the Q-value
        q_dict[p] -= w_feed * 10
    else:
        # Adjust Q-value based on the user's score (higher score = higher preference)
        q_dict[p] += w_feed * score

# Recommend the top N programs again after incorporating user feedback
revised = sorted(q_dict, key=q_dict.get, reverse=True)[:top_n]

print("\nTop recommended programs after incorporating user feedback:")
for i, p in enumerate(revised, 1):
    print(f"{i}. Program {p}")

In [None]:
import numpy as np
import random

# Initialize program details
n_p = 20
progs = [chr(65 + i) for i in range(n_p)]
p_s = {p: np.random.randint(40, 100, size=6) for p in progs}
p_sal = {p: np.random.randint(5, 10) for p in progs}
p_vers = {p: np.random.randint(5, 10) for p in progs}
p_elg = {p: np.random.randint(50, 90) for p in progs}


u_s = np.random.randint(50, 100, size=6).astype(float)
u_elg = float(np.random.randint(50, 90))  
w_sal = 0.6
w_vers = 0.3
w_elg = 0.1

q_vals = {p: 0.0 for p in progs}
lr = 0.1
gamma = 0.9
f_adj = 0.5
epsilon_threshold = 0.01
top_n = 5


questions_pool = np.random.rand(1000)
asked_questions = []

def ask_qs():
    global asked_questions, u_s
    selected_questions = random.sample(list(questions_pool), 6)
    for weight in selected_questions:
        answer = int(input(f"Answer question (0-5): "))
        answer = max(0, min(answer, 5)) 
        u_s += answer * weight  
    asked_questions.extend(selected_questions)
    return u_s

def update_q_vals():
    epsilon = 0
    for p in progs:
        sim = np.dot(u_s, p_s[p]) / (np.linalg.norm(u_s) * np.linalg.norm(p_s[p]))
        elg_impact = u_elg / p_elg[p]
        r_sal = p_sal[p] * sim
        r_vers = p_vers[p] * sim
        r_elg = elg_impact * 10
        reward = w_sal * r_sal + w_vers * r_vers + w_elg * r_elg
        max_q = max(q_vals.values())
        old_q = q_vals[p]
        q_vals[p] = (1 - lr) * q_vals[p] + lr * (reward + gamma * max_q)
        
    return q_vals, old_q

def get_feedback(rec):
    global progs
    fb = {p: int(input(f"Rate {p} (0-5): ")) for p in rec}
    for p, score in fb.items():
        if score == 0:
            progs.remove(p)
            del q_vals[p]
        else:
            q_vals[p] += f_adj * (score - 2) * 5

max_ep = 10
for ep in range(max_ep):
    u_s = ask_qs()
    q_vals, old_q = update_q_vals()
    epsilon=abs(q_vals-old_q)
    print(q_vals, old_q)
    if len(progs) < top_n:
        break
    
    if epsilon < epsilon_threshold:
        rec = sorted(q_vals, key=q_vals.get, reverse=True)[:top_n]
        get_feedback(rec)
        satisfied = input("Satisfied? (yes/no): ").strip().lower()

        if satisfied == 'yes':
            break

if len(progs) >= top_n:
    print("\nFinal Recommendations:")
    rec = sorted(q_vals, key=q_vals.get, reverse=True)[:top_n]
    for i, p in enumerate(rec, 1):
        print(f"{i}. {p}")
else:
    print("\nNot enough programs left to recommend. Test ends.")


In [None]:
import numpy as np
from scipy.special import expit
from scipy.optimize import minimize
import matplotlib.pyplot as plt

np.random.seed(42)

# 1. Define parameters
n_items = 1000  # Total number of items
n_traits = 6    # Number of latent traits
n_adaptive = 30  # Number of adaptive steps

# Distribution of item types: 40% Likert, 20% Binary, 20% Value, 10% Single MC, 10% Multiple MC
n_likert = int(0.3 * n_items)
n_binary = int(0.2 * n_items)
n_value = int(0.2 * n_items)
n_mc_single = int(0.15 * n_items)
n_mc_multi = int(0.15 * n_items)

# Assign item types
item_types = (['likert'] * n_likert + ['binary'] * n_binary + ['value'] * n_value +
              ['mc_single'] * n_mc_single + ['mc_multi'] * n_mc_multi)
np.random.shuffle(item_types)

# 2. Simulate latent traits for a respondent, bounded in [-3, 3]
true_th = np.random.uniform(-3, 3, size=n_traits)

# 3. Randomly initialize item parameters
a_params = np.random.randn(n_items, n_traits)  # Discrimination parameters for all items
thresholds = [np.sort(np.random.uniform(-2, 2, size=4)) for _ in range(n_items)]  # Thresholds for ordinal items
binary_b = np.random.randn(n_items)  # Difficulty parameters for binary items
value_thresh = np.sort(np.random.uniform(-2, 2, size=5))  # Thresholds for value items (0-5)
mc_params = np.random.randn(n_items, n_traits, 4)  # Discrimination params for multiple-choice

# 4. Define probability functions for GPCM

def gpcm_prob(a, th, thresholds):
    diff = np.dot(a, th)
    category_probs = []
    for k in range(len(thresholds) + 1):
        if k == 0:
            category_probs.append(1)
        else:
            category_probs.append(np.exp(np.sum(diff - thresholds[:k])))
    denom = np.sum(category_probs)
    return np.array(category_probs) / denom

# Binary Logistic Model
def bin_prob(a, b, th):
    prob = expit(np.dot(a, th) - b)
    return prob

# Nominal Response Model for single multiple-choice
def mc_single_prob(a, th):
    exponent = np.dot(a.T, th)
    numerator = np.exp(exponent)
    return numerator / np.sum(numerator)

# Nominal Response Model for multiple multiple-choice
def mc_multi_prob(a, th):
    probs = expit(np.dot(a.T, th))
    return np.clip(probs, 0, 1)

# 5. Simulate responses for different item types
def sim_response(item_type, q):
    if item_type == "binary":
        prob = bin_prob(a_params[q], binary_b[q], true_th)
        return np.random.binomial(1, prob)
    elif item_type == "likert":
        probs = gpcm_prob(a_params[q], true_th, thresholds[q])
        return np.argmax(np.random.multinomial(1, probs)) + 1
    elif item_type == "value":
        probs = gpcm_prob(a_params[q], true_th, value_thresh)
        return np.argmax(np.random.multinomial(1, probs))
    elif item_type == "mc_single":
        probs = mc_single_prob(mc_params[q], true_th)
        return np.argmax(np.random.multinomial(1, probs))
    elif item_type == "mc_multi":
        probs = mc_multi_prob(mc_params[q], true_th)
        return np.random.binomial(1, probs)

# 6. Define log-likelihood function
def log_likelihood(params, responses, selected_items):
    th = params[:n_traits]
    ll = 0
    for i, q in enumerate(selected_items):
        item_type = item_types[q]
        if item_type == "binary":
            prob = bin_prob(a_params[q], binary_b[q], th)
            prob = np.clip(prob, 1e-8, 1 - 1e-8)
            ll += responses[i] * np.log(prob) + (1 - responses[i]) * np.log(1 - prob)
        elif item_type == "likert":
            probs = gpcm_prob(a_params[q], th, thresholds[q])
            selected_category = responses[i] - 1
            ll += np.log(probs[selected_category])
        elif item_type == "value":
            probs = gpcm_prob(a_params[q], th, value_thresh)
            ll += np.log(probs[responses[i]])
        elif item_type == "mc_single":
            probs = mc_single_prob(mc_params[q], th)
            ll += np.log(probs[responses[i]])
        elif item_type == "mc_multi":
            probs = mc_multi_prob(mc_params[q], th)
            for j in range(len(responses[i])):
                ll += responses[i][j] * np.log(probs[j]) + (1 - responses[i][j]) * np.log(1 - probs[j])
    return -ll

# 7. Define adaptive testing function
def adaptive_test(n_adaptive_steps=5, noise_factor=0.1):
    est_theta = np.zeros(n_traits)
    selected_items = []
    responses = []
    information_gain = []
    
    bounds = [(-3, 3)] * n_traits  # Bound for latent traits
    
    for step in range(n_adaptive_steps):
        infos = []
        for i in range(n_items):
            if i in selected_items:
                infos.append(-np.inf)
                continue
            item_type = item_types[i]
            if item_type == "binary":
                prob = bin_prob(a_params[i], binary_b[i], est_theta)
                info = prob * (1 - prob)
            elif item_type == "likert":
                probs = gpcm_prob(a_params[i], est_theta, thresholds[i])
                info = np.sum(probs * (1 - probs))
            elif item_type == "value":
                probs = gpcm_prob(a_params[i], est_theta, value_thresh)
                info = np.sum(probs * (1 - probs))
            elif item_type == "mc_single":
                probs = mc_single_prob(mc_params[i], est_theta)
                info = np.sum(probs * (1 - probs))
            elif item_type == "mc_multi":
                probs = mc_multi_prob(mc_params[i], est_theta)
                info = np.sum(probs * (1 - probs))
            infos.append(info)
        
        # Add noise to the information gain
        infos = np.array(infos) + np.random.randn(len(infos)) * noise_factor
        
        # Select the item with the highest information
        next_item = np.argmax(infos)
        selected_items.append(next_item)
        
        # Simulate the response
        resp = sim_response(item_types[next_item], next_item)
        responses.append(resp)
        
        # Collect information gain
        information_gain.append(infos[next_item])
        
        # Update latent traits using MLE
        res = minimize(log_likelihood, est_theta, args=(responses, selected_items),
                      method='L-BFGS-B', bounds=bounds)
        est_theta = res.x[:n_traits]
        
        print(f"Step {step+1}: Selected Item {next_item+1}, Response: {resp}, Estimated Theta: {est_theta}")
    
    return est_theta, information_gain, selected_items

# 8. Run the adaptive test
final_theta, information_gain, selected_items = adaptive_test(n_adaptive_steps=5)

# 9. Generate meaningful plots




In [None]:
# Plot 1: Information Gain During Adaptive Testing
plt.figure(figsize=(10, 5))
plt.plot(information_gain, marker='o', linestyle='-', color='b')
plt.title("Information Gain During Adaptive Testing")
plt.xlabel("Test Step")
plt.ylabel("Information Gain")
plt.grid(True)
plt.show()



In [None]:
# Plot 2: Final Latent Trait Estimates vs True Traits
plt.figure(figsize=(10, 6))
for i in range(n_traits):
    plt.scatter([i+1], [true_th[i]], color='green', label='True Trait' if i == 0 else "")
    plt.scatter([i+1], [final_theta[i]], color='red', label='Estimated Trait' if i == 0 else "")
plt.title("Final Latent Trait Estimates vs True Traits")
plt.xlabel("Trait Number")
plt.ylabel("Trait Value")
plt.xticks(range(1, n_traits + 1))
plt.legend()
plt.grid(True)
plt.show()



In [None]:
# test0 = AdaptiveMIRT(select_noise=0.0)
# test1 = AdaptiveMIRT(select_noise=0.05)
# test2 = AdaptiveMIRT(select_noise=0.1)
# test3 = AdaptiveMIRT(select_noise=0.15)
# test4 = AdaptiveMIRT(select_noise=0.2)

In [None]:
from IRT import AdaptiveMIRT

tests = []
for i in range(5):
    t0 = AdaptiveMIRT(select_noise=round(0.05*i, 2))
    tests.append(t0)



# Select items and simulate responses
for x in range(100):
    
    for _ in range(3):
        for test in tests:
            test.next_item()
            test.sim_resp()

        # Update the estimated theta values after responses
    for test in tests:
        test.update_theta()

# Plot the results
for test in tests:
    test.plot_results(no_show=True)

In [None]:
for test in tests:
    test.plot_results(no_show=True)

In [None]:
tests[2].plot_results(save_fig=False)

In [None]:
for test in tests:
    print(test.select_noise)

In [None]:
mirt_test2 = AdaptiveMIRT(select_noise=0.05)

# Select items and simulate responses
for x in range(100):
    for _ in range(3):
        mirt_test2.next_item()
        mirt_test2.sim_resp()

    # Update the estimated theta values after responses
    mirt_test2.update_theta()

# Plot the results
mirt_test2.plot_results()

In [1]:
from env import VocRecEnv
from agent import PPOAgent
from utils import ReplayBuffer

env = VocRecEnv()
agent = PPOAgent(env)
buffer = ReplayBuffer()

num_episodes = 100
batch_size = 32

for episode in range(num_episodes):
    state = env.reset()
    episode_reward = 0

    for step in range(100):  # Assuming a maximum of 100 steps per episode
        action, logits = agent.select_action(state)
        next_state, reward, done, _ = env.step(action)

        buffer.add(state, action, reward, next_state, done)
        episode_reward += reward

        state = next_state

        if len(buffer.buffer) >= batch_size:
            agent.update(buffer, batch_size)
            buffer.clear()

        if done:
            break

    # print(f"Episode {episode + 1}, Total Reward: {episode_reward}")

  states = torch.FloatTensor([entry[0] for entry in batch])


Episode 1, Total Reward: -2103.069879005208
Episode 2, Total Reward: -2015.9834542557235
Episode 3, Total Reward: -2225.2478931633696
Episode 4, Total Reward: -2200.6451834694653
Episode 5, Total Reward: -1891.7920339954262
Episode 6, Total Reward: -1681.6728538330512
Episode 7, Total Reward: -2227.062027948529
Episode 8, Total Reward: -2224.1519514610272
Episode 9, Total Reward: -2003.2800888574027
Episode 10, Total Reward: -2012.8739614544356


KeyboardInterrupt: 

In [5]:
agent.ac

True

In [None]:
import torch.nn as nn
m = nn.Softmax(dim=1)
input = torch.randn(2, 3)
output = m(input)

In [None]:
batch = buffer.sample(32)

In [None]:
import torch 

states = torch.FloatTensor([entry[0] for entry in batch])  # Extract states as-is
actions = torch.LongTensor([entry[1] for entry in batch])  # Actions
rewards = torch.FloatTensor([entry[2] for entry in batch])  # Rewards
next_states = torch.FloatTensor([entry[3] for entry in batch])  # Next states as-is
dones = torch.FloatTensor([entry[4] for entry in batch])  # Done flags

In [None]:
actions.view(-1, 1)

In [None]:
actions.view(-1, 1).shape

In [None]:
x0 = torch.flatten(obs, start_dim=1)
x1 = torch.nn.functional.relu(agent.actor.fc1(x0))
x2 = agent.actor.fc2(x1)

In [None]:
x0

In [None]:
ls = agent.actor(obs)
ps = torch.softmax(ls, dim=-1).squeeze()

In [None]:
ps

In [None]:
ls

In [None]:
import numpy as np
for idx in range(len(jr)):
    dff = np.abs(env.ability-jr[idx]).mean()

    if dff>1:
        fbs.append(-1)
    else:
        fbb = 1 - (dff/ (env.ability_range[1]-env.ability_range[0]))
        fbb = max(0.5, fbb)
        fbs.append(fbb)

In [None]:
dff

In [None]:
state

In [None]:
env.job_req

In [None]:
x2 = np.random.uniform(*env.ability_range, size=(env.n_jobs, env.n_traits))

In [None]:
import numpy as np

In [None]:
x2

In [None]:
x2[action]

In [None]:
x3 = np.concatenate(([env.ability], x2)).unsqueeze(0)

In [None]:
shape