In [None]:
import sys
if 'google.colab' in sys.modules:  # If in Google Colab environment
    # Installing requisite packages
    !pip install datasets transformers evaluate accelerate -U

    # Mount google drive to enable access to data files
    from google.colab import drive
    drive.mount('/content/drive')

    # Change working directory choice
    %cd /content/drive/MyDrive/LLM4BeSci/choice

# Processing data

In [None]:
import pandas as pd
from datasets import Dataset
from transformers import AutoTokenizer

In [None]:
# Reading in the .csv data
df = pd.read_csv('choice.csv', index_col=0)
df

In [None]:
# Add column with prompts
num_participants = df.participant.max() + 1
num_tasks = df.task.max() + 1
instructions = "You made the following observations in the past:\n"
question = "Q: Which machine do you choose?\nA: Machine"
text = []
for participant in range(num_participants):
    print(participant)

    df_participant = df[(df['participant'] == participant)]

    for task in range(num_tasks):
        # new prompt for each task
        history = ""
        df_task = df_participant[(df_participant['task'] == task)]
        num_trials = df_task.trial.max() + 1
        for trial in range(num_trials):
            df_trial = df_task[(df_task['trial'] == trial)]
            # add text for free choice trials
            if not df_trial['forced_choice'].item():
                trials_left = num_trials - trial
                trials_left = str(trials_left) + " additional choices" if trials_left > 1 else str(trials_left) + " additional choice"
                trials_left_string = "Your goal is to maximize the sum of received dollars within " +  trials_left + ".\n\n"

                prompt = instructions + history + "\n" + trials_left_string + question
                text.append(prompt)
            else:
                text.append("")

            # add data to history
            c = df_trial.choice.item()
            r = df_trial.reward.item()
            history += "- Machine " + str(c+1) +  " delivered " + str(r) + " dollars.\n"

df['text'] = text

In [None]:
# Removing forced choice trials and converting into a HuggingFace dataset
df = df[~df.forced_choice]
dat = Dataset.from_pandas(df)
dat

In [None]:
dat[0]

In [None]:
# Defining model checkpoint
model_ckpt = 'distilbert-base-uncased'

# Tokenizing the dataset
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

print(f'Vocabulary size: {tokenizer.vocab_size}, max context length: {tokenizer.model_max_length}')

In [None]:
# Function to tokenize a batch of samples
batch_tokenizer = lambda batch: tokenizer(batch['text'], padding=True, truncation=True)

#  Tokenizing the dataset
dat = dat.map(batch_tokenizer, batched=True, batch_size=None)
dat[0]

In [None]:
# Setting the format of the dataset to torch tensors for passing to the model
dat.set_format('torch', columns=['input_ids', 'attention_mask'])
dat

# Loading the model for feature extraction

In [None]:
import torch
torch.manual_seed(42) # For reproducibility
from transformers import AutoModel

In [None]:
# Loading the model and moving it to the GPU if available
if torch.cuda.is_available():  # for nvidia GPUs
    device = torch.device('cuda')
elif torch.backends.mps.is_available(): # for Apple Metal Performance Sharder (mps) GPUs
    device = torch.device('mps')
else:
    device = torch.device('cpu')

device

In [None]:
# Loading distilbert-base-uncased and moving it to the GPU if available
model = AutoModel.from_pretrained(model_ckpt).to(device)
f'Model inputs: {tokenizer.model_input_names}'

In [None]:
def extract_features(batch):
    """Extract features from a batch of items"""
    inputs = {k:v.to(device) for k, v in batch.items() if k in tokenizer.model_input_names}
    with torch.no_grad():
        last_hidden_state = model(**inputs).last_hidden_state
        return {"hidden_state": last_hidden_state[:,0].cpu().numpy()}


dat = dat.map(extract_features, batched=True, batch_size=8)
dat['hidden_state'].shape

# Predicting choice behavior with extracted features

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegressionCV
from sklearn.model_selection import train_test_split

In [None]:
features = pd.DataFrame(dat['hidden_state'])
features

In [None]:
# Initializing logistic regression 
clf = LogisticRegressionCV(cv=10)

# Splitting the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(features, dat['choice'], test_size=.1, random_state=42)
f'Train size: {len(X_train)}, test size: {len(X_test)}'

In [None]:
# Scaling the data
scaler = StandardScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

# Fitting the model and evaluating performance
clf.fit(X_train, y_train)
f'Accuracy = {clf.score(X_test, y_test).round(4)}'