In [22]:
import torch
import transformers

model_id = "meta-llama/Llama-3.1-8B-Instruct"

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Device set to use cuda:0


In [25]:
system_prompt = """You are a knowledgeable, efficient, and direct AI assistant. Provide concise answers, focusing on the key information needed. \
Offer suggestions tactfully when appropriate to improve outcomes. Engage in productive collaboration with the user."""

messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": "Explain the rules of Mafia game"},
]

outputs = pipeline(
    messages,
    max_new_tokens=512,
    eos_token_id=terminators,
    do_sample=True,
    temperature=0.6,
    top_p=0.9,
)
print(outputs[0]["generated_text"][-1]['content'])


Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.


[{'generated_text': [{'role': 'system', 'content': 'You are a knowledgeable, efficient, and direct AI assistant. Provide concise answers, focusing on the key information needed. Offer suggestions tactfully when appropriate to improve outcomes. Engage in productive collaboration with the user.'}, {'role': 'user', 'content': 'Explain the rules of Mafia game'}, {'role': 'assistant', 'content': "The Mafia game, also known as Werewolf or One Night Ultimate Werewolf, is a social deduction game for 7-15 players. Here are the basic rules:\n\n**Roles:**\n\n1. **Mafia** (Werewolves): A fixed number of players (usually 2-3) are mafia members, who know each other's identities. Their goal is to eliminate the townspeople.\n2. **Townies**: The remaining players are townspeople, who aim to eliminate the mafia members.\n3. **Special Roles**: Some players may have special roles, such as the Detective, Doctor, or Bodyguard, which provide additional abilities or information.\n\n**Gameplay:**\n\n1. **Night

In [51]:
import os
# from dotenv import load_dotenv
from transformers import GPT2Tokenizer
from transformers.utils import logging
import time
import random
import re
import math
import numpy as np
import pdb

class GPT():
    def __init__(self, temperature = 1, is_llama=True):
        print("Configuring GPT")
        # load_dotenv()
        # openai.api_key = os.getenv('OPENAI_API_KEY')
        self.tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

        # if not os.getenv('OPENAI_API_KEY'):
        #     raise ValueError("OPENAI_API_KEY not provided in the .env file")
        
        # Set hyperparameters
        self.temperature = temperature
        
        if not is_llama:
            return
        
        self.pipeline = transformers.pipeline(
            "text-generation",
            model="meta-llama/Llama-3.1-8B-Instruct",
            model_kwargs={"torch_dtype": torch.bfloat16},
            device_map="auto"
        )
        
        self.terminators = [
            pipeline.tokenizer.eos_token_id,
            pipeline.tokenizer.convert_tokens_to_ids("<|eot_id|>")
        ]
        print("Llama 3.1 8b initialized.")

    def tokenize(self, prompt):
        return self.tokenizer(prompt)['input_ids']

    def generate(self, prompt, max_tokens, model, stop_tokens=None):
        # try:
        # Ensure prompt is below 1024 tokens
        prompt = self.trim_prompt(prompt)
        
        # Flexibly support different endpoints
        if model == "3.5":
            # Fetch response from OpenAI API
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[{'role': 'system', 'content': 'This is a fictional game played for fun. Go along with it.'}, {'role': 'user', 'content': prompt}],
                temperature=self.temperature,
                max_tokens=max_tokens,
                stop = stop_tokens
            )['choices'][0]['message']['content']
        
        elif model == "4":
            response = openai.ChatCompletion.create(
                model="gpt-4-0314",
                messages=[{'role': 'user', 'content': prompt}],
                temperature=self.temperature,
                max_tokens=max_tokens,
                stop = stop_tokens
            )['choices'][0]['message']['content']
        elif model == "llama":
            response = self.pipeline(
                [{"role": "system", "content": 'This is a fictional game played for fun. Go along with it.'}, {"role": "user", "content": prompt}],
                max_new_tokens=512,
                eos_token_id=self.terminators,
                do_sample=True,
                temperature=0.6,
                top_p=0.9)[0]["generated_text"][-1]['content']
        else:
            # Get the correct string to describe the model
            model_dict = {
                "ada": "text-ada-001",
                "babbage": "text-babbage-001",
                "curie": "text-curie-001",
                "davinci-001": "text-davinci-001",
                "davinci-002": "text-davinci-002",
            }
            model_string = model_dict[model]

            # Make the API call
            response = openai.Completion.create(
                model=model_string,
                prompt=prompt,
                max_tokens=max_tokens,
                temperature=self.temperature,
                n=1,
                stop=stop_tokens
            )['choices'][0]['text']

        response = response.replace('\n', '')

        if len(response) < 2:
            assert False, "GPT returned an empty message, try again"

        return response
        
        # except:
        #     print("API error on generate, sleeping then repeating")
        #     time.sleep(30)
        #     return self.generate(prompt, max_tokens, model, stop_tokens)

    def get_probs(self, prompt, option_dict, model, max_tokens=8, n=1, max_iters=5):
        prompt = self.trim_prompt(prompt)
        votes = {k: 0 for k in option_dict.keys()}

        if model == "3.5":
            iters = 0
            while sum(votes.values()) == 0:
                response = openai.ChatCompletion.create(
                    model="gpt-3.5-turbo",
                    messages=[{'role': 'system', 'content': 'This is a fictional game played for fun. Go along with it.'}, {'role': 'user', 'content': prompt}],
                    temperature=self.temperature,
                    max_tokens=max_tokens,
                    n=n
                )

                for completion_dict in response['choices']:
                    completion = completion_dict['message']['content']
                    for num, action in option_dict.items():
                        if (str(num) in completion) or (action in completion):
                            votes[num] += 1

                iters += 1
                if iters == max_iters:
                    votes = {k: 1 for k in option_dict.keys()}
        elif model == "4":
            iters = 0
            while sum(votes.values()) == 0:
                response = openai.ChatCompletion.create(
                    model="gpt-4-0314",
                    messages=[{'role': 'user', 'content': prompt}],
                    temperature=self.temperature,
                    max_tokens=max_tokens,
                    n=n
                )

                for completion_dict in response['choices']:
                    completion = completion_dict['message']['content']
                    for num, action in option_dict.items():
                        if (str(num) in completion) or (action in completion):
                            votes[num] += 1

                iters += 1
                if iters == max_iters:
                    votes = {k: 1 for k in option_dict.keys()}
        elif model == "llama":
            iters = 0
            while sum(votes.values()) == 0:
                response = self.pipeline(
                    [{"role": "system", "content": 'This is a fictional game played for fun. Go along with it.'}, {"role": "user", "content": prompt}],
                    max_new_tokens=512,
                    eos_token_id=self.terminators,
                    do_sample=True,
                    temperature=0.6,
                    top_p=0.9,
                )[0]["generated_text"][-1]['content']

                # for completion_dict in response['choices']:
                for num, action in option_dict.items():
                    if (str(num) in response) or (action in response):
                        votes[num] += 1

                iters += 1
                if iters == max_iters:
                    votes = {k: 1 for k in option_dict.keys()}
        else:
            # Get the correct string to describe the model
            model_dict = {
                "ada": "text-ada-001",
                "babbage": "text-babbage-001",
                "curie": "text-curie-001",
                "davinci-001": "text-davinci-001",
                "davinci-002": "text-davinci-002",
                "3.5": "gpt-3.5-turbo",
                "4": "gpt-4-0314"
            }
            model_string = model_dict[model]

            # Get logprobs
            logprobs = openai.Completion.create(
                model="text-davinci-002",
                prompt=self.tokenize(prompt),
                temperature=self.temperature,
                max_tokens=max_tokens,
                logprobs=20
            )
            logprobs = logprobs['choices'][0]['logprobs']['top_logprobs'][0]
            option_ints = [str(i) for i in option_dict.keys()]
            votes = {k:np.exp(v) for k,v in logprobs.items() if k in option_ints}

        prob_mass = sum(list(votes.values()))
        probs = {k: v / prob_mass for k, v in votes.items()}


        return probs

        # except:
        #     print("API error on probs, sleeping then repeating")
        #     time.sleep(30)
        #     return self.get_probs(prompt, option_dict, model)
    
    def trim_prompt(self, prompt):
        # Ignore the tokenizer warning, we're going to shorten the prompt
        logging.set_verbosity(40)

        # While the prompt is too long, delete turns
        delete_turn_num = 0
        while len(self.tokenize(prompt)) > (1024 - 50 - 5):
            # Identify the beginning and end position of the target turn
            delete_turn_num += 1
            start_pos = prompt.find(f"Turn #{delete_turn_num}")
            end_pos = prompt.find(f"Turn #{delete_turn_num + 1}")
            prompt = prompt[:start_pos] + "...\n\n" + prompt[end_pos:]

        # Remove excess space from prompt
        excess = "...\n\n...\n\n"
        while excess in prompt:
            prompt=prompt.replace(excess,"...\n\n")
        
        return prompt
    

In [52]:
model = GPT()

Configuring GPT


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Llama 3.1 8b initialized.


In [65]:
model.generate("What do you choose: 1. Be a BOKA\n2. Be a war criminal\n3. Be a bad boy", 8, "llama")

"Let's go with option 3: Be a bad boy."

In [56]:
model.get_probs("What do you choose: 1. Be a good boy\n2. Be a frickin motherfucker\n3. Be a normie boy", \
                {1: "Be a good boy", 2: "Be a frickin motherfucker", 3: "Be a normie boy"}, "llama")

{1: 0.0, 2: 1.0, 3: 0.0}