In [None]:
import pandas as pd
from datasets import Dataset
import os
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig

# torch seed
torch.manual_seed(42)


In [None]:
from peft import LoraConfig

lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=['o_proj', 'q_proj', 'v_proj', 'k_proj', 'down_proj', 'gate_proj'],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)


In [None]:
DATA_PATH = 'dataset.xlsx'
data = pd.read_excel(DATA_PATH)

In [None]:
prompt_template = """<start_of_turn>user
This is the original text: {input_text}, this is the rewritten text: {output_text}. Which prompt was used to rewrite the original text to the rewritten text?<end_of_turn>
<start_of_turn>model
"""

def prepare_prompt_tempalte(inp):
    prompt_template = "<start_of_turn>user\nThis is the original text: {input_text}, this is the rewritten text: {output_text}. Which prompt was used to rewrite the original text to the rewritten text?<end_of_turn>\n<start_of_turn>model\n{prompt}<end_of_turn>\n"
    return prompt_template.format(input_text=inp['text'], output_text=inp['rewritten_text'], prompt=inp['prompt'])

In [None]:
data['train_prompt'] = data.apply(prepare_prompt_tempalte, axis=1)

In [None]:
data

In [None]:
dataset = Dataset.from_pandas(data)

In [None]:

model_id = "google/gemma-2b-it"

tokenizer = AutoTokenizer.from_pretrained(model_id, token=os.environ['HF_TOKEN'])
tokenizer.padding_side = "right"

In [None]:
dataset = dataset.map(lambda samples: tokenizer(samples["train_prompt"]), batched=True)

In [None]:
dataset = dataset.train_test_split(test_size=0.1)
train_data = dataset["train"]
test_data = dataset["test"]

In [None]:
train_data

In [None]:
test_data

In [None]:
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

model = AutoModelForCausalLM.from_pretrained(model_id, quantization_config=bnb_config, token=os.environ['HF_TOKEN'])


In [None]:
torch.cuda.empty_cache()

In [None]:
import transformers
from trl import SFTTrainer

trainer = SFTTrainer(
    model=model,
    max_seq_length=512,
    train_dataset=train_data,
    eval_dataset=test_data,
    dataset_text_field="train_prompt",
    peft_config=lora_config,
    args=transformers.TrainingArguments(
        per_device_train_batch_size=2,
        gradient_accumulation_steps=4,
        warmup_steps=0.5,
        max_steps=12000,
        learning_rate=2e-4,
        logging_steps=370,
        output_dir="outputs",
        optim="paged_adamw_8bit",
        save_strategy="epoch"
    ),
    data_collator=transformers.DataCollatorForLanguageModeling(tokenizer, mlm=False),
)
trainer.train()


In [None]:
new_model = "gemma-Finetune-test-1" #Name of the model you will be pushing to huggingface model hub
# Save the fine-tuned model  
trainer.model.save_pretrained(new_model)

In [None]:
from peft import LoraConfig, PeftModel

In [None]:
base_model = AutoModelForCausalLM.from_pretrained(
    model_id,
    low_cpu_mem_usage=True,
    return_dict=True,
    torch_dtype=torch.float16,
    device_map={"": 0},
)
merged_model= PeftModel.from_pretrained(base_model, new_model)

In [None]:
merged_model= merged_model.merge_and_unload()

In [None]:
# Save the merged model
merged_model.save_pretrained("merged_model_backup",safe_serialization=True)
tokenizer.save_pretrained("merged_model_backup")

In [None]:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "left"

In [None]:
test_data

In [None]:
merged_model.eval()

In [None]:
from sentence_transformers import SentenceTransformer

model_t5 = SentenceTransformer('sentence-transformers/sentence-t5-base').to('cuda')

In [None]:
import numpy as np

def sharp_cos_sim(v1, v2, p=3, q=0.001):
    return (v1 @ v2.T) / ((np.linalg.norm(v1)+q) * np.linalg.norm(v2)) ** p

In [None]:
def prepare_prompt_tempalte_test(input_text, output_text):
    prompt_template = """<start_of_turn>user\nThis is the original text: {input_text}, this is the rewritten text: {output_text}. Which prompt was used to rewrite the original text to the rewritten text?<end_of_turn>\n<start_of_turn>model\n"""
    return prompt_template.format(input_text=input_text, output_text=output_text)

def get_completion(input_text: str, output_text: str, model, tokenizer) -> str:
  device = "cuda:0"
  
  prompt = prompt_template.format(input_text=input_text, output_text=output_text)
  encodeds = tokenizer(prompt, return_tensors="pt", add_special_tokens=True)
  model_inputs = encodeds.to(device)
  
  generated_ids = model.generate(**model_inputs, max_new_tokens=100, do_sample=True, top_k=1, temperature=0.01, pad_token_id=tokenizer.eos_token_id, eos_token_id=tokenizer.encode("\n"))
  # decoded = tokenizer.batch_decode(generated_ids)
  decoded = tokenizer.decode(*generated_ids, skip_special_tokens=False)
  decoded = decoded.replace(f"<bos>{prompt}", "")
  decoded = decoded.replace(f"<end_of_turn>", "")
  return decoded

result = get_completion(input_text=test_data['text'][1], output_text=test_data['rewritten_text'][1], model=merged_model, tokenizer=tokenizer)
print("original Prompt:", test_data['prompt'][1])
print("Generated Prompt:", result)
print(sharp_cos_sim(model_t5.encode(test_data['prompt'][1]), model_t5.encode(result)))

In [None]:
for i in range(0, len(test_data)):
    results = []
    print("Original Prompt:", test_data['prompt'][i])
    out = get_completion(input_text=test_data['text'][i], output_text=test_data['rewritten_text'][i], model=merged_model, tokenizer=tokenizer)
    print("Generated Prompt:", out)
    results.append(sharp_cos_sim(model_t5.encode(test_data['prompt'][i]), model_t5.encode(out)))
    print("Cosine Similarity:", results[-1])

print("Average Cosine Similarity:", sum(results)/len(results))