## Dataset
For this part of the assignment, you will be working with the CIFAR100 dataset (already loaded above). This dataset consists of 60K 32x32 color images from 100 classes, with 600 images per class. There are 50K training images and 10K test images. The images in CIFAR100 are of size 3x32x32, i.e. 3-channel color images of 32x32 pixels.
## BaseNet
We created a BaseNet that you can run and get a baseline accuracy

## Goal
Your goal is to edit the BaseNet class or make new classes for devising **a effective（accuracy & Floats & Params） deep net architecture** through what you have learned in this course

## Submission
**Before due**，submitting your work to **aleeyanger@163.com**
Attention:
YOUR FILE SHOULDE BE LIKE THIS

```
  FINAL_(YOUR_TREAM_NUMBER).zip:
      --Report.pdf
      --code.zip
```
  EXAMPLE:
`  T1G1.zip`




##BASELINE MODEL

In [1]:
#Device check and load model into device
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [2]:
#install thop for count PARAMS and Flops
! pip install thop



In [2]:
import pandas as pd
import os
import torch
import time
import torchvision
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torchvision.datasets.utils import download_url
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.transforms as tt
from torch.utils.data import random_split
from torchvision.utils import make_grid
import torchvision.models as models
import matplotlib.pyplot as plt
from sklearn.metrics import *


##HYPER-PARAM
batch_size = 400
epochs = 120
max_lr = 0.001
grad_clip = 0.01
weight_decay =0.001
#weight_decay =5e-4*batch_size
opt_func = torch.optim.Adam

##DOWNLOAD dataset
train_data = torchvision.datasets.CIFAR100('./', train=True, download=True)
# Stick all the images together to form a 1600000 X 32 X 3 array
x = np.concatenate([np.asarray(train_data[i][0]) for i in range(len(train_data))])
# calculate the mean and std along the (0, 1) axes
mean = np.mean(x, axis=(0, 1))/255
std = np.std(x, axis=(0, 1))/255
# the the mean and std
mean=mean.tolist()
std=std.tolist()

##TRANSFORM
transform_train = tt.Compose([tt.RandomCrop(32, padding=4,padding_mode='reflect'), 
                         tt.RandomHorizontalFlip(), 
                         tt.ToTensor(), 
                         tt.Normalize(mean,std,inplace=True)])
transform_test = tt.Compose([tt.ToTensor(), tt.Normalize(mean,std)])
##DATASET and DATALOADER
trainset = torchvision.datasets.CIFAR100("./",
                                         train=True,
                                         download=True,
                                         transform=transform_train)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size, shuffle=True, num_workers=2,pin_memory=True)

testset = torchvision.datasets.CIFAR100("./",
                                        train=False,
                                        download=True,
                                        transform=transform_test)
testloader = torch.utils.data.DataLoader(
    testset, batch_size*2,pin_memory=True, num_workers=2)
#LOADER
device = get_default_device()
trainloader = DeviceDataLoader(trainloader, device)
testloader = DeviceDataLoader(testloader, device)


  from .autonotebook import tqdm as notebook_tqdm


Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


In [3]:
##TRAINING SETUP
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))


class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        #labels=smooth_one_hot(labels)
        out = self(images) 
        #print(out)                 # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        #labels=smooth_one_hot(labels)
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], last_lr: {:.5f}, train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch, result['lrs'][-1], result['train_loss'], result['val_loss'], result['val_acc']))


In [4]:
class BatchNorm(nn.BatchNorm2d):
    def __init__(self, num_features, eps=1e-05, momentum=0.1, weight=True, bias=True):
        super().__init__(num_features, eps=eps, momentum=momentum)
        self.weight.data.fill_(1.0)
        self.bias.data.fill_(0.0)
        self.weight.requires_grad = weight
        self.bias.requires_grad = bias


