In [None]:
!pip install transformers
!pip install scikit-learn





In [None]:
import pandas as pd #Pandas and NumPy: Used for data manipulation and numerical operations.
import numpy as np
import torch ##PyTorch: Deep learning library for building and training neural networks.
from torch.utils.data import Dataset, DataLoader #torch.utils.data: PyTorch module for handling datasets and creating data loader
from torch.nn.utils.rnn import pad_sequence #torch.nn.utils.rnn: Utilities for working with recurrent neural networks (RNNs).
from transformers import BertTokenizer, BertForSequenceClassification, AdamW #transformers: Hugging Face library for working with transformer models like BERT.
from sklearn.model_selection import train_test_split #sklearn: Scikit-learn library for machine learning tools.
from sklearn.metrics import mean_squared_error #ast: Abstract Syntax Trees for parsing and evaluating literal expressions in strings.
import ast
import matplotlib.pyplot as plt

# Load training data from TSV file (replace 'train.tsv' with your file location)
train_data = pd.read_csv('/content/drive/MyDrive/Florabert/train.tsv', sep='\t') #TSV = Tab separated values
print("sequences")
print("Labels")

# Split the data into sequences (X) and expression levels (y)
sequences = train_data['sequence'].values
print(sequences)
labels = train_data['labels'].values
print(labels)

# Split data into training and validation sets
#used to split into training and validation states
#test_size == 10% used for testing and 90% for training
#random-seed : number to get a deterministic split to ensure random split is deterministic
train_sequences, val_sequences, train_labels, val_labels = train_test_split(
    sequences, labels, test_size=0.1, random_state=42
)

# Define a custom dataset for gene expression data
class GeneExpressionDataset(Dataset):
    def __init__(self, sequences, labels, tokenizer, max_length=None, num_labels=8):
        self.sequences = sequences
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.num_labels = num_labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = str(self.sequences[idx])
        labels = self.labels[idx]

        if isinstance(labels, list):
            # If labels are already a list, convert them to a tensor directly
            label_tensor = torch.tensor(labels, dtype=torch.float32)
        else:
            # If labels are a string, parse them into a list
            label_tensor = torch.tensor([float(x) for x in labels[1:-1].split(',')], dtype=torch.float32)

        # Set max_length during tokenization to ensure consistent sequence length
        encoding = self.tokenizer(sequence, truncation=True, padding=True, max_length=self.max_length, return_tensors='pt')

        input_ids = encoding['input_ids'].flatten()
        attention_mask = encoding['attention_mask'].flatten()

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': label_tensor
        }

# Initialize BERT tokenizer and model for sequence classification
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print("tokenizer")
print(tokenizer)
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=8)
print("model")
print(model)

# Use torch.optim.AdamW instead of transformers.AdamW
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)  # Adjust learning rate
print("Optimizer")
print(optimizer)

# Define loss function
loss_fn = torch.nn.MSELoss()

print(loss_fn)

# Define a collate function to handle dynamic padding
def collate_fn(batch, tokenizer):
    input_ids = [item['input_ids'] for item in batch]
    attention_masks = [item['attention_mask'] for item in batch]
    labels = [item['labels'] for item in batch]

    # Pad sequences to the length of the longest sequence in the batch
    max_len = max(len(seq) for seq in input_ids)
    input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_masks = pad_sequence(attention_masks, batch_first=True, padding_value=0)  # Assuming attention mask is 0 for padded tokens

    return {
        'input_ids': input_ids,
        'attention_mask': attention_masks,
        'labels': torch.stack(labels)
    }

# Create training dataset and data loader with collate_fn
train_dataset = GeneExpressionDataset(train_sequences, train_labels, tokenizer)
print("Train Dataset")
print(train_dataset)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=lambda batch: collate_fn(batch, tokenizer))

print("Loader")
print(train_loader)

# Create validation dataset and data loader with collate_fn
val_dataset = GeneExpressionDataset(val_sequences, val_labels, tokenizer)
val_loader = DataLoader(val_dataset, batch_size=64, collate_fn=lambda batch: collate_fn(batch, tokenizer))

# Training loop with checkpoints and early stopping
num_epochs = 10
checkpoint_path = 'fine_tuned_gene_expression_model_checkpoint.pth'
best_val_loss = float('inf')
best_epoch = 0
patience = 5  # Number of epochs with no improvement after which training will be stopped

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for batch in train_loader:
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask)
        loss = loss_fn(outputs.logits, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels = batch['labels']

            outputs = model(input_ids, attention_mask=attention_mask)
            loss = loss_fn(outputs.logits, labels)

            val_loss += loss.item()

    # Calculate average training and validation loss for the epoch
    avg_train_loss = train_loss / len(train_loader)
    avg_val_loss = val_loss / len(val_loader)

    print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}')

    # Save the model checkpoint if validation loss improves
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_epoch = epoch
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': avg_train_loss,
            'val_loss': avg_val_loss
        }, checkpoint_path)
    else:
        # If no improvement for 'patience' epochs, stop training
        if epoch - best_epoch > patience:
            print(f'Early stopping at epoch {epoch + 1}')
            break

