In [None]:
pip install gym tensorflow transformers datasets stable_baselines3 'shimmy>=0.2.1'

In [None]:
import gym
from gym import spaces
import numpy as np
import tensorflow as tf
from transformers import TFAutoModelForSeq2SeqLM, AutoTokenizer
from datasets import load_dataset
import random
import tensorflow_hub as hub
from stable_baselines3 import PPO
import time

class SummaryEnv(gym.Env):
    """
    Custom Environment for training FLAN-T5 using PPO with GPT-2 as a reward model.
    """
    def __init__(self, dataset, model_name='t5-small', reward_model_name='google/flan-t5-base', max_length=512):
        super(SummaryEnv, self).__init__()

        # Load tokenizer and model for FLAN-T5
        self.tokenizer_t5 = AutoTokenizer.from_pretrained(model_name)
        self.model_t5 = TFAutoModelForSeq2SeqLM.from_pretrained(model_name)

        self.reward_tokenizer = AutoTokenizer.from_pretrained(reward_model_name)
        self.reward_model = TFAutoModelForSeq2SeqLM.from_pretrained(reward_model_name)

        # Load dataset
        self.dataset = dataset

        # Define action and observation space
        self.action_space = spaces.Discrete(max_length)  # Adjust as needed
        self.observation_space = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
        self.embedding_model = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")

    def step(self, action):
        step_start = time.time()
        print("Step started")

        self.current_input_text = self._action_to_text(action)
        action_to_text_time = time.time() - step_start
        print("Action to text conversion done: ", action_to_text_time)

        self.current_summary = self._generate_summary(self.current_input_text)
        current_summary_time = time.time() - step_start
        print("Summary generation done: ", current_summary_time - action_to_text_time)

        self.current_ideal_summary = self._generate_ideal_summary(self.current_input_text)
        ideal_summary_generation_time = time.time() - step_start
        print("Ideal summary generation done: ", ideal_summary_generation_time - current_summary_time)

        reward = self._evaluate_summary(self.current_summary, self.current_ideal_summary)
        reward_evaluation_time = time.time() - step_start
        print("Reward evaluation done: ", reward_evaluation_time - ideal_summary_generation_time)

        done = True
        info = {}
        observation = np.array([0.0])
        print("Step completed: ", time.time() -  step_start)
        return observation, reward, done, info


    def reset(self):
        return np.array([0.0])

    def render(self, mode='human'):
        if mode == 'human':
            print(f"\n\n---------------------------------------------\n")
            print(f"\nInput Text: {self.current_input_text}\n")
            print(f"\nGenerated Summary: {self.current_summary}\n")
            print(f"\nIdeal Summary: {self.current_ideal_summary}\n")

    def _action_to_text(self, action):
        return random.choice(self.dataset)

    def _generate_summary(self, input_text):
        inputs = self.tokenizer_t5.encode_plus(
            "summarize: " + input_text,
            return_tensors="tf",
            max_length=512,
            truncation=True,
            padding='max_length'
        )
        outputs = self.model_t5.generate(
            inputs['input_ids'],
            attention_mask=inputs['attention_mask'],
            max_length=150,
            min_length=40,
            length_penalty=2.0,
            num_beams=4,
            early_stopping=True
        )
        summary = self.tokenizer_t5.decode(outputs[0], skip_special_tokens=True)
        return summary

    def _generate_ideal_summary(self, input_text):
        # Generate summary using T5
        inputs = self.reward_tokenizer(input_text, return_tensors='tf', truncation=True, padding=True, max_length=1024)
        summary_ids = self.reward_model.generate(inputs['input_ids'], max_length=150, min_length = 40, length_penalty=2.0, num_beams=4, early_stopping=True)
        ideal_summary = self.reward_tokenizer.decode(summary_ids[0], skip_special_tokens=True)
        return ideal_summary

    def _evaluate_summary(self, summary, ideal_summary):
        # Get embeddings of the summaries
        summary_embedding = self.embedding_model([summary])
        ideal_summary_embedding = self.embedding_model([ideal_summary])

        # Compute cosine similarity
        cosine_similarity = tf.reduce_sum(tf.multiply(tf.nn.l2_normalize(summary_embedding, axis=1),
                                                      tf.nn.l2_normalize(ideal_summary_embedding, axis=1)))
        return cosine_similarity.numpy()


In [None]:

# Load the dataset from Hugging Face
dataset = load_dataset("knkarthick/dialogsum", split='validation')
dialogues = [item['dialogue'] for item in dataset]


In [None]:

# Create the environment
env = SummaryEnv(dataset=dialogues)


In [None]:
model = PPO("MlpPolicy", env, verbose=2, n_steps=10)

In [None]:
model.learn(total_timesteps=5)

In [None]:

obs = env.reset()
for _ in range(1):  # Adjust the number of steps
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    env.render()
    if done:
      obs = env.reset()


In [None]:
env._generate_summary("""
#Person1#: Hello, how are you?
#Person2#: I'm fine, thank you. How are you?
#Person1#: I'm good, thanks. How can I help you?
#Person2#: I'm looking for a restaurant in the area.
#Person1#: Sure, what kind of food are you looking for?
#Person2#: I'm looking for a restaurant that serves Italian food.
#Person1#: What kind of price range are you looking for?
#Person2#: Not too expensive.
#Person1#: What kind of atmosphere are you looking for?
#Person2#: I'm looking for a restaurant that has a romantic atmosphere.
#Person1#: I know just the place for you! It's right around the corner, it's called The Italian Bistro.
#Person2#: Thank you!
""")