<a href="https://colab.research.google.com/github/anihab/dnaTokenization/blob/main/tokenize.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Libraries
import argparse
import os

import pandas as pd
import numpy as np

from tokenizers import Tokenizer, models, trainers, normalizers
from transformers import PreTrainedTokenizerFast

In [None]:
# Globals
MAX_TOKENS = 510

In [None]:
# Define inputs
BACTERIA_PATH = ""
PHAGE_PATH = ""
OUTPUT_PATH = ""
TOKENIZER_PATH = ""

Given
- a directory or list of selected bacteria sequences in csv files
- a directory or list of phage sequences in csv files
- a tokenizer (vocabulary json file)

Tokenize all files

###Step 1
process files

In [None]:
def process_files(input_path, output_path):
  # the input path is a .txt file list
  if input_path.endswith('.txt'):
    with open(input_path, 'r') as list:
      for f in list:
        f = f.strip()
        filename = os.path.basename(f)
        if os.path.isfile(f) and not is_processed(output_path, filename):
          tokenize(f, output_path)
  # the input path is a directory
  else:
    for filename in os.listdir(input_path):
      f = os.path.join(input_path, filename)
      if os.path.isfile(f) and not is_processed(output_path, filename):
        tokenize(f, output_path)

def is_processed(output_path, filename):
  '''/
  Determines whether or not a file has already been processed by checking
  if the output filename exists in the output directory and has a size
  greater than 0.
  '''
  filepath = os.path.join(output_path, filename.split('.')[0]  + '_tokenized.csv')
  if os.path.isfile(filepath) and os.path.getsize(filepath) > 0:
    return True
  else:
    return False

###Step 2

tokenize sequences according to tokenizer

In [None]:
def tokenize(filepath, output_path):
  sequences = []
  tokens = []

  filename = os.path.basename(filepath)
  filename = filename.split('.')[0]

  df = pd.read_csv(filepath)
  sequences = df['sequence'].values.tolist()

  # tokenize
  for seq in range(len(sequences)):
    tokens.append(seq2bpe(sequences[seq]))
  df['tokenized'] = tokens

  # shuffle and save to csv
  df = df.sample(frac=1).reset_index(drop=True)
  write_csv(filename, df, output_path)
  return df

def seq2bpe(sequence):
  '''\
  convert a sequence to byte pair encodings
  '''
  tokenizer = PreTrainedTokenizerFast(TOKENIZER_PATH)
  encoded_input = tokenizer(sequence, return_tensors="pt")
  token_ids = encoded_input.input_ids
  output = " ".join(tokenizer.batch_decode(token_ids))
  return output

###Step 3

save to csv

In [None]:
def write_csv(filename, df, output_path):
  """\
  save the given dataframe to two separate csv files:
  1. _full.csv includes the name, start position, sequence, tokenized
   sequence, and label.
  2. _tokenized.csv includes the tokenized sequence and the label.
  """
  df.to_csv(output_path + "/" + filename + '_full.csv', encoding='utf-8', index=False)
  tokenized = df[['tokenized', 'label']]
  tokenized.to_csv(output_path + '/' + filename + '_tokenized.csv', encoding='utf-8', index=False, header=False, sep='\t')

###Main

In [None]:
def main():
  # process and tokenize bacteria files
  process_files(BACTERIA_PATH, OUTPUT_PATH)
  # process and tokenize phage files
  process_files(PHAGE_PATH, OUTPUT_PATH)

if __name__ == "__main__":
    main()