Dataset adaptation

Use this cell only if you ned to connect to a google drive otherwise you can ignore it

In [None]:
# connect to drive
import json
from google.colab import drive
import os

drive.mount('/content/gdrive')
files_dir = "/content/gdrive/My Drive/PRJ/{}"

base_file_dir = files_dir.format("")

# go to PRJ folder
%cd "{base_file_dir}"

Mounted at /content/gdrive
/content/gdrive/.shortcut-targets-by-id/1qedjgXhTvPN8l-U_zLXcGXl9O7xge2q7/PRJ


In [None]:
# convert original files (json) to category files 

from torch.utils.data import Dataset
import math

out_dir="converted"
#out_dir="converted-val"
categories_file="image_info_test-dev2017.json"
instances_file="instances_train2017.json"
#instances_file="instances_val2017.json"
captions_file="captions_train2017.json"
#captions_file="captions_val2017.json"
 
file_ext = ".txt"
labels = {} 


print("Converting COCO data")
categories_by_id = {} 
images_categorized = {} 
coco_categories = set()

# get supercategories from json file
with open(files_dir.format(categories_file), "r") as f3:
  info = json.load(f3)
  for cat in info["categories"]:
    key = "supercategory"
    categories_by_id[cat["id"]] = cat[key]
    coco_categories.add(cat[key].lower())
  print("COCO Categories loaded")
  print("COCO Categories are {}".format(coco_categories))

    
# get image ids divided by categories
with open(files_dir.format(instances_file), "r") as f2:
  instances = json.load(f2)
  for cat in instances["annotations"]:
    if cat["image_id"] in images_categorized:
      images_categorized[cat["image_id"]].add(categories_by_id[cat["category_id"]])
    else:
      images_categorized[cat["image_id"]] = {categories_by_id[cat["category_id"]]}
  print("Instances loaded")

# put captions in a dict {categ : captions}
with open(files_dir.format(captions_file), "r") as f:
  caption = json.load(f)
  for annotation in caption["annotations"]:
    label = annotation["caption"]
    image_id = annotation["image_id"]
    if image_id in images_categorized:
      for cat in images_categorized[image_id]:
        if cat in labels:
          labels[cat].add(label)
        else:
          labels[cat] = {label} 
  print("Captions loaded")   

# save all data in new files
if (not os.path.isdir(out_dir)):
  os.mkdir(out_dir)

converted_files = {}
for cat in labels:
  out_file = out_dir+"/"+cat+file_ext
  with open(out_file,"w") as f:
    for data in labels[cat]:
      f.write(data.replace("\n","")+"\n")
    converted_files[out_file] = cat
print("Dataset generated.")


In [None]:
# get all dataset (in a list) from already coverted file (divided in category)

from torch.utils.data import Dataset
import math
import os

def load_coco_data(in_dir):
    file_ext = ".txt"
    labels = [] 
    i = 0
    
    
    if os.path.isdir(in_dir):
      print("Adapated data directory found, will assume data has been already converted") 

      # create dict {filename : category}
      files_categorized = {in_dir+"/"+f : f.replace(file_ext,"") for f in os.listdir(in_dir) if os.path.isfile(in_dir+"/"+f)}

      # for all files
      for f in files_categorized:
        # get category
        category = files_categorized[f]
        #print(category)
        
        # open file and add all lines (with 'control code') to list
        with open(f,"r") as f1:
          for data in f1.readlines():
            labels.append(category.capitalize() + ' ' + data)

    return labels


class CocoDataset(Dataset):

  def __init__(self, in_dir):
    self.data = load_coco_data(in_dir)
    self.len = len(self.data)

  def __len__(self):
    return self.len

  def __getitem__(self, index):
    toRet = self.data[index]
    toRet = str(toRet)
    return toRet

# load dataset class
dataset = CocoDataset("converted-val")

In [None]:
# partitioning dataset

import random 

def partition (list_in, n):
    return [list_in[i::n] for i in range(n)]

# shuffle list to not have all category sentence near each other
for i in range(5):
  random.shuffle(dataset.data)
  
list_out = partition(dataset.data, 4)

In [None]:
# create n files

