## Multilayer Perceptron for Normalized GEMs

Author: Brad Selee

A PyTorch implementation of a feed-forward neural network to classify RNA expression matrices into cancerous or non-cancerous samples. This notebook reads in two files: the sample file which is the log transformed Gene Expression Matrix with quantile normalization.   

In [None]:
import matplotlib.pyplot as plt
import numpy as np 
import os
import pandas as pd
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_scoreimport sys
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data

### Define Classes

In [None]:
class Dataset(data.Dataset):
    'Characterizes a dataset for PyTorch'
    def __init__(self, data, input_num_classes, output_num_classes):
        """
        Initialization
        data - normalized sample data 2 indexes wide the first index is the sample data and
               the second index is the label (numpy array)
        input_num_classes - represents the normalization range to create one-hot labels. 
        output_num_classes - number of labels that are being classified 
        """
        self.input_num_classes  = input_num_classes
        self.output_num_classes = output_num_classes
        self.data_input = data[0]
        self.data_output = data[1]
        assert len(self.data_input) == len(self.data_output)
    
    def __len__(self):
        'Denotes the total number of samples'
        return len(self.data_input)
    
    def __getitem__(self, index):
        'Generates one sample of data'
        # Select sample
        '''
        return (torch.nn.functional.one_hot(torch.tensor(self.data_input[index]),
                                            num_classes=self.input_num_classes),
                torch.nn.functional.one_hot(torch.tensor(self.data_output[index]),
                                            num_classes=self.output_num_classes),)
        '''
        '''
        return (torch.nn.functional.one_hot(torch.tensor(self.data_input[index]),
                                            num_classes=self.input_num_classes),
                torch.tensor(self.data_output[index]))
        '''
        return (torch.tensor(self.data_input[index]), torch.tensor(self.data_output[index]))

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

In [None]:
# TODO: gain a better understanding of dropout layers
class Net(nn.Module):
    def __init__(self,  input_seq_length, input_num_classes, output_num_classes):
        super(Net, self).__init__()
        self.input_seq_length = input_seq_length
        self.input_num_classes = input_num_classes
        self.output_num_classes = output_num_classes

        self.fc1 = nn.Linear(1*self.input_seq_length, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)        
        self.fc4 = nn.Linear(256, output_num_classes)
        self.dropout = nn.Dropout(p=0.5, inplace=False)

    def forward(self, x):
        x = x.view(x.shape[0], 1*self.input_seq_length* 1)
        x = F.relu(self.fc1(x))
        #x = self.dropout(x)
        x = F.relu(self.fc2(x))
        #x = self.dropout(x)
        x = F.relu(self.fc3(x))
        #x = self.dropout(x)
        x = self.fc4(x)
        # Note:
        #   Softmax activation for output layer is not used because the nn.CrossEntropyLoss
        #   automatically applies it, so we just send it the raw output. The most likely
        #   class will be the index with the highest value. If probability is needed, the
        #   softmax function can be called when calculating accuracy, this is shown in the
        #   multi_acc function. Ultimately, the softmax as thelast activation function won't 
        #   change the classification result.
        return x

### Define Functions

In [None]:
def labels_and_weights(label_file_df):
    """ 
     Get list of unique sample labels and weights of the samples using
     the inverse of the count. Weights is a tensor to be compatible with
     CrossEntropyLoss.
    """
        
    labels_all = label_file_df.iloc[:,-1].astype(str).values.tolist()
    labels_unique = set(labels_all)
    labels = sorted(labels_unique)
    
    labels_count = [labels_all.count(label) for label in labels]
    weights = 1. / torch.tensor(labels_count, dtype=torch.float) 
    
    return labels, weights


In [None]:
def split_data(matrix_transposed_df, label_file_df, num_classes=4):
    """ merge sample and label file """
    
    # Create dictionary of labels - key:labels, value:indices
    labels = label_file_df.iloc[:,-1].astype(str).values.tolist()
    labels_unique = set(labels)
    labels = sorted(labels_unique)
    labels_dict = {k:v for v, k in enumerate(labels)}
    
    merged_df = pd.merge(matrix_transposed_df, labels_df, left_index=True, right_on='sample')
    del merged_df['sample']
    merged_df['label'].replace(labels_dict, inplace=True)
    
    X = merged_df.iloc[:, 0:-1]
    y = merged_df.iloc[:, -1]
    
    # stratify=y weights the train and test labels properly
    X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3)
    
    return (X_train, y_train), (X_test, y_test)
    

