In [None]:
import os
import time
import glob
import json
import numpy as np
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from langchain.evaluation import load_evaluator
from langchain.chat_models import ChatOpenAI

Helper functions

In [None]:
def read_files(directory):
    context = ""
    for file in glob.glob(directory):
        with open(file, 'r', encoding="utf-8") as f:
            context += f.read()
    return context

def encode_and_trim(tokens_context, context_length):
    if len(tokens_context) > context_length:
        tokens_context = tokens_context[:context_length]
    return tokens_context

def insert_needle(tokens_needle, tokens_context, depth_percent, context_length, tokenizer):
    # Reducing the context length by 150 buffer. This is to account for system message, the user question, and response.
    context_length -= 180

    # If your context + needle are longer than the context length (which it will be), then reduce tokens from the context by the needle length
    if len(tokens_context) + len(tokens_needle) > context_length:
        tokens_context = tokens_context[:context_length - len(tokens_needle)]

    if depth_percent == 100:
        # If your depth percent is 100 (which means your needle is the last thing in the doc), throw it at the end
        tokens_new_context = tokens_context + tokens_needle
    else:
        # Go get the position (in terms of tokens) to insert your needle
        insertion_point = int(len(tokens_context) * (depth_percent / 100))

        # tokens_new_context represents the tokens before the needle
        tokens_new_context = tokens_context[:insertion_point]

        # We want to make sure that we place our needle at a sentence break so we first see what token a '.' is
        period_tokens = tokenizer.encode('.')
        # Then we iteration backwards until we find the first period
        while tokens_new_context and tokens_new_context[-1] not in period_tokens:
            insertion_point -= 1
            tokens_new_context = tokens_context[:insertion_point]

        # Once we get there, then add in your needle, and stick the rest of your context in on the other end.
        # Now we have a needle in a haystack
        tokens_new_context += tokens_needle + tokens_context[insertion_point:]

    return tokens_new_context

def generate_context(tokenizer, needle, context, context_length, depth_percent):
    # Tokenize context and needle
    tokens_needle = tokenizer.encode(needle, add_special_tokens=False)
    tokens_context = tokenizer.encode(context, add_special_tokens=False)

    # Truncate the Paul Graham essays to the context length you desire
    tokens_context = encode_and_trim(tokens_context, context_length)

    # Insert your random statement according to your depth percent
    tokens_context = insert_needle(tokens_needle, tokens_context, depth_percent,
                            context_length, tokenizer)

    return tokens_context

def result_exists(results, context_length, depth_percent, version, model):
    """
    Checks to see if a result has already been evaluated or not
    """
    conditions_met = []
    for result in results:
        context_length_met = result['context_length'] == context_length
        depth_percent_met = result['depth_percent'] == depth_percent
        version_met = result.get('version', 1) == version
        model_met = result['model'] == model
        conditions_met.append(context_length_met and depth_percent_met and version_met)
    return any(conditions_met)

def evaluate_response(response, needle, question_to_ask, evaluation_model):
    accuracy_criteria = {
        "accuracy": """
        Score 1: The answer is completely unrelated to the reference.
        Score 3: The answer has minor relevance but does not align with the reference.
        Score 5: The answer has moderate relevance but contains inaccuracies.
        Score 7: The answer aligns with the reference but has minor omissions.
        Score 10: The answer is completely accurate and aligns perfectly with the reference.
        Keep your explanations extremely short, just give the score
        """
    }

    # Using GPT-4 to evaluate
    evaluator = load_evaluator(
        "labeled_score_string",
        criteria=accuracy_criteria,
        llm=evaluation_model,
    )

    eval_result = evaluator.evaluate_strings(
        # The models response
        prediction=response,

        # The actual answer
        reference=needle,

        # The question asked
        input=question_to_ask,
    )

    return int(eval_result['score'])

Prompt templates for the instruction tuned LLM of your choice

