[![open in colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1DSSHN_lRd0A_tPBwYBi6zlOd_9N1DBJ3#scrollTo=dpz7yKFTYXPZ)

## HW Requirement

• Make the code of tuning_1 module.\
• Make the code of tuning_2 module.\
• Make the code of tuning_3 module.\
• Make the code of tuning_4 module.

## Model

In [101]:
import torch
from torch import nn, optim, Generator
from torch.utils.data import DataLoader, Dataset, random_split
from numpy.random import choice
from typing import Iterable, Callable, Type, Optional, Union, Tuple, List


In [2]:
from operator import mul

def product(nums: Iterable[Type], func: Callable[[Type, Type], Type] = mul):
    """
    return product of iterable multiplicable
    """
    def _product(nums):
        nonlocal func
        if len(nums) == 1:
            return nums[0]
        return func(nums[-1], _product(nums[:-1]))
    try:
        return _product(nums)
    except Exception as e:
        raise e

In [3]:
from collections import deque


class TwoLayerNetwork(nn.Module):
    storage: deque[nn.Module]

    def __init__(self, input_size: int, hidden_size: int, num_classes: int, init_method: Callable, active_func: Callable[[], nn.modules.module.Module],
                 DO: float, if_BN: bool, store_size: int = 1):
        super(TwoLayerNetwork, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.if_BN = if_BN
        # dropout
        self.do = nn.Dropout(DO)
        # first layer
        self.fc1 = nn.Linear(input_size, hidden_size)
        # batch norm
        self.bn1 = nn.BatchNorm1d(hidden_size)
        # activation
        self.active_func = active_func()
        # second layer
        self.fc2 = nn.Linear(hidden_size, num_classes)
        # initialize
        for param in self.parameters():
            init_method(param)
        self.storage = deque(maxlen=store_size)

    def forward(self, x: Iterable[Union[torch.Tensor, float]]) -> torch.Tensor:
        out = self.do(x)
        out = self.fc1(out)
        if self.if_BN:
            out = self.bn1(out)
        out = self.active_func(out)
        out = self.fc2(out)
        return out


In [4]:
class WD_Regularization(nn.Module):
    def __init__(self):
        super(WD_Regularization, self).__init__()


class L2_Regularization(WD_Regularization):
    def __init__(self, weight_decay: float):
        super(L2_Regularization, self).__init__()
        if weight_decay <= 0:
            raise ValueError("param weight_decay can not <=0!!")
        self.weight_decay = weight_decay

    def forward(self, model: nn.Module) -> Union[torch.Tensor, float]:
        reg = 0
        for name, parameter in model.named_parameters():
            if "weight" in name:
                reg += torch.sum(parameter**2)
        return self.weight_decay * reg


In [5]:
def validate(model: nn.Module, device: str, valloader: DataLoader[Dataset[torch.Tensor]], criterion: nn.modules.loss._Loss) \
        -> Tuple[float, float]:
    """return loss, accuracy"""
    # Validate the model
    model.to(device)
    val_loss = 0.0
    val_correct = 0
    model.eval()
    with torch.no_grad():
        for X, y in valloader:
            X = X.view(-1, model.input_size).to(device)
            y = y.to(device)
            outputs = model(X)
            loss = criterion(outputs, y)
            val_loss += loss.item() * X.size(0)
            _, predicted = torch.max(outputs.data, 1)
            val_correct += (predicted == y).sum().item()
        val_loss /= len(valloader.dataset)
        val_accuracy = val_correct / len(valloader.dataset)
    return val_loss, val_accuracy


In [6]:
def train(model: TwoLayerNetwork, opt: Callable[..., optim.Optimizer], device: str, epochs: float, learning_rate: float, trainloader: DataLoader[Dataset[torch.Tensor]], valloader: DataLoader[Dataset[torch.Tensor]], criterion: nn.modules.loss._Loss,
          sched: Optional[Callable[[optim.Optimizer], optim.lr_scheduler._LRScheduler]], wd_reg: Optional[WD_Regularization], learning_goal: float, min_lr: float, if_lr_adjust: bool, if_BN: bool, drop_out: float) \
        -> List[Tuple[float, float, float, float]]:
    """
    Params:
        model
        opt
        device
        epochs
        learing_rate
        criterion
        y: label of data
        wd_reg, BN, DO: regularization
    Results:
        history: train_loss, train_accuracy, val_loss, val_accuracy of each epochs
    """
    def forward_backward(optimizer: optim.Optimizer, criterion: nn.modules.loss._Loss, wd_reg: Optional[WD_Regularization], model: TwoLayerNetwork, y: torch.Tensor,
                         BN: Optional[nn.modules.batchnorm._BatchNorm], DO: Optional[nn.modules.dropout._DropoutNd]) \
            -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Params:
            optimizer
            criterion
            model
            y: label of data
            wd_reg, BN, DO: regularization
        Results:
            ouputs: f(x)
            loss_all: f(x) - y
        """
        optimizer.zero_grad()
        outputs = model(X)
        outputs = outputs if not DO else DO(outputs)
        loss_all = criterion(
            outputs, y) + wd_reg(model) if wd_reg else criterion(outputs, y)
        loss_all.backward()
        optimizer.step()
        return loss_all, outputs
    if epochs < 1:
        raise ValueError("Invalid epoch!!")
    if drop_out >= 1 or drop_out < 0:
        raise ValueError("Invalid dropout rate!!")
    # init
    epoch = 0
    init_lr = learning_rate
    origin_if_BN = model.if_BN
    model.if_BN = if_BN
    pre_loss = float("inf") if if_lr_adjust else None
    BN = nn.BatchNorm1d(model.hidden_size).to(device) if if_BN else None
    DO = nn.Dropout(drop_out).to(device) if drop_out != 0. else None
    model.to(device)
    # if not model.storage[-1]
    model.storage.append(list(model.parameters()))
    optimizer = opt(model.storage[-1], lr=learning_rate)
    scheduler = sched(optimizer) if sched else None
    history = []
    # Train the model
    while epoch < epochs:
        # Train the model
        train_loss = 0.0
        train_correct = 0
        model.train()
        for X, y in trainloader:
            X = X.view(-1, model.input_size).to(device)
            y = y.to(device)
            loss_all, outputs = forward_backward(
                optimizer, criterion, wd_reg, model, y, BN, DO)
            if pre_loss:
                while pre_loss <= loss_all.item():
                    if learning_rate < min_lr:
                        # return history
                        learning_rate = init_lr
                        optimizer = opt(model.storage[-1], lr=learning_rate)
                        loss_all, outputs = forward_backward(
                            optimizer, criterion, wd_reg, model, y, BN, DO)
                        # raise ValueError(f"{learning_rate} < {min_lr}")
                        break
                    learning_rate *= 0.7
                    optimizer = opt(model.storage[-1], lr=learning_rate)
                    loss_all, outputs = forward_backward(
                        optimizer, criterion, wd_reg, model, y, BN, DO)
                learning_rate *= 1.2
                pre_loss = loss_all.item()
            train_loss += loss_all.item() * X.size(0)
            _, predicted = torch.max(outputs.data, 1)
            train_correct += (predicted == y).sum().item()
            model.storage.append(list(model.parameters()))
        train_loss /= len(trainloader.dataset)
        train_accuracy = train_correct / len(trainloader.dataset)
        # Validate the model
        val_loss, val_accuracy = validate(
            model=model, device=device, valloader=valloader, criterion=criterion)
        # Log Statics
        history.append((train_loss, train_accuracy, val_loss, val_accuracy))
        # Stopping criteria
        if learning_goal < val_accuracy:
            return history
        # Update loop
        if scheduler:
            scheduler.step()
        epoch += 1
    # restore model
    model.if_BN = origin_if_BN
    return history


In [7]:
def test(model: nn.Module, device: str, testloader: DataLoader[Dataset[torch.Tensor]]) -> float:
    """return accuracy"""
    return validate(model=model, device=device, valloader=testloader, criterion=nn.CrossEntropyLoss())[1]


In [81]:
def generate_reduced_model(model: TwoLayerNetwork, node_id: int):
    new_model = TwoLayerNetwork(model.input_size, model.hidden_size - 1, product(
        model.fc2.bias.size()), lambda _: None, lambda: model.active_func, model.do.p, model.if_BN)
    for name, param in model.named_parameters():
        layer_name, variable_type = name.split(".")
        if layer_name in ("fc1", "bn1"):
            # node size of specified hidden layer, node size of input layer
            setattr(getattr(new_model, layer_name), variable_type, nn.Parameter(torch.cat(
                (param[:node_id], param[node_id + 1:]), 0)))
        elif layer_name == "fc2" and variable_type == "weight":
            # node size of output layer, node size of specified hidden layer
            setattr(getattr(new_model, layer_name), variable_type, nn.Parameter(torch.cat(
                (param[:, :node_id], param[:, node_id + 1:]), 1)))
        else:
            pass
    return new_model


In [92]:
def tunning1(model: TwoLayerNetwork, device: str, valloader: DataLoader[Dataset[torch.Tensor]], criterion: nn.modules.loss._Loss,
            reg_method: Callable, learning_goal: float):
    test_index = 0
    cur = pre = model
    while test_index < pre.hidden_size:
        cur = generate_reduced_model(pre, test_index)
        loss, acc = validate(cur, device, valloader, criterion)
        if acc >= learning_goal:
            reg_method(cur)
            pre = cur
        else:
            test_index += 1
    return cur


In [99]:
def tunning3(model: TwoLayerNetwork, device: str, valloader: DataLoader[Dataset[torch.Tensor]], criterion: nn.modules.loss._Loss,
             reg_method: Callable, learning_goal: float, fail_tolarance: int = 3):
    cur = pre = model
    fail_count = 0
    # may choose same node but I don't care because it's hard to implement
    while fail_count < fail_tolarance and pre.hidden_size > 1:
        test_index = choice(1, pre.hidden_size)[0]
        cur = generate_reduced_model(pre, test_index)
        _, acc = validate(cur, device, valloader, criterion)
        if acc >= learning_goal:
            reg_method(cur)
            pre = cur
        else:
            fail_count += 1
    return cur


In [143]:
def tunning4(model: TwoLayerNetwork, device: str, valloader: DataLoader[Dataset[torch.Tensor]], criterion: nn.modules.loss._Loss,
             reg_method: Callable, learning_goal: float):
    cur = pre = model
    try_count = 0
    while pre.hidden_size > 1 and pre.hidden_size > try_count:
        test_index = sorted(enumerate(pre.get_parameter(
            "fc2.weight").abs().sum(0)), key=lambda x: x[1])[0 + try_count][0]
        cur = generate_reduced_model(pre, test_index)
        _, acc = validate(cur, device, valloader, criterion)
        if acc >= learning_goal:
            reg_method(cur)
            pre = cur
            try_count = 0
        else:
            try_count += 1
    return cur


# Dataset

### pytorch dataset

In [8]:
# load pytorch dataset
from torchvision import datasets, transforms


def getPytorchData(train: float = 0.8, remain: float = 0.1) \
    -> tuple[DataLoader[Dataset[torch.Tensor]], DataLoader[Dataset[torch.Tensor]], DataLoader[Dataset[torch.Tensor]], int, int]:
    """
    Params:
        train: train_amount / total_amount or 1 - valid_amount / total_amount
        remain: reduce data amount to save time
    Results:
        trainloader, valloader, testloader: dataloader
        datum_size: size of datum
        class_amount: amount of types
    """
    # preprocess: flatten, normalize, drop 90%, split
    transform = transforms.transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    if 0 >= train or train >= 1:
        raise ValueError()
    if 0 > remain or remain > 1:
        raise ValueError()
    # Split the training set into training and validation sets
    trainset = datasets.FashionMNIST(
        root="./data/", train=True, download=False, transform=transform)
    train_count = int(train * remain * len(trainset))
    valid_count = int((1 - train) * remain * len(trainset))
    if train_count * valid_count == 0:
        raise ValueError()
    datum_size = product(trainset[0][0].size())
    class_amount = len(trainset.classes)
    testset = datasets.FashionMNIST(
        root="./data/", train=False, download=False, transform=transform)
    print(train_count, valid_count, len(testset))
    trainset, valset, _ = random_split(
        trainset, (train_count, valid_count, len(trainset) - train_count - valid_count), Generator().manual_seed(42))
    # Create dataloaders to load the data in batches
    trainloader = DataLoader(trainset, batch_size=32, shuffle=True)
    valloader = DataLoader(valset, batch_size=32, shuffle=True)
    testloader = DataLoader(testset, batch_size=32, shuffle=True)
    return trainloader, valloader, testloader, datum_size, class_amount


# Network Tunning

### setting

In [88]:
device = "cuda" if torch.cuda.is_available(
) else "mps" if torch.backends.mps.is_available() else "cpu"
trainloader, valloader, testloader, input_size, output_size = getPytorchData()
learning_rate = 0.001
weight_decay = 0.001
criterion = nn.CrossEntropyLoss()
lg = 0.82

4800 1199 10000


In [94]:
model_path = r"./data/rg_eb_lg_ua_bn_do"
model = torch.load(model_path)
model

TwoLayerNetwork(
  (do): Dropout(p=0.2, inplace=False)
  (fc1): Linear(in_features=784, out_features=11, bias=True)
  (bn1): BatchNorm1d(11, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (active_func): ReLU()
  (fc2): Linear(in_features=11, out_features=10, bias=True)
)

### tunning 1

In [93]:
epochs = 50
optimize = optim.SGD
schedule = None
learning_goal = 1. #
min_lr = learning_rate * 1e-5
l2_reg = None
r_BN = lambda x: train(x, optimize, device, epochs, learning_rate,
                   trainloader, valloader, criterion, schedule, l2_reg, learning_goal, min_lr, False, True, 0.0)
model = torch.load(model_path)
model = tunning1(model, device, valloader, criterion, r_BN, lg)
model

TwoLayerNetwork(
  (do): Dropout(p=0.2, inplace=False)
  (fc1): Linear(in_features=784, out_features=10, bias=True)
  (bn1): BatchNorm1d(10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (active_func): ReLU()
  (fc2): Linear(in_features=10, out_features=10, bias=True)
)

### tunning 2

In [95]:
epochs = 50
optimize = optim.SGD
schedule = None
learning_goal = 1.
min_lr = learning_rate * 1e-5
l2_reg = None
r_BN = lambda x: train(x, optimize, device, epochs, learning_rate,
                       trainloader, valloader, criterion, schedule, l2_reg, learning_goal, min_lr, False, True, 0.0)
epochs = 300
optimize = optim.SGD
schedule = None
learning_goal = lg
min_lr = learning_rate * 1e-5
l2_reg = L2_Regularization(weight_decay)
rG_EB_LG_UA_BN = lambda x: train(x, optimize, device, epochs, learning_rate,
                                 trainloader, valloader, criterion, schedule, l2_reg, learning_goal, min_lr, True, True, 0.0)

model = torch.load(model_path)
model = tunning1(model, device, valloader, criterion,
                 lambda x: r_BN(rG_EB_LG_UA_BN(x)), lg)
model


TwoLayerNetwork(
  (do): Dropout(p=0.2, inplace=False)
  (fc1): Linear(in_features=784, out_features=10, bias=True)
  (bn1): BatchNorm1d(10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (active_func): ReLU()
  (fc2): Linear(in_features=10, out_features=10, bias=True)
)

### tunning 3

In [102]:
epochs = 50
optimize = optim.SGD
schedule = None
learning_goal = 1.
min_lr = learning_rate * 1e-5
l2_reg = None
r_BN = lambda x: train(x, optimize, device, epochs, learning_rate,
                       trainloader, valloader, criterion, schedule, l2_reg, learning_goal, min_lr, False, True, 0.0)
epochs = 300
optimize = optim.SGD
schedule = None
learning_goal = lg
min_lr = learning_rate * 1e-5
l2_reg = L2_Regularization(weight_decay)
rG_EB_LG_UA_BN = lambda x: train(x, optimize, device, epochs, learning_rate,
                                 trainloader, valloader, criterion, schedule, l2_reg, learning_goal, min_lr, True, True, 0.0)

model = torch.load(model_path)
model = tunning3(model, device, valloader, criterion,
                 lambda x: r_BN(rG_EB_LG_UA_BN(x)), lg)
model


TwoLayerNetwork(
  (do): Dropout(p=0.2, inplace=False)
  (fc1): Linear(in_features=784, out_features=10, bias=True)
  (bn1): BatchNorm1d(10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (active_func): ReLU()
  (fc2): Linear(in_features=10, out_features=10, bias=True)
)

### tunning 4

In [144]:
epochs = 50
optimize = optim.SGD
schedule = None
learning_goal = 1.
min_lr = learning_rate * 1e-5
l2_reg = None
r_BN = lambda x: train(x, optimize, device, epochs, learning_rate,
                       trainloader, valloader, criterion, schedule, l2_reg, learning_goal, min_lr, False, True, 0.0)
epochs = 300
optimize = optim.SGD
schedule = None
learning_goal = lg
min_lr = learning_rate * 1e-5
l2_reg = L2_Regularization(weight_decay)
rG_EB_LG_UA_BN = lambda x: train(x, optimize, device, epochs, learning_rate,
                                 trainloader, valloader, criterion, schedule, l2_reg, learning_goal, min_lr, True, True, 0.0)

model = torch.load(model_path)
model = tunning4(model, device, valloader, criterion,
                 lambda x: r_BN(rG_EB_LG_UA_BN(x)), lg)
model

TwoLayerNetwork(
  (do): Dropout(p=0.2, inplace=False)
  (fc1): Linear(in_features=784, out_features=10, bias=True)
  (bn1): BatchNorm1d(10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (active_func): ReLU()
  (fc2): Linear(in_features=10, out_features=10, bias=True)
)