### *MNIST数据集的读取*
*传出的数据为图像的标签 train_label和图像的像素值 train_image*

In [2]:
import numpy as np
import struct
import matplotlib.pyplot as plt
import cv2
import random
import time

# 训练集文件
train_images_idx3_ubyte_file = 'dataset/mnist_dataset/train-images.idx3-ubyte'
# 训练集标签文件
train_labels_idx1_ubyte_file = 'dataset/mnist_dataset/train-labels.idx1-ubyte'

# 测试集文件
test_images_idx3_ubyte_file = 'dataset/mnist_dataset/t10k-images.idx3-ubyte'
# 测试集标签文件
test_labels_idx1_ubyte_file = 'dataset/mnist_dataset/t10k-labels.idx1-ubyte'


def decode_idx3_ubyte(idx3_ubyte_file):
    """
    解析idx3文件的通用函数
    :param idx3_ubyte_file: idx3文件路径
    :return: 数据集
    """
    # 读取二进制数据
    bin_data = open(idx3_ubyte_file, 'rb').read()

    # 解析文件头信息，依次为魔数、图片数量、每张图片高、每张图片宽
    offset = 0
    fmt_header = '>iiii' #因为数据结构中前4行的数据类型都是32位整型，所以采用i格式，但我们需要读取前4行数据，所以需要4个i。我们后面会看到标签集中，只使用2个ii。
    magic_number, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, offset)
    print('魔数:%d, 图片数量: %d张, 图片大小: %d*%d' % (magic_number, num_images, num_rows, num_cols))

    # 解析数据集
    image_size = num_rows * num_cols
    offset += struct.calcsize(fmt_header)  #获得数据在缓存中的指针位置，从前面介绍的数据结构可以看出，读取了前4行之后，指针位置（即偏移位置offset）指向0016。
    print(offset)
    fmt_image = '>' + str(image_size) + 'B'  #图像数据像素值的类型为unsigned char型，对应的format格式为B。这里还有加上图像大小784，是为了读取784个B格式数据，如果没有则只会读取一个值（即一副图像中的一个像素值）
    print(fmt_image,offset,struct.calcsize(fmt_image))
    images = np.empty((num_images, num_rows, num_cols))
    #plt.figure()
    for i in range(num_images):
        if (i + 1) % 10000 == 0:
            print('已解析 %d' % (i + 1) + '张')
            print(offset)
        images[i] = np.array(struct.unpack_from(fmt_image, bin_data, offset)).reshape((num_rows, num_cols))
        #print(images[i])
        offset += struct.calcsize(fmt_image)
#        plt.imshow(images[i],'gray')
#        plt.pause(0.00001)
#        plt.show()
    #plt.show()

    return images


def decode_idx1_ubyte(idx1_ubyte_file):
    """
    解析idx1文件的通用函数
    :param idx1_ubyte_file: idx1文件路径
    :return: 数据集
    """
    # 读取二进制数据
    bin_data = open(idx1_ubyte_file, 'rb').read()

    # 解析文件头信息，依次为魔数和标签数
    offset = 0
    fmt_header = '>ii'
    magic_number, num_images = struct.unpack_from(fmt_header, bin_data, offset)
    print('魔数:%d, 图片数量: %d张' % (magic_number, num_images))

    # 解析数据集
    offset += struct.calcsize(fmt_header)
    fmt_image = '>B'
    labels = np.empty(num_images)
    for i in range(num_images):
        if (i + 1) % 10000 == 0:
            print ('已解析 %d' % (i + 1) + '张')
        labels[i] = struct.unpack_from(fmt_image, bin_data, offset)[0]
        offset += struct.calcsize(fmt_image)
    return labels


def load_train_images(idx_ubyte_file=train_images_idx3_ubyte_file):
    """
    TRAINING SET IMAGE FILE (train-images-idx3-ubyte):
    [offset] [type]          [value]          [description]
    0000     32 bit integer  0x00000803(2051) magic number
    0004     32 bit integer  60000            number of images
    0008     32 bit integer  28               number of rows
    0012     32 bit integer  28               number of columns
    0016     unsigned byte   ??               pixel
    0017     unsigned byte   ??               pixel
    ........
    xxxx     unsigned byte   ??               pixel
    Pixels are organized row-wise. Pixel values are 0 to 255. 0 means background (white), 255 means foreground (black).

    :param idx_ubyte_file: idx文件路径
    :return: n*row*col维np.array对象，n为图片数量
    """
    return decode_idx3_ubyte(idx_ubyte_file)


