## 实时量化推理demo
-------

In [12]:
import torch 
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
import time
import numpy as np
import random

In [13]:
device =  'cpu'
config = {
    'seed': 42,   
    'valid_ratio': 0.3,
    'n_epochs': 10,        
    'batch_size': 1, 
    'learning_rate': 5e-3,              
    'early_stop': 3,    
    'save_path': './models/model.ckpt',  # model will be saved here.
    'data_path': './DataSet/npz/',
    'data_file': ["cat_eval","cat_train",
                  "dog_eval", "dog_train",
                  "other_eval","other_train"],
    'backend':'fbgemm',
}

ENV = 'REAL'

### 加载模型

In [14]:
class TinyM2NetDataset(Dataset):
    '''
    x: audio mfcc vector   44x13x1.
    y: image vector        32x32x3
    y: Targets:(cat,dog,duck,rabbit), if none, do prediction.
    '''

    def __init__(self, x, y, z=None):
        if y is None:
            self.z = z
        else:
            self.z = torch.FloatTensor(z)
        self.x = torch.FloatTensor(x)
        self.y = torch.FloatTensor(y)

    def __getitem__(self, idx):
        if self.z is None:
            return self.x[idx], self.y[idx]
        else:
            return self.x[idx], self.y[idx], self.z[idx]

    def __len__(self):
        return len(self.x)




class SeparableConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, bias=False):
        super(SeparableConv2d, self).__init__()
        self.depthwise = nn.Conv2d(in_channels,in_channels,kernel_size,groups=in_channels,padding=1)
        self.pointwise = nn.Conv2d(in_channels,out_channels,kernel_size=kernel_size,padding=1)
        self.outlayer = nn.ReLU()
    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        out = self.outlayer(out)
        return out
    
