# Import Packages

In [72]:
import numpy as np
import pandas as pd
# import keras
import sklearn
import pickle
import time
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn import svm
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import GridSearchCV

# 导入神经网络所需包
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
import torchvision.transforms as T
import torchvision.models as models


# Miscellaneous

In [73]:
class Timer(object):
    """Create a Timer, Record the time gap"""
    def __init__(self):
        self.times=[]
        self.start()
        
    def start(self):
        # record the timestamp of start
        self.tik = time.time()
        
    def stop(self):
        # record the time gap from start to stop
        self.times.append(time.time() - self.tik)
        
    def avg(self):
        # 求平均值
        return sum(self.times) / len(self.times)
    
    def sum(self):
        # 求和
        return sum(self.times)
    
    def cumsum(self):
        # 求累积和
        return np.array(self.times).cumsum().tolist()
    
def Binarization(x):
    """Binarization of Grayscale Images"""
    if x == 0:
        return 0
    else:
        return 1
    
class Accumulator:
    """For accumulating sums over `n` variables."""
    def __init__(self, n):
        """Defined in :numref:`sec_softmax_scratch`"""
        self.data = [0.0] * n

    def add(self, *args):
        self.data = [a + float(b) for a, b in zip(self.data, args)]

    def reset(self):
        self.data = [0.0] * len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]
    

# Load dataset

In [74]:
# 数据集的相对地址
dataset_train_dir = './EMNIST_Byclass_Small/emnist_train.pkl'
dataset_test_dir = './EMNIST_Byclass_Small/emnist_test.pkl'

# 将数据集文件解压缩，读取为字典（按照dataset_description.txt文件所示）
dataset_train_dict = pickle.load(file=open(dataset_train_dir, 'rb'))
dataset_test_dict = pickle.load(file=open(dataset_test_dir, 'rb'))


# SVM

In [75]:
# 将训练集由形状(100000, 1, 28, 28)转为形状(100000, 28**2)
x_train = dataset_train_dict['data'].reshape(100000, 28**2).astype('float32')
y_train = dataset_train_dict['labels']

# 将测试集由形状(20000, 1, 28, 28)转为形状(20000, 28**2)
x_test = dataset_test_dict['data'].reshape(20000, 28**2).astype('float32')
y_test = dataset_test_dict['labels']

# 数据归一化
x_train = x_train / 255
x_test = x_test / 255

# 创建模型svm
# C：正则化参数
# kernel：SVM使用的内核函数，‘rbf’为径向基函数内核
# gamma：内核函数的核系数
# model = svm.SVC(C=100.0, kernel='rbf', gamma=0.03)
SVMmodel = svm.SVC(C=100.0, kernel='rbf', gamma='scale')

# 利用训练集拟合
timer = Timer()
SVMmodel.fit(x_train, y_train)
timer.stop()

In [None]:

