In [1]:
!pip install scikit-learn
!pip install torch
!pip install torchvision



In [7]:
from sklearn.model_selection import train_test_split
import torch
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os


In [4]:
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [8]:
# Define the directories containing the positive and negative files
negative_dir = '/users/hutruon/Assignment-1-CS490/Data/NegativeFile'
positive_dir = '/users/hutruon/Assignment-1-CS490/Data/PositiveFile'

# Define the output file path
output_file_path = '/users/hutruon/Assignment-1-CS490/Data/merged_data.txt'

# List of negative and positive files
negative_files = os.listdir(negative_dir)
positive_files = os.listdir(positive_dir)

def read_file(file_path):
    """Read the contents of a file and return them as a list."""
    with open(file_path, 'r') as file:
        return file.readlines()

def merge_files(pos_file_path, neg_file_path, output_file, label_pos='1', label_neg='0'):
    """Append lines from positive and negative files to the output file with labels."""
    pos_lines = read_file(pos_file_path)
    neg_lines = read_file(neg_file_path)
    
    with open(output_file, 'a') as file:
        for line in pos_lines:
            file.write(f"{label_pos} {line}")
        for line in neg_lines:
            file.write(f"{label_neg} {line}")

# Ensure the output file is empty or does not exist before starting
if os.path.exists(output_file_path):
    os.remove(output_file_path)

# Merge files with matching identifiers
for pos_file in positive_files:
    # Construct the corresponding negative file name
    neg_file = pos_file.replace("_100nt.txt", "_negative_100nt.txt")
    if neg_file in negative_files:
        merge_files(os.path.join(positive_dir, pos_file), os.path.join(negative_dir, neg_file), output_file_path)
    else:
        print(f"Matching negative file not found for {pos_file}")

print("Merging completed.")


Merging completed.


In [9]:
# Path to the merged data file
merged_file_path = '/users/hutruon/Assignment-1-CS490/Data/merged_data.txt'

# Paths for the output files
train_file_path = '/users/hutruon/Assignment-1-CS490/Data/train_data.txt'
test_file_path = '/users/hutruon/Assignment-1-CS490/Data/test_data.txt'
validation_file_path = '/users/hutruon/Assignment-1-CS490/Data/validation_data.txt'

def read_file(file_path):
    """Read the contents of a file and return them as a list."""
    with open(file_path, 'r') as file:
        lines = file.readlines()
    return lines

def write_file(file_path, lines):
    """Write the given lines to a file."""
    with open(file_path, 'w') as file:
        file.writelines(lines)

# Read the merged data
data_lines = read_file(merged_file_path)

# Shuffle the data to ensure randomness
# It's important for machine learning models to be trained on data that's in a random order.
import random
random.shuffle(data_lines)

# Split data into training (75%), and a temporary set (25%)
train_lines, temp_lines = train_test_split(data_lines, test_size=0.25, random_state=42)

# Split the temporary set into testing (20% of total) and validation (5% of total)
# Since the temporary set is 25% of the total, we'll allocate 80% of it to testing and 20% to validation
# which corresponds to 20% and 5% of the total data, respectively.
test_lines, validation_lines = train_test_split(temp_lines, test_size=0.2, random_state=42)  # 0.2 * 0.25 = 0.05

# Write the split data to their respective files
write_file(train_file_path, train_lines)
write_file(test_file_path, test_lines)
write_file(validation_file_path, validation_lines)

print(f"Data split into training ({len(train_lines)} lines), testing ({len(test_lines)} lines), and validation ({len(validation_lines)} lines).")


Data split into training (1300592 lines), testing (346824 lines), and validation (86707 lines).


In [10]:
def one_hot_encoder(sequence):
    """Converts a DNA sequence to a one-hot encoded numpy array."""
    mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
    one_hot_encoded = [mapping.get(nucleotide, [0, 0, 0, 0]) for nucleotide in sequence]
    return np.array(one_hot_encoded)

def read_and_encode(file_path):
    labels = []
    encoded_sequences = []
    with open(file_path, 'r') as file:
        for line in file:
            label, sequence = line.strip().split(maxsplit=1)
            labels.append(int(label))
            encoded_sequences.append(one_hot_encoder(sequence))
    return np.array(labels), np.array(encoded_sequences)

# Set the path to your test_data.txt file
file_path = '/users/hutruon/Assignment-1-CS490/Data/test_data.txt'

# Read and encode the sequences
labels, encoded_sequences = read_and_encode(file_path)

