In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
import scanpy as sc
import time

In [None]:
# Import data
adata_train = sc.read('/home/amomtaz/Beng199_Projects/scGPT/MacaqueStudyData/clus_mac_adata.h5ad')
adata_test = sc.read('/home/amomtaz/Beng199_Projects/scGPT/MacaqueStudyData/clus_mac_adata.h5ad')
adata_train.X.eliminate_zeros()
adata_test.X.eliminate_zeros()
train_celltype_column = "PredCellType"
test_celltype_column = "PredCellType"
print('data imported')

KeyboardInterrupt: 

In [None]:
# Device configs
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyperparameters
num_epochs = 10
learning_rate = 0.01
input_size = len(adata_train.var_names)
hidden_size = 200
output_size = len(adata_train.obs[train_celltype_column].unique())
batch_size = 200

NameError: name 'adata_train' is not defined

In [None]:
# Transform
class adataDataset(Dataset):
    def __init__(self,data,labels):
        super(adataDataset,self).__init__()
        data = data.X.toarray()
        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = torch.tensor(LabelEncoder().fit_transform(labels), dtype=torch.long)
    
    def __len__(self):
        return self.data.shape[0]
    
    def __getitem__(self, index):
        return self.data[index], self.labels[index]
    
train_data = adataDataset(adata_train,adata_train.obs[train_celltype_column].values)
test_data = adataDataset(adata_test,adata_test.obs[test_celltype_column].values)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size)

In [None]:
# Define model
class NeuralNet(nn.Module):
    def __init__(self,input_size,hidden_size,output_size):
        super(NeuralNet,self).__init__()
        self.linear1 = nn.Linear(input_size,hidden_size)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(hidden_size,output_size)

    def forward(self,x):
        out = self.linear1(x)
        out = self.relu(out)
        out = self.linear2(out)
        return out
    
model = NeuralNet(input_size,hidden_size,output_size)
if torch.cuda.device_count() > 1:
    device_indices = [0,1,2]
    model = nn.DataParallel(model, device_ids=device_indices)
model.to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.0001)

In [None]:
# Training loop
steps_in_epoch = len(train_loader)
since = time.time()
for epoch in range(num_epochs):
    for i, (counts, labels) in enumerate(train_loader):
        counts = counts.to(device)
        labels = labels.to(device)

        # forward
        outputs = model(counts)
        loss = criterion(outputs, labels)

        # backwards
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1)%20 == 0:
            time_elapsed = time.time()-since
            print(f'Epoch: {epoch+1} Step: {i+1}/{steps_in_epoch} Loss: {loss.item():.3f} Time: {time_elapsed//60}m {time_elapsed%60:.1f}s')

In [None]:
# Test
with torch.no_grad():
    num_correct = 0
    num_samples = 0
    for counts, labels in test_loader:
        counts = counts.to(device)
        labels = labels.to(device)
        outputs = model(counts)

        _, predictions = torch.max(outputs,1)
        num_correct += (predictions == labels).sum().item()
        num_samples += labels.shape[0]

    print(f'Acc: {num_correct/num_samples}')