In [28]:
import pandas as pd 
import torch 
import torch.nn as nn 
import numpy as np 
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split


from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import pandas as pd

In [5]:
path_to_data = "/Users/abdoulabdillahi/Desktop/Thesis/Bio_project/200_samples_with_encoded.csv"
NUCLEOTIDE_TO_INT = {'A':0, 'C':1, 'G':2, 'T':3, '-':4, 'N':5}

In [26]:
## configuration to detect cuda or cpu
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print (device)

cpu


# Dataset Class

In [None]:
class GeneDataset(Dataset):
    def __init__(self, csv_file, label_col='Ciprofloxacin_NS', gene_prefix='gene_', quantile=80,  transform=None):
        """
        Args:
            csv_file (str): Path to the CSV.
            label_col (str): Column with binary labels.
            gene_prefix (str): Prefix of gene columns (default: 'gene_').
            quantile (int): Quantile (%) to determine seq_len (e.g., 95 means use 95th percentile of lengths).
        """
        self.df = pd.read_csv(csv_file)
        self.label_col = label_col
        self.gene_cols = [c for c in self.df.columns if c.startswith(gene_prefix)]
        
        # Compute seq_len using quantile of all sequence lengths
        all_lengths = []
        for row in self.df[self.gene_cols].itertuples(index=False):
            for cell in row:
                if pd.isna(cell): continue
                if isinstance(cell, str) and cell.startswith('['):
                    tokens = cell.strip("[]").replace("'", "").split()
                    tokens = [t for t in tokens if t.lower() != 'nan']
                else:
                    tokens = list(str(cell))
                all_lengths.append(len(tokens))

        self.seq_len = int(np.percentile(all_lengths, quantile))
        print(f"[INFO] Using seq_len={self.seq_len} based on {quantile}th percentile of sequence lengths.")

    def __len__(self):
        return len(self.df)

    def encode_sequence(self, tokens):
        ids = [NUCLEOTIDE_TO_INT.get(t.upper(), NUCLEOTIDE_TO_INT['N']) for t in tokens]
        # Pad or truncate
        if len(ids) < self.seq_len:
            ids += [NUCLEOTIDE_TO_INT['N']] * (self.seq_len - len(ids))
        else:
            ids = ids[:self.seq_len]
        return ids

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        label = int(row[self.label_col]) # extract label col
        features = []

        for col in self.gene_cols:
            cell = row[col]
            if pd.isna(cell):
                features.extend([NUCLEOTIDE_TO_INT['N']] * self.seq_len)
                continue

            if isinstance(cell, str) and cell.startswith('['):
                tokens = cell.strip("[]").replace("'", "").split()
                tokens = [t for t in tokens if t.lower() != 'nan']
            else:
                tokens = list(str(cell))

            encoded = self.encode_sequence(tokens)
            features.extend(encoded)

        return torch.tensor(features, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

In [22]:
dataset = GeneDataset(path_to_data)  # Automatically computes best seq_len
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

for x, y in dataloader:
    print(x.shape)  # (batch_size, num_genes × seq_len)
    print(y.shape)  # (batch_size,)
    break

[INFO] Using seq_len=174 based on 80th percentile of sequence lengths.
torch.Size([32, 2719446])
torch.Size([32])


In [20]:
dataset.encode_sequence

<bound method GeneDataset.encode_sequence of <__main__.GeneDataset object at 0x31aaf63c0>>

In [23]:
x[:3]

tensor([[0., 3., 2.,  ..., 0., 3., 0.],
        [0., 3., 2.,  ..., 5., 5., 5.],
        [0., 3., 2.,  ..., 5., 5., 5.]])

In [13]:
y

tensor([0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0,
        0, 0, 1, 0, 1, 1, 0, 1])

# Logistic Regression in Pytorch 


In [25]:
class LogisticRegressionModel(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.Linear = nn.Linear(input_dim, 1)
    
    def forward(self, x):
        return torch.sigmoid(self.Linear(x).squeeze(1))
    

In [27]:
model = LogisticRegressionModel(input_dim=len(dataset[0][0]))


In [32]:
model.parameters

<bound method Module.parameters of LogisticRegressionModel(
  (Linear): Linear(in_features=2719446, out_features=1, bias=True)
)>

In [None]:
# Binary Cross Entropy Loss
criterion = nn.BCELoss()

# Optimizer (e.g., Adam or SGD)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Training Loop 

In [34]:
# Train for a few epochs
epochs = 10

for epoch in range(epochs):
    model.train()
    running_loss = 0.0

    for X_batch, y_batch in dataloader:
        # Ensure target shape matches predictions
        y_batch = y_batch.float().view(-1)  # [B, 1]

        # Forward pass
        outputs = model(X_batch)

        # Compute loss
        loss = criterion(outputs, y_batch)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss:.4f}")

Epoch 1/10, Loss: 184.4475
Epoch 2/10, Loss: 253.8462
Epoch 3/10, Loss: 256.0096
Epoch 4/10, Loss: 252.4038
Epoch 5/10, Loss: 255.2885


KeyboardInterrupt: 