<a href="https://www.kaggle.com/code/eupphh/eupph-dr-cnn?scriptVersionId=292522812" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset

# 加载数据
train = pd.read_csv('/kaggle/input/digit-recognizer/train.csv')
test = pd.read_csv('/kaggle/input/digit-recognizer/test.csv')

# 分离特征和标签
y_train = train['label'].values
X_train = train.drop('label', axis=1).values
X_test = test.values

# 归一化
X_train = X_train / 255.0
X_test = X_test / 255.0

In [None]:
#训练集大小42000  测试集大小28000
print(X_train.shape)
print(X_test.shape)

In [None]:
#划分训练集和验证集
from sklearn.model_selection import train_test_split
#随机划分，80%训练，20%验证
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(
    X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
)

In [None]:
# 转换为tensor格式
X_train_tensor = torch.FloatTensor(X_train_split).reshape(-1, 1, 28, 28)
y_train_tensor = torch.tensor(y_train_split)
X_val_tensor = torch.FloatTensor(X_val_split).reshape(-1, 1, 28, 28)
y_val_tensor = torch.tensor(y_val_split)
X_test_tensor = torch.FloatTensor(X_test).reshape(-1, 1, 28, 28)

In [None]:
print(X_train_tensor.shape)

* 输入: (batch, 1, 28, 28)
* ↓ Conv1: 1→32通道, 3×3卷积, padding=1 → (batch, 32, 28, 28)
* ↓ ReLU激活
* ↓ MaxPool 2×2 → (batch, 32, 14, 14)
* ↓ Conv2: 32→64通道, 3×3卷积, padding=1 → (batch, 64, 14, 14)
* ↓ ReLU激活
* ↓ MaxPool 2×2 → (batch, 64, 7, 7)
* ↓ 展平 → (batch, 64×7×7=3136)
* ↓ 全连接: 3136→256 → ReLU
* ↓ 全连接: 256→128 → ReLU
* ↓ 全连接: 128→10
* 输出: (batch, 10)

In [None]:
#定义模型
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1=nn.Sequential(
            nn.Conv2d(1,32,3,1,1),      #输入(1,28,28)  输出(32,28,28)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)#池化减小一半 输出(32,14,14)
        )
        self.conv2=nn.Sequential(
            nn.Conv2d(32,64,3,1,1),      #输入(32,14,14)  输出(64,14,14)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)#池化减小一半 输出(64,7,7)
        )
        self.out = nn.Sequential(
            nn.Linear(64*49,256),   #256个  展平不要忘记通道数
            nn.ReLU(),
            nn.Linear(256,128),
            nn.ReLU(),
            nn.Linear(128,10)
        )
    def forward(self,x):
         #x (batch,1,28,28)
         x = self.conv1(x)
         x = self.conv2(x)
         x = x.view(x.size(0),-1)
         x = self.out(x)
         return x

In [None]:
#数据
batch_size = 64
#分别创建 DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=128, shuffle=False)  # 验证不需要打乱

In [None]:
import torch.optim as optim
net = CNN()
#损失函数
criterion = nn.CrossEntropyLoss()
#优化器
optimizer = optim.Adam(net.parameters(), lr=0.001)

In [None]:
import numpy as np
def fit(epochs,model,cr,opt,train,val):
    for e in range(epochs):
        model.train()
        #计算每个batch损失
        for x1,y1 in train:
            loss_batch(model,cr,x1,y1,opt)
        
        model.eval()
        with torch.no_grad():
            total_loss = []
            total_len=[]
            for x1,y1 in val:
                loss,len = loss_batch(model,cr,x1,y1)
                total_loss.append(loss)
                total_len.append(len)
            val_loss = np.sum(np.multiply(total_loss, total_len)) / np.sum(total_len)
            print('当前epoch:'+str(e), '验证集损失：'+str(val_loss))
                
        


In [None]:
def loss_batch(model,cr,x,y,opt=None):
    #计算损失
    loss = cr(model(x),y)
    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()

    return loss.item(),len(x)
    

In [None]:
fit(10,net,criterion,optimizer,train_loader,val_loader)

In [None]:
def acc(model,val):
    total=0
    right=0
    for x,y in val:
        out = model(x)
        p,pred = torch.max(out,1)
        total+=y.size(0)
        right+=(pred==y).sum().item()
    print(f"{right}/{total}")    
    print(f"acc:{right/total*100}%")
        

In [None]:
acc(net,val_loader)

In [None]:
# 切换到评估模式
net.eval()

# 预测整个测试集
with torch.no_grad():
    test_outputs = net(X_test_tensor)
    test_preds = torch.argmax(test_outputs, 1)
    
print(f'测试集预测完成，共 {len(test_preds)} 个预测结果')
# 创建提交文件
submission = pd.DataFrame({
    'ImageId': range(1, len(test_preds) + 1),  # Kaggle要求从1开始
    'Label': test_preds.numpy()
})

# 保存为CSV
submission.to_csv('submission.csv', index=False)

# 查看前几行确认格式
print('提交文件已保存: submission.csv')
print('前10行预览:')
print(submission.head(10))