In [2]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader


import matplotlib.pyplot as plt



# set the super parameters
batch_size_train = 32
batch_size_test = 1000


def prepare_data():
    """
    Prepare the MNIST dataset for training and testing.

    Returns:
    train_loader (DataLoader): DataLoader object for training data.
    test_loader (DataLoader): DataLoader object for testing data.
    """
    # 1.data preparation
    transform = transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,)),
        ]
    )
    # 2.load train and test data
    trainset = datasets.MNIST(
        "./data/MNIST_data/", download=True, train=True, transform=transform
    )
    testset = datasets.MNIST(
        "./data/MNIST_data/", download=True, train=False, transform=transform
    )

    # 3.create data loader
    train_loader = DataLoader(trainset, batch_size=batch_size_train, shuffle=True)
    test_loader = DataLoader(testset, batch_size=batch_size_test, shuffle=True)

    return train_loader, test_loader

def save_img(imgs, labels, num=0):
    # save image[0]  using plt
    plt.imshow(imgs[num].squeeze(0), cmap="gray")
    plt.savefig(f"./imgs/{labels[num]}.png")
    return labels[num]


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter


# 1.full connected network
class Full_Net(nn.Module):
    def __init__(self):
        """
        Initialize the neural network model.
        """
        super(Full_Net, self).__init__()
        self.fc1 = nn.Linear(784, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 784)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


# 2.convolutional neural network for number recognition
class Conv_Net(nn.Module):
    def __init__(self):
        super(Conv_Net, self).__init__()

        self.conv_1=nn.Sequential(
            # 1. input 1*28*28 output 16*28*28
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            # 2. batch normalization
            nn.BatchNorm2d(16),
            # 3. activation function
            nn.ReLU(),
            # 4. max pooling and output 16*14*14
            nn.MaxPool2d(kernel_size=2, stride=2)

        )
        self.conv_2=nn.Sequential(
            # 卷积 输入：bs*16*14*14  输出：bs*32*14*14
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            # 归一化
            nn.BatchNorm2d(32),
            # 激活函数
            nn.ReLU(),
            # 最大池化：输入:bs*32*14*14  输出：bs*32*7*7
            nn.MaxPool2d(2)
        )
        # 第三层卷积，输入：bs*32*7*7 输出：bs*64*3*3
        self.conv_3 = nn.Sequential(
            # 卷积 输入：bs*32*7*7  输出：bs*64*3*3
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            # 归一化
            nn.BatchNorm2d(64),
            # 激活函数
            nn.ReLU(),
            # 最大池化：输入：bs*64*7*7 输出：bs*64*3*3
            nn.MaxPool2d(2)
        )
        # 自适应池化，将bs*64*3*3映射为bs*64*1*1
        self.advpool = nn.AdaptiveAvgPool2d((1, 1))
         # 全连接层
        self.fc = nn.Linear(64, 10)

    def forward(self, x):
        # 1. conv layer
        x = self.conv_1(x)
        x = self.conv_2(x)
        x = self.conv_3(x)
        # 2. adaptive pooling
        x = self.advpool(x)
        # 3. flatten
        x = x.view(x.size(0), -1)
        # 4. fc layer
        x = self.fc(x)
        return x


In [4]:
import torch
from torch import nn
import torch.optim as optim
# from prepare_data import prepare_data
from torch.utils.tensorboard import SummaryWriter

logger = SummaryWriter("./pytorch_tb/train_test")
# super parameters
n_epochs = 10
learning_rate = 0.01


def net_train(net: nn.Module, trainloader: torch.utils.data.DataLoader):
    """
    Train the neural network model.
    """
    # set the optimizer and loss function

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(net.parameters(), lr=learning_rate)

    print("Start training...")
    for epoch in range(n_epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        # ensure the network is in training mode
        net.train()
        for i, data in enumerate(trainloader, 0):

            inputs, labels = data
            # detect the device
            device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
            inputs ,labels= inputs.to(device), labels.to(device)

            # 1.input data
            outputs = net(inputs)  # forward
            # 2.calculate loss
            loss = criterion(outputs, labels)  # calculate the loss
            # 3.gradient to zero
            optimizer.zero_grad()  # zero the parameter gradients
            # 4.backpropagation
            loss.backward()  # backpropagation
            # 5.update parameters
            optimizer.step()  # update parameters

            # Get the predicted labels
            _, predicted = torch.max(outputs, 1)

            # Calculate the number of correct predictions
            correct = (predicted == labels).sum().item()

            # Calculate the accuracy
            accuracy = correct / labels.size(0)

            # print statistics
            running_loss += loss.item()
            if i % 100 == 99:  # print every 2000 mini-batches
                print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1, running_loss / 100))
                logger.add_scalar('training loss', running_loss / 100, epoch * len(trainloader) + i)
                logger.add_scalar('training accuracy', accuracy, epoch * len(trainloader) + i)
                running_loss = 0.0
    print("Finished Training")