# 利用测试集预测
preds = [int(a) for a in SVMmodel.predict(x_test)]
timer.stop()
# 输出时间
print(f'Training Time:{timer.times[0]:.3f} s, Total Time:{timer.times[1]:.3f} s')
# 评估预测精度
print('accuracy:', accuracy_score(y_test, preds))
# 混淆矩阵
print("Confusion Matrix:")
cm = confusion_matrix(y_test, preds)
print(cm)
plt.figure(figsize=(30, 30))
ax = sns.heatmap(cm, annot=True, fmt="d", cmap="Reds")
plt.title('SVM: Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('Actual Label')
# f1-score,precision,recall
print('Classfication Report:')
print(classification_report(y_test, np.array(preds)))

# 决策树分类

In [None]:
# 将训练集由形状(100000, 28, 28)转为形状(100000, 28**2)
x_train = dataset_train_dict['data'].reshape(100000, 28**2).astype('float32')
y_train = dataset_train_dict['labels']
y_train = y_train.reshape(-1, 1)

# 将训练集由形状(20000, 28, 28)转为形状(20000, 28**2)
x_test = dataset_test_dict['data'].reshape(20000, 28**2).astype('float32')
y_test = dataset_test_dict['labels']
y_test = y_test.reshape(-1, 1)

# 建立Dataframe
train_data = np.hstack((x_train, y_train))
train_data = pd.DataFrame(train_data)
x_train = train_data.iloc[:, :-1]
print('x_train Shape:', x_train.shape)
y_train = train_data.iloc[:,-1]
print("y_train Shape: ", y_train.shape)

test_data = np.hstack((x_test, y_test))
test_data = pd.DataFrame(test_data)
x_test = test_data.iloc[:, :-1]
print('x_test Shape:', x_test.shape)
y_test = test_data.iloc[:,-1]
print("y_test Shape: ", y_test.shape)

In [None]:
# Create Model of DecisionTreeClassifier
DTmodel = DecisionTreeClassifier()
# Model fitting
timer = Timer()
DTmodel.fit(x_train, y_train)
timer.stop()
# Predict
prediction = DTmodel.predict(x_test)
timer.stop()
# 输出时间
print(f'Training Time:{timer.times[0]:.3f} s, Total Time:{timer.times[1]:.3f} s')
# Accuracy
acc = accuracy_score(y_test, prediction)
print(f"Accuracy:{acc * 100:.3f}%")
# classification_report
print(classification_report(y_test, np.array(preds)))
# Confusion_Matrix
plt.figure(figsize=(30, 30))
cm = confusion_matrix(y_test, prediction) # 混淆矩阵
ax = sns.heatmap(cm, annot=True, fmt="d", cmap="Reds")
plt.title('DecisionTreeClassifier: Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('Actual Label')

In [None]:
def show_image(index, dataset):
    """
    Show Images
    Input: index, dataset
    """   
    some_digit = dataset.iloc[index].values  # 按照索引取出图片
    some_digit_img = some_digit.reshape(28,28)  
    plt.imshow(some_digit_img,'binary')  # Show
    
show_image(999, x_train)

In [None]:
# 参数范围设置
params = {'splitter':('best','random')
              ,'criterion':("gini","entropy")
              ,"max_depth":[*range(1,15)]
              ,'min_samples_leaf':[*range(1,20,1)]
}
# 创建DecisionTreeClassifier模型
DTmodel = DecisionTreeClassifier()
# 进行网格搜索
timer = Timer()
clf = GridSearchCV(DTmodel, params, cv=3, scoring='accuracy',verbose=3, n_jobs=-1)
clf = clf.fit(x_train, y_train)
timer.stop()

In [None]:
# 输出寻参时间和最优参数
print(f"Params Search Time:{timer.times[0]:.3f} s")
print("Best Params:", clf.best_params_)

# resulet:
# {'criterion': 'entropy',
#  'max_depth': 9,
#  'min_samples_leaf': 2,
#  'splitter': 'best'}

In [None]:
# 按寻得的最优参数创建模型
model = DecisionTreeClassifier(
    criterion=clf.best_params_["criterion"], 
    max_depth=clf.best_params_["max_depth"],
    min_samples_leaf=clf.best_params_["min_samples_leaf"],
    splitter=clf.best_params_["splitter"])
timer = Timer()
model.fit(x_train,y_train)
timer.stop() 
prediction = model.predict(x_test)   
timer.stop()
# Time
print(f'Training Time:{timer.times[0]:.3f} s, Total Time:{timer.times[1]:.3f} s')
# Accuracy
print(f"Accuracy(Before Params Search):{acc * 100:0.3f}%")
acc = accuracy_score(y_test,prediction)
print(f"Accuracy(After Params Search):{acc * 100:0.3f}%")


# 卷积神经网络

In [None]:
# 创建EMNIST数据集
class EMNIST(data.Dataset):
    def __init__(self,  X, y, transform=None):
        self.transform = transform
        self.X = X
        self.y = y
        
    def __len__ (self):
        return len(self.X)
    
    def __getitem__(self, index):
        self.XX = self.X[index]
        self.yy = self.y[index]
        if self.transform:
            self.XX = self.transform(self.XX)
        return self.XX, self.yy
# 设置批量大小
batch_size = 256

x_train = dataset_train_dict['data'].astype('float32')
y_train = dataset_train_dict['labels'].reshape(-1).astype('int64')
y_train = torch.tensor(y_train).to(torch.int64)
# 创建EMNIST训练集（dataset）
emnist_train = EMNIST(x_train, y_train, transform=T.Compose([T.ToTensor()]))
# 创建训练集数据加载器（dataloader）
train_iter = data.DataLoader(emnist_train, batch_size=batch_size, shuffle=True)

x_test = dataset_test_dict['data'].astype('float32')
y_test = dataset_test_dict['labels'].reshape(-1).astype('int64')
y_test = torch.tensor(y_test).to(torch.int64)
# 创建EMNIST测试集（dataset）
emnist_test = EMNIST(x_test, y_test, transform=T.Compose([T.ToTensor()]))
# 创建测试集数据加载器（dataloader）
test_iter = data.DataLoader(emnist_test, batch_size=batch_size, shuffle=True)


In [None]:
astype = lambda x, *args, **kwargs: x.type(*args, **kwargs) 
reduce_sum = lambda x, *args, **kwargs: x.sum(*args, **kwargs) # 求和
size = lambda x, *args, **kwargs: x.numel(*args, **kwargs) # 计算Tensor的元素总数
argmax = lambda x, *args, **kwargs: x.argmax(*args, **kwargs) # 返回最大值的索引index

def init_weights(m):
    """参数权重初始化"""
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.01)
        
def accuracy(y_hat, y):
    """求预测值的准确率"""
    y_hat = argmax(y_hat, axis=1)
    cmp = astype(y_hat, y.dtype) == y
    return float(reduce_sum(astype(cmp, y.dtype)))

def evaluate_accuracy(net, data_iter):
    """多批量准确率评估"""
    if isinstance(net, nn.Module):
        net.eval()
    metric = Accumulator(2)
    with torch.no_grad():
        for X, y in data_iter:
            metric.add(accuracy(net(X), y), size(y))
    return metric[0] / metric[1]

def train(net, train_iter, loss, updater):
    # Set the model to training mode
    if isinstance(net, torch.nn.Module):
        net.train()
    # Sum of training loss, sum of training accuracy, no. of examples
    metric = Accumulator(3)
    for X, y in train_iter:
        # Compute gradients and update parameters
        y_hat = net(X)
        l = loss(y_hat, y)
        if isinstance(updater, torch.optim.Optimizer):
            # Using PyTorch in-built optimizer & loss criterion
            updater.zero_grad()
            l.mean().backward()
            updater.step()
        else:
            # Using custom built optimizer & loss criterion
            l.sum().backward()
            updater(X.shape[0])
        metric.add(float(l.sum()), accuracy(y_hat, y), y.numel())
    # Return training loss and training accuracy
    return metric[0] / metric[2], metric[1] / metric[2]

def trainer(net, train_iter, test_iter, loss, num_epochs, optim):
    """训练器"""
    for epoch in range(num_epochs):
        train_metrics = train(net, train_iter, loss, optim)
        test_acc = evaluate_accuracy(net, test_iter)   
        train_loss, train_acc =  train_metrics   
        print(f"epoch:{epoch+1}, train loss:{train_loss:.4f}, test_acc:{test_acc}")


In [None]:
# 多重感知机
net = nn.Sequential(
    # 第一层
    nn.Flatten(),
    
    nn.Linear(784, 256),
    nn.ReLU(),
    nn.Dropout(0.2),
    
    # 第二层
    nn.Linear(256, 256),
    nn.ReLU(),
    nn.Dropout(0.5),
    
    nn.Linear(256, 62)
)

# 应用权重初始化
net.apply(init_weights)
# 训练的epoch
num_epochs = 10
# dataloader的批量大小
batch_size = 256
# 学习率
learning_rate = 0.0005
# 优化器
optim = torch.optim.SGD(net.parameters(), lr=learning_rate)
# 损失函数
loss = nn.CrossEntropyLoss(reduction='none')
# 训练
timer = Timer()
trainer(net, train_iter, test_iter, loss, num_epochs, optim)
timer.stop()
print(f"训练耗时：{timer.times[0]}")


In [None]:
# 在全体测试集上评估最终模型的效果
for X,y in data.DataLoader(emnist_test, batch_size=200000, shuffle=True):
    y_preds = net(X)
    y_preds = np.argmax(y_preds.detach().numpy(), axis=1)
    # classification_report
    print(classification_report(y_test, preds))
    # confusion_matrix
    cm = confusion_matrix(y, y_preds)
    plt.figure(figsize=(30, 30))
    ax = sns.heatmap(cm, annot=True, fmt="d", cmap="Reds")
    plt.title('DecisionTreeClassifier: Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('Actual Label')

In [None]:
# 残差块
class Residual(nn.Module):
    def __init__(self, input_channels, num_channels, use_1x1conv=False, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1, stride=stride)
        self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size=3, padding=1)

        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size=1, stride=stride)
        else:
            self.conv3 = None
        
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))

        if self.conv3:
            X = self.conv3(X)
        Y += X
        return F.relu(Y)
    
