In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.nn.functional as F
import torch.optim.lr_scheduler as lr_scheduler

# 定义卷积神经网络模型
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()

        # 第一个卷积层：输入通道数为1（因为MNIST是灰度图像），输出通道数为32，
        # 卷积核大小为3x3，步长为1，填充为1
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)

        # 第二个卷积层：输入通道数为32，输出通道数为64，
        # 卷积核大小为3x3，步长为1，填充为1
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)

        # 最大池化层：池化核大小为2x2，步长为2，无填充
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        # 全连接层1：输入特征数为64 * 7 * 7（通过两次池化后的特征图大小为7x7）
        # 输出特征数为128
        self.fc1 = nn.Linear(64 * 7 * 7, 128)

        self.fc2 = nn.Linear(128, 10)

        self.relu = nn.ReLU()

    def forward(self, x):
        # 前向传播函数，定义数据在网络中的流动路径

        x = self.relu(self.conv1(x))

        x = self.pool(x)

        x = self.relu(self.conv2(x))

        x = self.pool(x)

        x = x.view(-1, 64 * 7 * 7)
        #逆向
        #tensor.view(batch_size,64,7,7)

        x = self.relu(self.fc1(x))

        x = self.fc2(x)

        return x  # 使用LogSoftmax作为最后一层的激活函数


# 数据预处理和加载
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)

# 检查 GPU 是否可用
if torch.cuda.is_available():
    device = torch.device("cuda")
    #model=Net_digits().cuda()#同model.to(device)
    print("CUDA is available. Using GPU.")
else:
    device = torch.device("cpu")
    print("CUDA is not available. Using CPU.")


# 初始化模型、损失函数和优化器
model = CNNModel().to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=4, factor=0.5,min_lr=0,verbose=True)

# 训练模型
epochs = 3
for epoch in range(epochs):
    model.train()
    for data, target in train_loader:
        data,target=data.to(device),target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

    # 在测试集上评估模型性能
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data,target=data.to(device),target.to(device)
            output = model(data)
            loss_test=criterion(output,target.squeeze().long())
            _, predicted = torch.max(output.data, 1)
            predicted=predicted
            total += target.size(0)
            correct += (predicted == target).sum().item()

    accuracy = correct / total
    scheduler.step(accuracy)
    print(f'Epoch {epoch+1}/{epochs}, Loss: {loss_test}, Test Accuracy: {accuracy * 100:.2f}%')
    if accuracy>=0.997: #早停机制
        print(f'Early stopping. Test Accuracy reached {accuracy * 100:.2f}%')
        break

# 输出最终模型在测试集上的准确率
print(f'Final Test Accuracy: {accuracy * 100:.2f}%')

CUDA is available. Using GPU.
Epoch 1/3, Loss: 0.0022243682760745287, Test Accuracy: 98.54%
Epoch 2/3, Loss: 0.000849008618388325, Test Accuracy: 98.53%
Epoch 3/3, Loss: 3.4717733797151595e-05, Test Accuracy: 97.95%
Final Test Accuracy: 97.95%


# 适配任务

In [35]:
import pandas as pd
import numpy as np
#展示所有行和列
pd.set_option('display.max_rows',None)
pd.set_option('display.max_columns',None)
predict=pd.read_csv(r'/kaggle/input/digit-recognizer/test.csv',encoding='utf8')
train=pd.read_csv(r'/kaggle/input/digit-recognizer/train.csv',encoding='utf8')

In [36]:
from sklearn.model_selection import train_test_split
train,test=train_test_split(train,test_size=0.2,random_state=42)
print(train.shape,test.shape)

(33600, 785) (8400, 785)


In [37]:
x_train=train.drop(['label'],axis=1)
y_train=train['label']
x_test=test.drop(['label'],axis=1)
y_test=test['label']

In [38]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader

xtrain_tensor=torch.tensor(x_train.to_numpy(),dtype=torch.float32)
ytrain_tensor=torch.tensor(y_train.to_numpy(),dtype=torch.float32)
ytrain_tensor=ytrain_tensor.unsqueeze(1)

xtest_tensor=torch.tensor(x_test.to_numpy(),dtype=torch.float32)
ytest_tensor=torch.tensor(y_test.to_numpy(),dtype=torch.float32)
ytest_tensor=ytest_tensor.unsqueeze(1)

class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels
        
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, index):
        x = self.features[index]
        y = self.labels[index]
        return x, y
    
class Predict_set(Dataset):
    def __init__(self, features):
        self.features = torch.tensor(np.array(features),dtype=torch.float32)
        
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, index):
        x = self.features[index]
        return x
    