# Load the best model checkpoint
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint['model_state_dict'])

# Save the best model checkpoint to Google Drive
google_drive_checkpoint_path = '/content/drive/MyDrive/Colab Notebooks/fine_tuned_gene_expression_model4'
torch.save({
    'epoch': best_epoch + 1,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'train_loss': best_val_loss,  # Save the validation loss as the best loss
    'val_loss': best_val_loss
}, google_drive_checkpoint_path)

print(f'Model saved to {google_drive_checkpoint_path}')



sequences
Labels
['GTCCCGTGCCTATAAATAGGTGAACAGAACCCCCGTACTGTTCACGCTGACTTGGCATTCGCTTTTTGCGTCACGCTCGTACTGTCATCTCCTTCCAATTGAAGGTACACTTGTAATTCGATGATATTTCTGCTTATGCCTGATAATAATATATAATTGTTCATGTTGTCTGTTATATCCTTTATGTTTCATTCTTCGTCATTGTTTAATGAATTTACGAAGGTACGTCCTTCATAACCTTCGTCCGTAAACTATTATATCCTAAGGGAAATAATGCTTCGGAGGACGAAGGACTTTAACGATTAACATTTTCTATGTTGCCTTGTTCTTAACTCATAGCACTTGAGAACAAGTCCCCAACAAGGACGTTATTGGAGATGAAATAGATATAGGAGATGAAATCTTTTAGAAGAGACTATAAAAAATAGATATACAAGATAAATATAGAGAACATTGTTGGAGACCGTCTTAGTGCTCATACATAGCATGCTATATAGCTACTTGTCAGTTCCCACGGGTCGTGTTGAGTGTTTATTTAGCTAGCCGTACTGGAATACACTACATTAGATTAAAATAAAGAAGAAAAAAGTGGTAAGATAATTTCTATTTTGTTTATGAAATCGCACTCGAAATCGAATCACACAGCTAGGGCATACGTGTACATACGAAACAAGCTAACAAAAGCTAGCGGGCGGCCACCGCCGGGCCTTTTACACGTACGCGCCTCGCTCGCTGCAAACGCGCGCCACGGGGCCTCACATGGACTGCGCTCCGGCTGCTGCTTCGCTTTTTCTTCTGGCTGTCCGTTCACTCGCGTCACGCCATCACGTCTCGTATCGCATCTTGCGCACTGGCCGGCGCGCGGGCGGCGCCGCTCAGCTCTTTCCTATAAATAGGGCACAGGCCACAGGGAAGAAGGGCACCAGCCGGTGTTGGTTTCAGCTTGTATTCCCACTCTGTCGCACAGCGTCGCCCGTCGCC

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


model
BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=

In [None]:
import ast
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score


# Load test data from CSV file (replace 'test_dataset.csv' with your file location)
test_data = pd.read_csv('/content/drive/MyDrive/Florabert/test.tsv', sep='\t')

print(test_data.columns)
test_sequences = test_data['sequence'].values
test_labels_str = test_data['labels'].values

# Convert string representations to lists
test_labels = [ast.literal_eval(label) for label in test_labels_str]

# Create test dataset and data loader
test_dataset = GeneExpressionDataset(test_sequences, test_labels, tokenizer)
test_loader = DataLoader(test_dataset, batch_size=64, collate_fn=lambda batch: collate_fn(batch, tokenizer))
print(test_loader)
# Evaluate the model on the test set
predictions = []
model.eval()
test_loss = 0.0
total_samples = 0

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']

        outputs = model(input_ids, attention_mask=attention_mask)
        loss = loss_fn(outputs.logits, labels)

        test_loss += loss.item()
        predictions.append(outputs.logits.cpu().numpy())  # Append without extending
        total_samples += input_ids.size(0)

# Calculate average test loss
avg_test_loss = test_loss / len(test_loader)
print(f'Average Test Loss: {avg_test_loss:.4f}')

# Concatenate predictions without flattening
predictions = np.concatenate([np.squeeze(pred) if np.ndim(pred) > 1 else pred for pred in predictions], axis=0)

# Ensure predictions cover the entire test dataset
predictions = predictions[:total_samples]

# Calculate R-squared value
# r2 = r2_score(test_labels, predictions)
# print(f'R-squared: {r2:.4f}')

# # Print a statement resembling accuracy (you can customize the format)
# accuracy_statement = f'Accuracy of the model: {r2 * 100:.1f}%'
# print(accuracy_statement)

# plt.scatter(test_labels, predictions)
# plt.xlabel('Actual Values')
# plt.ylabel('Predicted Values')
# plt.title('Actual vs. Predicted Values')
# plt.show()


NameError: name 'pd' is not defined

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error
import ast
import matplotlib.pyplot as plt



# Calculate Mean Absolute Error
mae = mean_absolute_error(test_labels, predictions)
print(f'Mean Absolute Error: {mae:.4f}')

# Calculate Mean Squared Error
mse = mean_squared_error(test_labels, predictions)
print(f'Mean Squared Error: {mse:.4f}')

mae_accuracy_statement = f'Accuracy (MAE): {100 - mae:.2f}%'
mse_accuracy_statement = f'Accuracy (MSE): {100 - mse:.2f}%'

print(mae_accuracy_statement)
print(mse_accuracy_statement)


# plt.scatter(test_labels, predictions)
# plt.xlabel('Actual Values')
# plt.ylabel('Predicted Values')
# plt.title('Actual vs. Predicted Values')
# plt.show()

Mean Absolute Error: 4.1073
Mean Squared Error: 20.7510
Accuracy (MAE): 95.89%
Accuracy (MSE): 79.25%


In [None]:
import torch
from transformers import BertTokenizer, BertForSequenceClassification

# Load BERT model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=8)


# Load the saved model checkpoint
checkpoint = torch.load('/content/drive/MyDrive/Colab Notebooks/fine_tuned_gene_expression_model4')
model.load_state_dict(checkpoint['model_state_dict'])

def predict_expression(model, tokenizer, input_sequence):
    model.eval()
    with torch.no_grad():
        encoding = tokenizer(input_sequence, truncation=True, padding=True, return_tensors='pt')
        input_ids = encoding['input_ids'].flatten()
        attention_mask = encoding['attention_mask'].flatten()

        # Ensure input is in the correct shape (add batch dimension)
        input_ids = input_ids.unsqueeze(0)
        attention_mask = attention_mask.unsqueeze(0)

        outputs = model(input_ids, attention_mask=attention_mask)
        predictions = outputs.logits.cpu().numpy()

    return predictions[0]  # Assuming the output is a single regression value

input_sequence = "CACTGACGCGCGATAAATGCTTACGGAGGGGAATGATGCATGACGAAACTCCGACTTGATATTAGTAAGGATTTAAATAGTACTAAGAATAAATTGAAACTATTTACGATATCTTTCAATATTGATTTCATACTATTAGTATACATGAATTTAAAATAAATTTAGTATTTTTCTAATTTAATTTGAACCTAACGTATCTGTTGTTGTCAATTATTTTAATAGTCTATTTTTTGGGAATATAATATTGTTTTAGTTCAATGGTAAATATTACACAAATAATAATTGATTATTTGGTATGTCTAAATATTAAATATTTATGAGTAACTAGCTTATTTTATTTAAGTTTATTCAGTTTATCTATTGTTTTATTAAATATCCGTATCTAATATAATTTAGTTTATTTCAATGTTAGAGACCATTATAAGGCTATTTTGATTTTTATTTCATGTTAATTATCGATGAACTTAGTCATGAAATTCTATTTATCTATGTTAAATTTAGCAATAATACACGCGCTGGTCTCTGAGTTAAATTAAGTGAACATTCGAATAGAAACCTGAATCCAGTATATTTATTTTGAATTCGTATTCAAAAGGATTTGTACTGAATCTAGATTAAAATATGATAAGAAGATGGTGTCCAGATCTGATTCCATACGCGTTTCGGTTTGTTTCGGTTTCGATTACTGCTCTCCAGACAGATACCGTGCCCGACATGCATGTTCTAATCACACGCCTCCCCGCCCACTGCATTTCGCATCAATCCAGAAGATTTCGCAGCCAAAGCAGTATCCAACGGATGAATGGTGGTCACCAGCCCAGCAGCCCTCGACTCGACGACGACTCTGTGAGCGCGACCACAGGTCACAGGTGCTTGCACTGCACTCATCCTGGTGGTGGAGTGATGGTTCAGTTCATCAGTTGTGGCTTGTGGCGCCGCGGCGAGTGGCTGCGCGCGTGACTGTTTGTTTGGTTCACTACCTCAGTTGCCACACTTTGCC"  # Your input sequence here
predicted_output = predict_expression(model, tokenizer, input_sequence)
print("Predicted Output:", predicted_output)


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Predicted Output: [-4.1323676 -5.247997  -4.3124156 -3.896009  -5.9913836 -4.673238
 -3.9568675 -6.624393 ]


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