class GhostBatchNorm(BatchNorm):
    def __init__(self, num_features, num_splits, **kw):
        super().__init__(num_features, **kw)
        self.num_splits = num_splits
        self.register_buffer('running_mean', torch.zeros(num_features * self.num_splits))
        self.register_buffer('running_var', torch.ones(num_features * self.num_splits))

    def train(self, mode=True):
        if (self.training is True) and (mode is False):  # lazily collate stats when we are going to use them
            self.running_mean = torch.mean(self.running_mean.view(self.num_splits, self.num_features), dim=0).repeat(
                self.num_splits)
            self.running_var = torch.mean(self.running_var.view(self.num_splits, self.num_features), dim=0).repeat(
                self.num_splits)
        return super().train(mode)

    def forward(self, input):
        N, C, H, W = input.shape
        if self.training or not self.track_running_stats:
            return F.batch_norm(
                input.view(-1, C * self.num_splits, H, W), self.running_mean, self.running_var,
                self.weight.repeat(self.num_splits), self.bias.repeat(self.num_splits),
                True, self.momentum, self.eps).view(N, C, H, W)
        else:
            return F.batch_norm(
                input, self.running_mean[:self.num_features], self.running_var[:self.num_features],
                self.weight, self.bias, False, self.momentum, self.eps)


In [5]:
#NET
dropout_value=0.3
def conv_block(in_channels, out_channels, pool=False):
    alpha=0.3
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
              GhostBatchNorm(out_channels,num_splits=16),
              nn.Dropout(dropout_value),]
              #nn.MaxPool2d(2), 
              #nn.CELU(alpha),]
    if pool: layers.append(nn.MaxPool2d(2))
    layers.append(nn.CELU(alpha))
    return nn.Sequential(*layers)

class Ghost_ResNet9(ImageClassificationBase):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        self.conv1 = conv_block(in_channels, 64)
        self.conv2 = conv_block(64, 128, pool=True) 
        self.res1 = nn.Sequential(conv_block(128, 128), conv_block(128, 128)) 
        
        self.conv3 = conv_block(128, 256, pool=True)
        self.conv4 = conv_block(256, 512, pool=True) 
        self.res2 = nn.Sequential(conv_block(512, 512), conv_block(512, 512)) 
        self.conv5 = conv_block(512, 1028, pool=True) 
        self.res3 = nn.Sequential(conv_block(1028, 1028), conv_block(1028, 1028))  
        
        self.classifier = nn.Sequential(nn.MaxPool2d(2), # 1028 x 1 x 1
                                        nn.Flatten(), # 1028 
                                        nn.Linear(1028, num_classes)) # 1028 -> 100
        
    def forward(self, xb):
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.conv5(out)
        out = self.res3(out) + out
        out = self.classifier(out)
        return out

GNmodel = to_device(Ghost_ResNet9(3,100), device)


