# Activation

In [69]:
# import torch.nn as nn
# import time

# def benchmark_activation(activation_function):
#     start_time = time.time()
#     # nn.Sequential(nn.Linear(10, 10), nn.Activation(activation_function)).train()
#     nn.Sequential(nn.Linear(10, 10), activation_function).train()
#     end_time = time.time()
#     return end_time - start_time

# activation_functions = [nn.ReLU(), nn.LeakyReLU(), nn.SiLU()]
# training_times = [benchmark_activation(activation_function)*1000 for activation_function in activation_functions]

# print(f"Training times (in mili seconds): {training_times}")

In [1]:
import torch.nn as nn
import torch
import time

def benchmark_activation(activation_function, input_size):
    model = nn.Sequential(nn.Linear(input_size, 10), activation_function)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())

    # Generate random tensor data
    batch_size = 32
    x_train = torch.randn(batch_size, input_size)
    y_train = torch.randn(batch_size, 10)

    start_time = time.time()

    # Train the model for a few iterations
    for _ in range(100):
        optimizer.zero_grad()
        outputs = model(x_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

    end_time = time.time()
    return (end_time - start_time) * 1000  # Convert to milliseconds

input_size = 10
# activation_functions = [nn.ReLU(), nn.LeakyReLU(), nn.SiLU()]
# training_times = [benchmark_activation(activation_function, input_size) for activation_function in activation_functions]

# print(f"Training times (in milliseconds): {training_times}")

activation_function = nn.ReLU()
training_times = benchmark_activation(activation_function, input_size)
print(f"Training times of ReLU Activation (in milliseconds): {training_times}")
print()

activation_function = nn.LeakyReLU()
training_times = benchmark_activation(activation_function, input_size)
print(f"Training times of LeakyReLU Activation (in milliseconds): {training_times}")
print()

activation_function = nn.SiLU()
training_times = benchmark_activation(activation_function, input_size)
print(f"Training times of SiLU Activation (in milliseconds): {training_times}")
print()



  from .autonotebook import tqdm as notebook_tqdm


Training times of ReLU Activation (in milliseconds): 514.5785808563232

Training times of LeakyReLU Activation (in milliseconds): 42.48785972595215

Training times of SiLU Activation (in milliseconds): 38.384437561035156



In [71]:
# import torch
# import torch.nn as nn
# import torch.optim as optim
# import time

# # Define your neural network architecture
# class Model(nn.Module):
#     def __init__(self, activation):
#         super(Model, self).__init__()
#         self.fc1 = nn.Linear(1000, 64)
#         self.fc2 = nn.Linear(64, 64)
#         self.fc3 = nn.Linear(64, 10)
#         self.activation = activation

#     def forward(self, x):
#         x = self.activation(self.fc1(x))
#         x = self.activation(self.fc2(x))
#         x = self.fc3(x)
#         return x

# # Generate random tensor data
# input_size = 1000
# batch_size = 32
# num_batches = 100
# x_train = torch.randn(batch_size * num_batches, input_size)
# y_train = torch.randint(5, 10, (batch_size * num_batches,))

# # Define the activation functions to benchmark
# # activation_functions = ['relu', 'sigmoid', 'tanh']
# activation_functions = [nn.ReLU(), nn.LeakyReLU(), nn.SiLU()]

# # Benchmark training time for each activation function
# for activation in activation_functions:
#     model = Model(activation)
#     criterion = nn.CrossEntropyLoss()
#     optimizer = optim.Adam(model.parameters())

#     start_time = time.time()

#     # Train the model
#     for epoch in range(5):
#         for batch in range(num_batches):
#             batch_start = batch * batch_size
#             batch_end = (batch + 1) * batch_size
#             inputs = x_train[batch_start:batch_end]
#             labels = y_train[batch_start:batch_end]

#             optimizer.zero_grad()
#             outputs = model(inputs)
#             loss = criterion(outputs, labels)
#             loss.backward()
#             optimizer.step()

#     end_time = time.time()
#     training_time = end_time - start_time

#     print(f"Activation: {activation} | Training Time: {training_time:.2f} seconds")


# Norm

In [2]:
import torch
import torch.nn as nn

class ActNorm(nn.Module):
    def __init__(self, num_features, logdet=False, affine=True,
                 allow_reverse_init=False):
        assert affine
        super().__init__()
        self.logdet = logdet
        self.loc = nn.Parameter(torch.zeros(1, num_features, 1, 1))
        self.scale = nn.Parameter(torch.ones(1, num_features, 1, 1))
        self.allow_reverse_init = allow_reverse_init

        self.register_buffer('initialized', torch.tensor(0, dtype=torch.uint8))

    def initialize(self, input):
        with torch.no_grad():
            flatten = input.permute(1, 0, 2, 3).contiguous().view(input.shape[1], -1)
            mean = (
                flatten.mean(1)
                .unsqueeze(1)
                .unsqueeze(2)
                .unsqueeze(3)
                .permute(1, 0, 2, 3)
            )
            std = (
                flatten.std(1)
                .unsqueeze(1)
                .unsqueeze(2)
                .unsqueeze(3)
                .permute(1, 0, 2, 3)
            )

            self.loc.data.copy_(-mean)
            self.scale.data.copy_(1 / (std + 1e-6))

    def forward(self, input, reverse=False):
        if reverse:
            return self.reverse(input)
        if len(input.shape) == 2:
            input = input[:,:,None,None]
            squeeze = True
        else:
            squeeze = False

        _, _, height, width = input.shape

        if self.training and self.initialized.item() == 0:
            self.initialize(input)
            self.initialized.fill_(1)

        h = self.scale * (input + self.loc)

        if squeeze:
            h = h.squeeze(-1).squeeze(-1)

        if self.logdet:
            log_abs = torch.log(torch.abs(self.scale))
            logdet = height*width*torch.sum(log_abs)
            logdet = logdet * torch.ones(input.shape[0]).to(input)
            return h, logdet

        return h

    def reverse(self, output):
        if self.training and self.initialized.item() == 0:
            if not self.allow_reverse_init:
                raise RuntimeError(
                    "Initializing ActNorm in reverse direction is "
                    "disabled by default. Use allow_reverse_init=True to enable."
                )
            else:
                self.initialize(output)
                self.initialized.fill_(1)

        if len(output.shape) == 2:
            output = output[:,:,None,None]
            squeeze = True
        else:
            squeeze = False

        h = output / self.scale - self.loc

        if squeeze:
            h = h.squeeze(-1).squeeze(-1)
        return h

In [82]:
import torch.nn as nn
import time
import numpy as np


def benchmark_activation(activation_function, input_size):
    model = nn.Sequential(nn.Linear(input_size, 10), activation_function)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())

    # Generate random tensor data
    batch_size = 32
    x_train = torch.randn(batch_size, input_size)
    y_train = torch.randn(batch_size, 10)

    start_time = time.time()

    # Train the model for a few iterations
    for _ in range(100):
        optimizer.zero_grad()
        outputs = model(x_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

    end_time = time.time()
    return (end_time - start_time) * 1000  # Convert to milliseconds

input_size = 10

activation_functions = [ActNorm(input_size)]*10
training_times = [benchmark_activation(activation_function, input_size) for activation_function in activation_functions]
print(f"Training times ActNorm (in mili seconds): {np.mean(training_times)}")

activation_functions = [nn.InstanceNorm1d(input_size)]*10
training_times = [benchmark_activation(activation_function, input_size) for activation_function in activation_functions]
print(f"Training times InstanceNorm2d (in mili seconds): {np.mean(training_times)}")

activation_functions = [nn.BatchNorm1d(input_size)]*10
training_times = [benchmark_activation(activation_function, input_size) for activation_function in activation_functions]
print(f"Training times BatchNorm2d (in mili seconds): {np.mean(training_times)}")

Training times ActNorm (in mili seconds): 72.14639186859131
Training times InstanceNorm2d (in mili seconds): 52.314162254333496
Training times BatchNorm2d (in mili seconds): 63.10415267944336


# Attention

In [5]:
def make_divisible(v, divisor=8, min_value=None, round_limit=.9):
    min_value = min_value or divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < round_limit * v:
        new_v += divisor
    return new_v

class SEModule(nn.Module):
    """ SE Module as defined in original SE-Nets with a few additions
    Additions include:
        * divisor can be specified to keep channels % div == 0 (default: 8)
        * reduction channels can be specified directly by arg (if rd_channels is set)
        * reduction channels can be specified by float rd_ratio (default: 1/16)
        * global max pooling can be added to the squeeze aggregation
        * customizable activation, normalization, and gate layer
    """
    def __init__(
            self, channels, rd_ratio=1. / 16, rd_channels=None, rd_divisor=8, add_maxpool=True,
            bias=True, act_layer=nn.ReLU, norm_layer=ActNorm, gate_layer='sigmoid'):
        super(SEModule, self).__init__()
        self.add_maxpool = add_maxpool
        if not rd_channels:
            rd_channels = make_divisible(channels * rd_ratio, rd_divisor, round_limit=0.)
        self.fc1 = nn.Conv2d(channels, rd_channels, kernel_size=1, bias=bias)
        self.bn = norm_layer(rd_channels) if norm_layer else nn.Identity()
        self.act = nn.SiLU()
        self.fc2 = nn.Conv2d(rd_channels, channels, kernel_size=1, bias=bias)
        self.gate = nn.Sigmoid()

    def forward(self, x):
        x_se = x.mean((2, 3), keepdim=True)
        if self.add_maxpool:
            # experimental codepath, may remove or change
            x_se = 0.5 * x_se + 0.5 * x.amax((2, 3), keepdim=True)
        x_se = self.fc1(x_se)
        x_se = self.act(self.bn(x_se))
        x_se = self.fc2(x_se)
        return x * self.gate(x_se)

class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()

        self.block = nn.Sequential(
            nn.Conv2d(in_features, in_features, 3, padding=1),
            nn.LeakyReLU(),
            nn.Conv2d(in_features, in_features, 3, padding=1),
        )

    def forward(self, x):
        return x + self.block(x)

In [9]:
import torch.nn as nn
import time
import numpy as np


def benchmark_residual_network(activation_function, input_size):
    model = nn.Sequential(activation_function, activation_function, activation_function)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())

    # Generate random tensor data
    batch_size = 32
    x_train = torch.randn(batch_size, input_size, input_size, input_size)
    # x_train = torch.randn(batch_size, input_size)
    y_train = torch.randn(batch_size, input_size, input_size, input_size)
    # y_train = torch.randn(batch_size, input_size)

    start_time = time.time()

    # Train the model for a few iterations
    for _ in range(100):
        optimizer.zero_grad()
        outputs = model(x_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

    end_time = time.time()
    return (end_time - start_time) * 1000  # Convert to milliseconds

input_size = 10

training_times = benchmark_residual_network(ResidualBlock(input_size), input_size)
print(f"Training times ResidualBlock (in mili seconds): {(training_times)}")
print()

training_times = benchmark_residual_network(SEModule(input_size), input_size)
print(f"Training times SEModule (in mili seconds): {(training_times)}")
print()


Training times ResidualBlock (in mili seconds): 82454.54573631287

Training times SEModule (in mili seconds): 17923.808336257935



# All components

In [6]:
import torch.nn as nn
import time
import numpy as np


def benchmark_all_component(CNN_block, activation_function, norm_layer, input_size):
    model = nn.Sequential( \
                        CNN_block,\
                        activation_function, \
                        norm_layer, \
                        CNN_block,\
                        activation_function, \
                        norm_layer
                            )
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())

    # Generate random tensor data
    batch_size = 32
    x_train = torch.randn(batch_size, input_size, input_size, input_size)
    # x_train = torch.randn(batch_size, input_size)
    y_train = torch.randn(batch_size, input_size, input_size, input_size)
    # y_train = torch.randn(batch_size, input_size)

    start_time = time.time()

    # Train the model for a few iterations
    for _ in range(100):
        optimizer.zero_grad()
        outputs = model(x_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

    end_time = time.time()
    return (end_time - start_time) * 1000  # Convert to milliseconds

input_size = 10

training_times = benchmark_all_component(ResidualBlock(input_size), \
                                            nn.LeakyReLU(), \
                                            nn.BatchNorm2d(input_size), \
                                            input_size)
print(f"Training times old components (in milliseconds): {(training_times)}")
print()

training_times = benchmark_all_component(SEModule(input_size), \
                                            nn.SiLU(), \
                                            ActNorm(input_size), \
                                            input_size)
print(f"Training times new components (in milliseconds): {(training_times)}")
print()


Training times old components (in mili seconds): 772.8569507598877

Training times new components (in mili seconds): 351.3188362121582



In [6]:
import argparse
from utils.options import dict2str, parse
from utils.dist_util import get_dist_info, init_dist
import random
from utils.misc import set_random_seed


def parse_options(is_train=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '-opt', type=str, default='test_speed_FiveK.yml', help='Path to option YAML file.')
    parser.add_argument(
        '--launcher',
        choices=['none', 'pytorch', 'slurm'],
        default='none',
        help='job launcher')
    parser.add_argument('--local_rank', type=int, default=0)
    args = parser.parse_args()
    opt = parse(args.opt, is_train=is_train)

    # distributed settings
    if args.launcher == 'none':
        opt['dist'] = False
        print('Disable distributed.', flush=True)
    else:
        opt['dist'] = True
        if args.launcher == 'slurm' and 'dist_params' in opt:
            init_dist(args.launcher, **opt['dist_params'])
        else:
            init_dist(args.launcher)

    opt['rank'], opt['world_size'] = get_dist_info()

    # random seed
    seed = opt.get('manual_seed')
    if seed is None:
        seed = random.randint(1, 10000)
        opt['manual_seed'] = seed
    set_random_seed(seed + opt['rank'])

    return opt

In [None]:
def main():
    # parse options, set distributed setting, set ramdom seed
    opt = parse_options(is_train=False)
    