Before running the next block, set up a python3 virtual environemnt. 

First, make sure you're using the correct version of python. In the terminal, type:

>module load python3/3.10.5

Then, create your virtual environment:

>python -m venv .venv

then 

>source .venv/bin/activate

then install the requirements:

>pip install -r requirements.txt

You are now free to run the rest of the code:

In [58]:
#for tokenizer
from sklearn.metrics import mean_squared_error, mean_absolute_error , r2_score
import morfessor
import math
import tokenizers 
from tokenizers import Tokenizer, trainers, BertWordPieceTokenizer
from tokenizers.models import BPE
from tokenizers.normalizers import Lowercase
from tokenizers.pre_tokenizers import CharDelimiterSplit
import os

#for BERT
from transformers import AutoTokenizer,AutoModelForSequenceClassification,TrainingArguments, Trainer, BigBirdTokenizerFast, AutoConfig, BertConfig, BertForSequenceClassification
import torch
from datasets import load_dataset
import numpy


In [None]:
if torch.cuda.is_available():
    print('total gpu:',torch.cuda.get_device_properties(0).total_memory / (1024 ** 3), "GB")
    print('Allocated mem:', torch.cuda.memory_allocated(0) / (1024 ** 3), "GB")
    print('Cache:',  torch.cuda.memory_reserved(0) / (1024 ** 3), "GB")

# Repression Prediction via LLM
Below are the modules needed to train the LMM. 

In [86]:
def train_tokenizer():
    # We build our custom tokenizer:
    tokenizer = BertWordPieceTokenizer()
    tokenizer.pre_tokenizer = CharDelimiterSplit(' ')

    # We can train this tokenizer by giving it a list of path to text files:
    #trainer = trainers.BpeTrainer()
    tokenizer.train(files=['data/token_data_sliding_window.txt'], show_progress=True, initial_alphabet = ['A','G','C','T'])

    tokenizer.enable_truncation(max_length=512)

    tokenizer.save('./tok/tokenizer.json')

In [87]:
train_tokenizer()






In [92]:
#prepare the data
def get_tokenized_dataset(tokenizer):
    ds = load_dataset('csv', data_files='data/seq_er_sliding_window.csv')
    train_testvalid = ds['train'].train_test_split(test_size=0.2)
    # Split the 10% test + valid in half test, half valid
    test_valid = train_testvalid['test'].train_test_split(test_size=0.5)
    # gather to a single DatasetDict
    ds = {
        'train': train_testvalid['train'],
        'test': test_valid['test'],
        'valid': test_valid['train']}

    def preprocess_function(examples):
        label = examples["Expression Rate"] 
        examples = tokenizer(examples["Token"], truncation=True, padding="max_length", max_length=512)
        examples["label"] = float(label)
        return examples

    for split in ds:
        ds[split] = ds[split].map(preprocess_function, remove_columns=["Token", "Expression Rate"])
    return ds

In [95]:
#Choose your tokenizer!

#autotokenizer from DNABERT
tokenizer_dna = AutoTokenizer.from_pretrained("zhihan1996/DNABERT-2-117M", trust_remote_code=True)

#self-made morphessor tokenizer
tokenizer_morph = BigBirdTokenizerFast.from_pretrained("tok", max_len=512)

ds = get_tokenized_dataset(tokenizer_morph)
print(ds)

Map: 100%|██████████| 71997/71997 [00:20<00:00, 3524.98 examples/s]
Map: 100%|██████████| 9000/9000 [00:02<00:00, 3364.55 examples/s]
Map: 100%|██████████| 9000/9000 [00:02<00:00, 3445.16 examples/s]

{'train': Dataset({
    features: ['input_ids', 'attention_mask', 'label'],
    num_rows: 71997
}), 'test': Dataset({
    features: ['input_ids', 'attention_mask', 'label'],
    num_rows: 9000
}), 'valid': Dataset({
    features: ['input_ids', 'attention_mask', 'label'],
    num_rows: 9000
})}





In [None]:
#Choose your tokenizer!

#autotokenizer from DNABERT
tokenizer_dna = AutoTokenizer.from_pretrained("zhihan1996/DNABERT-2-117M", trust_remote_code=True)

#self-made morphessor tokenizer
tokenizer_morph = BigBirdTokenizerFast.from_pretrained("tok", max_len=512)

ds = get_tokenized_dataset(tokenizer_morph)
print(ds)

Map:  41%|████      | 29411/72000 [00:08<00:11, 3628.43 examples/s]

None





ValueError: You need to specify either `text` or `text_target`.

In [4]:
LEARNING_RATE = 1e-6
MAX_LENGTH = 256
BATCH_SIZE = 8
EPOCHS = 20

#metrics
def compute_metrics_for_regression(eval_pred):
    logits, labels = eval_pred

    #when training DNABERT you have to use this line
    #logits = logits[0]

    mse = mean_squared_error(labels, logits)
    mae = mean_absolute_error(labels, logits)
    r2 = r2_score(labels, logits)
    single_squared_errors = ((logits - labels).flatten()**2).tolist()
    
    return {"mse": mse, "mae": mae, "r2": r2}

training_args = TrainingArguments(
    output_dir="models/DNABERT_MORPH",
    learning_rate=LEARNING_RATE,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    num_train_epochs=EPOCHS,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=5,
    load_best_model_at_end=True,
    weight_decay=0.01,
    dataloader_num_workers = 5,
    metric_for_best_model = "mse",
    logging_steps = 10,
)

In [5]:
#Define the model
class RegressionTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs[0][:, 0]
        loss = torch.nn.functional.mse_loss(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
# #Train the Model
os.environ["TOKENIZERS_PARALLELISM"] = "true"
# config = BertConfig.from_pretrained("zhihan1996/DNABERT-2-117M")
# model = AutoModelForSequenceClassification.from_pretrained("zhihan1996/DNABERT-2-117M",  config=config, trust_remote_code=True)

config = BertConfig(num_labels=1)
model = BertForSequenceClassification(config)
model.config.num_labels = 1
#load latest checkpoint
model.load_state_dict(torch.load("models/DNABERT_MORPH/checkpoint-22500/pytorch_model.bin"))


trainer = RegressionTrainer(
    model=model,
    args=training_args,
    train_dataset=ds["train"],
    eval_dataset=ds["valid"],
    compute_metrics=compute_metrics_for_regression
)
trainer.train()


In [None]:
#evaluate the data
trainer.eval_dataset=ds["test"]
trainer.evaluate()

In [None]:
trainer.save_model("final_model/")