def load_train_labels(idx_ubyte_file=train_labels_idx1_ubyte_file):
    """
    TRAINING SET LABEL FILE (train-labels-idx1-ubyte):
    [offset] [type]          [value]          [description]
    0000     32 bit integer  0x00000801(2049) magic number (MSB first)
    0004     32 bit integer  60000            number of items
    0008     unsigned byte   ??               label
    0009     unsigned byte   ??               label
    ........
    xxxx     unsigned byte   ??               label
    The labels values are 0 to 9.

    :param idx_ubyte_file: idx文件路径
    :return: n*1维np.array对象，n为图片数量
    """
    return decode_idx1_ubyte(idx_ubyte_file)


def load_test_images(idx_ubyte_file=test_images_idx3_ubyte_file):
    """
    TEST SET IMAGE FILE (t10k-images-idx3-ubyte):
    [offset] [type]          [value]          [description]
    0000     32 bit integer  0x00000803(2051) magic number
    0004     32 bit integer  10000            number of images
    0008     32 bit integer  28               number of rows
    0012     32 bit integer  28               number of columns
    0016     unsigned byte   ??               pixel
    0017     unsigned byte   ??               pixel
    ........
    xxxx     unsigned byte   ??               pixel
    Pixels are organized row-wise. Pixel values are 0 to 255. 0 means background (white), 255 means foreground (black).

    :param idx_ubyte_file: idx文件路径
    :return: n*row*col维np.array对象，n为图片数量
    """
    return decode_idx3_ubyte(idx_ubyte_file)


def load_test_labels(idx_ubyte_file=test_labels_idx1_ubyte_file):
    """
    TEST SET LABEL FILE (t10k-labels-idx1-ubyte):
    [offset] [type]          [value]          [description]
    0000     32 bit integer  0x00000801(2049) magic number (MSB first)
    0004     32 bit integer  10000            number of items
    0008     unsigned byte   ??               label
    0009     unsigned byte   ??               label
    ........
    xxxx     unsigned byte   ??               label
    The labels values are 0 to 9.

    :param idx_ubyte_file: idx文件路径
    :return: n*1维np.array对象，n为图片数量
    """
    return decode_idx1_ubyte(idx_ubyte_file)



if __name__ == '__main__':
    # train_labels存储标签信息（一位小数）
    # train_images则存储图片的像素信息（每张图像以28*28数组存储，元素数量为图像总数量）
    train_image = load_train_images()

    train_label = load_train_labels()
    # test_images = load_test_images()
    # test_labels = load_test_labels()

    # 查看前十个数据及其标签以读取是否正确
    for i in range(10):
        print(train_label[i])
        plt.imshow(train_image[i], cmap='gray')
        plt.pause(0.000001)
        plt.show()
    print('done')


魔数:2051, 图片数量: 60000张, 图片大小: 28*28
16
>784B 16 784
已解析 10000张
7839232
已解析 20000张
15679232
已解析 30000张
23519232
已解析 40000张
31359232
已解析 50000张
39199232
已解析 60000张
47039232
魔数:2049, 图片数量: 60000张
已解析 10000张
已解析 20000张
已解析 30000张
已解析 40000张
已解析 50000张
已解析 60000张
5.0


<Figure size 640x480 with 1 Axes>

0.0


<Figure size 640x480 with 1 Axes>

4.0


<Figure size 640x480 with 1 Axes>

1.0


<Figure size 640x480 with 1 Axes>

9.0


<Figure size 640x480 with 1 Axes>

2.0


<Figure size 640x480 with 1 Axes>

1.0


<Figure size 640x480 with 1 Axes>

3.0


<Figure size 640x480 with 1 Axes>

1.0


<Figure size 640x480 with 1 Axes>

4.0


<Figure size 640x480 with 1 Axes>

done


### *验证数据的读取结果*