In [6]:
#Training Setup
@torch.no_grad()
def evaluate(model, test_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in test_loader]
    return model.validation_epoch_end(outputs)

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_one_cycle(epochs, max_lr, model, train_loader, test_loader, 
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):
    torch.cuda.empty_cache()
    history = []
    
    # Set up cutom optimizer with weight decay
    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # Set up one-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_loader))
    
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        lrs = []
        for batch in train_loader:
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            
            # Gradient clipping
            if grad_clip: 
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)
            
            optimizer.step()
            optimizer.zero_grad()
            
            # Record & update learning rate
            lrs.append(get_lr(optimizer))
            sched.step()
        
        # Validation phase
        result = evaluate(model, test_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['lrs'] = lrs
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [19]:
#epochs=40
#Training(Using Multi_LR)
history = [evaluate(model, testloader)] ## Initial evaluation
# Fitting the first 1/4 
current_time=time.time()
history += fit_one_cycle(int(epochs/4), max_lr, GNmodel, trainloader, testloader, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)
# Fitting the first 2/4 epochs
history += fit_one_cycle(int(epochs/4), max_lr/10, GNmodel, trainloader, testloader, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)
# Fitting the first 3/4 
history += fit_one_cycle(int(epochs/4), max_lr/100, GNmodel, trainloader, testloader, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)
# Fitting the first 4/4 epochs
history += fit_one_cycle(int(epochs/4), max_lr/100, GNmodel, trainloader, testloader, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)
# Print training time
time_train = time.time() - current_time
print('Training time: {:.2f} s'.format(time_train))

In [None]:
# Collect training time and result
current_time = time.time()
result = evaluate(GNmodel, testloader)
result
time_inference = time.time() - current_time
print('Inference time: {:.2f} s'.format(time_inference))


In [None]:
# Saving the model to h5 file
Path='./drive/My Drive/GNmodel1.pth'
Path1='./drive/My Drive/GNmodel1.h5'
torch.save(GNmodel.state_dict(), Path)
torch.save(GNmodel.state_dict(), Path1)


In [None]:
# Saving the model to h5 file
Path='./drive/My Drive/GNmodel1.pth'
Path1='./drive/My Drive/GNmodel1.h5'
torch.save(GNmodel.state_dict(), Path)
torch.save(GNmodel.state_dict(), Path1)
# Generate testing accuracy, predicted label, confusion matrix, and table for classification report
def test_label_predictions(model, device, test_loader):
    model.eval()
    actuals = []
    predictions = []
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            prediction = output.argmax(dim=1, keepdim=True)
            actuals.extend(target.view_as(prediction))
            predictions.extend(prediction)
    return [i.item() for i in actuals], [i.item() for i in predictions]

y_test, y_pred = test_label_predictions(GNmodel, device, testloader)
cm=confusion_matrix(y_test, y_pred)
cr=classification_report(y_test, y_pred)
fs=f1_score(y_test,y_pred,average='weighted')
rs=recall_score(y_test, y_pred,average='weighted')
accuracy=accuracy_score(y_test, y_pred)
print('Confusion matrix:')
print(cm)
print(cr)
print('F1 score: %f' % fs)
print('Recall score: %f' % rs)
print('Accuracy score: %f' % accuracy)

In [None]:
#Train Time
print('Training time: {:.2f} s'.format(time_train))

In [None]:
#Inference Time (Test Time)
print('Inference time: {:.2f} s'.format(time_inference))

In [7]:
#Paramater Size and FLOPS
from thop import profile
 
input = torch.randn(1,3,32,32)
input = input.to(device)
flops, params = profile(GNmodel, inputs=(input,))
print(flops)
print(params)


[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv2d'>.
[91m[WARN] Cannot find rule for <class '__main__.GhostBatchNorm'>. Treat it as zero Macs and zero Params.[00m
[INFO] Register zero_ops() for <class 'torch.nn.modules.dropout.Dropout'>.
[91m[WARN] Cannot find rule for <class 'torch.nn.modules.activation.CELU'>. Treat it as zero Macs and zero Params.[00m
[91m[WARN] Cannot find rule for <class 'torch.nn.modules.container.Sequential'>. Treat it as zero Macs and zero Params.[00m
[INFO] Register zero_ops() for <class 'torch.nn.modules.pooling.MaxPool2d'>.
[91m[WARN] Cannot find rule for <class 'torch.nn.modules.flatten.Flatten'>. Treat it as zero Macs and zero Params.[00m
[INFO] Register count_linear() for <class 'torch.nn.modules.linear.Linear'>.
[91m[WARN] Cannot find rule for <class '__main__.Ghost_ResNet9'>. Treat it as zero Macs and zero Params.[00m
531641968.0
30430880.0


In [None]:
#Accuaray
print('Accuracy score: %f' % accuracy)

## **(Optional)Some Results you can use in your Report**

In [None]:
# Plot classification report and save to pdf function
def plot_classification(precision, recall, f1_score):
    plt.rcParams['font.size'] = 12
    plt.rc('axes', linewidth=1.75)
    marker_size = 8
    figsize = 6
    plt.figure(figsize=(1.4 * figsize, figsize))
    plt.subplot(3, 1, 1)
    plt.plot(precision, 'o', markersize=marker_size)
    plt.ylabel('Precision', fontsize=14)
    plt.xticks([])
    plt.subplot(3, 1, 2)
    plt.plot(recall, 'o', markersize=marker_size)
    plt.ylabel('Recall', fontsize=14)
    plt.xticks([])
    plt.subplot(3, 1, 3)
    plt.plot(f1_score, 'o', markersize=marker_size)
    plt.ylabel('F1-score', fontsize=14)
    plt.xlabel('Class', fontsize=14)
    plt.subplots_adjust(hspace=0.001)
    plt.tight_layout()
    plt.savefig("result.pdf")
# Plot classification report and save to pdf
def plot_confusion_matrix(cm):
    plt.figure()
    plt.imshow(cm, interpolation='nearest', cmap=plt.get_cmap('Blues'))
    plt.colorbar()
    plt.ylabel('True label', fontsize=14)
    plt.xlabel('Predicted label', fontsize=14)
    plt.tight_layout()
    plt.savefig("confusion_matrix.pdf")
    plt.show()

In [None]:
# Plot and save confusion matrix
precision, recall, f1,_= precision_recall_fscore_support(y_test, y_pred)
print(recall)
plot_classification(precision, recall, f1)

# Plot confusion matrix
plot_confusion_matrix(cm)

In [None]:
model=GNmodel
from torch.quantization import get_default_qconfig
import copy
from torch.quantization.quantize_fx import prepare_fx, convert_fx
from torch.ao.quantization.fx.graph_module import ObservedGraphModule
def quant_fx(model):
    """
    使用Pytorch中的FX模式对模型进行量化
    """
    model.eval()
    qconfig = get_default_qconfig("fbgemm")  # 默认是静态量化
    qconfig_dict = {
        "": qconfig,
        # 'object_type': []
    }
    model_to_quantize = copy.deepcopy(model)
    prepared_model = prepare_fx(model_to_quantize, qconfig_dict)
    print("prepared model: ", prepared_model)

    quantized_model = convert_fx(prepared_model)
    print("quantized model: ", quantized_model)
    torch.save(model.state_dict(), "r9.pth")
    torch.save(quantized_model.state_dict(), "r9_quant.pth")


def calib_quant_model(model, calib_dataloader):
    """
    校准函数
    """
    assert isinstance(
        model, ObservedGraphModule
    ), "model must be a perpared fx ObservedGraphModule."
    model.eval()
    with torch.inference_mode():
        for inputs, labels in calib_dataloader:
            model(inputs)
    print("calib done.")


def quant_calib_and_eval(model):
    # test only on CPU
    model.to(torch.device("cpu"))
    model.eval()

    qconfig = get_default_qconfig("fbgemm")
    qconfig_dict = {
        "": qconfig,
        # 'object_type': []
    }

    model2 = copy.deepcopy(model)
    model_prepared = prepare_fx(model2, qconfig_dict)
    model_int8 = convert_fx(model_prepared)
    model_int8.load_state_dict(torch.load("r9_quant.pth"))
    model_int8.eval()

    a = torch.randn([1, 3, 224, 224])
    o1 = model(a)
    o2 = model_int8(a)

    diff = torch.allclose(o1, o2, 1e-4)
    print(diff)
    print(o1.shape, o2.shape)
    print(o1, o2)
    # get_output_from_logits(o1)
    # get_output_from_logits(o2)

    #train_loader, test_loader = prepare_dataloader()
    print("model:")
    y_test, y_pred =test_label_predictions(model, device, testloader)
    fs=f1_score(y_test,y_pred,average='weighted')
    rs=recall_score(y_test, y_pred,average='weighted')
    accuracy=accuracy_score(y_test, y_pred)
    print('F1 score: %f' % fs)
    print('Recall score: %f' % rs)
    print('Accuracy score: %f' % accuracy)
    print()
    
    print("Not calibration model_int8:")
    y_test, y_pred =test_label_predictions(model_int8, device, testloader)
    fs=f1_score(y_test,y_pred,average='weighted')
    rs=recall_score(y_test, y_pred,average='weighted')
    accuracy=accuracy_score(y_test, y_pred)
    print('F1 score: %f' % fs)
    print('Recall score: %f' % rs)
    print('Accuracy score: %f' % accuracy)
    print()
    
    # calib quant model
    model2 = copy.deepcopy(model)
    model_prepared = prepare_fx(model2, qconfig_dict)
    model_int8 = convert_fx(model_prepared)
    torch.save(model_int8.state_dict(), "r9.pth")
    model_int8.eval()

    #model_prepared = prepare_fx(model2, qconfig_dict)
    calib_quant_model(model2, testloader)  # 对模型进行校准
    model_int8 = convert_fx(model2)
    torch.save(model_int8.state_dict(), "r9_quant_calib.pth")
    print("Do calibration model_int8:")
    evaluate_model(model_int8, testloader)


In [None]:
import os
if os.path.exists("r9_row.pth"):
        model.load_state_dict(torch.load("r9_row.pth", map_location="cpu"))
# else:
#         train_model(model, train_loader, test_loader, torch.device("cuda"))
#         print("train finished.")
#         torch.save(model.state_dict(), "r18_row.pth")
    # 模型量化
quant_fx(model)
    # 对比是否 calibration 的影响
quant_calib_and_eval(model)