<a href="https://colab.research.google.com/github/anihab/tokenization/blob/main/vocabulary.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install Bio
!pip install tokenizers

In [None]:
# Libraries
import argparse
import gzip
import os
import csv

import pandas as pd
import numpy as np

from Bio import SeqIO
from tokenizers import Tokenizer, models, trainers, normalizers

Train a BPE tokenizer on the full bacterial training directory to create the vocabulary file.

###Step 0

define vocabulary size and input data path
- input should be a path to a directory of bacterial fasta files

In [None]:
# Globals
VOCAB_SIZE=4096
INPUT_PATH='/ocean/projects/bio230026p/lindseyl/DATA/SEGMENTS/bacteria'

###Step 1

parse through all fasta files in full bacterial training directory
- to make input similar to input for pre-training, use sequences of at least 1500 nt (i.e., min sequence length = 1500)
- do not limit max sequence length

###Step 2

build vocabulary json - limit vocab size to 4096

In [None]:
def sequence_iterator():
  '''\
  iterates through the sequences of every csv file in a given directory
  '''
  if os.path.isdir(INPUT_PATH):
        for filename in os.listdir(INPUT_PATH):
          f = os.path.join(INPUT_PATH, filename)
          with open(f, 'r') as file:
            csv_reader = csv.reader(file)
            for row in csv_reader:
              # ensure the correct number of columns exists
              if len(row) > 4:
                seq = row[4]
                # min sequence length = 1500
                if len(seq) >= 1500:
                  yield seq

def build_vocab():
  '''\
  builds a vocabulary on input
  '''
  tokenizer = Tokenizer(models.BPE())
  tokenizer.normalizer = normalizers.Sequence([normalizers.NFKC()])
  trainer = trainers.BpeTrainer(vocab_size=VOCAB_SIZE)
  tokenizer.train_from_iterator(sequence_iterator(), trainer=trainer)
  tokenizer.save("my_vocabulary.json")

In [None]:
build_vocab()

### OLD

In [None]:
def parse_sequences(input_path):
  '''
  input: a directory of fasta files or a .txt file that has a list of fasta files
  output: a list of sequences
  '''
  sequences = [] # the only information we need is the sequences, we do not need to limit length
  if os.path.isdir(input_path):
    for filename in os.listdir(input_path):
      f = os.path.join(input_path, filename)
      if os.path.isfile(f):
        if f.endswith('.gz'):
          f = gzip.open(f, 'rt', encoding='utf-8')
        for record in SeqIO.parse(f, 'fasta'):
          seq = str(record.seq).upper() # min sequence length = 1500
          if len(seq) >= 1500:
            sequences.append(seq)
  return sequences

def build_vocab(sequences):
  '''
  input: a list of sequences
  output: a vocabulary json file
  '''
  tokenizer = Tokenizer(models.BPE())
  # customize the tokenizer to handle DNA sequences
  tokenizer.normalizer = normalizers.Sequence([normalizers.NFKC()])
  # train the tokenizer on DNA sequences
  trainer = trainers.BpeTrainer(vocab_size=VOCAB_SIZE)
  tokenizer.train_from_iterator(sequences, trainer=trainer)
  tokenizer.save("my_vocabulary.json")

# build vocabulary
sequences = parse_sequences(INPUT_PATH)
build_vocab(sequences)