In [5]:
import os
import torch
import numpy as np
import torchvision as tv
import torch.utils.data as Data
import torch.nn.functional as F

"""
1.torch.nn / torch.nn.functional:
(1).__init__函数初始化使用 torch.nn
(2).forward计算使用 torch.nn.functional

2.torch.nn.Sequentail()
构建模型存储序列

3.基于torch.nn.Sequentail()动态加载torch模块
self.mlp = torch.nn.Sequential()
self.mlp.add_module（model_name(str),module）

4.torch中的优化函数与损失函数需要实例化后可正常使用
# select optimizer function
self.optim_f = self.get_optimization_function(opt)(self.mlp.parameters(), lr=0.001)  # 优化函数实例化
# select loss function
self.loss_f = self.get_loss_function(loss)()                                         # 损失函数实例化

5.torch.util.data.TensorDataset(x, y)
(1).TensorDataset 对 x 与 y 进行数据打包,类似zip功能
(2).x,y tensor第一个维度必须相同

6.torch.utils.data.DataLoader(TensorDataset,batch_size=10)
(1).与TensorDataset连用,将TensorDataset类型的数进行分批次


"""
class MLP(torch.nn.Module):

    def __init__(self,
                 input_dim,
                 output_dim,
                 hidden_dim_list=[],
                 activate_list=["relu"],
                 opt="adam",
                 loss='CrossEntropyLoss'):

        super(MLP, self).__init__()
        self.mlp = torch.nn.Sequential()

        # construct mlp model
        if (len(hidden_dim_list) == 0):
            self.mlp.add_module("linear_layer 1", torch.nn.Linear(input_dim, output_dim))
            if activate_list[0] == "linear":
                pass
            else:
                self.mlp.add_module("linear_layer 1 act", self.get_act(activate_list[0]))

        for i in range(len(hidden_dim_list)):
            if i == 0:
                self.mlp.add_module("linear_layer " + str(i + 1), torch.nn.Linear(input_dim, hidden_dim_list[i]))
                if activate_list[i] == "linear":
                    pass
                else:
                    self.mlp.add_module("linear_layer " + str(i + 1) + "act", self.get_act(activate_list[i]))
            else:
                self.mlp.add_module("linear_layer " + str(i + 1),
                                    torch.nn.Linear(hidden_dim_list[i - 1], hidden_dim_list[i]))
                if activate_list[i] == "linear":
                    pass
                else:
                    self.mlp.add_module("linear_layer " + str(i + 1) + "act", self.get_act(activate_list[i]))

        if (len(hidden_dim_list) != 0):
            self.mlp.add_module("linear_layer " + str(len(hidden_dim_list) + 2), torch.nn.Linear(hidden_dim_list[i], output_dim))

        # select optimizer function
        self.optim_f = self.get_optimization_function(opt)(self.mlp.parameters(), lr=0.001)  # 优化函数实例化
        # select loss function
        self.loss_f = self.get_loss_function(loss)()                                         # 损失函数实例化

    def forward(self, x):

        output = self.mlp(x)

        pred_y = F.softmax(output, dim=1)

        return pred_y

    def get_optimization_function(self, opt):

        optim_f = None

        if opt == "sgd":                       # 随机梯度下降
            optim_f = torch.optim.SGD
        elif opt == "asgd":                    # 平均随机梯度下降
            optim_f = torch.optim.ASGD
        elif opt == "adagrad":
            optim_f = torch.optim.Adagrad
        elif opt == "adadelta":
            optim_f = torch.optim.Adadelta
        elif opt == "rmsprop":
            optim_f = torch.optim.RMSprop
        elif opt == "adam":
            optim_f = torch.optim.Adam
        elif opt == "adamax":
            optim_f = torch.optim.Adamax
        elif opt == "sparseadam":
            optim_f = torch.optim.SparseAdam
        elif opt == "lbfgs":
            optim_f = torch.optim.LBFGS

        return optim_f

    def get_loss_function(self, loss):

        loss_f = None
        # 多分类损失函数
        if loss == "CrossEntropyLoss":          # 交叉熵损失
            loss_f = torch.nn.CrossEntropyLoss
        elif loss == "NLLLoss":                 # 负对数似然损失
            loss_f = torch.nn.NLLLoss

        return loss_f

    def get_act(self, act):

        if act == "elu":
            act = torch.nn.ELU()
        elif act == "leaky_relu":
            act = torch.nn.LeakyReLU()
        elif act == "relu":
            act = torch.nn.ReLU()
        elif act == "relu6":
            act = torch.nn.ReLU6()
        elif act == "sigmoid":
            act = torch.nn.Sigmoid()
        elif act == "softplus":
            act = torch.nn.Softplus()
        elif act == "tanh":
            act = torch.nn.Tanh()
        else:
            raise ("wrong act type:", act)

        return act