In [3]:
print(len(train_image))

# 打印最后一张图像的像素值信息
# print(list(train_images[len(train_images) - 1]))

#打印第一张图像的标签
print(train_label[len(train_label) - 1])

print(np.shape(train_image))

a = []
a.append(0)
a

60000
8.0
(60000, 28, 28)


[0]

### *清洗数据集便于实现二分类*
* 其中5为1，8为-1

In [4]:
import numpy as np
#搜索可进行优化

train_images = []
labels = []
for i in range(len(train_label)):
    if (train_label[i] == 5):
        #train_images = np.delete(train_images,i,axis=0)
        labels.append(1)
        train_images.append(train_image[i])
    if (train_label[i] == 8):
        labels.append(-1)
        train_images.append(train_image[i])

print(len(labels))
print(len(train_images))    

11272
11272


### *图像特征空间的选择*
* 整个图像作为特征向量：图像像素数值为特征空间 规模：28 * 28
* HOG特征
* 提取灰度（利用黑白特征）直方图的相关信息作为特征空间

In [5]:
import warnings
warnings.filterwarnings("ignore")
# 整个图像像素作为特征向量，即直接使用train_images数组即可

def get_full_features(train_images):
    features = []
    
    for img in train_images:
        img = np.reshape(img,(28,28))
        img = img.astype(np.uint8)

        features.append(img)

    features = np.array(features)
    features = np.reshape(features,(-1,28 * 28))

    return features
    

In [6]:
#利用库函数提取hog特征
def get_hog_features(train_images):
    features = []
 
#hog函数的参数封装于hog.xml配置文件
    hog = cv2.HOGDescriptor('./hog.xml')
 
    for img in train_images:
        img = np.reshape(img,(28,28))
        cv_img = img.astype(np.uint8)
 
        hog_feature = hog.compute(cv_img)
        # hog_feature = np.transpose(hog_feature)
        features.append(hog_feature)
 
    features = np.array(features)
    features = np.reshape(features,(-1,324))
 
    return features

### *基于感知器准则的线性分类器*

In [7]:
def classify_sense(features, train_labels, size):

    feature_size = size * size 
    study_step = 1e-3

    train_size = len(train_labels)

    # 初始化参数矩阵，此处方便计算选择将截距b放于矩阵外
    w = np.ones((feature_size,1))
    b = 10
    
    correct_count = 1
    count_max = 1e6
    count = 1
    result = 0

#感知器算法设计两层迭代，实现w更新后对判别result的计算更新
    while True:
        index = 0
        for index in range(train_size):

            x = features[index]
            label = train_labels[index]

        #判别函数，若判断错误则置为负（二分类判断是否准确）
            result = (np.dot(x.T,w) + b) * label

            count += 1
        #若当前判别错误，即函数值 < 0，则判断错误，需要更新w和b
            if (result < 0):
                #判定
                x = x.reshape((feature_size,1))
                #result = abs(np.dot(x.T,w) + b)
                #x = np.reshape(features[index], (feature_size, 1))
                #更新参数
                w += x * study_step * label  
                b += study_step * label
                #由于b对应的增广矩阵元素为1
                correct_count = 0
                break
            
            correct_count += 1

            if count > count_max:
                return w,b

        if correct_count >= train_size:
            break

    #print('最小误差为%lf' %result)
    return w,b 

### *基于LMS准则的线性分类器*

In [8]:
# 首先处理训练样本
def data_matrix(train_features,train_labels):
    matrix = []
    for i in range(len(train_labels)):
        temp = list(train_features[i])
        temp.append(1)
        temp = np.array(temp)
        if(train_labels[i] == 1):
            matrix.append(np.array(temp))
        if(train_labels[i] == -1):
            temp = [-item for item in temp]
            matrix.append(np.array(temp))
    matrix = np.array(matrix)
    return matrix

