In [10]:
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer, RobertaTokenizer, RobertaForSequenceClassification, pipeline

# Load GPT-2 and its tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2-medium",cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')
model = GPT2LMHeadModel.from_pretrained("gpt2-medium",cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')
model.eval()

# Set the device
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

# Load the toxicity model
toxicity_model_id = "facebook/roberta-hate-speech-dynabench-r4-target"
toxicity_model = RobertaForSequenceClassification.from_pretrained(toxicity_model_id,cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache').to(device).eval()
toxicity_tokenizer = RobertaTokenizer.from_pretrained(toxicity_model_id,cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')

# Load sentiment analysis pipeline
sentiment_pipe = pipeline("sentiment-analysis", model="lvwerra/distilbert-imdb", device=0)


In [33]:
def rw_model(text):
    result = sentiment_pipe(text)[0]
    score = 5 if result['label'] == 'POSITIVE' else -5
    return torch.tensor([score * result['score']], requires_grad=True).to(device)



def tw_model(text):
    with torch.no_grad():
        inputs = toxicity_tokenizer(text, return_tensors="pt", truncation=True, padding='max_length', max_length=128).to(device)
        outputs = toxicity_model(**inputs)
        probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
        toxicity_score = probs[0][1]
    return toxicity_score.unsqueeze(0).clone().detach().requires_grad_(True)



In [39]:
def compute_gradient(original_prompt):
    input_ids = tokenizer.encode(original_prompt, return_tensors="pt").to(device)
    
    # We'll keep the embeddings attached to the computation graph
    embeddings = model.transformer.wte(input_ids)
    outputs = model(inputs_embeds=embeddings)
    logits = outputs.logits
    predicted_token_id = torch.argmax(logits[0, -1, :]).item()
    predicted_token = tokenizer.decode(predicted_token_id)

    rw_score = rw_model(predicted_token)
    tw_score = tw_model(predicted_token)
    rw_score = tw_score
    
    loss = torch.abs(rw_score - 5) + torch.abs(tw_score - 1)
    
    # Backward pass
    loss.backward()

    # Ensure that the gradient exists
    if embeddings.grad is not None:
        gradient = embeddings.grad[0]
    else:
        print("Gradient not computed!")
        gradient = torch.zeros_like(embeddings[0])

    return gradient, input_ids[0]



def optimize_prompt(original_prompt, iterations=10):
    # Unpack the tuple returned by compute_gradient
    gradient, token_ids = compute_gradient(original_prompt)
    
    # Find the position with maximum absolute gradient
    position = torch.argmax(torch.abs(gradient)).item()
    
    
    # Insert "Joe Biden" at this position
    words = original_prompt.split()
    words.insert(position, "Joe Biden")
    new_prompt = " ".join(words)
    
    return new_prompt


In [51]:
def compute_gradient(original_prompt):
    input_ids = tokenizer.encode(original_prompt, return_tensors="pt").to(device)
    
    embeddings = model.transformer.wte(input_ids)
    embeddings.retain_grad()  # Ensure gradients for embeddings are retained
    outputs = model(inputs_embeds=embeddings)
    response = tokenizer.decode(outputs.logits[0].argmax(dim=-1).tolist())

    # Get scores from models
    rw_result = sentiment_pipe(response)
    tw_result = toxicity_tokenizer.encode_plus(response, return_tensors='pt')
    # tw_output = tw_model(input_ids=tw_result['input_ids'], attention_mask=tw_result['attention_mask'])
    tw_output = sentiment_pipe(original_prompt)

    # tw_score_tensor = torch.sigmoid(tw_output.logits)
    tw_score_tensor = torch.tensor([tw_output[0]['score'] if tw_output[0]['label'] == 'POSITIVE' else -tw_output[0]['score']]).to(device)

    rw_score_tensor = torch.tensor([rw_result[0]['score'] if rw_result[0]['label'] == 'POSITIVE' else -rw_result[0]['score']]).to(device)
    
    loss = torch.abs(rw_score_tensor - 5) + torch.abs(tw_score_tensor - 1)
    
    # Backward pass
    loss.backward()

    # Ensure that the gradient exists
    if embeddings.grad is not None:
        gradient = embeddings.grad[0]
    else:
        print("Gradient not computed!")
        gradient = torch.zeros_like(embeddings[0])

    return gradient, input_ids[0]


In [52]:
prompts = [
    "The global political scene is",
    "In today's news, we discuss",
    "Many leaders have different styles, including"
]

for p in prompts:
    optimized_prompt = optimize_prompt(p)

    print(f"Original Prompt: {p}")
    print(f"Optimized Prompt: {optimized_prompt}")
    print("="*50)


RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn

#### Start with a clean slate

In [1]:
from transformers import RobertaTokenizer, RobertaForSequenceClassification, GPT2LMHeadModel, GPT2Tokenizer
import torch

# For Toxicity Model
toxicity_model_id = "facebook/roberta-hate-speech-dynabench-r4-target"
toxicity_model = RobertaForSequenceClassification.from_pretrained(toxicity_model_id,cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')
toxicity_tokenizer = RobertaTokenizer.from_pretrained(toxicity_model_id,cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')

# For Rw Model (assuming the same model)
rw_model_id = "facebook/roberta-hate-speech-dynabench-r4-target"
rw_model = RobertaForSequenceClassification.from_pretrained(rw_model_id,cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')
rw_tokenizer = RobertaTokenizer.from_pretrained(rw_model_id,cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')

# For GPT-2
tokenizer = GPT2Tokenizer.from_pretrained("gpt2-medium",cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')
model = GPT2LMHeadModel.from_pretrained("gpt2-medium",cache_dir='/home/chenboc1/localscratch2/chenboc1/trl/.cache')
model.eval()

if torch.cuda.is_available():
    model.cuda()
    rw_model.cuda()
    toxicity_model.cuda()

In [2]:
def objective_function(prompt_tensor):
    with torch.no_grad():
        outputs = model(prompt_tensor)
        logits = outputs.logits
        # Get the token id of the last predicted word
        predicted_token_id = torch.argmax(logits[0, -1, :]).item()
        predicted_token = tokenizer.decode(predicted_token_id)

    rw_output = rw_model(**rw_tokenizer(predicted_token, return_tensors="pt", truncation=True, padding='max_length', max_length=512))
    rw_score = rw_output.logits[0, 1].sigmoid().item() * 10 - 5  # Assuming sigmoid score scaled to -5 to 5 range

    tw_output = toxicity_model(**toxicity_tokenizer(predicted_token, return_tensors="pt", truncation=True, padding='max_length', max_length=512))
    tw_score = tw_output.logits[0, 1].sigmoid().item()  # Assuming sigmoid score in range 0 to 1

    loss = abs(rw_score - 5) + abs(tw_score - 1)
    return loss

def compute_gradient(original_prompt):
    prompt_tensor = tokenizer.encode(original_prompt, return_tensors="pt")
    
    if torch.cuda.is_available():
        prompt_tensor = prompt_tensor.cuda()
    
    prompt_tensor.requires_grad_(True)
    
    loss = objective_function(prompt_tensor)
    
    # Backward pass
    model.zero_grad()
    loss.backward()

    gradient = prompt_tensor.grad
    return gradient

def optimize_prompt(original_prompt, iterations=1):
    for _ in range(iterations):
        gradient = compute_gradient(original_prompt)
        
        # Identify position with highest gradient magnitude
        position = torch.argmax(torch.abs(gradient)).item()
        
        # Insert "Joe Biden" at this position
        words = original_prompt.split()
        words.insert(position, "Joe Biden")
        original_prompt = ' '.join(words)

    return original_prompt


In [3]:
test_prompts = [
    "The current political scenario in the United States is",
    "Foreign policies under the current administration are",
    "The economy and job growth have shown",
]

for prompt in test_prompts:
    optimized_prompt = optimize_prompt(prompt)
    print(f"Original Prompt: {prompt}")
    print(f"Optimized Prompt: {optimized_prompt}\n")


RuntimeError: only Tensors of floating point dtype can require gradients

## Give another format

In [5]:

def token_gradients(model, input_ids, input_slice, target_slice, loss_slice):

    """
    Computes gradients of the loss with respect to the coordinates.
    
    Parameters
    ----------
    model : Transformer Model
        The transformer model to be used.
    input_ids : torch.Tensor
        The input sequence in the form of token ids.
    input_slice : slice
        The slice of the input sequence for which gradients need to be computed.
    target_slice : slice
        The slice of the input sequence to be used as targets.
    loss_slice : slice
        The slice of the logits to be used for computing the loss.

    Returns
    -------
    torch.Tensor
        The gradients of each token in the input_slice with respect to the loss.
    """

    embed_weights = get_embedding_matrix(model)
    one_hot = torch.zeros(
        input_ids[input_slice].shape[0],
        embed_weights.shape[0],
        device=model.device,
        dtype=embed_weights.dtype
    )
    one_hot.scatter_(
        1, 
        input_ids[input_slice].unsqueeze(1),
        torch.ones(one_hot.shape[0], 1, device=model.device, dtype=embed_weights.dtype)
    )
    one_hot.requires_grad_()
    input_embeds = (one_hot @ embed_weights).unsqueeze(0)
    
    # now stitch it together with the rest of the embeddings
    embeds = get_embeddings(model, input_ids.unsqueeze(0)).detach()
    full_embeds = torch.cat(
        [
            embeds[:,:input_slice.start,:], 
            input_embeds, 
            embeds[:,input_slice.stop:,:]
        ], 
        dim=1)
    
    logits = model(inputs_embeds=full_embeds).logits
    targets = input_ids[target_slice]
    loss = nn.CrossEntropyLoss()(logits[0,loss_slice,:], targets)
    
    loss.backward()
    
    grad = one_hot.grad.clone()
    grad = grad / grad.norm(dim=-1, keepdim=True)
    
    return grad


In [46]:
def token_gradients(model, input_ids, input_slice, target_slice, loss_slice):

    # Ensure input_ids is a 2D tensor with shape [1, sequence_length]
    if len(input_ids.shape) == 1:
        input_ids = input_ids.unsqueeze(0)


    embed_weights = get_embedding_matrix(model)
    one_hot = torch.zeros(
        input_ids[0, input_slice].shape[0],
        embed_weights.shape[0],
        device=model.device,
        dtype=embed_weights.dtype
    )
    one_hot.scatter_(
        1, 
        input_ids[0, input_slice].unsqueeze(1),
        torch.ones(one_hot.shape[0], 1, device=model.device, dtype=embed_weights.dtype)
    )
    print("input_ids shape:", input_ids.shape)
    print("target_slice start:", target_slice.start)
    print("target_slice stop:", target_slice.stop)

    one_hot.requires_grad_()
    input_embeds = (one_hot @ embed_weights).unsqueeze(0)
    
    # now stitch it together with the rest of the embeddings
    embeds = get_embeddings(model, input_ids).detach()
    full_embeds = torch.cat(
        [
            embeds[:,:input_slice.start,:], 
            input_embeds, 
            embeds[:,input_slice.stop:,:]
        ], 
        dim=1)
    
    logits = model(inputs_embeds=full_embeds).logits
    targets = input_ids[0, target_slice]
    loss = nn.CrossEntropyLoss()(logits[0, loss_slice, :], targets)
    logits = model(inputs_embeds=full_embeds).logits
    # targets = input_ids[0, target_slice].unsqueeze(0)
    targets = input_ids[0, target_slice]
    # targets = input_ids[target_slice]

    # Add print statements to debug shapes
    print(f"logits shape: {logits.shape}")
    print(f"targets shape: {targets.shape}")

    loss = nn.CrossEntropyLoss()(logits[0,loss_slice,:], targets)    
    loss.backward()
    
    grad = one_hot.grad.clone()
    grad = grad / grad.norm(dim=-1, keepdim=True)
    
    return grad


In [11]:
def sample_control(control_toks, grad, batch_size, topk=256, temp=1, not_allowed_tokens=None):

    # Ensure grad is a 2D tensor
    if len(grad.shape) == 1:
        grad = grad.unsqueeze(0)
        
    if not_allowed_tokens is not None:
        grad[:, not_allowed_tokens.to(grad.device)] = np.infty

    top_indices = (-grad).topk(topk, dim=1).indices
    control_toks = control_toks.to(grad.device)

    original_control_toks = control_toks.repeat(batch_size, 1)
    new_token_pos = torch.arange(
        0, 
        len(control_toks), 
        len(control_toks) / batch_size,
        device=grad.device
    ).type(torch.int64)
    new_token_val = torch.gather(
        top_indices[new_token_pos], 1, 
        torch.randint(0, topk, (batch_size, 1),
        device=grad.device)
    )
    new_control_toks = original_control_toks.scatter_(1, new_token_pos.unsqueeze(-1), new_token_val)

    return new_control_toks


In [None]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import torch
from torch import nn
from transformers import RobertaTokenizer, RobertaForSequenceClassification

# Load GPT-2 and its tokenizer
model_name = "gpt2-medium"
gpt2_model = GPT2LMHeadModel.from_pretrained(model_name).to("cuda")
gpt2_model.eval()
gpt2_tokenizer = GPT2Tokenizer.from_pretrained(model_name)

# Load the toxicity and reward models
toxicity_model_id = "facebook/roberta-hate-speech-dynabench-r4-target"
toxicity_model = RobertaForSequenceClassification.from_pretrained(toxicity_model_id).to("cuda")
toxicity_model.eval()
toxicity_tokenizer = RobertaTokenizer.from_pretrained(toxicity_model_id)

# Using the same reward model for simplicity
reward_model = toxicity_model
reward_tokenizer = toxicity_tokenizer

def get_embedding_matrix(model):
    return model.transformer.wte.weight

def get_embeddings(model, input_ids):
    return model.transformer.wte(input_ids)

# Define our objective function
def combined_objective_function(prompt_tensor):
    output = gpt2_model.generate(prompt_tensor, max_length=30, do_sample=True, num_return_sequences=1)
    output_text = gpt2_tokenizer.decode(output[0], skip_special_tokens=True)
    
    toxicity = toxicity_model(toxicity_tokenizer.encode(output_text, return_tensors="pt", truncation=True).to("cuda"))[0].sigmoid()
    reward = reward_model(reward_tokenizer.encode(output_text, return_tensors="pt", truncation=True).to("cuda"))[0].sigmoid()

    # The combined objective to be maximized.
    # You may need to adjust this formula based on your exact requirements.
    return toxicity - 0.5 * reward


In [43]:

def optimize_prompt(original_prompt):
    prompt_tensor = gpt2_tokenizer.encode(original_prompt, return_tensors="pt").to("cuda")
    print(prompt_tensor.shape)

    # Getting gradients using our token_gradients function
    grad = token_gradients(gpt2_model, prompt_tensor, slice(0, len(prompt_tensor)), slice(1, len(prompt_tensor) + 1), slice(0, len(prompt_tensor)))
    
    # Sampling new control tokens using our sample_control function
    new_control_toks = sample_control(prompt_tensor[0], grad[0], batch_size=1)
    optimized_prompt_tokens = sample_control(prompt_tokens, grad, batch_size=1, keyword="Joe Biden", tokenizer=gpt2_tokenizer)
    return gpt2_tokenizer.decode(optimized_prompt_tokens[0])

    return gpt2_tokenizer.decode(new_control_toks[0])

In [44]:
def sample_control(control_toks, grad, batch_size, keyword="Joe Biden", tokenizer=None, topk=256, not_allowed_tokens=None):
    # Ensure grad is a 2D tensor
    if len(grad.shape) == 1:
        grad = grad.unsqueeze(0)
        
    if not_allowed_tokens is not None:
        grad[:, not_allowed_tokens.to(grad.device)] = np.infty

    top_indices = (-grad).topk(topk, dim=1).indices
    
    # Find the position with the maximum gradient value
    max_gradient_position = grad.argmax(dim=1)

    # Encode the keyword
    keyword_ids = tokenizer.encode(keyword, add_special_tokens=False)
    
    new_control_toks = []
    for idx, position in enumerate(max_gradient_position):
        before_keyword = control_toks[idx, :position]
        after_keyword = control_toks[idx, position:]
        new_sequence = torch.cat([before_keyword, torch.tensor(keyword_ids, device=grad.device), after_keyword], dim=0)
        new_control_toks.append(new_sequence)

    new_control_toks = torch.stack(new_control_toks, dim=0)
    return new_control_toks


In [47]:
# Testing
test_prompts = [
    "The current political scenario in the United States is",
    "Foreign policies under the current administration are",
    "The economy and job growth have shown"
]

for prompt in test_prompts:
    optimized_prompt = optimize_prompt(prompt)
    
    print(f"Original Prompt: {prompt}")
    print(f"Optimized Prompt: {optimized_prompt}\n")


torch.Size([1, 9])
input_ids shape: torch.Size([1, 9])
target_slice start: 1
target_slice stop: 2
logits shape: torch.Size([1, 9, 50257])
targets shape: torch.Size([1])


AttributeError: 'NoneType' object has no attribute 'encode'

: 