# ResNet-like 1D Neural Network

This note book will be used to test resnet inspired neural network on each of the 3 ontologies.

In [1]:
import tensorflow as tf
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold
import yaml

2024-02-23 01:11:56.967040: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-02-23 01:12:07.755743: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /gpfs/space/software/cluster_software/spack/linux-centos7-x86_64/gcc-9.2.0/blast-plus-2.12.0-zt4jna4xynkry22vk62pu2bzcspytxm7/lib:/gpfs/software/soft/spack/linux-centos7-broadwell/gcc-9.2.0/cuda-10.1.243-fgipoyn2aa7f5eqpjut35wchklelutf6/lib64:/gpfs/software/soft/spack/linux-centos7-broadwell/gcc-9.2.0/cudnn-7.6.3.30-10.1-linux-x64-2in3eqmrnltkm2jvegqk3momjizihy5t/lib64:/gpfs/space/software/cluster_

In [2]:
print("TensorFlow v" + tf.__version__)
print("Numpy v" + np.__version__)

TensorFlow v2.11.0
Numpy v1.21.5


In [3]:
config_path = './config.yaml'
with open(config_path, 'r') as file:
    config = yaml.safe_load(file)

In [7]:
USE_75_PERCENT_DATA = config['use_75_percent_datasets']
partial_dataset_prefix = '75percent_' if USE_75_PERCENT_DATA else ''
BP_train_df = pd.read_pickle(f"{config['directories']['preprocessed_data']}/{partial_dataset_prefix}train_embeddings_BiologicalProcesses.pkl")
CC_train_df = pd.read_pickle(f"{config['directories']['preprocessed_data']}/{partial_dataset_prefix}train_embeddings_CellularComponent.pkl")
MF_train_df = pd.read_pickle(f"{config['directories']['preprocessed_data']}/{partial_dataset_prefix}train_embeddings_MolecularFunction.pkl")
BP_label_df = pd.read_pickle(f"{config['directories']['preprocessed_data']}/{partial_dataset_prefix}train_labels_BiologicalProcesses.pkl")
CC_label_df = pd.read_pickle(f"{config['directories']['preprocessed_data']}/{partial_dataset_prefix}train_labels_CellularComponent.pkl")
MF_label_df = pd.read_pickle(f"{config['directories']['preprocessed_data']}/{partial_dataset_prefix}train_labels_MolecularFunction.pkl")

In [6]:
train_data_dict = {
    'Biological Processes': [BP_train_df, BP_label_df],
    'Cellular Component': [CC_train_df, CC_label_df],
    'Molecular Function': [MF_train_df, MF_label_df]
}

In [7]:
num_labels = 1500
num_folds = config['num_folds']

In [8]:
BATCH_SIZE = config['batch_size']

In [9]:
from tensorflow.keras import layers, models

class ResNet1DBlock(models.Model):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResNet1DBlock, self).__init__()
        self.conv1 = layers.Conv1D(out_channels, kernel_size=3, strides=stride, padding='same', use_bias=False)
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.ReLU()
        self.conv2 = layers.Conv1D(out_channels, kernel_size=3, padding='same', use_bias=False)
        self.bn2 = layers.BatchNormalization()
        self.downsample = downsample

    def call(self, inputs, training=False):
        identity = inputs

        out = self.conv1(inputs)
        out = self.bn1(out, training=training)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out, training=training)

        if self.downsample is not None:
            identity = self.downsample(inputs)

        out += identity
        out = self.relu(out)

        return out

