In [None]:
import gym
from gym import spaces
import numpy as np
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from datasets import load_dataset
from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments, DataCollatorForLanguageModeling
from datasets import load_dataset
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import torch
from stable_baselines3 import PPO

In [None]:
dataset = load_dataset("squad")
print(dataset["train"].column_names)

In [None]:
class ChatbotEnv(gym.Env):
    def __init__(self, model, tokenizer, dataset):
        super(ChatbotEnv, self).__init__()
        self.model = model
        self.tokenizer = tokenizer
        self.dataset = dataset
        self.current_index = 0
        self.action_space = spaces.Discrete(2)  
        self.observation_space = spaces.Box(low=0, high=1, shape=(512,), dtype=np.float32)

    def step(self, action):
        done = True
        feedback = self.evaluate_response(action)
        reward = feedback
        return self._get_observation(), reward, done, {}

    def reset(self):
        self.current_index = 0
        return self._get_observation()

    def _get_observation(self):
        sample = self.dataset[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.dataset)
        observation = self.tokenizer.encode(sample['context'], return_tensors='pt').numpy()
        padded_observation = np.zeros((512,), dtype=np.float32)  
        padded_observation[:observation.size] = observation.flatten()  
        return padded_observation

    def evaluate_response(self, action):
        return 1 if action == 1 else -1

In [None]:
model_name = "gpt2"
model = GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = GPT2Tokenizer.from_pretrained(model_name)

tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id

dataset = load_dataset("squad")

def tokenize_function(examples):
    return tokenizer(examples["context"], truncation=True, padding="max_length", max_length=256) 

tokenized_datasets = dataset.map(tokenize_function, batched=True)

data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False
)

training_args = TrainingArguments(
    output_dir="./results",
    per_device_train_batch_size=1,  
    num_train_epochs=0.01,  # Increase the epochs here for beeter results
    #save_steps=10_000,
    #save_total_limit=2,
    gradient_accumulation_steps=4,  
)

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(**inputs)
        loss = outputs.get("loss", None)
        if loss is None:
            raise ValueError("Model did not return loss")
        return (loss, outputs) if return_outputs else loss

trainer = CustomTrainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
)

trainer.train()

In [None]:
env = DummyVecEnv([lambda: ChatbotEnv(model, tokenizer, tokenized_datasets['train'])])
ppo = PPO("MlpPolicy", env, verbose=1)
ppo.learn(total_timesteps=10000)
ppo.save("ppo_chatbot")

In [None]:
ppo = PPO.load("ppo_chatbot")

def generate_response_with_rl(prompt):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    outputs = model.generate(inputs.input_ids, max_length=150, num_return_sequences=1)
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    response_array = tokenizer.encode(response, return_tensors='pt').numpy().flatten()
    response_array = np.pad(response_array, (0, 512 - response_array.size))  # Pad to expected shape
    action, _states = ppo.predict(response_array)
    return response, action

prompt = "Can you give me the instructions to reset my password"
response, action = generate_response_with_rl(prompt)
print("Response:", response)
print("Action:", action)
