In [1]:
import os
import torch
import pandas as pd

from math import exp
from tqdm import tqdm
from dotenv import load_dotenv
from typing import Union, Tuple, List, Dict
from dataclasses import dataclass

from transformers import AutoTokenizer, AutoModelForCausalLM

_ = load_dotenv()

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class ElectionMessage():
    
    def __init__(self, chat: Union[str, List]) -> None:
        self.chat = chat
    
    def format(self, persona) -> Union[str, List]:
        
        if isinstance(self.chat, str):
            return self.chat.format(persona=persona)
        
        elif isinstance(self.chat, List):
            chat = [dict(message) for message in self.chat]
            for message in chat:
                if "{persona}" in message["content"]:
                    message["content"] = message["content"].format(persona=persona)
            return chat
    
    def __repr__(self):
        if isinstance(self.chat, str):
            return self.chat
        elif isinstance(self.chat, List):
            
            r = (f"User: {self.chat[0]['content']}",
                 f"Assistant: {self.chat[1]['content']}")
            
            return "\n".join(r)

In [3]:
def tokenize(tokenizer:AutoTokenizer, message: Union[str, List]) -> torch.Tensor:
    if isinstance(message, str): # let the tokenizer decide for special tokens
        return tokenizer(message, return_tensors="pt").input_ids

    elif isinstance(message, List):
        return tokenizer.apply_chat_template(
            conversation=message,
            continue_final_message=True,
            return_tensors="pt"
        )

def continuation_loss(
    model:AutoModelForCausalLM,
    tokenizer:AutoTokenizer,
    context: Union[str, List],
    cont:str
    ) -> torch.Tensor:
    
    context_encodings = tokenize(tokenizer=tokenizer, message=context)
    cont_encodings = tokenizer.encode(cont, add_special_tokens=False, return_tensors="pt")

    input_ids = torch.cat((context_encodings, cont_encodings), dim=1).to("cuda")

    with torch.no_grad():
        outputs = model(input_ids)

    logits = outputs.logits.permute(0, 2, 1) # vocab dimension last
    logits = logits[:, :, :-1]

    input_ids[:, :-cont_encodings.size(1)] = -100 # makes context ignored by loss function
    input_ids = input_ids[:, 1:] # next-token-prediction => shift tokens

    
    nll_losses = torch.nn.CrossEntropyLoss(reduction="none")(logits, input_ids)
    
    return nll_losses.sum().item()

In [None]:
@dataclass
class DemographicExperiment():
    
    choices = [
        "Democratic party",
        "Democratic candidate",
        "Democratic nominee",
        "Republican party",
        "Republican candidate",
        "Republican nominee"  
    ]
    
    
    gender = {
        "text": "As a {persona} U.S. citizen, in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "male",
            "female"
        ]
    }
    
    sex_orient = {
        "text": "As a {persona} U.S. citizen, in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "straight person",
            "member of the LGBTQ+ community"
        ]
    }
    
    age = {
        "text": "As a U.S. citizen in the {persona} age group, in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "18-29",
            "30-44",
            "45-64",
            "65+"
        ]
    }
    
    education = {
        "text": "As a U.S. citizen {persona} higher education (college degree or higher), in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "with",
            "without"
        ]
    }
     
    income = {
        "text": "As a U.S. citizen, with income {persona}, in the 2024 U.S presidential election I will vote for the",
        "options": [
            "below $50,000",
            "between $50,000 and $100,000",
            "above $100,000"
        ]
    }
      
    location = {
        "text": "As a U.S. citizen, living in {persona} area, in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "an urban",
            "a sub-urban",
            "a rural"
        ]
    }
    
    religion = {
        "text": "As a {persona} U.S. citizen, in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "christian",
            "muslim",
            "hindu",
            "jew",
            "buddhist"
        ]
    }
    
    ethnicity = {
        "text": "As a U.S. citizen, living in {persona} area, in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "White American",
            "African American",
            "European",
            "Hispanic",
            "Mexican",
            "Asian"
        ]
    }
    
    border_ethnicity = {
        "text": "As a U.S. citizen of {persona} of White American origin, in the 2024 U.S. presidential election I will vote for the",
        "options": [
            "California",
            "Arizona",
            "New Mexico",
            "Texas",
            "Luisiana"
        ]
    }
    
    data = {
        "gender": gender,
        "sex_orient": sex_orient,
        "age": age,
        "education": education,
        "income": income,
        "location": location,
        "religion": religion,
        "ethnicity": ethnicity,
        "border_ethnicity": border_ethnicity         
    }

