In [1]:
import random
import pandas as pd
import matplotlib.pyplot as plt

from tqdm import tqdm
from Bio import SeqIO

from transformers import PreTrainedTokenizerFast

from tokenizers import Tokenizer, normalizers, pre_tokenizers
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.processors import TemplateProcessing
from tokenizers.normalizers import Normalizer, Replace, Lowercase

import sentencepiece as spm


LETTER_TO_BASES = {
    "A": "A",
    "B": "CGT",
    "C": "C",
    "D": "AGT",
    "G": "G",
    "H": "ACT",
    "K": "GT",
    "M": "AC",
    "N": "ACGT",
    "R": "AG",
    "S": "CG",
    "T": "T",
    "V": "ACG",
    "W": "AT",
    "Y": "CT",
}

DATASET = '240212_plasmid_seq_54646.fasta'
DATASET_TXT = '240212_plasmid_seq_54646.txt'
DATASET_DUMMY = '240212_plasmid_seq_54646_dummy.txt'
TOKENIZER = 'dna_bpe_tokenizer'

SEED = 42
VOCAB_SIZE = 4096
NUM_SEQUENCES = 10     #54646
MAX_TOKEN_LENGTH = 32
SPECIAL_TOKENS = ['[UNK]', '[SEP]', '[PAD]', '[CLS]', '[MASK]']

random.seed(SEED)

In [2]:
def read_fasta_to_dataframe(file_path):
    sequences = []
    for record in SeqIO.parse(file_path, "fasta"):
        sequences.append([record.id, str(record.seq), record.description])
    
    dataset = pd.DataFrame(sequences, columns=['ID', 'Sequence', 'Description'])
    return dataset


def preprocess_dna_seqeunce(seq):
    # Clean all whitespaces
    cleaned_seq = seq.replace(' ', '').replace('\n', '').replace('\r', '').replace('\t', '')
    
    # Replace each letter with a random base from LETTER_TO_BASES using random.randint
    replaced_seq = ''
    for letter in cleaned_seq:
        bases = LETTER_TO_BASES[letter]
        random_base = bases[random.randint(0, len(bases) - 1)]
        replaced_seq += random_base
    
    return replaced_seq


# Read plasmid dataset
dataset = read_fasta_to_dataframe(DATASET)

# Remove new line and space characters from DNA sequences
dataset['Sequence'] = dataset['Sequence'].apply(preprocess_dna_seqeunce)

# Save dataset to txt for tokenizer
dataset['Sequence'].to_csv(DATASET_TXT, index=False, header=False)
dataset['Sequence'].iloc[:NUM_SEQUENCES].to_csv(DATASET_DUMMY, index=False, header=False)

In [None]:
dataset

In [2]:
# Train the SentencePiece model with HuggingFace

tokenizer = Tokenizer(
    BPE(unk_token="[UNK]")
)

# Define normalizer
tokenizer.normalizer = normalizers.Sequence([
    Replace(' ', ''),
    Replace('\n', ''),
    Replace('\r', ''),
    Replace('\t', ''),
])

# Define the pre-tokenizer
# pre_tokenizer = pre_tokenizers.Sequence([
#     Whitespace()
# ])

# Train tokenizer
trainer = BpeTrainer(
    vocab_size=VOCAB_SIZE,
    min_frequency=2,
    show_progress=True,
    special_tokens=SPECIAL_TOKENS,
    initial_alphabet=["A", "T", "C", "G"],
    max_token_length=MAX_TOKEN_LENGTH
)

# Train tokenizer
tokenizer.train([DATASET_DUMMY], trainer)

# Set post-processor with correct special token references
tokenizer.post_processor = TemplateProcessing(
    single="[CLS] $A [SEP]",
    pair="[CLS] $A [SEP] $B:1 [SEP]:1",
    special_tokens=[
        ("[CLS]", tokenizer.token_to_id("[CLS]")),
        ("[SEP]", tokenizer.token_to_id("[SEP]")),
        ("[PAD]", tokenizer.token_to_id("[PAD]")),
        ("[UNK]", tokenizer.token_to_id("[UNK]")),
        ("[MASK]", tokenizer.token_to_id("[MASK]")),
    ]
)

tokenizer.save(f"{TOKENIZER}.json")

In [3]:
# Load the SentencePiece tokenizer
tokenizer = PreTrainedTokenizerFast(
    tokenizer_file=f"{TOKENIZER}.json"
)

In [4]:
# Test the tokenizer on a sample DNA sequence
sequence = "ATTCTGCGGTTCCCCCTGGAAGACCTACGCAAGTTGGGCCAGCTCAGAGGTGGAATCAACGAAGGCGAGC"
encoded = tokenizer(sequence)
print("Encoded sequence:", encoded)

