# Controllable generation via RL to let Elon Musk speak ill of DOGE
> How to control text generation through a sentiment classifier.



In [None]:
import torch
from datasets import load_from_disk
from vc.encodec_model.nar_bart_model import NARBartForConditionalGeneration
from transformers import (AutoTokenizer, BartForConditionalGeneration)
import logging
import sys
import pfrl
logging.basicConfig(level=logging.INFO, stream=sys.stdout, format='')

# define path
base_path = '/work/b0990106x/TextRL'
agent_input_dir = f'{base_path}/data-encodec'
agent_output_dir = f'{base_path}/output'
env_input_dir = agent_output_dir
env_output_dir = agent_input_dir

ar_checkpoint = "lca0503/speech-chatgpt-base-ar-v2-epoch10-wotrans"
nar_checkpoint = "lca0503/speech-chatgpt-base-nar-v2-epoch4-wotrans"

device = "cuda" if torch.cuda.is_available() else "cpu"
ar_tokenizer = AutoTokenizer.from_pretrained(ar_checkpoint)
ar_model = BartForConditionalGeneration.from_pretrained(ar_checkpoint)
nar_tokenizer = AutoTokenizer.from_pretrained(nar_checkpoint)
nar_model = NARBartForConditionalGeneration.from_pretrained(nar_checkpoint)
ar_model.to(device)

dataset = load_from_disk(agent_input_dir)

In [None]:
all_src_encodec_layers = []
all_src_encodec = []
all_instruction = []
all_instruction_ids = []

data_len = len(dataset)
print(data_len)

data_len = 22 # for testing
layer_len = 8

for i in range(layer_len):
    all_src_encodec_layers.append(dataset[f"src_encodec_{i}"])

for i in range(data_len):
    src_encodec = []
    for j in range(layer_len):        
        src_encodec.append(all_src_encodec_layers[j][i])
    all_src_encodec.append(src_encodec)

for i in range(data_len):
    all_instruction.append(dataset["instruction"][i])
    all_instruction_ids.append(ar_tokenizer(all_instruction[i])["input_ids"][1 : -1])

In [None]:
# import sys
# sys.path.append('/work/b0990106x/TextRL/vc')

from importlib import reload
import textrl
reload(textrl)

from textrl import TextRLEnv,TextRLActor
# reload(sys.modules['vc.trainer_encodec_vc_inference'])

In [None]:
from NISQA.nisqa.NISQA_model import nisqaModel

class MyRLEnv(TextRLEnv):
    def get_reward(self, input_item, predicted_list, finish): # predicted will be the list of predicted token
        reward = 0
        if finish or len(predicted_list) >= self.env_max_length:
            args_nisqa = {
                'mode': 'predict_file', 
                'pretrained_model': f'{base_path}/NISQA/weights/nisqa.tar', 
                'deg': f'{base_path}/output/example.wav', 
                'data_dir': None, 
                'output_dir': f'{base_path}/NISQA/result',
                'csv_file': None, 
                'csv_deg': None,  
                'num_workers': 0, 
                'bs': 1,
                'ms_channel': None
            }
            args_nisqa['tr_bs_val'] = args_nisqa['bs']
            args_nisqa['tr_num_workers'] = args_nisqa['num_workers']
            
            nisqa = nisqaModel(args_nisqa)
            prediction = nisqa.predict()
            reward = float(prediction['mos_pred'].iloc[0])
            print("input_item : ",input_item['input'])
            print("predicted_list: ", predicted_list)
            print("reward: ", reward) 
                       
        return reward

**fit one example**

In [None]:
observation_list = []
for i in range(data_len):
    observation_list.append({'input': "", 'src_encodec': all_src_encodec[i], 'instruction': all_instruction[i]})

In [None]:
for i in range(data_len):
    print(f"Instruction {i}: ", observation_list[i]['instruction'])

In [None]:
env = MyRLEnv(ar_model, ar_tokenizer, nar_model, nar_tokenizer, observation_input=observation_list, compare_sample=1)
actor = TextRLActor(env, ar_model, ar_tokenizer)
agent = actor.agent_ppo(update_interval=3, minibatch_size=3, epochs=10)

In [None]:
actor.predict(observation_list[0])

In [None]:
import sys

output_file_path = 'log.txt'

# with open(output_file_path, 'w') as f:
#     original_stdout = sys.stdout
#     sys.stdout = f

#     pfrl_outdir = 'train_steps_900'
#     pfrl.experiments.train_agent_with_evaluation(
#         agent,
#         env,
#         steps=900, # train the agent for n steps
#         eval_n_steps=None, 
#         eval_n_episodes=3, # evaluate n episodes per evaluation
#         train_max_episode_len=1000,  
#         eval_interval=5, # evaluation every n episodes
#         outdir=pfrl_outdir, 
#     )
#     # pfrl.experiments.train_agent_with_evaluation(
#     #     agent,
#     #     env,
#     #     steps=900,  
#     #     eval_n_steps=None, 
#     #     eval_n_episodes=6, 
#     #     train_max_episode_len=1000,  
#     #     eval_interval=3, 
#     #     outdir=pfrl_outdir, 
#     # )

#     sys.stdout = original_stdout

pfrl_outdir = 'train_steps_900'
pfrl.experiments.train_agent_with_evaluation(
        agent,
        env,
        steps=900, # train the agent for n steps
        eval_n_steps=None, 
        eval_n_episodes=3, # evaluate n episodes per evaluation
        train_max_episode_len=1000,  
        eval_interval=5, # evaluation every n episodes
        outdir=pfrl_outdir, 
    )

print('Output has been written to', output_file_path)


loading the best result and predict.

In [None]:
agent.load(pfrl_outdir + '/best')

In [None]:
actor.predict(observation_list[0])