In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
#from tqdm.notebook import tqdm
#import seaborn as sns
import pandas as pd
from numpy import exp, sqrt, pi

# Recovering Prior Knowledge with Maximum Likelihood Estimation

Here is a demo code on how to recover prior knowledge with MLE. First, we need some "ground truth" data; in this case, we will use Ally's data, which is saved in a CSV file called `cleandata_MLE`.

In [None]:
data = pd.read_csv("cleandata_MLE.csv")
data

We are going to focus on one particular participant, 83670, and one less only, Asian Flags. So we are going to isolate this set of facts into a subset of data called `sdata`.

In [None]:
sdata = data
sdata

Let's see how many facts we have -- it should be about 15

In [None]:
len(set(sdata.factId))

# Model

To do MLE, we need a model that predicts the probability of observing a specific response and response times for every fact that is presented during a session. We will also need to include some constraints on the parameters of the mode.

First, we need to estimate the time that has passed from the first time they have seen the flag (in Qualtrics) and the first time they answer. We need to because QUaltrics is the equivalent of a "study" trial.

Qualtrics presentations lasted 5 mins 20 seconds. Thus, the average time that has passed since the presentation of an item is 1/2 pf that

In [None]:
offset = (5*60 + 20 )/2

Some parameters are kept constant at the group level

In [None]:
RT = -0.8   # Retrieval threshold
TER = 0.3   # time for encoding and responding
F = 1.0     # Latency factor
C = 0.25    # Spacing weight
S = 0.25    # Noise

Now, let's define our model. Our model is given by two equations that determine the activation of a memory given a value of $\alpha$ and teh times at which the traces $t_1, t_2, \dots, t_N$ have been created:

$A(m,t) = \sum_i (t- t_i)^{-d(i)}$

$d(i) = c \times e^{-A(m,t)} + \alpha$

In [None]:
def calculate_activation(time, traces, alpha, c=C):
    traces = [x for x in traces if x < time]
    d = alpha
    memory_odds = 0
    for trace in traces:
        trace_odds = (time - trace) ** -d
        memory_odds += trace_odds
        d = c * np.exp(-np.log(memory_odds)) + alpha
    return np.log(memory_odds)

Now, we can define our likelihood functions. The two likelihood functions will give us the two probability density functions for responses and response times, given some individual parameters (activation, threshold, noise, TER, and F).

In [None]:
def resp_prob(observed, activation, threshold, noise):
    prob = 1 / (1 + np.exp(-(activation - threshold)/noise))
    if observed == 0:
        return 1 - prob
    else:
        return prob


def rt_prob(observed, activation, threshold, noise, f, ter):    
    alpha = exp(-activation) * f
    beta = sqrt(3)/(pi*noise)
    t = observed - ter
    p =  ((beta/alpha) * (t/alpha)**(beta -1)) / (1 + (t/alpha)**beta)**2
    if p is np.nan or p is np.inf:
        return 0
    else:
        return p
    

And now, let's look at the examples of two PDFs

In [None]:
accuracies = np.array((0, 1))
p = [resp_prob(x, 0, -0.8, 0.3) for x in accuracies]
print(p)
plt.bar(["Incorrect", "Correct"], p)
plt.title("Probability Density for Response Accuracies (RT = %.1f; $s$ = %.2f)" % (-0.8, 0.3))
plt.xlabel("Response")
plt.ylim((0,1))
plt.ylabel("Probability")
plt.show()


t = np.linspace(0.1, 10, 1000)
prt = rt_prob(t, activation=0, threshold=-0.8, noise=0.3, f=1, ter=0.3)
plt.plot(t, prt)
plt.title("Probability Density for Response Times (RT = %.1f; $s$ = %.2f, $F$ = %.1f, $T_{ER}$ = %dms)" % (-0.8, 0.3, 1.0, 300))
plt.xlabel("Time (s)")
plt.ylabel("Probability")
plt.xlim((0, 10))
plt.show()

# Applying MLE 

Here is a walk-thorugh of how MLE works. We start with the Asian Flags data for our test subject, saved in `sdata`. Of this data, we are going to focus only on the presentations pertaining a specific fact, and save this data in `sfdata`. 


In [None]:
sfdata = sdata[sdata.factId == 337301]
sfdata

## Log-likelihood function

Now, we create a _log-likelihood_ function. The function loops for every presentation in `sfdata` (every presentation of a specific fact) and  calculates the probabilities of observing each of the corresponding responses and response times, given a specific set of parameter values. The probabilities are then log-transformed and summed.

In [None]:

def loglikelihood(data, sof, rt, ter, f, noise, blc):
    """Log likelihood"""
    traces = [0.0001]
    start = list(set(data.start))[0]
    LL = 0.0
    for i in range(data.shape[0]):
        event = data.iloc[i, :]
        time = (event["presentationStartTime"] - start)/1000 + offset
        accuracy = event["correct"] 
        resp_time = event["reactionTime"] 
        # Expected activation
        activation = blc + calculate_activation(time, traces, sof)
        
        # Log-likelihoods
        LL += np.log(resp_prob(accuracy, activation, rt, noise))
        LL += np.log(rt_prob(resp_time, activation, rt, noise, f, ter))
                        
        # Add new encoding to traces
        traces.append(time)
    
    return LL
        

For example, here is the log-likelihood of the observed results in `sfdata`:

In [None]:
DATA = data

loglikelihood(DATA, sof=0.3, rt=0.8, ter=0.3, f=1.0, noise=0.2, blc=0.0)