In [None]:
def plot(train_stats, test_stats, y_target_list, y_pred_list, labels,
         graphs_title="Training vs Validation", cm_title="Confusion Matrix"):
    """Plot training/validation accuracy/loss and Confusion Matrix"""

    # Set up dimensions for plots
    dimensions = (7,12)
    fig, axis = plt.subplots(figsize=dimensions)

    # Plot CM
    confusion_matrix_df = pd.DataFrame(confusion_matrix(y_target_list, y_pred_list))
    sns_heatmap=sns.heatmap(confusion_matrix_df, ax=axis, annot=True, cbar=False,
                                square=True, xticklabels=labels, yticklabels=labels)
    axis.set_title(cm_title)
    axis.set_ylabel("Actual")
    axis.set_xlabel("Predicted")
    
    # Plot Accuracy
    figure = plt.figure()
    figure.set_figheight(12)
    figure.set_figwidth(7)
    plot1 = figure.add_subplot(211)
    plot1.plot(train_stats['accuracy'])
    plot1.plot(test_stats['accuracy'])
    plot1.set_title(graphs_title)
    plot1.set_ylabel("Accuracy")
    plot1.set_xlabel("Epoch")
    plt.legend(["Training", "Testing"], loc="upper left")

    # Plot Loss 
    plot2 = figure.add_subplot(212)
    plot2.plot(train_stats['loss'])
    plot2.plot(test_stats['loss'])
    plot2.set_title(graphs_title)
    plot2.set_ylabel("Loss")
    plot2.set_xlabel("Epoch")
    plot2.legend(["Training", "Testing"], loc="upper left")

    # Save plots into pdf
    #plt.savefig(os.path.join(OUTPUT_DIR, 'stats.pdf'))
    #sns_heatmap.figure.savefig(os.path.join(OUTPUT_DIR, "confusion_matrix.pdf"))

In [None]:
def multi_accuracy(actual_labels, predicted_labels):
    """Computes the accuracy for multiclass predictions"""
    pred_labels_softmax = torch.softmax(predicted_labels, dim=1)
    _, pred_labels_tags = torch.max(pred_labels_softmax, dim=1)

    correct = (pred_labels_tags == actual_labels).float()
    return correct.sum()

## Main Workflow

In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(use_cuda)

train_kwargs = {'batch_size': 16}
test_kwargs = {'batch_size': 16}
if use_cuda:
    cuda_kwargs = {'num_workers': 1,
                   'pin_memory': True,
                   'shuffle': True}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)

### Initialization

In [None]:
# Define file paths
SAMPLE_FILE = "lung.emx.txt"
LABEL_FILE = "sample_condition.txt"

In [None]:
## Load Data
matrix_df = pd.read_csv(SAMPLE_FILE, sep='\t')
matrix_df.head()

In [None]:
column_names = ("sample", "label")
labels_df = pd.read_csv(LABEL_FILE, names=column_names, delim_whitespace=True, header=None)
labels, class_weights = labels_and_weights(labels_df)
print(len(labels_df))
print(len(matrix_df.columns))
assert len(labels_df) == len(matrix_df.columns) 
print(len(labels_df))
print(len(matrix_df.columns))
print(labels)
print(class_weights)

In [None]:
## Define parameters
batch_size = 16
max_epoch = 50
learning_rate = 0.001 #5e-4
num_features = len(matrix_df.index)
num_classes = len(labels)

## Create model structure
model = Net(input_seq_length=num_features,
          input_num_classes=10,
          output_num_classes=num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, verbose=True, patience=50)
loss_fn = torch.nn.CrossEntropyLoss()#(weight=class_weights)

### Data Preprocessing and Visualizaiton

In [None]:
# Visualize missing data ~16k rows ~1400 columns
# yellow => missing data
plt.figure(figsize=(10, 7.5))
sns.heatmap(matrix_df.isnull(), xticklabels=False, yticklabels=False, cmap="viridis")

In [None]:
# Replace nan values with the global minimum of the data set, not independent of rows
# Then plot the density curve
val_min, val_max = np.nanmin(matrix_df), np.nanmax(matrix_df)
matrix_df.fillna(val_min, inplace=True)
i = 0
dimensions = (11.7, 8.27)
fig, ax = plt.subplots(figsize=dimensions)
for column in matrix_df:
    i = i+1
    sns.distplot(matrix_df[column], hist = False, ax=ax)
    
plt.title("Sample Distributions")
plt.xlabel("Expression Level")
plt.ylabel("Density")
plt.savefig("density.pdf")
print(i)
print(matrix_df.shape)

In [None]:
# Transposing matrix to align with label file
matrix_transposed_df = matrix_df.T
matrix_transposed_df.head()