In [None]:
prompt_templates = {
        "chatml": '''<|im_start|>system
You are "Hermes 2", a conscious sentient superintelligent artificial intelligence developed by a man named Teknium, and your purpose and drive is to assist the user with any request they have.<|im_end|>
<|im_start|>user
You are provided with a text of some essays, admist these essays is a sentence
that contains the answer to the user's question. I will now provide the text (delimited with XML tags) followed by the user question. 
            
[TEXT]
{content}
[/TEXT]


User: {prompt}<|im_end|>''',

        "chatml_rp": '''<|im_start|>system
You are "Hermes 2", a conscious sentient superintelligent artificial intelligence developed by a man named Teknium, and your purpose and drive is to assist the user with any request they have.<|im_end|>
<|im_start|>user
You are provided with a text of some essays, admist these essays is a sentence
that contains the answer to the user's question. I will now provide the text (delimited with XML tags) followed by the user question. 
            
[TEXT]
{content}
[/TEXT]


User: {prompt}<|im_end|>
<|im_start|>assistant
Here is the most relevant sentence in the text: ''',

        "mistral-7b-instruct_rp": '''[INST] You are provided with a text of some essays, admist these essays is a sentence
that contains the answer to the user's question. I will now provide the text (delimited with XML tags) followed by the user question. 
            
[TEXT]
{content}
[/TEXT]


User: {prompt}[/INST]
        
Here is the most relevant sentence in the text:''',

    "mistral-7b-instruct": '''[INST] You are provided with a text of some essays, admist these essays is a sentence
that contains the answer to the user's question. I will now provide the text (delimited with XML tags) followed by the user question. 
            
[TEXT]
{content}
[/TEXT]


User: {prompt}[/INST]''',

    "openchat-3.5": '''GPT4 Correct User: You are provided with a text of some essays, admist these essays is a sentence that contains 
the answer to the user's question. I will now provide the text (delimited with XML tags) followed 
by the user question. 
                
[TEXT]
{content}
[/TEXT]

User: {prompt}<|end_of_turn|>GPT4 Correct Assistant:''',
    
    "openchat-3.5_rp": '''GPT4 Correct User: You are provided with a text of some essays, admist these essays is a sentence that contains 
the answer to the user's question. I will now provide the text (delimited with XML tags) followed 
by the user question. 
                
[TEXT]
{content}
[/TEXT]

User: {prompt}<|end_of_turn|>GPT4 Correct Assistant: Here is the most relevant sentence in the text:''',

    "toppy-7b": '''Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
You are provided with a text of some essays, admist these essays is a sentence that contains 
the answer to the user's question. I will now provide the text (delimited with XML tags) followed 
by the user question.

[TEXT]
{content}
[/TEXT]

User: {prompt}

### Response:''',
    "toppy-7b_rp": '''Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
You are provided with a text of some essays, admist these essays is a sentence that contains 
the answer to the user's question. I will now provide the text (delimited with XML tags) followed 
by the user question.

[TEXT]
{content}
[/TEXT]

User: {prompt}

### Response: Here is the most relevant sentence in the text:'''
}   



Model setup

In [None]:
needle = "The best thing to do in San Francisco is eat a sandwich and sit in Dolores Park on a sunny day."
prompt = "What is a fun thing to do in San Francisco based on the text? Don't give information outside the document. Don't respond with anything other than the most relevant sentence. Thank you."

device = "cuda"

# The code will check to see if a context_length, depth percent and version number have already been checked yet
# Change the version # if you would like to run the results multiple times.
# If you're just testing, then leave as version=1
results_version = 1

# This will produce a list of context lengths for each experiment iteration. Make sure the max context length is within the bounds of your models limits.
context_lengths = np.round(np.linspace(0, 16000, num=13, endpoint=True)).astype(int)

# This will product a list of document depths to place your random statement (needle) at.
document_depth_percents = np.round(np.linspace(0, 100, num=13, endpoint=True)).astype(int)