def resnet_block(in_channels, out_channels, num_residual, first_block=False):
    blk = []
    for i in range(num_residual):
        if i == 0 and not first_block:
            blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
        else:
            blk.append(Residual(out_channels, out_channels))
    return blk

b1 = nn.Sequential(
    nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
    nn.BatchNorm2d(64), 
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))
# Resnet18
net = nn.Sequential(
    b1, b2, b3, b4, b5, 
    nn.AdaptiveAvgPool2d((1, 1)),
    nn.Flatten(),
    nn.Linear(512, 62)
)

# 权重初始化
net.apply(init_weights)
# 超参数
num_epochs = 10
batch_size = 256
learning_rate = 0.0005
# 优化器
optim = torch.optim.SGD(net.parameters(), lr=learning_rate)
# 损失函数
loss = nn.CrossEntropyLoss(reduction='none')
# 训练器
timer = Timer()
trainer(net, train_iter, test_iter, loss, num_epochs, optim)   
timer.stop()
print(f"训练耗时：{timer.times[0]}")

# 在全体训练集上评估最终模型效果
for X,y in data.DataLoader(emnist_test, batch_size=200000, shuffle=True):
    y_preds = net(X)
    y_preds = np.argmax(y_preds.detach().numpy(), axis=1)
    # classification_report
    print(classification_report(y_test, preds))
    cm = confusion_matrix(y, y_preds)
    plt.figure(figsize=(30, 30))
    ax = sns.heatmap(cm, annot=True, fmt="d", cmap="Reds")
    plt.title('DecisionTreeClassifier: Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('Actual Label')

In [None]:
# 预训练的Resnet18
my_transform = T.Compose([
    T.Resize((224, 224)),
    T.Grayscale(3),
    T.ToTensor(),
    T.Normalize((0.1307,), (0.3081,)),
])

# 依据预训练的Resnet18模型调整输入的shape
emnist_train = EMNIST(x_train, y_train, transform=my_transform)
train_iter = data.DataLoader(emnist_train, batch_size=batch_size, shuffle=True)
emnist_test = EMNIST(x_test, y_test, transform=my_transform)
test_iter = data.DataLoader(emnist_test, batch_size=batch_size, shuffle=True)

In [None]:
# 加载预训练的Resnet18
model = models.resnet18(pretrained=True)
# 将预训练的Resnet18的全连接层的输出改为EMNIST的分类数62
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 62)
# 超参数
num_epochs = 10
batch_size = 256
learning_rate = 0.0001
# 优化器
optim = torch.optim.SGD(net.parameters(), lr=learning_rate)
# 损失函数
loss = nn.CrossEntropyLoss(reduction='none')
# 训练
timer = Timer()
trainer(model, train_iter, test_iter, loss, num_epochs, optim)   
timer.stop()
print(f"训练耗时：{timer.times[0]}")

# 在全体测试集上评估最终模型的效果
for X, y in data.DataLoader(emnist_test, batch_size=200000, shuffle=True):
    y_preds = model(X)
    y_preds = np.argmax(y_preds.detach().numpy(), axis=1)
    # classification_report
    print(classification_report(y_test, preds))
    cm = confusion_matrix(y, y_preds)
    plt.figure(figsize=(30, 30))
    ax = sns.heatmap(cm, annot=True, fmt="d", cmap="Reds")
    plt.title('DecisionTreeClassifier: Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('Actual Label')

KeyboardInterrupt: 