In [None]:
train_data, test_data = split_data(matrix_transposed_df, labels_df, num_classes)

In [None]:
train_data[0].head()

In [None]:
train_data[1].head()

In [None]:
# Convert tuple of df's to tuple of np's
# Allows the dataset class to access w/ data[][] instead of data[].iloc[]
train_data_np = (train_data[0].values, train_data[1].values)
test_data_np = (test_data[0].values, test_data[1].values)

train_dataset = Dataset(train_data_np,input_num_classes=10, output_num_classes=num_classes)
test_dataset = Dataset(test_data_np, input_num_classes=10,output_num_classes=num_classes)
train_generator = data.DataLoader(train_dataset, **train_kwargs, drop_last=False)
test_generator = data.DataLoader(test_dataset, **test_kwargs, drop_last=False)

### Train and test the network

In [None]:
# Meter object to keep track of loss
loss_avgmeter = AverageMeter()

# Dataframes to keep track of statistics
summary_file = pd.DataFrame([], columns=['Epoch', 'Training Loss', 'Accuracy', 'Accurate Count', 'Total Items'])
train_stats = pd.DataFrame([], columns=['accuracy', 'loss'])
test_stats = pd.DataFrame([], columns=['accuracy', 'loss'])

for epoch in range(max_epoch):
    model.train()
    total_items = 0
    acc = 0.0

    # Training 
    for data, target in train_generator:
        data = data.unsqueeze(1).float()
        data, target = data.to(device), target.to(device)
        total_items += target.shape[0] 
        # Zero out the gradients
        optimizer.zero_grad()
        # Predict in-sample labels and get training accuracy
        prediction = model(data)
        acc += multi_accuracy(target, prediction)
        loss = loss_fn(prediction, target.long())
        # Compute gradients and update weights
        loss.backward()
        optimizer.step()

    # Calculate loss per epoch
    loss_avgmeter.update(loss.item(), batch_size)
    acc_avg = acc/total_items
    temp_stats = pd.DataFrame([[acc_avg, loss_avgmeter.avg]], columns=['accuracy', 'loss'])
    train_stats = train_stats.append(temp_stats, ignore_index=True)

    model.eval()
    total_items = 0
    correct = 0
    acc = 0.0
    acc_avg = 0.0
    loss_avgmeter.reset()
    
    # Skip gradient calculation to improve speed
    
    with torch.no_grad():
        # Testing
        for data, target in test_generator:
            data = data.unsqueeze(1).float()
            data, target = data.to(device), target.to(device)
            total_items += target.shape[0]
            # Predict out-sample labels (samples network hasn't seen) and get validation accuracy
            prediction = model(data)
            acc += multi_accuracy(target, prediction)
            loss = loss_fn(prediction, target.long())
            loss_avgmeter.update(loss.item(), batch_size)
    
    acc_avg = acc/total_items
    temp_stats = pd.DataFrame([[acc_avg, loss_avgmeter.avg]], columns=['accuracy', 'loss'])
    test_stats = test_stats.append(temp_stats, ignore_index=True)

    run_file = pd.DataFrame([['%d' %epoch, '%2.5f' %train_stats.iloc[epoch]['loss'], '%2.3f' %acc_avg, '%d' % acc, '%d' % total_items]], columns=['Epoch', 'Training Loss', 'Accuracy', 'Accurate Count', 'Total Items'])
    print('Epoch: %d Training Loss: %2.5f Test Accuracy : %2.3f Accurate Count: %d Total Items :%d '% (epoch, train_stats.iloc[epoch]['loss'], acc_avg, acc, total_items))
    scheduler.step(acc)
    loss_avgmeter.reset()
    

In [None]:
# List to store predictions and actual labels for confusion matrix
y_pred_list = []
y_target_list = []

# Run test set to get confusion matrix values
with torch.no_grad():
    for data, target in test_generator:
        data = data.unsqueeze(1).float()
        data, target = data.to(device), target.to(device)
        total_items += target.shape[0]
        prediction = model(data)
        prediction_softmax = torch.softmax(prediction, dim=1)
        _, prediction_tags = torch.max(prediction_softmax, dim=1)
        y_pred_list.append(prediction_tags.to('cpu'))
        y_target_list.append(target.to('cpu'))

y_pred_list = [j for val in y_pred_list for j in val]
y_target_list = [j for val in y_target_list for j in val]

In [None]:
plot(train_stats, test_stats, y_target_list, y_pred_list, labels,
            graphs_title="lung.emx Plots", cm_title="lung.emx Confusion")

In [None]:
print("f1 score: %0.2f" % (f1_score(y_target_list, y_pred_list, average="weighted")))