def net_test(net: nn.Module, testloader: torch.utils.data.DataLoader):
    print("Start testing...")
    correct = 0
    total = 0
    # set model to evaluation mode
    net.eval()
    with torch.no_grad():
        for data in testloader:
            images, labels = data
             # detect the device
            device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
            images ,labels= images.to(device), labels.to(device)

            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(
            "Accuracy of the network on the test images: %d %%"
            % (100 * correct / total)
        )
    print("Finished Testing")


In [5]:
import torch
from PIL import Image
from torchvision import transforms

def predict_digit(image_path, model):
    # 1.定义转换
    transform = transforms.Compose([
        transforms.Resize((28, 28)), # 调整图片大小
        transforms.ToTensor(), # 将PIL图片转换为Tensor
        transforms.Normalize((0.5,), (0.5,)) # 标准化
    ])

    # 2.打开图片并应用转换
    image = Image.open(image_path).convert('L') # 转换为灰度图
    image = transform(image)

    # 添加一个批次维度
    image = image.unsqueeze(0)
    # print image shape
    print(image.shape)

    # 将模型设置为评估模式
    model.eval()

    # predict result and probability
    with torch.no_grad():
        output = model(image)
        _, predicted = torch.max(output.data, 1)
        print("Predicted digit:", predicted.item())
        print("Predicted probability:", torch.nn.functional.softmax(output.data, dim=1)[0][predicted.item()])


In [6]:

# multi-cpu training

# torch.set_num_threads(4)  # 设置为你想要的线程数


def print_data(data_loader):
    # Assuming `data_loader` is an instance of torch.utils.data.DataLoader
    flag = 0
    for i, data in enumerate(data_loader):
        if flag == 1:
            break
        inputs, labels = data
        # detect the device
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        inputs ,labels= inputs.to(device), labels.to(device)
        print(f"Batch {i+1}:")
        print("Inputs:", inputs)
        print("Labels:", labels)
        # print shape of inputs and labels
        print("Inputs shape:", inputs.shape)
        print("Labels shape:", labels.shape)
        flag += 1



In [7]:

import time
if __name__ == "__main__":
    # set random seed
    random_seed = 1
    torch.manual_seed(random_seed)

    # 1.get data
    trainloader, testloader = prepare_data()

    # # print data
    # print_data(trainloader)

    # 2.create network
    net = Conv_Net()
    # detect gpu
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(device)
    net.to(device)
    start=time.time()
    # 3.1 train network
    net_train(net, trainloader)
    end=time.time()
    print("consume time:",end-start)
    # # 3.2 load weights
    # net.load_state_dict(torch.load("./weights/CovNet__weights.pth"))

    # # 4.save weights
    # torch.save(net.state_dict(), "./weights/CovNet__weights.pth")


cuda
Start training...
[1,   100] loss: 0.832
[1,   200] loss: 0.253
[1,   300] loss: 0.167
[1,   400] loss: 0.147
[1,   500] loss: 0.118
[1,   600] loss: 0.088
[1,   700] loss: 0.106
[1,   800] loss: 0.106
[1,   900] loss: 0.082
[1,  1000] loss: 0.098
[1,  1100] loss: 0.079
[1,  1200] loss: 0.091
[1,  1300] loss: 0.078
[1,  1400] loss: 0.068
[1,  1500] loss: 0.072
[1,  1600] loss: 0.063
[1,  1700] loss: 0.065
[1,  1800] loss: 0.061
[2,   100] loss: 0.068
[2,   200] loss: 0.060
[2,   300] loss: 0.045
[2,   400] loss: 0.056
[2,   500] loss: 0.048
[2,   600] loss: 0.051
[2,   700] loss: 0.058
[2,   800] loss: 0.063
[2,   900] loss: 0.051
[2,  1000] loss: 0.061
[2,  1100] loss: 0.050
[2,  1200] loss: 0.045
[2,  1300] loss: 0.056
[2,  1400] loss: 0.056
[2,  1500] loss: 0.064
[2,  1600] loss: 0.055
[2,  1700] loss: 0.054
[2,  1800] loss: 0.051
[3,   100] loss: 0.040
[3,   200] loss: 0.041
[3,   300] loss: 0.038
[3,   400] loss: 0.037
[3,   500] loss: 0.035
[3,   600] loss: 0.045
[3,   700] 

In [8]:

    # 5.test network
    net_test(net, testloader)





Start testing...
Accuracy of the network on the test images: 99 %
Finished Testing
