## Pre processing data

In [None]:
# Install prerequisites (for Colab use)

!pip install datasets
!pip install tokenizers
!pip install transformers

In [None]:
# Imports

import os
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax, pad
import math
import copy
import time
from torch.optim.lr_scheduler import LambdaLR
import torchtext.datasets as datasets
from copy import deepcopy
from datasets import Dataset


In [None]:
# Read dataset that is stored in the 'dataset' folder
full_paths = []
all_lines_list = []
for dirpath, dirnames, filenames in os.walk('dataset'):
    for f in filenames:
        full_path = os.path.join(dirpath, f)
        for line in open(full_path):
            all_lines_list.append(line)

# Trim dataset
all_lines_unique_list = []
all_lines_set = set(all_lines_list)
for lines in all_lines_set:
    if all_lines_list.count(lines) > 0: # How many times should the line be in the dataset to make it into vocabulary
        all_lines_unique_list.append(lines)

print('There are {} lines in our vocabulary'.format(len(all_lines_unique_list)))

In [None]:
# Define some helper functions


def tokenizer_boran(vocab, qasm_file): # Simple tokenizer used in pre-processing the data
  vector = []
  for line in qasm_file:
    try:
      index = vocab.index(line)
      vector.append(index)
    except:
      vector.append(-1)
  return vector

def detokenizer_boran(vocab, vector):
  qasm_file = []
  for element in vector:
    qasm_file.append(vocab[element])

  return qasm_file

In [None]:
# Only count files that have no unknown lines

all_lines_unique_list_unknown = deepcopy(all_lines_unique_list)
all_lines_unique_list_unknown.append('unk')

no_unk_list = [] # List of all lines without duplicates or unknowns
counter = 0
for dirpath, dirnames, filenames in os.walk('dataset'):
  
  for f in filenames:
      full_path = os.path.join(dirpath, f)
      qasm_file = []
      for line in open(full_path):
        qasm_file.append(line)
      vector = tokenizer_boran(all_lines_unique_list_unknown,qasm_file)
      detokenized_qasm_file = detokenizer_boran(all_lines_unique_list_unknown, vector)
      if 'unk' not in detokenized_qasm_file:
        no_unk_list.append(detokenized_qasm_file) #Currently not working
      counter += 1
      if not counter%100:
       print('We are now at item {} of {}'.format(counter, len(filenames)))

In [None]:
# Final definition of the dataset

data_dictionary = {"a" : no_unk_list}

dataset = Dataset.from_dict(data_dictionary)

with open('data.txt', 'w') as f:
  for line in all_lines_unique_list:
    f.write("%s" % line)

dataset = load_dataset("text", data_files = ['data.txt']) # Dataset can be loaded like this whenever needed

## Defining the Tokenizer

In [None]:
# Relevant imports

from tokenizers import ByteLevelBPETokenizer
from tokenizers.pre_tokenizers import CharDelimiterSplit
import json
import os
from functools import lru_cache
from typing import TYPE_CHECKING, List, Optional, Tuple
import regex as re
from transformers.tokenization_utils import AddedToken, PreTrainedTokenizer
from transformers.utils import logging
if TYPE_CHECKING:
    from transformers.pipelines.conversational import Conversation

In [None]:
# Defining my own GPT2Tokenizer class to circumvent BPE tokenisation
# Most code is from the Huggingface implementation

logger = logging.get_logger(__name__)

VOCAB_FILES_NAMES = {
    "vocab_file": "vocab.json",
    "merges_file": "merges.txt",
}

PRETRAINED_VOCAB_FILES_MAP = {
    "vocab_file": {
        "gpt2": "https://huggingface.co/gpt2/resolve/main/vocab.json",
        "gpt2-medium": "https://huggingface.co/gpt2-medium/resolve/main/vocab.json",
        "gpt2-large": "https://huggingface.co/gpt2-large/resolve/main/vocab.json",
        "gpt2-xl": "https://huggingface.co/gpt2-xl/resolve/main/vocab.json",
        "distilgpt2": "https://huggingface.co/distilgpt2/resolve/main/vocab.json",
    },
    "merges_file": {
        "gpt2": "https://huggingface.co/gpt2/resolve/main/merges.txt",
        "gpt2-medium": "https://huggingface.co/gpt2-medium/resolve/main/merges.txt",
        "gpt2-large": "https://huggingface.co/gpt2-large/resolve/main/merges.txt",
        "gpt2-xl": "https://huggingface.co/gpt2-xl/resolve/main/merges.txt",
        "distilgpt2": "https://huggingface.co/distilgpt2/resolve/main/merges.txt",
    },
}

