In [None]:
import os
import time
import onnx
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import onnxruntime as ort
import torch.nn.functional as F
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset, DataLoader

# Param Config

In [None]:
N_EPOCH = 1
N_BATCH = 128
N_BATCH_NUM = 250
S_DATA_PATH = r"mnist_train.csv"
S_TORCH_MODEL_FULL_PATH = r"cnn_model.pth"
S_TORCH_MODEL_PARAMS_PATH = r"cnn_model_state.pth"
S_TORCH_MODEL_SCRIPT_PATH = r"cnn_model.torch_script.pt"
S_ONNX_MODEL_PATH = r"cnn_model_batch%d.onnx" % N_BATCH
S_DEVICE, N_DEVICE_ID, S_DEVICE_FULL = "cuda", 0, "cuda:0"
# S_DEVICE, N_DEVICE_ID, S_DEVICE_FULL = "cpu", 0, "cpu"

# Read Data

In [None]:
df = pd.read_csv(S_DATA_PATH, header=None)
print(df.shape)
np_mat = np.array(df)
print(np_mat.shape)

X = np_mat[:, 1:]
Y = np_mat[:, 0]
X = X.astype(np.float32) / 255
X_train = X[:N_BATCH * N_BATCH_NUM]
X_test = X[N_BATCH * N_BATCH_NUM:]
Y_train = Y[:N_BATCH * N_BATCH_NUM]
Y_test = Y[N_BATCH * N_BATCH_NUM:]

X_train = X_train.reshape(X_train.shape[0], 1, 28, 28)
X_test = X_test.reshape(X_test.shape[0], 1, 28, 28)

print(X_train.shape)
print(Y_train.shape)
print(X_test.shape)
print(Y_test.shape)


class MnistDataSet(Dataset):
    def __init__(self, X, Y):
        self.l_data, self.l_label = [], []
        for i in range(X.shape[0]):
            self.l_data.append(X[i, :, :, :])
            self.l_label.append(Y[i])

    def __getitem__(self, index):
        return self.l_data[index], self.l_label[index]

    def __len__(self):
        return len(self.l_data)


train_loader = DataLoader(MnistDataSet(X_train, Y_train), batch_size=N_BATCH, shuffle=True)
test_loader = DataLoader(MnistDataSet(X_test, Y_test), batch_size=N_BATCH, shuffle=False)


# Build Model

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.encoder = torch.nn.Sequential(nn.Conv2d(1, 16, 3, 1),
                                           nn.MaxPool2d(2), nn.Flatten(1),
                                           nn.Linear(2704, 128), nn.ReLU(),
                                           nn.Linear(128, 10))

    def forward(self, x):
        out = self.encoder(x)
        return out


net = Net().to(S_DEVICE)
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
loss_fun = nn.CrossEntropyLoss()
print(net)

# Model Train

In [None]:
print("model train")
for i in range(N_EPOCH):
    net.train()
    t_loss = 0.
    np_pred, np_y = None, None
    for j, (t_x_b, t_y_b) in enumerate(train_loader):
        t_y_b = t_y_b.long().to(S_DEVICE)
        t_x_b = t_x_b.float().to(S_DEVICE)

        t_logits_b = net(t_x_b)
        t_loss_b = loss_fun(t_logits_b, t_y_b)

        optimizer.zero_grad()
        t_loss_b.backward()
        optimizer.step()

        t_loss += t_loss_b
        np_pred_b = torch.argmax(t_logits_b, -1).detach().cpu().numpy()
        np_pred = np_pred_b if np_pred is None else np.concatenate(
            (np_pred, np_pred_b), 0)
        np_y = t_y_b.cpu().numpy() if np_y is None else np.concatenate(
            (np_y, t_y_b.cpu().numpy()), 0)

    f_acc = accuracy_score(np_y, np_pred)
    print("train ", t_loss, f_acc)
    print()

# Model Predict

In [None]:
with torch.no_grad():
    for i in range(N_EPOCH):
        net.eval()
        t_loss = 0.
        np_pred, np_y = None, None
        for j, (t_x_b, t_y_b) in enumerate(test_loader):
            t_y_b = t_y_b.long().to(S_DEVICE)
            t_x_b = t_x_b.float().to(S_DEVICE)

            t_logits_b = net(t_x_b)
            t_loss_b = loss_fun(t_logits_b, t_y_b)

            t_loss += t_loss_b

            np_pred_b = torch.argmax(t_logits_b, -1).detach().cpu().numpy()
            np_pred = np_pred_b if np_pred is None else np.concatenate(
                (np_pred, np_pred_b), 0)
            np_y = t_y_b.cpu().numpy() if np_y is None else np.concatenate(
                (np_y, t_y_b.cpu().numpy()), 0)

        f_acc = accuracy_score(np_y, np_pred)
        print("test ", t_loss, f_acc)
        print()

# Model Save