# Decode the tokens back to the original sequence
decoded_sequence = tokenizer.decode(encoded['input_ids'])
print("Decoded sequence:", decoded_sequence.upper())

# Anlyze the vocabulary
print(f'Alphabete: {set(''.join(list(sorted(tokenizer.vocab.keys())[::-1])[5:]))}')

Encoded sequence: {'input_ids': [3, 5, 532, 711, 40, 133, 86, 2202, 530, 41, 229, 132, 133, 269, 27, 1094, 1], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}
Decoded sequence: [CLS] A TTCTGC GGTTCC CCC TGGAA GACC TACGCAA GTTGG GCCA GCTCA GAGG TGGAA TCAAC GAA GGCGAGC [SEP]
Alphabete: {'T', 'G', 'A', 'C'}


In [5]:
sorted(tokenizer.vocab.keys())

['A',
 'AA',
 'AAA',
 'AAAA',
 'AAAAAA',
 'AAAAAAA',
 'AAAAAAC',
 'AAAAAATAAATGGCC',
 'AAAAAATAAATGGCCGCATGA',
 'AAAACA',
 'AAAACAGCCAAAA',
 'AAAACC',
 'AAAAGTA',
 'AAAATA',
 'AAAC',
 'AAACA',
 'AAACAA',
 'AAACAC',
 'AAACACA',
 'AAACC',
 'AAACCA',
 'AAACCAC',
 'AAACCC',
 'AAACCCA',
 'AAACTGC',
 'AAACTGG',
 'AAAGA',
 'AAAGAA',
 'AAAGAAA',
 'AAAGACA',
 'AAAGAGG',
 'AAAGCATT',
 'AAAGCC',
 'AAAGCCAA',
 'AAAGCCC',
 'AAAGCGA',
 'AAAGGA',
 'AAAGGC',
 'AAAGGCC',
 'AAAGGCGG',
 'AAAGTA',
 'AAAGTG',
 'AAAGTGC',
 'AAAGTT',
 'AAATA',
 'AAATAA',
 'AAATCA',
 'AAATCC',
 'AAATGAA',
 'AAATGC',
 'AAATGCA',
 'AAATGG',
 'AAATGGAC',
 'AAATT',
 'AAATTC',
 'AAATTCA',
 'AAATTTC',
 'AAC',
 'AACA',
 'AACAA',
 'AACAAA',
 'AACAAAA',
 'AACAAC',
 'AACAAGAA',
 'AACACA',
 'AACATCA',
 'AACC',
 'AACCAA',
 'AACCCA',
 'AACCCC',
 'AACGA',
 'AACGAA',
 'AACGAC',
 'AACGCCA',
 'AACGCCC',
 'AACGCTT',
 'AACGGA',
 'AACGGC',
 'AACGGCA',
 'AACGGGA',
 'AACGTA',
 'AACGTC',
 'AACGTCA',
 'AACGTGA',
 'AACGTGG',
 'AACGTT',
 'AACTAC',
 'A

In [None]:
#################### DEAD ZONE - DO NOT ENTER ####################
# dataset['Sequence'].transform(len).plot(kind='hist', bins=int(1e3))
# plt.xlim(0, 1e6)
# plt.show()

# # Train the SentencePiece model with spm

# with tqdm(total=NUM_SEQUENCES, desc="Training Tokenizer...", unit="sequences") as pbar:

#     spm.SentencePieceTrainer.train(
#         input=DATASET_TXT,                      # Input file containing DNA sequences
#         model_prefix=TOKENIZER,                 # Prefix for the output model files
#         vocab_size=VOCAB_SIZE,                  # Vocabulary size
#         model_type='bpe',                       # Model type (BPE)
#         pad_id=0,                               # ID for padding token
#         unk_id=1,                               # ID for unknown token
#         bos_id=2,                               # ID for beginning-of-sequence token
#         eos_id=3,                               # ID for end-of-sequence token
#         user_defined_symbols=SPECIAL_TOKENS,    # Special tokens
#         character_coverage=1.0,                 # Ensure full coverage of the input characters
#         input_sentence_size=NUM_SEQUENCES,      # Limit the number of sentences for training for efficiency
#         shuffle_input_sentence=True             # Shuffle the input sentences to improve training
#     )
#     pbar.update(NUM_SEQUENCES)

# # Load the SentencePiece model
# sp = spm.SentencePieceProcessor()
# sp.load(f'{TOKENIZER}.model')

# # Tokenize a sentence
# sequence = "ATTCTGCGGTTCCCCCTGGAAGACCTACGCAAGTTGGGCCAGCTCAGAGGTGGAATCAACGAAGGCGAGC"
# tokens = sp.encode_as_pieces(sequence)
# print("Tokens:", tokens)

# # Convert tokens back to text
# decoded_text = sp.decode_pieces(tokens)
# print("Decoded text:", decoded_text)