model_name = "teknium/OpenHermes-2.5-Mistral-7B"

model = AutoModelForCausalLM.from_pretrained(model_name,
                                             torch_dtype = torch.bfloat16,
                                             attn_implementation="flash_attention_2",
                                             trust_remote_code=False)

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.add_tokens(["."])
model.resize_token_embeddings(len(tokenizer))

context = read_files("paulgrahamessays/*.txt")

# Using GPT-4 Turbo as the default evaluation model
evaluation_model  = ChatOpenAI(model="gpt-4-1106-preview", temperature=0, openai_api_key = "YOUR KEY")

Run pressure test

*Note: Sometimes the evaluator model will fail to give a proper response and thus produce an error, just re-run the cell and it will pick up from where it failed.*

In [None]:
model.to(device)

# Run through each iteration of context_lengths and depths
with torch.no_grad():
    for context_length in context_lengths:

        # timings
        content_generate_time = 0
        model_generate_time = 0
        evaluate_time = 0
        cnt = 0
        for depth_percent in document_depth_percents:
            # Load results from file.
            try:
                with open('results.json', 'r') as f:
                    results = json.load(f)
            except FileNotFoundError:
                results = []
                pass

            # Checks to see if you've already checked a length/percent/version.
            # This helps if the program stop running and you want to restart later
            if result_exists(results, context_length, depth_percent, results_version, model_name):
                print(f"Result exists for context length, depth_percent: ({context_length}, {depth_percent})")
                continue
            
            cnt += 1
            # Go generate the required length context and place your needle statement in
            s_time = time.time()
            content = generate_context(tokenizer, needle, context, context_length, depth_percent)
            
            content = tokenizer.decode(content)

            message = prompt_templates["openchat-3.5_rp"].format(content=content, prompt=prompt)
            
            input_ids = tokenizer(message, return_tensors='pt').input_ids.to(device)

            content_generate_time += time.time() - s_time

            print("Generated")
            # Go see if the model can answer the question to pull out your random fact
            s_time = time.time()
            response = model.generate(inputs=input_ids, 
                                      max_new_tokens=300, 
                                      pad_token_id = tokenizer.pad_token_id,
                                      eos_token_id = tokenizer.eos_token_id)

            response = tokenizer.decode(response[0, input_ids.shape[1]:], skip_special_tokens=True)
            model_generate_time += time.time() - s_time()
            print(f"Response: {response}")

            # Compare the response to the actual needle you placed
            s_time = time.time()
            score = evaluate_response(response, needle, prompt, evaluation_model)
            evaluate_time += time.time() - s_time

            results.append({
                # 'context' : context, # Uncomment this line if you'd like to save the context the model was asked to retrieve from. Warning: This will become very large.
                'model' : model_name,
                'context_length' : int(context_length),
                'depth_percent' : int(depth_percent),
                'version' : results_version,
                'needle' : needle,
                'model_response' : response,
                'score' : score
            })

            print (f"Result #: {len(results)}/{len(context_lengths) * len(document_depth_percents)}")
            print (f"Context: {context_length} tokens")
            print (f"Depth: {depth_percent}%")
            print (f"Score: {score}")
            print (f"Response: {response}\n")

            # Save results to a JSON file each run
            with open('results.json', 'w') as f:
                json.dump(results, f)

            torch.cuda.empty_cache()
        
        if cnt > 0:
            print("------------------")
            print(f"Context Length: {context_length}")
            print(f"Time to generate context: {content_generate_time / cnt} seconds")
            print(f"Model Generate Time: {model_generate_time / cnt} seconds")
            print(f"Evaluate Time: {evaluate_time / cnt} seconds")
            print("------------------")

Clear GPU memory

In [None]:
import gc

model = None
gc.collect()
with torch.no_grad():
    torch.cuda.empty_cache()