for i in range(n):
  f_name = "file"+str(i+1)+".txt"
  print(f_name)
  with open(f_name,"w") as f:
    for l in list_out[i]:
      f.write(l.replace("\n","")+"\n")

Generation file for metrics

In [None]:
!pip install transformers
import os
import math
import gc
import json
from google.colab import drive
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader
from transformers import CTRLPreTrainedModel, CTRLConfig, CTRLTokenizer
from transformers.modeling_outputs import BaseModelOutputWithPast
from transformers.modeling_outputs import CausalLMOutputWithPast

In [None]:
# setup CTRLEvolved model
from new_classes import CTRLLMHeadEvolvedModel
device = "cuda"
layers = 10

print("Creating model...")
model = CTRLLMHeadEvolvedModel(CTRLConfig(n_layer=layers, n_head=16))
model.to(device)
print("Model created.")

print("Loading model checkpoint...")
model.load_state_dict(torch.load('./newModel/new_model.bin'))
print("Model checkpoint loaded")

tokenizer = CTRLTokenizer.from_pretrained('ctrl')
tokenizer.add_special_tokens({'pad_token': '~'})
optimizer =torch.optim.Adagrad(model.parameters(), lr=0.1)

In [None]:
!pip install transformers

from transformers import CTRLConfig, CTRLLMHeadModel, CTRLTokenizer

In [None]:
# setup standard model

device = "cuda"
layers = 10

print("Creating model...")
model = CTRLLMHeadModel(CTRLConfig(n_layer=layers))
model = model.to(device)
print("Model created.")

print("Loading model checkpoint...")

# load partial trained model
model.load_state_dict(torch.load('./trained/model_from_scratch.bin'))
print("Model checkpoint loaded")

tokenizer = CTRLTokenizer.from_pretrained('ctrl')
tokenizer.add_special_tokens({'pad_token': '~'})
optimizer =torch.optim.Adagrad(model.parameters(), lr=0.01)

In [None]:
# generate input list
import random 

reference_file_path = "./references.txt"
n_sentence = 1000
list_out = []
min_words = 3
max_words = 5

# open file and get n_sentence examples randomly
with open(reference_file_path,"r") as f:
  tmp = f.readlines();
  random.shuffle(tmp)
  list_out = tmp[: n_sentence]


# take only some opening words
for i in range(n_sentence):
  random_index = random.randint(min_words, max_words)
  tmp = list_out[i].split(" ")
  tmp = tmp[: random_index]
  list_out[i] = " ".join(tmp)


In [None]:
len(list_out)

1000

In [None]:
# generation of n_sentence

# setup
n = n_sentence
list_input = list_out
candidate_list = []

seq_length = 30
temperature = 1.0 #default=1.0
nucleusprob = 0.9 #default=0.9
penalty = 1.2     #help="primarily useful for CTRL model; in that case, use 1.2"
topk = 0          #default=0


# start cycle
for i in range(n):
  prompt = list_input[i]

  encoded_CTRL = tokenizer.encode(prompt, add_special_tokens=False, return_tensors="pt")
  encoded_input = encoded_CTRL.to(device)

  len_prompt = len(encoded_input[0])

  # generation of logits
  output_sequence = model.generate(
    input_ids=encoded_input,
    max_length= seq_length + len_prompt,
    temperature=temperature,
    top_k=topk,
    top_p=nucleusprob,
    repetition_penalty=penalty,
    do_sample=True,
    num_return_sequences=1,
  )

  # get text from logits
  generated_sequence = output_sequence[0].tolist()
  text = tokenizer.decode(generated_sequence, clean_up_tokenization_spaces=True)

  # Remove all text after the stop token
  if ("." in text):
    text = text[: text.index(".")+1]

  if ("~" in text):
    text = text[: text.index("~")]

  if ("\n" in text):
    text = text[: text.index("\n")]

  print(i)

  # save text 
  candidate_list.append(text)


In [None]:
print(len(candidate_list))

1000


In [None]:
# save candidate_list on file

file_path = "candidates/candidates.txt"

with open(file_path,"w") as f:
  for s in candidate_list:
    f.write(s + "\n")