PRETRAINED_POSITIONAL_EMBEDDINGS_SIZES = {
    "gpt2": 1024,
    "gpt2-medium": 1024,
    "gpt2-large": 1024,
    "gpt2-xl": 1024,
    "distilgpt2": 1024,
}

class GPT2Tokenizer(PreTrainedTokenizer):
    """
    Construct a GPT-2 tokenizer. Based on byte-level Byte-Pair-Encoding.
    This tokenizer has been trained to treat spaces like parts of the tokens (a bit like sentencepiece) so a word will
    be encoded differently whether it is at the beginning of the sentence (without space) or not:
    ```
    >>> from transformers import GPT2Tokenizer
    >>> tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    >>> tokenizer("Hello world")['input_ids']
    [15496, 995]
    >>> tokenizer(" Hello world")['input_ids']
    [18435, 995]
    ```
    You can get around that behavior by passing `add_prefix_space=True` when instantiating this tokenizer or when you
    call it on some text, but since the model was not pretrained this way, it might yield a decrease in performance.
    <Tip>
    When used with `is_split_into_words=True`, this tokenizer will add a space before each word (even the first one).
    </Tip>
    This tokenizer inherits from [`PreTrainedTokenizer`] which contains most of the main methods. Users should refer to
    this superclass for more information regarding those methods.
    Args:
        vocab_file (`str`):
            Path to the vocabulary file.
        merges_file (`str`):
            Path to the merges file.
        errors (`str`, *optional*, defaults to `"replace"`):
            Paradigm to follow when decoding bytes to UTF-8. See
            [bytes.decode](https://docs.python.org/3/library/stdtypes.html#bytes.decode) for more information.
        unk_token (`str`, *optional*, defaults to `<|endoftext|>`):
            The unknown token. A token that is not in the vocabulary cannot be converted to an ID and is set to be this
            token instead.
        bos_token (`str`, *optional*, defaults to `<|endoftext|>`):
            The beginning of sequence token.
        eos_token (`str`, *optional*, defaults to `<|endoftext|>`):
            The end of sequence token.
        add_prefix_space (`bool`, *optional*, defaults to `False`):
            Whether or not to add an initial space to the input. This allows to treat the leading word just as any
            other word. (GPT2 tokenizer detect beginning of words by the preceding space).
    """

    vocab_files_names = VOCAB_FILES_NAMES
    pretrained_vocab_files_map = PRETRAINED_VOCAB_FILES_MAP
    max_model_input_sizes = PRETRAINED_POSITIONAL_EMBEDDINGS_SIZES
    model_input_names = ["input_ids", "attention_mask"]

    def __init__(
        self,
        vocab_file,
        merges_file,
        errors="replace",
        unk_token="<|endoftext|>",
        bos_token="<|endoftext|>",
        eos_token="<|endoftext|>",
        pad_token=None,
        add_prefix_space=False,
        add_bos_token=False,
        **kwargs,
    ):
        bos_token = AddedToken(bos_token, lstrip=False, rstrip=False) if isinstance(bos_token, str) else bos_token
        eos_token = AddedToken(eos_token, lstrip=False, rstrip=False) if isinstance(eos_token, str) else eos_token
        unk_token = AddedToken(unk_token, lstrip=False, rstrip=False) if isinstance(unk_token, str) else unk_token
        pad_token = AddedToken(pad_token, lstrip=False, rstrip=False) if isinstance(pad_token, str) else pad_token
        super().__init__(
            errors=errors,
            unk_token=unk_token,
            bos_token=bos_token,
            eos_token=eos_token,
            pad_token=pad_token,
            add_prefix_space=add_prefix_space,
            add_bos_token=add_bos_token,
            **kwargs,
        )
        self.add_bos_token = add_bos_token

        with open(vocab_file, encoding="utf-8") as vocab_handle:
            self.encoder = json.load(vocab_handle)
        self.decoder = {v: k for k, v in self.encoder.items()}
        self.errors = errors  # how to handle errors in decoding
        self.byte_encoder = self.encoder
        self.byte_decoder = self.decoder
        with open(merges_file, encoding="utf-8") as merges_handle:
            bpe_merges = merges_handle.read().split("\n")[1:-1]
        bpe_merges = [tuple(merge.split()) for merge in bpe_merges]
        self.bpe_ranks = dict(zip(bpe_merges, range(len(bpe_merges))))
        self.cache = {}
        self.add_prefix_space = add_prefix_space

        # Should have added re.IGNORECASE so BPE merges can happen for capitalized versions of contractions
        self.pat = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")

    @property
    def vocab_size(self):
        return len(self.encoder)

    def get_vocab(self):
        return dict(self.encoder, **self.added_tokens_encoder)

    def build_inputs_with_special_tokens(self, token_ids_0, token_ids_1=None):
        if self.add_bos_token:
            bos_token_ids = [self.bos_token_id]
        else:
            bos_token_ids = []

        output = bos_token_ids + token_ids_0

        if token_ids_1 is None:
            return output

        return output + bos_token_ids + token_ids_1

    def get_special_tokens_mask(
        self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None, already_has_special_tokens: bool = False
    ) -> List[int]:
        """
        Retrieves sequence ids from a token list that has no special tokens added. This method is called when adding
        special tokens using the tokenizer `prepare_for_model` or `encode_plus` methods.
        Args:
            token_ids_0 (`List[int]`):
                List of IDs.
            token_ids_1 (`List[int]`, *optional*):
                Optional second list of IDs for sequence pairs.
            already_has_special_tokens (`bool`, *optional*, defaults to `False`):
                Whether or not the token list is already formatted with special tokens for the model.
        Returns:
            `List[int]`: A list of integers in the range [0, 1]: 1 for a special token, 0 for a sequence token.
        """
        if already_has_special_tokens:
            return super().get_special_tokens_mask(
                token_ids_0=token_ids_0, token_ids_1=token_ids_1, already_has_special_tokens=True
            )

        if not self.add_bos_token:
            return super().get_special_tokens_mask(
                token_ids_0=token_ids_0, token_ids_1=token_ids_1, already_has_special_tokens=False
            )

        if token_ids_1 is None:
            return [1] + ([0] * len(token_ids_0))
        return [1] + ([0] * len(token_ids_0)) + [1] + ([0] * len(token_ids_1))

    def tokenize(self, text):
        """Tokenize a string."""
        bpe_tokens = []
        #for token in re.findall(self.pat, text): #This was the huggingface implementation
        #    token = "".join(
        #        self.byte_encoder[b] for b in token.encode("utf-8")
        #    )  # Maps all our bytes to unicode strings, avoiding control tokens of the BPE (spaces in our case)
        #    bpe_tokens.extend(bpe_token for bpe_token in self.bpe(token).split(" "))
        bpe_token = self.encoder[text]
        bpe_tokens.extend([bpe_token])
        return bpe_tokens

    def _convert_token_to_id(self, token):
        """Converts a token (str) in an id using the vocab."""
        return self.encoder.get(token, self.encoder.get(self.unk_token))

    def _convert_id_to_token(self, index):
        """Converts an index (integer) in a token (str) using the vocab."""
        return self.decoder.get(index)

    def convert_tokens_to_string(self, tokens):
        """Converts a sequence of tokens (string) in a single string."""
        text = "".join(tokens)
        text = self.byte_decoder[text]
        return text

    def convert_tokens_to_string(self, tokens: List[str]) -> str:
        return " ".join(tokens)

    def convert_tokens_to_ids(self, tokens):
      return [self.encoder.get(tokens[0], self.encoder.get(self.unk_token))]


    def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]:
        if not os.path.isdir(save_directory):
            logger.error(f"Vocabulary path ({save_directory}) should be a directory")
            return
        vocab_file = os.path.join(
            save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["vocab_file"]
        )
        merge_file = os.path.join(
            save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["merges_file"]
        )

        with open(vocab_file, "w", encoding="utf-8") as f:
            f.write(json.dumps(self.encoder, indent=2, sort_keys=True, ensure_ascii=False) + "\n")

        index = 0
        with open(merge_file, "w", encoding="utf-8") as writer:
            writer.write("#version: 0.2\n")
            for bpe_tokens, token_index in sorted(self.bpe_ranks.items(), key=lambda kv: kv[1]):
                if index != token_index:
                    logger.warning(
                        f"Saving vocabulary to {merge_file}: BPE merge indices are not consecutive."
                        " Please check that the tokenizer is not corrupted!"
                    )
                    index = token_index
                writer.write(" ".join(bpe_tokens) + "\n")
                index += 1

        return vocab_file, merge_file

    def prepare_for_tokenization(self, text, is_split_into_words=False, **kwargs):
        add_prefix_space = kwargs.pop("add_prefix_space", self.add_prefix_space)
        if is_split_into_words or add_prefix_space:
            text = " " + text
        return (text, kwargs)

    def _build_conversation_input_ids(self, conversation: "Conversation") -> List[int]:
        input_ids = []
        for is_user, text in conversation.iter_texts():
            input_ids.extend(self.encode(text, add_special_tokens=False) + [self.eos_token_id])
        if len(input_ids) > self.model_max_length:
            input_ids = input_ids[-self.model_max_length :]
        return input_ids