class myConv2d(nn.Module):
    def __init__(self,input_channels,output_channels,kernel_size,dense_dim,bn_dim):
        super(myConv2d, self).__init__()
        self.conv2d = nn.Sequential(
            nn.Conv2d(input_channels,output_channels,kernel_size,padding=1),
            nn.BatchNorm2d(bn_dim),
            nn.ReLU()
        )
        self.spconv2d1 = nn.Sequential(
            SeparableConv2d(output_channels,32,kernel_size),
            nn.MaxPool2d((2,2)),
            nn.Dropout(0.2)
        )
        self.spconv2d2 = nn.Sequential(
            SeparableConv2d(32,output_channels,kernel_size),
            nn.MaxPool2d((2,2)),
            nn.Dropout(0.2)
        )
        self.outlayer = nn.Sequential(
            nn.Linear(dense_dim,output_channels),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
    def forward(self,x):
        out = self.conv2d(x)
        out = self.spconv2d1(out)

        out = self.spconv2d2(out)
#         print(out.shape)
        out = torch.flatten(out,start_dim=1)
#         print(out.shape)
        out = self.outlayer(out)
#         print(out.shape)
        return out
    
class Tiny2Net(nn.Module):
    def __init__(self, labels,device):
        super(Tiny2Net, self).__init__()
#         self.args = args
        self.videoNet = myConv2d(3,64,(3,3),4096,64)  #(3,64,(3,3),4096,32)
        self.audioNet = myConv2d(1,64,(3,3),2112,64) #(1,64,(3,3),2112,44)
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()
        
        self.layer1 = nn.Sequential(
            nn.Linear(128,64),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
        self.layer2 = nn.Sequential(
            nn.Linear(64,labels),
        )
        self.softmax = nn.Softmax(dim=-1)
        self.device=device
    def forward(self,x,y):
        """
        input x   MFCC Vector     size:  44x13x1
        input y   Image Vector   size: 32x32x3
        """
#         x_noise, y_noise = torch.rand_like(x).to(device), torch.rand_like(y).to(device)
#         x,y=x+x_noise,y+y_noise
        x,y = self.quant(x),self.quant(y)
        
        x = self.audioNet(x)
        y = self.videoNet(y)
        z = torch.cat((x,y),1)
        
        z = self.layer1(z)
    
        z = self.layer2(z)
        
        z = self.dequant(z)
        z = self.softmax(z)
        return z

In [15]:
def same_seed(seed): 
    '''Fixes random number generator seeds for reproducibility.'''
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
same_seed(config['seed'])

In [20]:
model_fp32 = Tiny2Net(3,device)
state_dict = torch.load(r'./models/seed_42_1205_073217_entropy_standardnoise_.pth')
model_fp32.load_state_dict(state_dict)


########### try static
# model_fp32.eval()
# model_fp32.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
# # model_fp32_fused = torch.quantization.fuse_modules(model_fp32,
# #     [])
# model_fp32_prepared = torch.quantization.prepare_qat(model_fp32.train())
# model_fp32_prepared.eval()
# model_int8 = torch.quantization.convert(model_fp32_prepared)


########## no quantization
# model_int8 = model_fp32


######### dynamic quantization (only support nn.Linear in our model)
model_int8 = torch.quantization.quantize_dynamic(
    model_fp32,  # the original model
    {torch.nn.Linear,torch.nn.Conv2d,nn.MaxPool2d},  # a set of layers to dynamically quantize
    dtype=torch.qint8)  # the target dtype for quantized weights




In [21]:
print(model_int8)

Tiny2Net(
  (videoNet): myConv2d(
    (conv2d): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (spconv2d1): Sequential(
      (0): SeparableConv2d(
        (depthwise): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=64)
        (pointwise): Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (outlayer): ReLU()
      )
      (1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
      (2): Dropout(p=0.2, inplace=False)
    )
    (spconv2d2): Sequential(
      (0): SeparableConv2d(
        (depthwise): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32)
        (pointwise): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (outlayer): ReLU()
      )
      (1): MaxPool2d(kernel_size=(2, 2), stride=(2, 

### 加载数据

In [None]:
ENV = 'DEBUG' #非实时输入状态下进行测试
# from CameraAudioRead.AudioRead import read_audio
# from CameraAudioRead.CameraRead import read_camera

In [None]:
def loadData(path,dataList):
    def getName(base,file):
        return base+file+'.npz'
    dataset={}
    for item in dataList:
        dataset[item]=np.load(getName(path,item))
    return dataset

_x = None
_y = None
_z = None

if ENV == 'DEBUG':
    dataset = loadData(config['data_path'],config['data_file'])
    
    for k,v in dataset.items():
        if 'other' in  k:
            continue
        if _x is None:
            _x = v["x"]
            _y = v["y"]
            _z = v["z"]
        else:
            _x = np.concatenate((_x, v["x"]), axis=0)
            _y = np.concatenate((_y, v["y"]), axis=0)
            _z = np.concatenate((_z, v["z"]), axis=0)

In [None]:
def load_data_item():
    if ENV =='DEBUG':
        idx = random.randint(0, _x.shape[0])
        return torch.from_numpy(_x[idx]).unsqueeze(0),torch.from_numpy(_y[idx]).unsqueeze(0),torch.from_numpy(_z[idx]).unsqueeze(0)
#     else:
#         camera_frames = read_camera(resize_h=32, resize_w=32, frames_number=1)
#         audio_frames = read_audio(record_second=1,frames_number=1)
#         y = torch.from_numpy(camera_frames[0]).unsqueeze(0)
#         x = torch.from_numpy(audio_frames[0]).unsqueeze(0)
#         reutrn x,y,None

In [None]:
print(_x.shape)

##  运行

In [None]:
run_time = 1
config['batch_size']=64
criterion = nn.MSELoss(reduction='mean')
criterion = nn.CrossEntropyLoss()
animal_dict = {0:"cat",1:"dog",2:"other animal"}
test_dataset = TinyM2NetDataset(_x, _y, _z)
test_loader = DataLoader(test_dataset, batch_size=config['batch_size'], shuffle=False, pin_memory=True)
model_int8.eval()
acc_record = []
for x, y, z in test_loader:
    x, y, z = x.to(device), y.to(device), z.to(device)
    with torch.no_grad():
        pred = model_int8(x, y)
#     print(pred,z)
    val_correct = torch.argmax(pred, dim=1) == torch.argmax(z, dim=1)
    val_accuracy = torch.mean(val_correct.float())
    #print(val_accuracy)
    acc_record.append(val_accuracy)
mean_acc = sum(acc_record) / len(acc_record)
print(f'Test accuracy: {mean_acc:.4f}')
    
# while(run_time>=0):
#     x,y,z=load_data_item()
#     x=x.to(device)
#     y=y.to(device)
# #     print(x.shape)
# #     print(y.shape)
#     with torch.no_grad():
#         pred_quantized = model_int8(x.float(), y.float())
        
#         target_list_quantized = pred_quantized.cpu().numpy().tolist()[0]
#         target_quantized = target_list_quantized.index(max(target_list_quantized))
#         print("Quantized:the prediction is {} ,corresponding animal is {}".format(target_quantized,animal_dict[target_quantized]))
# #     print(run_time)
#     run_time=run_time-1