In [11]:
# Step 3: One-hot encoding function
def one_hot_encoder(sequence):
    mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
    return np.array([mapping.get(nucleotide, [0, 0, 0, 0]) for nucleotide in sequence])

# Function to read and encode data from a file
def read_data_and_encode(file_path):
    labels = []
    encoded_data = []
    with open(file_path, 'r') as file:
        for line in file:
            label, sequence = line.strip().split(' ', 1)
            labels.append(int(label))
            encoded_data.append(one_hot_encoder(sequence))
    return np.array(labels), np.array(encoded_data)

# Custom Dataset class
class DNADataset(Dataset):
    def __init__(self, sequences, labels):
        self.labels = torch.tensor(labels, dtype=torch.long)
        self.data = torch.stack([torch.tensor(s, dtype=torch.float32) for s in sequences])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Main program
if __name__ == "__main__":
    # Step 4: Read and encode data
    labels, encoded_data = read_data_and_encode('test_data.txt')
    
    # Step 8: Instantiate the dataset
    test_dataset = DNADataset(encoded_data, labels)
    
    # Step 9: Create a DataLoader
    test_dataloader = DataLoader(test_dataset, batch_size=512, shuffle=True)
    
    # Step 10: Iterate and print one batch
    for data, label in test_dataloader:
        print(data, label)
        break  # Only print the first batch