def classify_LMS(train_features,train_labels,size):
    feature_size  = size * size
    study_step = 1e-4
    study_total = 1e3

    b = np.ones((len(train_labels),1))
    w = np.zeros((feature_size,1))

    matrix = data_matrix(train_features,train_labels)
    # 求伪逆
    matrix_t = np.linalg.pinv(matrix)

    count = 0
    while(count < study_total):
        w = np.dot(matrix_t,b)
        err = np.dot(matrix,w) - b 
        if (err.any() < 0):
            print("ERROR!")
            return 0

        if (err.any() == 0):
            print("训练完成!")
            return w
            
        b += study_step * (err + abs(err))
        count += 1
        if count % 1000 == 0:
            print(count / 1000)
    print("训练未达到最优结果退出!")

    return w

### *基于Fisher准则的线性分类器*

In [9]:
# 将数据拆分为类
def devide_matrix(train_features,train_labels):
    #对类中样本数量进行统计
    count_1 = 0
    count_2 = 0
    
    matrix_1 = []
    matrix_2 = []
    for i in range(len(train_labels)):
        if (train_labels[i] == 1):
            matrix_1.append(train_features[i])
            count_1 += 1
        else:
            matrix_2.append(train_features[i])
            count_2 += 1

    return matrix_1,matrix_2,count_1,count_2

# 求矩阵的均值向量
def get_average(matrix,count):
    matrix_t = np.transpose(matrix)
    temp = np.ones((count,1))
    temp = np.dot(temp,1/count)
    ans = np.dot(matrix_t,temp)
    return ans

def get_divergence(vector,matrix,count,size):
    feature_size = size * size
    ans = np.zeros((feature_size,feature_size))
    for i in range(count):
        temp = np.array(matrix[i] - vector)
        ans_k = np.dot(temp,temp.T)
       #print(ans_k.shape)
        ans += ans_k

    return ans

def classify_Fisher(train_features,train_labels,size):
    feature_size = size * size
    matrix_1,matrix_2,count_1,count_2 = devide_matrix(train_features,train_labels)
    # 求类内聚类指标
    # 1.求均值向量
    vector_1 = get_average(matrix_1,count_1)
    vector_2 = get_average(matrix_2,count_2)

    # 2.计算类内散度矩阵
    divergence_1 = get_divergence(vector_1,matrix_1,count_1,size)
    divergence_2 = get_divergence(vector_2,matrix_2,count_2,size)

    # 3.计算总类内散度矩阵
    divergence = []
    divergence = np.array(divergence)
    divergence = divergence_1 + divergence_2

    # 计算最优投影方向和决策分界面
    w = []
    w = np.dot(np.linalg.pinv(divergence),(vector_1 - vector_2))
    # 将两类样本的中间值作为分界点
    w_a = 0.5 * (vector_1 + vector_2)

    return w,w_a


### *预测结果*
* 正确则为object_num，错误则为-1

In [10]:
def Predict_Sence(testset, w, b):
    predict = []
    for i in range(len(testset)):
        img = testset[i]
        #img = np.array(img).reshape(-1,1)
        result = np.dot(w,img) + b
        if result < 0:
            result = -1
        else:
            result = 1
        predict.append(result)
    
    return np.array(predict).reshape(-1,1)

def Predict_LMS(testset, w):
    predict = []
    for i in range(len(testset)):
        img = testset[i]
        temp = list(img)
        temp.append(1)
        img = np.array(temp)
        img = img.reshape(-1,1)
        result = np.dot(w.T,img)
        if result < 0:
            result = -1
        else:
            result = 1
        predict.append(result)

    return np.array(predict).reshape(-1,1)

def Predict_Fisher(testset, w ,w_a):
    predict = []
    # 投影至一维的决策面
    res = np.dot(w_a.T,w)
    for i in range (len(testset)):
        img = testset[i]
        temp = np.dot(img,w)
        if (temp > res):
            predict.append(1)
        else:
            predict.append(-1)
    return np.array(predict).reshape(-1,1)

### *主程序*

In [11]:
from sklearn.cross_validation import train_test_split
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings("ignore")

size = 0
full_size = 28
hog_size = 18

p = int(input("请输入模式：1为hog，2为全像素"))
if p == 1:
    size = hog_size
    feature = get_hog_features(train_images)
elif p == 2:
    size = full_size
    feature = get_full_features(train_images)
else:
    print("invalid input")


#分割数据集
train_features, test_features, train_labels, test_labels = train_test_split(feature,labels, test_size=0.2)
#print(len(train_features))
#print(len(train_labels))