predict_set = Predict_set(predict)
predict_loader = DataLoader(predict_set,batch_size=64)
    
predict_set = Predict_set(predict)
predict_loader = DataLoader(predict_set,batch_size=64)

train_set=CustomDataset(xtrain_tensor,ytrain_tensor)
test_set=CustomDataset(xtest_tensor,ytest_tensor)

train_loader=DataLoader(train_set,batch_size=64,shuffle=True)
test_loader=DataLoader(test_set,batch_size=64,shuffle=False)

In [55]:
import torch
import torch.nn as nn
import torch.optim.lr_scheduler as lr_scheduler


class DigitNet(nn.Module):
    
    def __init__(self):
        super(DigitNet,self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        
        self.pool = nn.MaxPool2d(kernel_size=2,stride=2,padding=0)
        
        self.fc1=nn.Linear(64*7*7,128)
        
        self.fc2=nn.Linear(128,10)
        
        self.relu=nn.ReLU()
        
        self.norm = nn.BatchNorm1d(64 * 7 * 7) # 对数据进行归一化
        
    def forward(self,x):
        x = x.view(-1,1,28,28)
#         x = x.reshape(-1,1,28,28)
        
        x = self.relu(self.conv1(x))
        
        x = self.pool(x)
        
        x = self.relu(self.conv2(x))
        
        x = self.pool(x)
        
        x = x.view(-1, 64 * 7 * 7)
        
        x = self.norm(x)
        
        #逆向
        #tensor.view(batch_size,64,7,7)
        
        x = self.relu(self.fc1(x))
        
        x = self.fc2(x)
        
        return x
    
if torch.cuda.is_available():
    device=torch.device("cuda")
    print('CUDA is available,using GPU')
else:
    device=torch.device("cpu")
    print('CUDA is notfind,using CPU')
    
model=DigitNet().to(device)
# criterion=nn.BCELoss().to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer=torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=4, factor=0.5,min_lr=0,verbose=True)


epochs=30

for epoch in range(epochs):
    model.train
    for data,target in train_loader:
        data,target=data.to(device),target.to(device)
        optimizer.zero_grad()
        output=model(data)
#         target=target.squeeze(1)
        loss=criterion(output,target.squeeze(1).long())
        loss.backward()
        optimizer.step()
        
    with torch.no_grad():
        correct = 0
        total = 0
        # 设置模型为评估模式
        model.eval()
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss_test=criterion(output,target.squeeze(1).long())
            _, predicted = torch.max(output, dim=1)
            total += target.size(0)
            correct += (predicted == target.squeeze()).sum().item()
        accuracy = correct / total
        scheduler.step(accuracy) # 学习率调度器

    print(f'Epoch {epoch+1}/{epochs}, Loss: {loss_test}, Test Accuracy: {accuracy * 100:.2f}%')
    if accuracy>=0.997: #早停机制
        print(f'Early stopping. Test Accuracy reached {accuracy * 100:.2f}%')
        break

# 输出最终模型在测试集上的准确率
print(f'Final Test Accuracy: {accuracy * 100:.2f}%')

CUDA is available,using GPU
Epoch 1/30, Loss: 0.017569325864315033, Test Accuracy: 97.25%
Epoch 2/30, Loss: 0.011772358790040016, Test Accuracy: 98.39%
Epoch 3/30, Loss: 0.001014163251966238, Test Accuracy: 98.37%
Epoch 4/30, Loss: 0.01566229574382305, Test Accuracy: 98.49%
Epoch 5/30, Loss: 0.024510174989700317, Test Accuracy: 98.42%
Epoch 6/30, Loss: 0.022524051368236542, Test Accuracy: 98.62%
Epoch 7/30, Loss: 0.005927287507802248, Test Accuracy: 98.25%
Epoch 8/30, Loss: 5.051389962318353e-06, Test Accuracy: 98.76%
Epoch 9/30, Loss: 0.03502592071890831, Test Accuracy: 98.60%
Epoch 10/30, Loss: 0.0003023294557351619, Test Accuracy: 98.54%
Epoch 11/30, Loss: 9.996775770559907e-05, Test Accuracy: 98.71%
Epoch 12/30, Loss: 0.0006100759492255747, Test Accuracy: 98.45%
Epoch 13/30, Loss: 5.990127192490036e-06, Test Accuracy: 98.82%
Epoch 14/30, Loss: 0.00017315434524789453, Test Accuracy: 98.52%
Epoch 15/30, Loss: 7.4204590418958105e-06, Test Accuracy: 98.87%
Epoch 16/30, Loss: 2.77902995

In [56]:
result_df = pd.DataFrame()
model.eval()
for i in predict_loader:
    input_p = i.to(device)
    output_p = model(input_p)
    _, output_p = torch.max(output_p, dim=1)
    output_p_df = pd.DataFrame(np.array(output_p.detach().cpu()))
    result_df = pd.concat([result_df,output_p_df],axis=0)
result_df.head()

Unnamed: 0,0
0,2
1,0
2,9
3,9
4,3


In [60]:
result_df.shape

(28000, 10)

# 直接从for_ward中返回网络输出结果和索引

In [61]:
import torch
import torch.nn as nn
import torch.optim.lr_scheduler as lr_scheduler
import torch.nn.functional as F


class DigitNet(nn.Module):
    
    def __init__(self):
        super(DigitNet,self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        
        self.pool = nn.MaxPool2d(kernel_size=2,stride=2,padding=0)
        
        self.fc1=nn.Linear(64*7*7,128)
        
        self.fc2=nn.Linear(128,10)
        
        self.relu=nn.ReLU()
        
        self.norm = nn.BatchNorm1d(64 * 7 * 7) # 对数据进行归一化
        
    def forward(self,x):
        x = x.view(-1,1,28,28)
#         x = x.reshape(-1,1,28,28)
        
        x = self.relu(self.conv1(x))
        
        x = self.pool(x)
        
        x = self.relu(self.conv2(x))
        
        x = self.pool(x)
        
        x = x.view(-1, 64 * 7 * 7)
        
        x = self.norm(x)
        
        #逆向
        #tensor.view(batch_size,64,7,7)
        
        x = self.relu(self.fc1(x))
        
        x = self.fc2(x)
        
#         output = F.log_softmax(x, dim=1)
        
        _, predicted = torch.max(x, dim=1)
        
        return x, predicted
    
if torch.cuda.is_available():
    device=torch.device("cuda")
    print('CUDA is available,using GPU')
else:
    device=torch.device("cpu")
    print('CUDA is notfind,using CPU')
    
model=DigitNet().to(device)
# criterion=nn.BCELoss().to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer=torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=4, factor=0.5,min_lr=0,verbose=True)


epochs=10

for epoch in range(epochs):
    model.train
    for data,target in train_loader:
        data,target = data.to(device),target.to(device)
        optimizer.zero_grad()
        output,_ = model(data)
        loss=criterion(output,target.squeeze(1).long())
        loss.backward()
        optimizer.step()
        
    with torch.no_grad():
        correct = 0
        total = 0
        # 设置模型为评估模式
        model.eval()
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output,predicted = model(data)
            loss_test=criterion(output,target.squeeze(1).long())
            total += target.size(0)
            correct += (predicted == target.squeeze()).sum().item()
        accuracy = correct / total
        scheduler.step(accuracy) # 学习率调度器

    print(f'Epoch {epoch+1}/{epochs}, Loss: {loss_test}, Test Accuracy: {accuracy * 100:.2f}%')
    if accuracy>=0.997: #早停机制
        print(f'Early stopping. Test Accuracy reached {accuracy * 100:.2f}%')
        break

# 输出最终模型在测试集上的准确率
print(f'Final Test Accuracy: {accuracy * 100:.2f}%')

CUDA is available,using GPU
Epoch 1/10, Loss: 0.006916174665093422, Test Accuracy: 97.42%
Epoch 2/10, Loss: 0.06900906562805176, Test Accuracy: 97.85%
Epoch 3/10, Loss: 0.20799463987350464, Test Accuracy: 98.26%
Epoch 4/10, Loss: 0.0036278641782701015, Test Accuracy: 98.75%
Epoch 5/10, Loss: 0.04211093857884407, Test Accuracy: 98.54%
Epoch 6/10, Loss: 0.0015080865705385804, Test Accuracy: 98.76%
Epoch 7/10, Loss: 0.0036886176094412804, Test Accuracy: 98.39%
Epoch 8/10, Loss: 0.00016015541041269898, Test Accuracy: 98.73%
Epoch 9/10, Loss: 0.04492902755737305, Test Accuracy: 98.54%
Epoch 10/10, Loss: 0.002489992883056402, Test Accuracy: 98.76%
Final Test Accuracy: 98.76%


In [62]:
result_df = pd.DataFrame()
model.eval()
for i in predict_loader:
    input_p = i.to(device)
    _,output_p = model(input_p)
    output_p_df = pd.DataFrame(np.array(output_p.detach().cpu()))
    result_df = pd.concat([result_df,output_p_df],axis=0)
result_df.head()

Unnamed: 0,0
0,2
1,0
2,9
3,0
4,3


In [66]:
result_df.to_csv("Digit_Recognizer.csv",sep=',',encoding='utf8')