tensor([[[0., 0., 0., 1.],
         [0., 0., 1., 0.],
         [0., 0., 1., 0.],
         ...,
         [0., 0., 0., 1.],
         [0., 0., 1., 0.],
         [0., 0., 0., 1.]],

        [[0., 1., 0., 0.],
         [0., 1., 0., 0.],
         [0., 0., 0., 1.],
         ...,
         [0., 0., 1., 0.],
         [0., 0., 1., 0.],
         [0., 1., 0., 0.]],

        [[0., 0., 1., 0.],
         [1., 0., 0., 0.],
         [0., 0., 0., 1.],
         ...,
         [0., 1., 0., 0.],
         [1., 0., 0., 0.],
         [1., 0., 0., 0.]],

        ...,

        [[0., 1., 0., 0.],
         [1., 0., 0., 0.],
         [0., 0., 1., 0.],
         ...,
         [0., 0., 1., 0.],
         [0., 1., 0., 0.],
         [0., 0., 1., 0.]],

        [[0., 0., 1., 0.],
         [0., 1., 0., 0.],
         [0., 0., 1., 0.],
         ...,
         [0., 0., 0., 1.],
         [0., 0., 1., 0.],
         [0., 1., 0., 0.]],

        [[0., 0., 0., 1.],
         [1., 0., 0., 0.],
         [0., 0., 0., 1.],
         ...,
 

In [12]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Layer 1
        self.conv1 = nn.Conv1d(4, 96, 11, stride=4, padding=5)  # Add padding
        self.relu1 = nn.ReLU()

        # Layer 2
        self.conv2 = nn.Conv1d(96, 96, 1, padding=1)  # Add padding
        self.relu2 = nn.ReLU()

        # Layer 3
        self.conv3 = nn.Conv1d(96, 96, 1, padding=1)  # Add padding
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool1d(3, stride=2)
        self.drop3 = nn.Dropout(0.5)

        # Layer 4
        self.conv4 = nn.Conv1d(96, 192, 11, stride=4, padding=5)  # Add padding
        self.relu4 = nn.ReLU()

        # Layer 5
        self.conv5 = nn.Conv1d(192, 192, 1, padding=1)  # Add padding
        self.relu5 = nn.ReLU()

        # Layer 6
        self.conv6 = nn.Conv1d(192, 192, 1, padding=1)  # Add padding
        self.relu6 = nn.ReLU()
        self.pool6 = nn.MaxPool1d(3, stride=2)
        self.drop6 = nn.Dropout(0.5)

        # Layer 7
        self.conv7 = nn.Conv1d(192, 384, 3, stride=1, padding=1)  # Add padding
        self.relu7 = nn.ReLU()

        # Layer 8
        self.conv8 = nn.Conv1d(384, 384, 1, padding=1)  # Add padding
        self.relu8 = nn.ReLU()

        # Layer 9
        self.conv9 = nn.Conv1d(384, 384, 1, padding=1)  # Add padding
        self.relu9 = nn.ReLU()
        self.drop9 = nn.Dropout(0.5)

        # Layer 10
        self.conv10 = nn.Conv1d(384, 20, 3, stride=1, padding=1)  # Add padding
        self.relu10 = nn.ReLU()

        # Layer 11
        self.conv11 = nn.Conv1d(20, 20, 1, padding=1)  # Add padding
        self.relu11 = nn.ReLU()

        # Layer 12
        self.conv12 = nn.Conv1d(20, 20, 1, padding=1)  # Add padding
        self.relu12 = nn.ReLU()
        self.adapool = nn.AdaptiveAvgPool1d((1))

    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = self.drop3(self.pool3(self.relu3(self.conv3(x))))
        x = self.relu4(self.conv4(x))
        x = self.relu5(self.conv5(x))
        x = self.drop6(self.pool6(self.relu6(self.conv6(x))))
        x = self.relu7(self.conv7(x))
        x = self.relu8(self.conv8(x))
        x = self.drop9(self.relu9(self.conv9(x)))
        x = self.relu10(self.conv10(x))
        x = self.relu11(self.conv11(x))
        x = self.adapool(self.relu12(self.conv12(x)))
        x = torch.flatten(x, 1)  # Flatten for potential further layers or a classifier
        return x

In [13]:
# Step 3: One-hot encoding function
def one_hot_encoder(sequence):
    mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
    return np.array([mapping.get(nucleotide, [0, 0, 0, 0]) for nucleotide in sequence])

# Function to read and encode data from a file
def read_data_and_encode(file_path):
    labels = []
    encoded_data = []
    with open(file_path, 'r') as file:
        for line in file:
            label, sequence = line.strip().split(' ', 1)
            labels.append(int(label))
            encoded_data.append(one_hot_encoder(sequence))
    return np.array(labels), np.array(encoded_data)

# Custom Dataset class
class DNADataset(Dataset):
    def __init__(self, sequences, labels):
        self.labels = torch.tensor(labels, dtype=torch.long)
        self.data = torch.stack([torch.tensor(s, dtype=torch.float32) for s in sequences])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = self.data[idx]
        # Reshape data to have 4 channels
        data = data.transpose(1, 0)
        return data, self.labels[idx]

# Main program
if __name__ == "__main__":
    # Step 4: Read and encode data
    labels, encoded_data = read_data_and_encode('test_data.txt')
    
    # Step 8: Instantiate the dataset
    test_dataset = DNADataset(encoded_data, labels)
    
    # Step 9: Create a DataLoader
    test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=True)

    # Read and encode training data
    train_labels, train_encoded_data = read_data_and_encode('train_data.txt')

    # Instantiate the training dataset
    train_dataset = DNADataset(train_encoded_data, train_labels)

    # Create a DataLoader for the training data
    train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)  
    

# Instantiating the model and assigning an optimizer to the model and creating a loss function

model = Net().to(device)
optimizer=optim.Adam(params=model.parameters(),lr=0.001)
loss_fn = nn.CrossEntropyLoss()

In [14]:
def train(model,device,train_dataloader,optimizer,epochs):
    print("inside train")
    model.train()
    for batch_ids, (img, classes) in enumerate(train_dataloader):
        classes=classes.type(torch.LongTensor)
        img,classes=img.to(device),classes.to(device)
        torch.autograd.set_detect_anomaly(True)     
        optimizer.zero_grad()
        output=model(img)
        loss = loss_fn(output,classes)                
        
        loss.backward()
        optimizer.step()
    if(batch_ids +1) % 2 == 0:
        print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
            epochs, batch_ids* len(img), len(train_dataloader.dataset),
            100.*batch_ids / len(train_dataloader),loss.item()))

In [15]:
def test(model, device, test_dataloader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for img, classes in test_dataloader:
            img, classes = img.to(device), classes.to(device)
            output = model(img)
            test_loss += F.cross_entropy(output, classes, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True) 
            correct += pred.eq(classes.view_as(pred)).sum().item()

    test_loss /= len(test_dataloader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_dataloader.dataset),
        100. * correct / len(test_dataloader.dataset)))
    print('=' * 30)

In [None]:
if __name__=='__main__':
    seed=42
    EPOCHS=3
    
    for epoch in range(1,EPOCHS+1):
        train(model,device,train_dataloader,optimizer,epoch)
        test(model,device,test_dataloader)


inside train

Test set: Average loss: 0.5946, Accuracy: 230981/346824 (67%)

inside train

Test set: Average loss: 0.5795, Accuracy: 235608/346824 (68%)

inside train
