<a href="https://colab.research.google.com/github/RexSword/1112-New-Learning-Algorithm/blob/main/hw3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HW3
## weight-tuning_LG

Model 6 \
hidden nodes: 11 \
epochs: 300 \
init: xavier \
active: relu \
optimize: sgd \
schedule: None \
weight decay: 0.0

## Define the functions

In [None]:
import torch.nn as nn

### Model

In [None]:
class TwoLayerNetwork(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, num_classes: int, init_method: Callable, active_func: nn.modules.module.Module) -> None:
        super(TwoLayerNetwork, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        # full connected first layer
        self.fc1 = nn.Linear(input_size, hidden_size)
        # activation
        self.active_func = active_func()
        # initialize
        for param in self.parameters():
            init_method(param)
        # full connected second layer
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.active_func(out)
        out = self.fc2(out)
        return out


### Training

In [None]:
from array import array

def train(model: TwoLayerNetwork, opt: nn.Module, device: str, epochs: int, learning_rate: float, trainloader: DataLoader, valloader: DataLoader, criterion: nn.modules.loss._Loss, sched: optim.lr_scheduler._LRScheduler, weight_decay: float, learning_goal: float):
    """
    Args:
        learning goal: the desire ratio of [validation loss / initiate validation loss] to early stop
    """
    if epochs < 1:
        raise ValueError("Invalid epoch!!")
    else:
        epochs = int(epochs)
    model.to(device)
    optimizer = opt(model.parameters(), lr=learning_rate,
                    weight_decay=weight_decay)
    scheduler = sched(optimizer) if sched else None
    history = tuple(array("d", [0] * epochs) for e in range(4))
    # Train the model
    for epoch in range(epochs):
        train_loss = 0.0
        train_correct = 0
        model.train()
        for X, y in trainloader:
            X = X.view(-1, model.input_size).to(device)
            y = y.to(device)
            optimizer.zero_grad()
            outputs = model(X)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * X.size(0)
            _, predicted = torch.max(outputs.data, 1)
            train_correct += (predicted == y).sum().item()
        train_loss /= len(trainloader.dataset)
        train_accuracy = train_correct / len(trainloader.dataset)

        # Validate the model
        val_loss = 0.0
        val_correct = 0
        model.eval()
        with torch.no_grad():
            for X, y in valloader:
                X = X.view(-1, model.input_size).to(device)
                y = y.to(device)
                outputs = model(X)
                loss = criterion(outputs, y)
                val_loss += loss.item() * X.size(0)
                _, predicted = torch.max(outputs.data, 1)
                val_correct += (predicted == y).sum().item()
            val_loss /= len(valloader.dataset)
            val_accuracy = val_correct / len(valloader.dataset)
        if scheduler:
            scheduler.step()
        # Print epoch statistics
        history[0][epoch] = train_loss
        history[1][epoch] = train_accuracy
        history[2][epoch] = val_loss
        history[3][epoch] = val_accuracy
        if learning_goal * history[2][0] > val_loss:
            return history
        # sys.stdout.write('Epoch [{}/{}], Train Loss: {:.4f}, Train Accuracy: {:.2f}%, Val Loss: {:.4f}, Val Accuracy: {:.2f}%\n'
        #       .format(epoch+1, epochs, train_loss, train_accuracy, val_loss, val_accuracy))
    return history

### Training_UA

In [None]:
from array import array

def train_UA(model: TwoLayerNetwork, opt: nn.Module, device: str, epochs: int, learning_rate: float, trainloader: DataLoader, valloader: DataLoader, criterion: nn.modules.loss._Loss, sched: optim.lr_scheduler._LRScheduler, weight_decay: float, learning_goal: float):
    """
    Args:
        learning goal: the desire ratio of [validation loss / initiate validation loss] to early stop
    """
    if epochs < 1:
        raise ValueError("Invalid epoch!!")
    else:
        epochs = int(epochs)
    model.to(device)
    optimizer = opt(model.parameters(), lr=learning_rate,
                    weight_decay=weight_decay)
    scheduler = sched(optimizer) if sched else None
    history = tuple(array("d", [0] * epochs) for e in range(4))

    prev_loss = None

    # Train the model
    for epoch in range(epochs):
        train_loss = 0.0
        train_correct = 0
        model.train()
        for X, y in trainloader:
            X = X.view(-1, model.input_size).to(device)
            y = y.to(device)
            optimizer.zero_grad()
            outputs = model(X)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * X.size(0)
            _, predicted = torch.max(outputs.data, 1)
            train_correct += (predicted == y).sum().item()
        train_loss /= len(trainloader.dataset)
        train_accuracy = train_correct / len(trainloader.dataset)

        # Validate the model
        val_loss = 0.0
        val_correct = 0
        model.eval()
        with torch.no_grad():
            for X, y in valloader:
                X = X.view(-1, model.input_size).to(device)
                y = y.to(device)
                outputs = model(X)
                loss = criterion(outputs, y)
                val_loss += loss.item() * X.size(0)
                _, predicted = torch.max(outputs.data, 1)
                val_correct += (predicted == y).sum().item()
            val_loss /= len(valloader.dataset)
            val_accuracy = val_correct / len(valloader.dataset)
        if scheduler:
            scheduler.step()

        # Adjust the learning rate based on the validation loss
        if  val_loss is None && val_loss < prev_loss:
            learning_rate *= 1.2
            prev_loss = val_loss
            print(f"Adjusting learning rate to {learning_rate:.6f}")
        else:
            learning_rate *= 0.7
            print(f"Adjusting learning rate to {learning_rate:.6f}")
         
        
        # Print epoch statistics
        history[0][epoch] = train_loss
        history[1][epoch] = train_accuracy
        history[2][epoch] = val_loss
        history[3][epoch] = val_accuracy
        if learning_goal * history[2][0] > val_loss:
            return history
        # sys.stdout.write('Epoch [{}/{}], Train Loss: {:.4f}, Train Accuracy: {:.2f}%, Val Loss: {:.4f}, Val Accuracy: {:.2f}%\n'
        #       .format(epoch+1, epochs, train_loss, train_accuracy, val_loss, val_accuracy))
    return history

### Testing

In [None]:
def test(model: nn.Module, device: str, testloader: DataLoader):
    val_correct = 0
    model.to(device)
    model.eval()
    with torch.no_grad():
        for X, y in testloader:
            X = X.view(-1, model.input_size).to(device)
            y = y.to(device)
            outputs = model(X)
            _, predicted = torch.max(outputs.data, 1)
            val_correct += (predicted == y).sum().item()
        val_accuracy = val_correct / len(testloader.dataset)
    return val_accuracy

## Dataset

In [None]:
# load pytorch dataset
from torchvision import datasets, transforms

def getPytorchData(train: float = 0.8, remain: float = 0.1):
    """
    Args:
        train: train_amount / total_amount or 1 - valid_amount / total_amount
        remain: reduce data amount to save time
    """
    # preprocess: flatten, normalize, drop 90%, split
    transform = transforms.transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    if 0 >= train or train >= 1:
        raise ValueError()
    if 0 > remain or remain > 1:
        raise ValueError()
    # Split the training set into training and validation sets
    trainset = datasets.FashionMNIST(
        root="./data/", train=True, download=False, transform=transform)
    train_count = int(train * remain * len(trainset))
    valid_count = int((1 - train) * remain * len(trainset))
    if train_count * valid_count == 0:
        raise ValueError()
    datum_size = product(trainset[0][0].size())
    class_amount = len(trainset.classes)
    testset = datasets.FashionMNIST(
        root="./data/", train=False, download=False, transform=transform)
    print(train_count, valid_count, len(testset))
    trainset, valset, _ = random_split(
        trainset, (train_count, valid_count, len(trainset) - train_count - valid_count), Generator().manual_seed(42))
    # Create dataloaders to load the data in batches
    trainloader = DataLoader(trainset, batch_size=32, shuffle=True)
    valloader = DataLoader(valset, batch_size=32, shuffle=True)
    testloader = DataLoader(testset, batch_size=32, shuffle=True)
    return trainloader, valloader, testloader, datum_size, class_amount

## Training process

In [None]:
device = "cuda" if torch.cuda.is_available(
) else "mps" if torch.backends.mps.is_available() else "cpu"

# model spec
trainloader, valloader, testloader, input_size, output_size = getPytorchData()
hidden_size = 11
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()

# set the hyperparameter
init = lambda x: nn.init.xavier_uniform_(tensor=x) if len(x.shape) > 1 else None
active = nn.ReLU
optimize = optim.SGD
schedule = None
weight_decay = 0.0


### weigth-tuning_LG

In [None]:
epochs = 500
learning_goal = 0.3

model = TwoLayerNetwork(input_size, hidden_size,
                        output_size, init, active)
LG_baseline = test(model, device, testloader)
LG_history = train(model, optimize, device, epochs, learning_rate,
                   trainloader, valloader, criterion, schedule, weight_decay, learning_goal)
LG_result = test(model, device, testloader)
print(LG_baseline, LG_history, LG_result, sep="\n")


### weight-tuning_LG_UA