In [36]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [37]:
train_raw = pd.read_csv('../data/train_MPRA.txt', delimiter='\t', header=None)
test_raw = pd.read_csv('../data/test_MPRA.txt', delimiter='\t', header=None)
train_sol = pd.read_csv('../data/trainsolutions.txt', delimiter='\t', header=None)
train_raw.head()
strand_length = 295


In [38]:
# Get our x and y data
train_scores = np.array(train_raw.iloc[:, 2:297]) #Dimensions are 8000 (samples) by 295 (SHARPR scores per nucleotide)
raw_dna_strands_train = [list(train_raw[1][i]) for i in range(len(train_raw))] #List of lists holding DNA strands separated by character. Size 8000 lists each of length 290
embedded_dna_strands_train = [np.column_stack((np.array(pd.get_dummies(pd.concat([pd.Series(raw_dna_strands_train[i]), pd.Series(["A", "C", "T", "G"])]), dtype='int'))[:-4], np.arange(295))) for i in range(len(train_raw))] #One hot encoded dna strands, list of 8000 matrices, each (295,5)
embedded_dna_strands_train = [embedded_dna_strands_train[i] for i in range(len(embedded_dna_strands_train)) if not ("N" in raw_dna_strands_train[i])]
train_scores  = [train_scores[i] for i in range(len(raw_dna_strands_train)) if not ("N" in raw_dna_strands_train[i])]
#Repeat for test data
raw_dna_strands_test = [list(test_raw[1][i]) for i in range(len(test_raw))] #List of lists holding DNA strands separated by character. Size 8000 lists each of length 290
embedded_dna_strands_test = [np.column_stack((np.array(pd.get_dummies(pd.concat([pd.Series(raw_dna_strands_test[i]), pd.Series(["A", "C", "T", "G"])]), dtype='int'))[:-4], np.arange(295))) for i in range(len(test_raw))]
embedded_dna_strands_test = [embedded_dna_strands_test[i] for i in range(len(embedded_dna_strands_test)) if not ("N" in raw_dna_strands_test[i])]

In [39]:
#Add column with unique identifier for each nucleotide (sequence + location)
train_sol[3] = [str(train_sol.iloc[i, 1][5:]).zfill(4) + str(train_sol.iloc[i,2]).zfill(3) for i in range(len(train_sol))]

#Split by activators and repressors
train_sol_act = train_sol[train_sol[0] == 'A'][3]
train_sol_rep = train_sol[train_sol[0] == 'R'][3]

### ML Model

In [40]:
class DNADataset(Dataset):
    def __init__(self, embedded_dna_strands, train_scores):
        self.x = torch.tensor(embedded_dna_strands, dtype=torch.float32) # Convert x and y to tensors
        self.y = torch.tensor(train_scores, dtype=torch.float32)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [None]:
class SelfAttentionFeedForward(nn.Module):
    #Initialize hyperparameters and NN matrices
    def __init__(self, attention_size, seq_len, embed_size, hidden_size, hidden_layers, lr, train_len, num_heads):
        super().__init__()
        self.attention_size = attention_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.hidden_layers = hidden_layers
        self.lr = lr
        self.train_len = train_len
        self.seq_len = seq_len
        self.num_heads = num_heads
        #self.dropout_rate = dropout_rate 

        self.initAttention()
        self.initFFN()

        self.optimizer = torch.optim.Adam(
            self.parameters(),
            lr=self.lr,
            amsgrad=True,
        )

    #Initialize our weight matrices as torch objects, allows them to be automatically optimized
    def initAttention(self):
        head_size = self.attention_size // self.num_heads
        self.W_Q = nn.ModuleList([nn.Linear(self.embed_size, head_size, bias=True) for _ in range(self.num_heads)])
        self.W_K = nn.ModuleList([nn.Linear(self.embed_size, head_size, bias=True) for _ in range(self.num_heads)])
        self.W_V = nn.ModuleList([nn.Linear(self.embed_size, head_size, bias=True) for _ in range(self.num_heads)])
        self.W_O = nn.Linear(self.attention_size, self.attention_size)

        
    
    #Initialize Feed Forward layers, based on however many hidden layers we want
    def initFFN(self):


        self.layer_norm1 = nn.LayerNorm(self.attention_size)
        self.layer_norm2 = nn.LayerNorm(self.attention_size)
        
        
        self.ffn = nn.Sequential(
            nn.Linear(self.attention_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, self.attention_size)
        )

        self.output_proj = nn.Linear(self.attention_size, 1)
        self.criterion = nn.MSELoss() # Switch to mean squared error instead of simple norm (this is better apparently?)

    def forward(self, x):
        # x of size                                               (batch_size, sequence_length, embedding_size)
        if x.shape[-1] != self.embed_size:
            raise ValueError
        

        head_outputs = []
        for head in range(self.num_heads):
            queries = self.W_Q(x) #                                   (batch_size, sequence_length, attention_size)
            keys = self.W_K(x) #                                      (batch_size, sequence_length, attention_size)
            values = self.W_V(x) #                                    (batch_size, sequence_length, attention_size)

            # Scale to prevent overflow errors, divide by square root of attention dimension
            scale = torch.sqrt(torch.Tensor(queries.size(-1)))

            #Compute attention and then normalize
            attention = torch.bmm(queries, keys.transpose(1,2)) / scale #                  (batch_size, seq_len, seq_len)
            weights = torch.nn.functional.softmax(attention, dim=2) # Apply this per sample

            # Use as weights for values
            context = torch.bmm(weights, values) #    (batch_size, attention_size, sequence_length)
            head_outputs.append(context)

        # Combine heads
        multi_head = torch.cat(head_outputs, dim=-1)
        attention_output = self.W_O(multi_head)

        # Add first layernorm + residual (add initial info)
        x = self.layer_norm1(x + attention_output)

        # Run through all FFN layers
        ffn_output = self.ffn(x)

        # Add second layernorm + residual
        x = self.layer_norm2(x + ffn_output)

        #Output projection
        x = self.output_proj(x)

        # Return prediction with added b term
        return x.squeeze(-1)

    def train_step(self, x, y):
        self.optimizer.zero_grad()
        pred = self(x)
        loss = self.criterion(pred, y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0) # This is for stability
        self.optimizer.step()
        return loss.item() # Diagnostic info

    def train(self, dataloader):
        losses = []
        for epoch in range(self.train_len):
            epoch_loss = 0
            for x_batch, y_batch in dataloader:
                loss = self.train_step(x_batch, y_batch)
                epoch_loss += loss
            avg_loss = epoch_loss / len(dataloader)
            losses.append(avg_loss)
            if (epoch + 1) % 5 == 0:
                print(f"Epoch {epoch+1}/{self.train_len}, Loss: {avg_loss:.4f}")
        return losses
            

