In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.functional as F

In [2]:
data_df = pd.read_csv('./train.csv')
predict_df = pd.read_csv('./test.csv')

# Divide the data into training and validation sets
train_df, val_df = data_df.iloc[:7000].copy(), data_df.iloc[7000:].copy()

# Extract the labels
train_label = train_df.pop('label').values
val_label = val_df.pop('label').values
train_df.shape, val_df.shape, train_label.shape, val_label.shape


((7000, 784), (35000, 784), (7000,), (35000,))

# Data Preprocessing

In [11]:
import albumentations as A
import numpy as np
import gc

def data_transform(df, target_size: int):
    """
    Transform each image in the DataFrame to a specified size.

    Parameters:
    - df: DataFrame containing the image data, each row is a flattened image.
    - target_size: int, the width and height to resize each image to.

    Returns:
    - array_list: list of transformed images as flattened arrays.
    """
    transform = A.Compose([
        A.Resize(width=target_size, height=target_size),
    ])
    array_list = []
    
    for i in range(df.shape[0]):
        # Reshape each row to 28x28, apply transform, and flatten to target_size*target_size
        img = df.iloc[i, :].values.reshape(28, 28)
        transformed = transform(image=img.astype(np.uint8))
        transformed_image = transformed["image"].astype(None)
        data_img = transformed_image.reshape(target_size * target_size)
        array_list.append(data_img)
        
        # Print progress every 1000 iterations
        if i > 0 and i % 1000 == 0:
            print(f"Processed {i} images out of {df.shape[0]}")    
    # Perform garbage collection after processing
    gc.collect()
    return array_list


train_array_list = data_transform(train_df, 224)
val_array_list = data_transform(val_df,224 )

import pickle

# Save the transformed data to disk
with open('train_data.pkl', 'wb') as f:
    pickle.dump(train_array_list, f)

with open('val_data.pkl', 'wb') as f:
    pickle.dump(val_array_list, f)



Processed 1000 images out of 7000
Processed 2000 images out of 7000
Processed 3000 images out of 7000
Processed 4000 images out of 7000
Processed 5000 images out of 7000
Processed 6000 images out of 7000
Processed 1000 images out of 35000
Processed 2000 images out of 35000
Processed 3000 images out of 35000
Processed 4000 images out of 35000
Processed 5000 images out of 35000
Processed 6000 images out of 35000
Processed 7000 images out of 35000
Processed 8000 images out of 35000
Processed 9000 images out of 35000
Processed 10000 images out of 35000
Processed 11000 images out of 35000
Processed 12000 images out of 35000
Processed 13000 images out of 35000
Processed 14000 images out of 35000
Processed 15000 images out of 35000
Processed 16000 images out of 35000
Processed 17000 images out of 35000
Processed 18000 images out of 35000
Processed 19000 images out of 35000
Processed 20000 images out of 35000
Processed 21000 images out of 35000
Processed 22000 images out of 35000
Processed 230

In [3]:
import pickle

# 加载数据
with open('val_data.pkl', 'rb') as f:
    val_array_list = pickle.load(f)

In [None]:
ttf= np.array(val_array_list)
y_val = val_label[:ttf.shape[0]]
x_val_tensor = torch.from_numpy(ttf/255).float()
x_val_tensor = torch.reshape(x_val_tensor,(-1,1,224,224))
y_val = torch.from_numpy(y_val).float()
x_val_tensor.shape,y_val.shape


In [6]:
from torch.utils.data import DataLoader,TensorDataset

val_dataset = TensorDataset(x_val_tensor,y_val)
val_dataload = DataLoader(val_dataset,batch_size=100,shuffle=True)

In [7]:
def test(img_tensorData,model):
    model.eval()
    with torch.no_grad():
        prediction = model(img_tensorData)
    return prediction.cpu().numpy()

In [8]:
import torch
import torch.nn as nn

class VGGNet(nn.Module):
    def __init__(self, num_classes=10, init_weights=True):
        super(VGGNet, self).__init__()

        self.features = nn.Sequential(
            # Block 1: 2Conv + 1MaxPool
            self.vgg_block(num_convs=2, in_channels=1, out_channels=64), # 1*224*224 -> 64*112*112
            # Block 2: 2Conv + 1MaxPool
            self.vgg_block(num_convs=2, in_channels=64, out_channels=128), # 64*112*112 -> 128*56*56
            # Block 3: 3Conv + 1MaxPool
            self.vgg_block(num_convs=3, in_channels=128, out_channels=256), # 128*56*56 -> 256*28*28
            # Block 4: 3Conv + 1MaxPool
            self.vgg_block(num_convs=3, in_channels=256, out_channels=512), # 256*28*28 -> 512*14*14
            # Block 5: 3Conv + 1MaxPool
            self.vgg_block(num_convs=3, in_channels=512, out_channels=512), # 512*14*14 -> 512*7*7
        )

        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096), # 512*7*7 -> 4096
            # nn.ReLU(inplace=True), 
            # nn.Linear(4096, 4096),  # 4096 -> 4096
            nn.ReLU(inplace=True), 
            nn.Linear(4096, num_classes),   # 4096 -> 10
        )

        if init_weights:
            self._init_weights() # initialize weights

    def vgg_block(self, num_convs, in_channels, out_channels):
        layers = []
        for _ in range(num_convs): # (1 conv + 1 relu) * num_convs
            layers.append(nn.Conv2d(in_channels, out_channels,
                                    kernel_size=3, padding=1))
            layers.append(nn.BatchNorm2d(out_channels))  # Batch Normalization
            layers.append(nn.ReLU(inplace=True))
            in_channels = out_channels
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2)) # 1 MaxPool
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.features(x)  # 5 VGG blocks
        x = torch.flatten(x, start_dim=1) # flatten
        x = self.classifier(x)  # 3 FC layers
        return x

    def _init_weights(self):
        for layer in self.modules():
            if isinstance(layer, nn.Conv2d):
                nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')
                if layer.bias is not None:
                    nn.init.constant_(layer.bias, 0)
            elif isinstance(layer, nn.Linear):
                nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')
                nn.init.constant_(layer.bias, 0)