class ResNet1D(models.Model):
    def __init__(self, input_channels=1, num_classes=1500):
        super(ResNet1D, self).__init__()
        
        self.initial_conv = layers.Conv1D(64, kernel_size=7, strides=2, padding='same', use_bias=False)
        self.initial_bn = layers.BatchNormalization()
        self.relu = layers.ReLU()
        self.maxpool = layers.MaxPool1D(pool_size=3, strides=2, padding='same')

        self.layer1 = self._make_layer(64, 64, blocks=2)
        self.layer2 = self._make_layer(64, 128, blocks=2, stride=2)
        self.layer3 = self._make_layer(128, 256, blocks=2, stride=2)
        self.layer4 = self._make_layer(256, 512, blocks=2, stride=2)

        self.avgpool = layers.GlobalAveragePooling1D()
        self.fc = layers.Dense(num_classes)

    def _make_layer(self, in_channels, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or in_channels != out_channels:
            downsample = models.Sequential([
                layers.Conv1D(out_channels, kernel_size=1, strides=stride, use_bias=False),
                layers.BatchNormalization(),
            ])

        layers_list = []
        layers_list.append(ResNet1DBlock(in_channels, out_channels, stride, downsample))
        for _ in range(1, blocks):
            layers_list.append(ResNet1DBlock(out_channels, out_channels))

        return models.Sequential(layers_list)

    def call(self, inputs, training=False):
        inputs = tf.expand_dims(inputs, axis=-1)
        
        x = self.initial_conv(inputs)
        x = self.initial_bn(x, training=training)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x, training=training)
        x = self.layer2(x, training=training)
        x = self.layer3(x, training=training)
        x = self.layer4(x, training=training)

        x = self.avgpool(x)
        x = self.fc(x)

        return x


In [10]:
def train_model(get_model, dataset_name, model_name, data, num_folds, BATCH_SIZE):
    train, label = data

    best_f1 = 0
    print('=======================================================================')
    print(f'Training for {dataset_name}')
    
    model_root_path = f'{config["directories"]["models"]}/{model_name}'

    kfold = KFold(n_splits=num_folds, shuffle=True)
    fold_no = 1

    for train_fold, test_fold in kfold.split(train, label):
        print(f'Training for fold {fold_no} ...')
        
        label_frequencies = label.iloc[train_fold].mean(axis=0)

        weights = 1 / (label_frequencies + 1e-8)
        weights = weights / weights.sum() 
        
        def weighted_softmax_cross_entropy_with_logits(labels, logits):
            raw_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
            
            weights_reshaped = tf.reshape(weights, [1, -1])
    
            weighted_loss = raw_loss * weights_reshaped
    
            return tf.reduce_mean(weighted_loss)

        model = get_model()
        model.compile(
          optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
          loss=weighted_softmax_cross_entropy_with_logits,
          metrics=['binary_accuracy',
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
                ]
          )

        history = model.fit(
            train.iloc[train_fold], label.iloc[train_fold],
            validation_data=(train.iloc[test_fold], label.iloc[test_fold]),
            batch_size=BATCH_SIZE,
            epochs=5
        )

        scores = model.evaluate(train.iloc[test_fold], label.iloc[test_fold], verbose=0)
        precision = scores[3]
        recall = scores[4]
        F1_score = 2 * precision * recall / (precision + recall)
        print(f'Score for fold {fold_no}: F1 score of {F1_score}; {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')

        if F1_score > best_f1:
            best_f1 = F1_score
            
            if dataset_name == 'Biological Processes':
              tf.keras.models.save_model(
                  model,
                  f'{model_root_path}/best_BP_model',
              )
              print(f'Current best model for Biological Processes has an F1 score of {F1_score}')

            elif dataset_name == 'Molecular Function':
              tf.keras.models.save_model(
                  model,
                  f'{model_root_path}/best_MF_model',
              )
              print(f'Current best model for Molecular Function has an F1 score of {F1_score}')
            else:
              tf.keras.models.save_model(
                  model,
                  f'{model_root_path}/best_CC_model',
              )
              print(f'Current best model for Cellular Component has an F1 score of {F1_score}')

        fold_no += 1

In [11]:
def train_on_datasets(get_model, model_name):
    for dataset in train_data_dict:
        dataset_name = dataset
        data = train_data_dict[dataset]
        train_model(get_model, dataset_name, model_name, data, 10, BATCH_SIZE)

In [None]:
def get_model_1():
    model = ResNet1D()
    
    return model

train_on_datasets(get_model_1, 'ResNet1D_1')