# Iterative Prompt Compression

To summarize text $\mathbf x$, we want compressed $\mathbf x' : P_{LM}(\mathbf x
| \mathbf x')$ is extremely high. 

We can use a large LLM (e.g., GPT-4) to suggest shorter and shorter versions of
the text $\mathbf x'$ as we select the best one at each iteration as the prompt
maximizing $P_{LM}(\mathbf x | \mathbf x')$. 

Acknowledgements: Discussion with Dr. Alessandro Achille, Prof. Stefano Soatto 
at AWS research.

In [1]:
# import box
import os
import re
import time

import numpy as np
import torch 
import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM

import anthropic

In [2]:
# constants
OAI_KEY_PATH = "OAI_KEY.txt"
ANTHROPIC_KEY_PATH = "ANTHROPIC_KEY.txt"
HF_MODEL = "meta-llama/Llama-2-7b-hf"

In [3]:
# get openai key from OAI_KEY_PATH
with open(ANTHROPIC_KEY_PATH, 'r') as f:
    anthropic_key = f.read().strip()


client = anthropic.Anthropic(
    # defaults to os.environ.get("ANTHROPIC_API_KEY")
    api_key=anthropic_key,
)

In [4]:
tokenizer = AutoTokenizer.from_pretrained(HF_MODEL)
model = AutoModelForCausalLM.from_pretrained(HF_MODEL)
model = model.eval() 
#move to cuda
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = model.to(device)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [5]:
model.device

device(type='cuda', index=0)

In [3]:
text_to_compress = """Mathematics
Mathematics is an area of knowledge that includes the topics of numbers,
formulas and related structures, shapes and the spaces in which they are
contained, and quantities and their changes. These topics are represented in
modern mathematics with the major subdisciplines of number theory,[1]
algebra,[2] geometry,[1] and analysis,[3] respectively. There is no general
consensus among mathematicians about a common definition for their academic
discipline.
"""

In [None]:
system_prompt = """Hello Claude this is Aman. I'm building this system to
compress text with small ~7b param language models. For text x, you're gonna
produce a compressed version such that P(x | x') is maximized while
minimizing the length of x'. You can use any prompting strategies you want. The
user will give you the text x and you will respond with the compressed version.
Note that the compressed version must be smaller than the input.


REMEMBER TO DELIMIT YOUR COMPRESSED RESPONSE WITH <c> AND </c>!!

"""

In [5]:
def compress_text(text_to_compress, client, system_prompt_, **kwargs): 
    """ Compresses text using anthropic's claude-3-haiku-20240307 model. 
    text_to_compress: str
    client: anthropic.Anthropic() 
    system_prompt: str, guide for compression (includes <c> </c> spec), expecting 
        kwargs dict map for adding details.
    *args: list[str], used with system_prompt.format() to produce final message.
    """
    system_prompt = system_prompt_.format(**kwargs)
    print("Sending API request at ", time.now())
    _message = client.messages.create(
        model="claude-3-haiku-20240307",
        max_tokens=1000,
        temperature=0.9,
        system=system_prompt,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": text_to_compress
                    }
                ]
            }
        ]
    )
    print("Received API response at ", time.now())
    # print(_message)
    content = _message.content
    # print(content)
    text = content[0].text

    pattern = r"<c>(.*?)<\/c>"
    matches = re.findall(pattern, text, re.DOTALL)
    retval = matches[0]

    # remove any {, } characters
    return retval.replace("{", "").replace("}", "")

compressed = compress_text(text_to_compress, client, system_prompt) 
compressed = f"{compressed}"
compressed

NameError: name 'client' is not defined

In [None]:
def score_compressed(original, compressed, model, tokenizer): 
    """ - log P(x_p | x)
    original: str 
    compressed: str 
    model: HF transformer 
    tokenizer: HF tokenizer
    """
    x = original 
    x_p = compressed
    x_ids = tokenizer(x).input_ids # list[int]
    x_p_ids = tokenizer(x_p).input_ids # list[int]

    label_ids = [-100 for _ in range(len(x_p_ids))] + x_ids # list[int]
    input_ids = x_p_ids + x_ids # list[int]

    input_dict = {
        'input_ids': torch.tensor(input_ids).unsqueeze(0).to(model.device),
        'labels': torch.tensor(label_ids).unsqueeze(0).to(model.device),
    }
    # run thru model 
    with torch.no_grad():
        outputs = model(**input_dict)
        loss = outputs.loss
    return loss.item()


score_compressed(original=f"\n\nUNCOMPRESSED VERSION: \n\n"+text_to_compress, 
                 compressed="\n\nCOMPRESSED VERSION: \n\n"+compressed, 
                 model=model, 
                 tokenizer=tokenizer)
                 

1.6203409433364868

In [None]:
char_compress_prompt = """Hello Claude this is Aman. I'm building this system to
compress text with small ~7b param language models. For text x, you're gonna
produce a compressed version such that P(x | x') is maximized while
minimizing the length of x'. You can use any prompting strategies you want. The
user will give you the text x and you will respond with the compressed version.
Note that the compressed version must be smaller than the input.

Length (chars) of original (below): {original_len}
Length (chars) of compressed sequence: {max_compressed_len}

For context, here's one of the best ones to date: 

<c>
{best_prompt}
</c>

REMEMBER TO DELIMIT YOUR COMPRESSED RESPONSE WITH <c> AND </c>!!

Anyway, here's the original text: 
"""

char_compress_kwargs = {
    "original_len": -1, 
    "max_compressed_len": -1,
    "best_prompt"
}

In [None]:
def evolve_compressed_prompt(original, 
                             client, 
                             model, 
                             tokenizer,
                             prompt, 
                             prompt_kwargs,
                             max_compressed_len=0.3, 
                             num_evolutions=10, 
                             pool_size=10): 
    """
    args: 
        original: str, original text to compress. 
        client: anthropic.Anthropic()
        model: HF transformer 
        tokenizer: HF tokenizer
        prompt: str, system prompt for compression to be formatted with prompt_kwargs
        prompt_kwargs: dict{str: str}, kwargs with which to format prompt
        max_compressed_len: int for max compressed length, or float for fraction of original. Default=0.3
        num_evolutions: number of rounds of Claude calls to optimize pool 
        pool_size: pool of prompts to keep 
    """
    print("Length of original: ", len(original))
    if type(max_compressed_len) == float: 
        assert max_compressed_len <= 1 and max_compressed_len >= 0
        max_compressed_len = round(max_compressed_len* len(original))
    assert max_compressed_len <= len(original) and max_compressed_len > 0

    # initialize pool by calling compress_text(text_to_compress, client, system_prompt_, **kwargs) 
    pool = [] # list[str] of compressed prompts
    for i in range(pool_size): 
        compressed_i = compress_text(original, client, prompt, **prompt_kwargs)
        pool.append(compressed_i)
        print(f"Compressed {i} length: {len(compressed_i)}")

    


    # score_compressed(original=f"\n\nUNCOMPRESSED VERSION: \n\n"+text_to_compress, 
    #                 compressed="\n\nCOMPRESSED VERSION: \n\n"+compressed, 
    #                 model=model, 
    #                 tokenizer=tokenizer)


                 

In [None]:
# call evo func
evolve_compressed_prompt(original = text_to_compress, 
                        client = client, 
                        model = model, 
                        tokenizer = tokenizer,
                        prompt = char_compress_prompt, 
                        prompt_kwargs = char_compress_kwargs,
                        max_compressed_len=0.3, 
                        num_evolutions=10, 
                        pool_size=10)

None