In [50]:

dataset = DNADataset(embedded_dna_strands_train, train_scores)

# Create a DataLoader for batching, shuffling, and parallel data loading
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

model = SelfAttentionFeedForward(20, 295, 5, 10, 1, 1e-4, 100, 1) # (attention_size, seq_len, embed_size, hidden_size, hidden_layers, lr, train_len)
model.train(dataloader)


NotImplementedError: Module [ModuleList] is missing the required "forward" function

In [44]:
model.forward(torch.Tensor(embedded_dna_strands_test)).detach().numpy()

array([[-0.04466298,  0.04145521,  0.04282215, ...,  0.01897781,
         0.01897781,  0.01897781],
       [-0.0492821 ,  0.04089904,  0.04809882, ...,  0.01897781,
         0.01897781,  0.01897781],
       [ 0.01717338,  0.01775044,  0.01639129, ..., -0.05476579,
        -0.05476579, -0.05476579],
       ...,
       [ 0.00803119,  0.01332299,  0.02351362, ..., -0.05476579,
        -0.05476579, -0.05476579],
       [-0.00102001, -0.00231577,  0.01801525, ..., -0.05476579,
        -0.05476579, -0.05476579],
       [ 0.01110825,  0.01059504,  0.01076886, ..., -0.05932143,
        -0.05932143, -0.05932143]], shape=(7720, 295), dtype=float32)

In [45]:

#The output should now reflect predictions based on the training data

#Generate predictions
predictions = []
for i, strand in enumerate(embedded_dna_strands_train):
    logits = model.forward(torch.Tensor(strand).unsqueeze(0)).detach().numpy().flatten()
    predicted_scores = torch.sigmoid(torch.Tensor(logits)).numpy().flatten()  #Apply Sigmoid to logits
    predicted_labels = ["A" if score > 0.5 else "R" for score in predicted_scores]  #0.5 threshold
    sequence_id = f"train{str(i).zfill(4)}"
    nucleotides = raw_dna_strands_train[i]

    for pos, (nucleotide, label) in enumerate(zip(nucleotides, predicted_labels)):
        predictions.append([label, sequence_id, nucleotide])

#Save predictions
predictions_df = pd.DataFrame(predictions, columns=["Activation/Repression", "Sequence_ID", "Nucleotide"])
predictions_df.to_csv("predictions.txt", sep="\t", header=False, index=False)

#Load training solutions
training_solutions = pd.read_csv("../data/trainsolutions.txt", delimiter="\t", header=None, names=["Activation/Repression", "Sequence_ID", "Nucleotide"])

#Load predictions
predictions = pd.read_csv("predictions.txt", delimiter="\t", header=None, names=["Activation/Repression", "Sequence_ID", "Nucleotide"])

#Ensure "Nucleotide" columns are strings in both DataFrames
predictions["Nucleotide"] = predictions["Nucleotide"].astype(str)
training_solutions["Nucleotide"] = training_solutions["Nucleotide"].astype(str)

#Merge predictions and training solutions
comparison = pd.merge(predictions, training_solutions, on=["Sequence_ID", "Nucleotide"], how="inner", suffixes=('_pred', '_true'))

#Add a "Match" column to indicate where Prediction matches Truth
comparison["Match"] = comparison["Activation/Repression_pred"] == comparison["Activation/Repression_true"]

#Calculate accuracy
accuracy = comparison["Match"].mean()
print(f"Accuracy: {accuracy:.2%}")

#Save the comparison to a file for inspection
comparison.to_csv("comparison.txt", sep="\t", index=False)

Accuracy: nan%