In [None]:
torch.save(net, S_TORCH_MODEL_FULL_PATH)
torch.save(net.state_dict(), S_TORCH_MODEL_PARAMS_PATH)

# Model Load and Loaded Model Use

In [None]:
print("load torch model and pred test data")
net_load = torch.load(S_TORCH_MODEL_FULL_PATH,
                      map_location=lambda storage, loc: storage)
net_load = net_load.to(S_DEVICE)
print("load model ok")
with torch.no_grad():
    for i in range(N_EPOCH):
        net_load.eval()
        t_loss = 0.
        np_pred, np_y = None, None
        for j, (t_x_b, t_y_b) in enumerate(test_loader):
            t_y_b = t_y_b.long().to(S_DEVICE)
            t_x_b = t_x_b.float().to(S_DEVICE)

            t_logits_b = net_load(t_x_b)
            t_loss_b = loss_fun(t_logits_b, t_y_b)

            t_loss += t_loss_b
            np_pred_b = torch.argmax(t_logits_b, -1).detach().cpu().numpy()
            np_pred = np_pred_b if np_pred is None else np.concatenate(
                (np_pred, np_pred_b), 0)
            np_y = t_y_b.cpu().numpy() if np_y is None else np.concatenate(
                (np_y, t_y_b.cpu().numpy()), 0)

        f_acc = accuracy_score(np_y, np_pred)
        print("load torch model ", t_loss, f_acc)
        print()

# Export Torch Script

In [None]:
torch_script_trace = torch.jit.trace(net_load, t_x_b)
print(torch_script_trace)
torch_script_trace.save(S_TORCH_MODEL_SCRIPT_PATH)

# Load Torch Script and Use Script

In [None]:
torch_script_load = torch.jit.load(S_TORCH_MODEL_SCRIPT_PATH)
torch_script_load = torch_script_load.to(S_DEVICE)
print(torch_script_load)
print(torch_script_load.code)
print("load scirpt model ok")
with torch.no_grad():
    for i in range(N_EPOCH):
        torch_script_load.eval()
        t_loss = 0.
        np_pred, np_y = None, None
        for j, (t_x_b, t_y_b) in enumerate(test_loader):
            t_y_b = t_y_b.long().to(S_DEVICE)
            t_x_b = t_x_b.float().to(S_DEVICE)

            t_logits_b = torch_script_load(t_x_b)
            t_loss_b = loss_fun(t_logits_b, t_y_b)

            t_loss += t_loss_b
            np_pred_b = torch.argmax(t_logits_b, -1).detach().cpu().numpy()
            np_pred = np_pred_b if np_pred is None else np.concatenate(
                (np_pred, np_pred_b), 0)
            np_y = t_y_b.cpu().numpy() if np_y is None else np.concatenate(
                (np_y, t_y_b.cpu().numpy()), 0)

        f_acc = accuracy_score(np_y, np_pred)
        print("load scirpt torch model ", t_loss, f_acc)
        print()

# Export Onnx

In [None]:
dummy_in = torch.randn(N_BATCH, 1, 28, 28)
torch.onnx.export(
    net_load.cpu(),
    dummy_in,
    S_ONNX_MODEL_PATH,
    verbose=True,
    input_names=["data"],
    output_names=["output"],
    dynamic_axes={
        'data': {0: 'batch_size'},
        'output': {0: 'batch_size'}
    })

# ONNX Run

In [None]:
model = onnx.load(S_ONNX_MODEL_PATH)
print(onnx.checker.check_model(model))  # Check that the model is well formed
print(onnx.helper.printable_graph(model.graph))  # Print a human readable representation of the graph
ls_input_name, ls_output_name = [input.name for input in model.graph.input], [output.name for output in model.graph.output]
print("input name ", ls_input_name)
print("output name ", ls_output_name)
s_input_name = ls_input_name[0]

x_input = X_train[:N_BATCH*2, :, :, :].astype(np.float32)
ort_val = ort.OrtValue.ortvalue_from_numpy(x_input, S_DEVICE, N_DEVICE_ID)
print("val device ", ort_val.device_name())
print("val shape ", ort_val.shape())
print("val data type ", ort_val.data_type())
print("is_tensor ", ort_val.is_tensor())
print("array_equal ", np.array_equal(ort_val.numpy(), x_input))
providers = 'CUDAExecutionProvider' if S_DEVICE == "cuda" else 'CPUExecutionProvider'
print("providers ", providers)
ort_session = ort.InferenceSession(S_ONNX_MODEL_PATH,
                                   providers=[providers])  # gpu运行
ort_session.set_providers([providers])
outputs = ort_session.run(None, {s_input_name: ort_val})
print("sess env ", ort_session.get_providers())
print(type(outputs))
print(outputs[0])
'''
For example ['CUDAExecutionProvider', 'CPUExecutionProvider']
    means execute a node using CUDAExecutionProvider if capable, otherwise execute using CPUExecutionProvider.
'''