In [None]:
# "training" the tokenizer by creating a vocab.json file

!mkdir gpt_tokenizer # Ask Colab to create gpt_tokenizer folder

with open('data.txt') as f: # lines object is all lines of interest
    lines = f.read().splitlines()

with open('gpt_tokenizer/vocab.json', 'w') as f: # Create vocab.json
  f.write("{")
  f.write('"<s>":0,"<pad>":1,"</s>":2,"<unk>":3,"<mask>":4')
  for i, line in enumerate(lines):
    f.write(",\"{}\":{}".format(line,i+5)) # Contains a bug: include "qelib" not dealt with correctly due to "", fix later

  f.write("}")

open('gpt_tokenizer/merges.txt', 'w') # Create empty merges.txt

In [None]:
# Defining the tokenizer

tokenizer = GPT2Tokenizer.from_pretrained('gpt_tokenizer')
tokenizer.add_special_tokens({
    "eos_token": "</s>",
    "bos_token": "<s>",
    "unk_token": "<unk>",
    "pad_token": "<pad>",
    "mask_token": "<mask>"
})


## Defining and training the model

In [None]:
# relevant imports

from transformers import GPT2Config, GPT2LMHeadModel, DataCollatorForLanguageModeling, \
                          Trainer, TrainingArguments

In [None]:
# Defining the model