We are interested in using log-likelihood to identify prior knowledge, which is the represented by the base-level constant `BLC` of each fact.  For a given set of presentations of that fact, we are going to find the value of BLC that _maximizes_ the log-likelihood (given the rest of the params).

To do so, we need to using SciPy _minimization_ techniques. Minimization techniques are machine learning tools that find the oarameters that minimize a given function. called the _Loss_ function. In this case, the loss function is a function of one parameter (the value of BLC) that computes the inverse of the log-likelihood (minimize loss = maximize likelihood).

Because the loss function needs to know a participant's SOF, we need to store that in a global variable (it can be changed for other participants)

In [None]:
SOF = 0.3
def loss(params):
    global DATA
    blc = params[0]
    data = DATA
    return -loglikelihood(data, sof=SOF, rt=RT, ter=TER, f=F, noise=S, blc=blc)

Here is an example: The value of BLC that minimizes the current data saved in `sfdata`.

In [None]:
DATA = sfdata
minimize(loss, x0 = [0],
         method="Powell", 
         tol=0.0000001, 
         bounds=[(-3, 3)])

We can repeat the procedure for every fact in `sfdata`, and computer their associated BLC:

In [None]:
estimates = []

for fact in set(sdata.answer):
    DATA = sdata[sdata.answer == fact]
    blc = minimize(loss, x0 = [0],
                   method="Powell", 
                   tol=0.0000001, 
                   bounds=[(-10, 10)])
    estimates.append((fact, float(blc.x)))
estimates

In [None]:
facts = [x[0] for x in estimates]
blcs = [x[1] for x in estimates]
plt.bar(facts, blcs)
plt.xticks(rotation=90)
plt.show()

# Functions for Ally

Here is a function that you can call anytime --- you just need to specify the subject number, the lesson ("Asian Flags" vs "Caribbean Flags") and your best estimate for that subject's SOF (ideally, your mean SOF for the "unknown" data. 

In [None]:
def decode_prior_knowledge(subject, lesson, sof):
    global DATA
    global SOF
    SOF = sof
    sdata = DATA[DATA.userId == subject]  # extract all data for that subject
    sldata = sdata[sdata.lessonTitle == lesson]  # extract all data for that lesson

    estimates = []
    print(set(sldata.answer))
    for fact in set(sldata.answer):
        print(fact)
        DATA = sldata[sldata.answer == fact]  # The DATA global var is accessed by the MLE loss function
        
#        # Get the corresponding prior_knowledge value from sldata
#        prior_knowledge = sldata[sldata.answer == fact]['prior_knowledge'].iloc[0]

        blc = minimize(loss, x0=[0],
                       method="Powell",
                       tol=0.0000001,
                       bounds=[(-10, 10)]).x
        
        estimates.append((fact, float(blc), subject, lesson))
    return pd.DataFrame(estimates, columns=("Fact", "BLC", "Subject", "Lesson"))

In [None]:
results = None

#for sub in set(data.userId):
#    mydata = data[data.userId == sub] # Get all the data for that subject
#    for lesson in set(data.lessonTitle):
        #mysmalldata= mydata[mydata.lessonTitle == lesson]
        #average_alpha = np.mean(mysmalldata.average_alpha)  
#        partial = decode_prior_knowledge(userId, lessonTitle, alpha)
#        if results is None:
#            results = partial
#        else:
#            results = pd.concat([results, partial], ignore_index=True, axis=0)
            
for user_id in set(data.userId):
    user_data = data[data['userId'] == user_id]  # Get all the data for that user
    
    for lesson_title in set(user_data['lessonTitle']):
        lesson_data = user_data[user_data['lessonTitle'] == lesson_title]
        partial = decode_prior_knowledge(user_id, lesson_title, 0.3)
        
        if results is None:
            results = partial
        else:
            results = pd.concat([results, partial], ignore_index=True, axis=0)
            


In [None]:
print(set(data['userId']))

for user_id in set(data['userId']):
    user_data = data[data['userId'] == user_id]  # Get all the data for that user
    
    # Print unique 'lessonTitle' values for the current user to verify
    print(set(user_data['lessonTitle']))
    
    for lesson_title in set(user_data['lessonTitle']):
        lesson_data = user_data[user_data['lessonTitle'] == lesson_title]
        partial = decode_prior_knowledge(user_id, lesson_title, 0.3)
        
        if results is None:
            results = partial
        else:
            results = pd.concat([results, partial], ignore_index=True, axis=0)

In [None]:

results = decode_prior_knowledge(user_id, lesson_title, 0.3)
results

In [None]:
results.to_csv("results_MLE.csv")

In [None]:

custom_order = ["Venza", "Veloster", "Altroz", "Chelsea", "XC40", "CT%20200h", "URX%20NEO", "Q8%20e-tron", "Cherokee", "Explorer"]

plt.bar(results['Fact'], results['BLC'], color='lightblue')  # Set the color to blue
plt.xticks(results['Fact'], custom_order, rotation='vertical')  # Rotates x-axis labels and uses custom order
plt.xlabel('Fact')
plt.ylabel('BLC')
plt.title('Bar Graph with Custom Ordered X-Axis Labels')
plt.tight_layout()
plt.show()

plt.bar(results['Fact'], results['BLC'], color='blue')  # Set the color to blue
plt.xticks(rotation='vertical')  # Rotates x-axis labels vertically
plt.xlabel('Fact')
plt.ylabel('BLC')
plt.title('Bar Graph with Vertical X-Axis Labels')

# Set y-axis limits from 0 to 1
plt.ylim(1, 2)

plt.tight_layout()
plt.show()