def model_train(model,dataset,train_epoch):
    train_set = None

    if "train_set" in dataset.keys():
        train_set = dataset["train_set"]

    for epoch in range(train_epoch):

        train_loss = 0.0

        for i, batch_train_data in enumerate(train_set):

            model.optim_f.zero_grad()  # 每一个batch训练前清空梯度

            x, y = batch_train_data

            x = torch.autograd.Variable(x).cuda()
            y = torch.autograd.Variable(y).cuda()

            pred_y = model(x)
            loss = model.loss_f(pred_y, y)
            loss.backward()
            model.optim_f.step()

            train_loss += loss.item()

        print('the epoch:' + str(epoch+1) + ' loss:', train_loss)

    return model

def model_test(model, dataset):

    if "test_set" in dataset.keys():
        test_set = dataset["test_set"]
    performance = 0.0

    for i, data in enumerate(test_set):

        x, y = data

        x = torch.autograd.Variable(x).cuda()
        y = torch.autograd.Variable(y).cuda()

        pred_y = model(x)
        performance += evaluate(pred_y, y)

    performance = performance/(i+1)

    print("test performace,", performance)


def get_data():

    current_path = os.getcwd()
    train_set = tv.datasets.MNIST(current_path + "/mnist/train",
                                  train=True,
                                  transform=tv.transforms.ToTensor(),
                                  download=True)

    train_x = train_set.data.view(-1, 28 * 28).float()/255
    train_y = train_set.targets.to(torch.int64)
    train_set = Data.TensorDataset(train_x, train_y)

    test_set = tv.datasets.MNIST(current_path + "/mnist/test",
                                 train=False,
                                 transform=tv.transforms.ToTensor(),
                                 download=True)

    test_x = test_set.data.view(-1, 28 * 28).float()/255
    test_y = test_set.targets.to(torch.int64)
    test_set = Data.TensorDataset(test_x, test_y)

    train_dataset = torch.utils.data.DataLoader(train_set, batch_size=100)
    test_dataset = torch.utils.data.DataLoader(test_set, batch_size=100)
    
    input_dim = 784
    output_dim = 10

    return train_dataset, test_dataset, input_dim, output_dim

def evaluate(pred, y):

    pred = pred.cpu().data.numpy()
    label = y.cpu().data.numpy()
    test_np = (np.argmax(pred, 1) == label)
    test_np = np.float32(test_np)
    return np.mean(test_np)

if __name__ == "__main__":

    train_dataset, test_dataset, input_dim, output_dim = get_data()

    data_set = {"train_set": train_dataset,
                "test_set": test_dataset}

    mlp = MLP(input_dim, output_dim, hidden_dim_list=[512, 128], activate_list=["elu", "elu"]).cuda()

    model = model_train(mlp, data_set, 10)

    model_test(model, data_set)

the epoch:1 loss: 950.8309353590012
the epoch:2 loss: 917.8038918972015
the epoch:3 loss: 908.8398736715317
the epoch:4 loss: 903.2410755157471
the epoch:5 loss: 900.4795634746552
the epoch:6 loss: 897.5223001241684
the epoch:7 loss: 895.9417276382446
the epoch:8 loss: 893.301366686821
the epoch:9 loss: 891.5964286327362
the epoch:10 loss: 891.133666396141
test performace, 0.9687000089883804