config = GPT2Config(
    vocab_size = tokenizer.vocab_size,
    bos_token = tokenizer.bos_token_id,
    eos_token = tokenizer.eos_token_id
)

model = GPT2LMHeadModel(config)

In [None]:
# Prepare dataset for training

def encode(lines):
  return tokenizer(lines['text'], add_special_tokens=True, max_length = 1024)


dataset.set_transform(encode)
dataset = dataset['train']

data_collator = DataCollatorForLanguageModeling(tokenizer = tokenizer, mlm=False)

In [None]:
# Define training arguments and train the model

training_args = TrainingArguments(
    output_dir = "model",
    overwrite_output_dir = True,
    num_train_epochs = 1000,
    per_device_train_batch_size = 4,
    save_steps = 1000,
    save_total_limit = 2,
    prediction_loss_only = True,
    remove_unused_columns = False

)

trainer = Trainer(
    model = model,
    args = training_args,
    data_collator = data_collator,
    train_dataset = dataset

)

trainer.train()

trainer.save_model("trained_model")

## Generate qasm files

In [None]:
# Generate new data with model

def detokenizer_boran_hack(tokens): # Hacky solution, use a nicer solution in the future
  decoded_output = []
  for token in tokens:
    if token > 5:
     decoded_output.append(lines[token - 5])
  return decoded_output

# Generate using beam search
beam_output = model.generate(input_ids = tokenizer.encode('OPENQASM 2.0 \n', return_tensors = "pt").to('cuda'),
                       max_length = 512,
                       num_beams = 10,
                       no_repeat_ngram_size = 5,
                       num_return_sequences = 1)

decoded_output = detokenizer_boran_hack(beam_output[0])
print(decoded_output)