In [9]:
model1 = torch.load('./VGG_net.pkl')


# # save result
# import numpy as np
# import pandas as pd

# def save_result(y_pred):
#     df = pd.DataFrame(y_pred, columns=['label'])
#     df.index.name = 'id'
#     df.to_csv('./result.csv')

# y_pred = test(x_val_tensor, model1)
# save_result(y_pred)


  model1 = torch.load('./VGG_net.pkl')


In [23]:
import torch
import torch.nn as nn

class VGGNet(nn.Module):
    def __init__(self, num_classes=10, init_weights=True):
        super(VGGNet, self).__init__()

        self.features = nn.Sequential(
            # Block 1: 2Conv + 1MaxPool
            self.vgg_block(num_convs=2, in_channels=1, out_channels=64), # 1*224*224 -> 64*112*112
            # Block 2: 2Conv + 1MaxPool
            self.vgg_block(num_convs=2, in_channels=64, out_channels=128), # 64*112*112 -> 128*56*56
            # Block 3: 3Conv + 1MaxPool
            # self.vgg_block(num_convs=3, in_channels=128, out_channels=256), # 128*56*56 -> 256*28*28
            # # Block 4: 3Conv + 1MaxPool
            # self.vgg_block(num_convs=3, in_channels=256, out_channels=512), # 256*28*28 -> 512*14*14
            # # Block 5: 3Conv + 1MaxPool
            # self.vgg_block(num_convs=3, in_channels=512, out_channels=512), # 512*14*14 -> 512*7*7
        )

        self.classifier = nn.Sequential(
            nn.Linear(64*32*32, 256), 
            nn.ReLU(inplace=True), 
            nn.Linear(256, num_classes),  
        )

        if init_weights:
            self._init_weights() # initialize weights

    def vgg_block(self, num_convs, in_channels, out_channels):
        layers = []
        for _ in range(num_convs): # (1 conv + 1 relu) * num_convs
            layers.append(nn.Conv2d(in_channels, out_channels,
                                    kernel_size=3, padding=1))
            layers.append(nn.BatchNorm2d(out_channels))  # Batch Normalization
            layers.append(nn.ReLU(inplace=True))
            in_channels = out_channels
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2)) # 1 MaxPool
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.features(x)  # 5 VGG blocks
        x = torch.flatten(x, start_dim=1) # flatten
        x = self.classifier(x)  # 3 FC layers
        return x

    def _init_weights(self):
        for layer in self.modules():
            if isinstance(layer, nn.Conv2d):
                nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')
                if layer.bias is not None:
                    nn.init.constant_(layer.bias, 0)
            elif isinstance(layer, nn.Linear):
                nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')
                nn.init.constant_(layer.bias, 0)

    

  model1 = torch.load('./net.pkl')


In [10]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def test(img_tensorData, model):
    model.eval()
    with torch.no_grad():
        prediction = model(img_tensorData)
    return prediction.cpu().numpy()

def evaluate_model(test_dataloader, model):
    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for images, labels in test_dataloader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    f1 = f1_score(y_true, y_pred, average='weighted')

    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')

    return accuracy, precision, recall, f1, y_pred, y_true

test_dataset = TensorDataset(x_val_tensor, y_val)
test_dataloader = DataLoader(test_dataset, batch_size=100, shuffle=False)

accuracy, precision, recall, f1, y_pred, y_true=evaluate_model(test_dataloader, model1)

# save result
import numpy as np
import pandas as pd

# save prediction result and true label
def save_result(y_pred, y_true):
    df = pd.DataFrame(y_pred, columns=['label'])
    df.index.name = 'id'
    df.to_csv('./result.csv')

    df = pd.DataFrame(y_true, columns=['label'])
    df.index.name = 'id'
    df.to_csv('./true_label.csv')


# save evaluation result

def save_evaluation_result(accuracy, precision, recall, f1):
    df = pd.DataFrame({
        'Metric': ['Accuracy', 'Precision', 'Recall', 'F1 Score'],
        'Value': [accuracy, precision, recall, f1]
    })
    df.to_csv('./evaluation_result.csv', index=False)

save_result(y_pred, y_true)
save_evaluation_result(accuracy, precision, recall, f1)

# Accuracy: 0.9651
# Precision: 0.9662
# Recall: 0.9651
# F1 Score: 0.9651
# (0.9651428571428572,
#  0.9662066456472299,
#  0.9651428571428572,
#  0.9650539494495448)


Accuracy: 0.9651
Precision: 0.9662
Recall: 0.9651
F1 Score: 0.9651


(0.9651428571428572,
 0.9662066456472299,
 0.9651428571428572,
 0.9650539494495448)