#feature = get_full_features(train_images)
#print(feature[1])

# 感知器算法
total = 0
w_1,b = classify_sense(train_features,train_labels,size)
w_1 = w_1.reshape(-1,(size * size)).astype(np.int)
#print(b)
#print(w)

# LMS算法
w_2 = classify_LMS(train_features,train_labels,size)
#print(np.transpose(w))

# Fisher算法
w_3, w_a = classify_Fisher(train_features,train_labels,size)

test_predict_1 = Predict_Sence(test_features,w_1,b)
score = accuracy_score(test_labels,test_predict_1)
print("感知器内核二分类器的预测准确率为%lf" %score)

test_predict_2 = Predict_LMS(test_features,w_2)
score = accuracy_score(test_labels,test_predict_2)
print("LMS内核二分类器的预测准确率为%lf" %score) 

test_predict_3 = Predict_Fisher(test_features,w_3,w_a)
score = accuracy_score(test_labels,test_predict_3)
print("Fisher内核二分类器的预测准确率为%lf" %score)



1.0
训练未达到最优结果退出!
感知器内核二分类器的预测准确率为0.933481
LMS内核二分类器的预测准确率为0.996009
Fisher内核二分类器的预测准确率为0.949889


In [None]:
import torch 
from torch.utils.data import DataLoader
import torchvision.datasets as dsets 
import torchvision.transforms as transforms

batch_size = 100
# MNIST dataset
train_dataset = dsets.MNIST(root='./pymnist', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = dsets.MNIST(root='./pymnist', train=False, transform=transforms.ToTensor(), download=True)
# load_data
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)

In [13]:
import warnings
warnings.filterwarnings("ignore")
# original_data
print("train_data:", train_dataset.train_data.size())
print("train_labels:", train_dataset.train_labels.size())
print("test_data:", test_dataset.test_data.size())
print("test_labels:", test_dataset.test_labels.size())
# shuffle batch_size data
print("batch_size:", train_loader.batch_size)
print("load_train_data:", train_loader.dataset.train_data.shape)
print("load_train_labels:", train_loader.dataset.train_labels.shape)

train_data: torch.Size([60000, 28, 28])
train_labels: torch.Size([60000])
test_data: torch.Size([10000, 28, 28])
test_labels: torch.Size([10000])
batch_size: 100
load_train_data: torch.Size([60000, 28, 28])
load_train_labels: torch.Size([60000])


In [14]:
import torch.nn as nn

input_size = 784
hidden_size = 500
num_classes = 10

# #定义神经网络模型
class Neural_net(nn.Module):
    def __init__(self, input_num, hidden_size, output_num):
        super(Neural_net, self).__init__()
        self.layers1 = nn.Linear(input_num, hidden_size)
        self.layers2 = nn.Linear(hidden_size, output_num)

    def forward(self, x):
        out = self.layers1(x)
        out = torch.relu(out)
        out = self.layers2(out)
        return out
net = Neural_net(input_size, hidden_size, num_classes)
print(net)

Neural_net(
  (layers1): Linear(in_features=784, out_features=500, bias=True)
  (layers2): Linear(in_features=500, out_features=10, bias=True)
)


In [15]:
from torch.autograd import Variable

# training
learning_rate = 1e-1
num_epoches = 5
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate)
for epoch in range(num_epoches):
    #print("current epoch = {}".format(epoch))
    for i, (images,labels) in enumerate(train_loader):
        images = Variable(images.view(-1, 28*28))
        labels = Variable(labels)

        outputs = net(images)
        loss = criterion(outputs, labels)  # calculate loss
        optimizer.zero_grad()  # clear net state before backward
        loss.backward()       
        optimizer.step()   # update parameters

        #if i%100 == 0:
            #print("current loss = %.5f" %loss.item())
print("finished training")

finished training


In [16]:
# prediction
total = 0
correct = 0
for images, labels in test_loader:
    images = Variable(images.view(-1, 28*28))
    labels = Variable(labels)
    outputs = net(images)

    _,predicts = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicts == labels).sum()
print("Accuracy = %.2f" %(100*correct/total))

Accuracy = 96.14