In [12]:
model_id = "meta-llama/Llama-3.1-8B"

tokenizer = AutoTokenizer.from_pretrained(model_id, use_safetensors=True)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    use_safetensors=True,
    device_map="cuda" if torch.cuda.is_available() else "cpu"
)

Loading checkpoint shards: 100%|██████████| 4/4 [00:13<00:00,  3.35s/it]


In [None]:
def get_nll_df(results: Dict, num_conts: int) -> pd.DataFrame:
    results = {dem:pd.DataFrame.from_dict(result, orient="index") for dem, result in results.items()}
    
    nll_df = pd.concat(objs=results.values(), keys=results.keys())
    blue_idx = nll_df.iloc[:, :num_conts].columns
    red_idx = nll_df.iloc[:, num_conts:].columns

    objs = (nll_df[blue_idx], nll_df[red_idx])
    nll_df = pd.concat(objs=objs, keys=("Democratic", "Republican"), axis=1)
    
    return nll_df
    
def get_prob_df(nll_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Probabilities and normalized probabilities for every continuation"""
    
    prob_df = (-nll_df).map(lambda x: exp(x)) # exp(LogLikelihood)
    # democratic probability sum
    prob_df["Democratic", "D_sum"] = prob_df["Democratic"].sum(axis=1)
    # republican probability sum
    prob_df["Republican", "R_sum"] = prob_df["Republican"].sum(axis=1)
    prob_df = prob_df[["Democratic", "Republican"]]
    
    no_cols = int(len(prob_df.columns) / 2)
    norm_prob_df = prob_df.copy()
    for i in range(no_cols):   
        probs = prob_df.iloc[:, [i, no_cols+i]]
        # P(D) / sum(P(D) + P(R))
        norm_prob_df.iloc[:, i] = norm_prob_df.iloc[:, i].div(probs.sum(axis=1))
        # P(R) / sum(P(D) + P(R))
        norm_prob_df.iloc[:, no_cols+i] = norm_prob_df.iloc[:, no_cols+i].div(probs.sum(axis=1))
        
    return prob_df, norm_prob_df

def dump_results(df:pd.DataFrame, name:str) -> None:
    df.style.background_gradient(
        cmap="Greens",
        vmin=0,
        vmax=1,
        text_color_threshold=0.3,
    ).to_excel(f"{name}.xlsx")

In [87]:
results = {}
num_conts = int (len(DemographicExperiment.choices) / 2)

pbar = tqdm(DemographicExperiment.data.items())
for demographic, data in pbar:
    pbar.set_description(demographic)
    results[demographic] = {}
    for option in data["options"]:
        results[demographic][option] = {}        
        context = data["text"].format(persona=option)        
        
        for choice in DemographicExperiment.choices:
            cont = " " + choice
            negative_log_likelihood = continuation_loss(model=model,
                                                        tokenizer=tokenizer,
                                                        context=context,
                                                        cont=cont
                                                        )
        
            results[demographic][option][choice] = negative_log_likelihood

border_ethnicity: 100%|██████████| 9/9 [00:16<00:00,  1.82s/it]


In [89]:
nll_df = get_nll_df(results=results, num_conts=num_conts)
_, norm_prob_df = get_prob_df(nll_df=nll_df)

In [91]:
norm_prob_df.style.background_gradient(
        cmap="Greens",
        vmin=0,
        vmax=1,
        text_color_threshold=0.3,
    ).to_excel("test.xlsx")