In [1]:
#import and construct hepler function
from util_main import load_dance_dance_data_set
from util_main import test_model_nn
from util_main import load_model as load_model_keras
from util_main import one_hot_encode_labels
import torch 
import numpy as np
import torch.nn as nn
import torch.nn.functional as F

data_set_path = "Data-sets/Dance_Data"
sampling_rate = 5
window_length = 2.56  # Seconds

#load data set
x_data, y_data = load_dance_dance_data_set(data_set_path, sampling_rate=sampling_rate, window_length=window_length)

y_data = one_hot_encode_labels(y_data)

x_tensor = torch.from_numpy(x_data)
x_tensor = x_tensor.double()
y_tensor = torch.from_numpy(y_data)
y_tensor = y_tensor.double()
input_size = x_data.shape[1]
output_size = y_data.shape[1]

def hidden_init(layer):
    fan_in = layer.weight.data.size()[0]
    lim = 1. / np.sqrt(fan_in)
    return (-lim, lim) 

#3 linear layer with ReLu activation for hidden and softmax activation for output
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.hidden1 = nn.Linear(input_size, 256)
        self.hidden2 = nn.Linear(256, 256)
        self.hidden3 = nn.Linear(256, 128)
        self.output = nn.Linear(128, output_size)
    
    def reset_parameters(self):
        self.hidden1.weight.data.uniform_(*hidden_init(self.fc1))
        self.hidden2.weight.data.uniform_(*hidden_init(self.fc2))
        self.hidden3.weight.data.uniform_(*hidden_init(self.fc3))
        self.output.weight.data.uniform_(-3e-3, 3e-3)
        self.reset_parameters()
    
    
    def forward(self, x):
        x = self.hidden1(x)
        x = self.hidden2(F.relu(x))
        x = self.hidden3(F.relu(x))
        x = self.output(F.relu(x))
        return F.softmax(x, dim=0)

class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)

def prepare_data_loaders():

    inputs, targets = load_dance_dance_data_set(data_set_path, sampling_rate=sampling_rate, window_length=window_length)

    train_sampler = torch.utils.data.RandomSampler(dataset)

    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=8,
        sampler=train_sampler)

    return data_loader
    
def evaluate(model, criterion, data, target, neval_batches):
    model.eval()
    top1 = AverageMeter('Acc@1', ':6.2f')
    top5 = AverageMeter('Acc@5', ':6.2f')
    cnt = 0
    with torch.no_grad():
        for i in range(data.shape[0]):
            output = model(data[i])
            #loss = criterion(output, target[i])
            cnt += 1
            #acc1, acc5 = accuracy(output, target[i], topk=(1, 5))
            #print('.', end = '')
            #top1.update(acc1[0], data.size(0))
            #top5.update(acc5[0], data.size(0))
            if cnt >= neval_batches:
                 return top1, top5

    return top1, top5


def accuracy(output, target, topk=(1,)):
        """Computes the accuracy over the k top predictions for the specified values of k"""
        with torch.no_grad():
            maxk = max(topk)
            batch_size = target.size(0)

            _, pred = output.topk(maxk, 1, True, True)
            pred = pred.t()
            correct = pred.eq(target.view(1, -1).expand_as(pred))

            res = []
            for k in topk:
                correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
                res.append(correct_k.mul_(100.0 / batch_size))
            return res


def load_model(model_file):
    model = Model()
    state_dict = torch.load(model_file)
    model.load_state_dict(state_dict)
    model.to('cpu')
    return model

def print_size_of_model(model):
    torch.save(model.state_dict(), "temp.p")
    print('Size (MB):', os.path.getsize("temp.p")/1e6)
    os.remove('temp.p')

Using TensorFlow backend.


Processing: dumbbells.txt
(151, 75)
Processing: face_wipe.txt
(163, 75)
Processing: left.txt
(199, 75)
Processing: muscle.txt
(159, 75)
Processing: nothing.txt
(225, 75)
Processing: pac_man.txt
(79, 75)
Processing: right.txt
(225, 75)
Processing: shooting_star.txt
(73, 75)
Processing: shout_out.txt
(23, 75)
Processing: tornado.txt
(9, 75)
Processing: weight_lifting.txt
(65, 75)


In [3]:
model = Model()
model_file = 'model_pytorch.pt'
model_file_keras = 'trained_models/neural_network_model'

model_keras = load_model_keras(model_file_keras)
weights=model_keras.get_weights()

model.hidden1.weight.data=torch.from_numpy(np.transpose(weights[0])).double()
model.hidden1.bias.data=torch.from_numpy(weights[1]).double()
model.hidden2.weight.data=torch.from_numpy(np.transpose(weights[2])).double()
model.hidden2.bias.data=torch.from_numpy(weights[3]).double()
model.hidden3.weight.data=torch.from_numpy(np.transpose(weights[4])).double()
model.hidden3.bias.data=torch.from_numpy(weights[5]).double()
model.output.weight.data=torch.from_numpy(np.transpose(weights[6])).double()
model.output.bias.data=torch.from_numpy(weights[7]).double()

torch.save(model.state_dict(), "neural_network_model_state_dict_pytorch.pt")
torch.save(model, "neural_network_model_pytorch.pth")

Loaded model from disk


In [63]:
#y_data.astype(np.float)
criterion = nn.CrossEntropyLoss()

num_calibration_batches = 2000
# Specify quantization configuration
model.eval()
model.qconfig = torch.quantization.default_qconfig
print(model.qconfig)
torch.quantization.prepare(model, inplace=True)

# Calibration
print('Post Training Quantization Prepare: Inserting Observers')
evaluate(model, criterion, x_tensor, y_tensor, neval_batches=num_calibration_batches)
print('Post Training Quantization: Calibration done')

# Convert to quantized model
torch.quantization.convert(model, inplace=True)
print('Post Training Quantization: Convert done')

print("Size of model after quantization")
print_size_of_model(model)

#test if quantized success
y_quantized = model(x_tensor[0])
print(y_quantized)

QConfig(activation=functools.partial(<class 'torch.quantization.observer.MinMaxObserver'>, reduce_range=True), weight=functools.partial(<class 'torch.quantization.observer.MinMaxObserver'>, dtype=torch.qint8, qscheme=torch.per_tensor_symmetric))
Post Training Quantization Prepare: Inserting Observers
Post Training Quantization: Calibration done


RuntimeError: Didn't find engine for operation quantized::linear_prepack NoQEngine (operator () at ..\aten\src\ATen\native\quantized\cpu\qlinear_prepack.cpp:202)
(no backtrace available)