In [None]:
import tensorflow as tf
import os
import numpy as np
import matplotlib.image as mpimg # mpimg 用于读取图片

data_path = '../data'

# 图片的路径，1中包括了数字1-9，0中全部为0
path_1 = os.listdir(os.path.join(data_path, '1'))
np.random.shuffle(path_1)
path_1 = list(map(lambda x: os.path.join(data_path, '1', x), path_1))
path_0 = os.listdir(os.path.join(data_path, '0'))
path_0 = list(map(lambda x: os.path.join(data_path, '0', x), path_0))


def parse_image(image_path):
    """对所给的图像进行处理，变成一维向量, 并且归一化
    Args:
        image_path: 图像的路径
    Returns：
        img: 处理好的图像
    """
    t = mpimg.imread(image_path)
    return np.reshape(t, (28 * 28)) / 255.


def get_label(paths, labels):
    """根据给的路径对图像进行处理，打上标签
    Args:
        paths: 图片路径
        labels: 图片标签
    Returns:
        x: 处理好的图片
        y: 对应长度的标签
    """
    x = list(map(parse_image, paths))
    if labels == 1:
        y = [[1,0] for _ in range(len(paths))]
    else:
        y = [[0,1] for _ in range(len(paths))]
    return x, y


data_0 = get_label(path_0, 1)
print(data_0[1][:10])


data_1 = get_label(path_1, 0)

In [None]:
# 输入图片
data_X_train = np.concatenate((data_0[0][:-500], data_1[0][:-500]))
# 输入标签数据
data_Y_train = np.concatenate((data_0[1][:-500], data_1[1][:-500]))
training_set = np.concatenate([data_X_train, data_Y_train], axis=1)
# 验证集
data_X_test = np.concatenate((data_0[0][-500:], data_1[0][-500:]))
data_Y_test = np.concatenate((data_0[1][-500:], data_1[1][-500:]))

In [None]:
def gen_batch(dataset, batchsize):
    """根据设定的batchsize大小产生mini batch
    Args:
        dataset: 数据集
        batchsize: batchsize
    Generates:
        x: 输入
        y：输出
    """
    for i in range(np.shape(dataset)[0] // batchsize):
        pos = i * batchsize
        x = dataset[pos:pos + batchsize, 0:-2]
        y = dataset[pos:pos + batchsize, -2:]
        yield x, y
    remain = np.shape(dataset)[0] % batchsize
    if remain != 0:
        x, y = dataset[-remain:, 0:-2], dataset[-remain:, -2:]
        yield x, y

In [None]:
batchsize = 64
lr = 0.01
epoch = 100

In [None]:
# 定义计算图
graph = tf.Graph()
with graph.as_default():
    # 定义placeholder
    X = tf.placeholder(shape=(None, 28*28), dtype=tf.float32, name="X")
    Y = tf.placeholder(shape=(None, 2), dtype=tf.float32, name="Y")

    # 定义weight matrix
    W = tf.Variable(tf.truncated_normal(shape=[784, 2]), name="WeightMatrix")
    lgt = tf.matmul(X, W)
    output = tf.nn.sigmoid(lgt, name="Apply_Sigmoid")
    # 定义loss
    loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=Y, logits=lgt), name="calculate_loss")
    
    with tf.name_scope("SGD"):
        # 使用梯度下降进行优化
        opt = tf.train.GradientDescentOptimizer(lr).minimize(loss, var_list=[W])
    
    # 计算错误率
    with tf.name_scope("calculate_error_rate"):
        # 概率大于 0.5 预测结果为0， 否则为 0
        pred = tf.argmax(output, axis=1)
        true = tf.argmax(Y, axis=1)
        accuracy = tf.reduce_mean(tf.cast(tf.equal(pred, true), tf.float32))
    

        

        error_rate = 1 - accuracy

In [None]:
with tf.Session(graph=graph) as sess:
    # 初始化变量
    init = tf.global_variables_initializer()
    sess.run(init)
    step = 0
    for epc in range(epoch):
        for x, y in gen_batch(training_set, batchsize):
            l, error, _ = sess.run([loss, error_rate, opt], feed_dict={X: np.reshape(x, (-1, 784)), Y: np.reshape(y, (-1, 2))})
            if step % 50 == 0:
                print("Step: {:>4}, Loss: {:.4f}, Error Rate: {:.4%}".format(step, l, error))
            step += 1
    print("Training finished.")
    l, error, weight_matrix = sess.run([loss, error_rate, W],
                                       {X: data_X_test, Y: data_Y_test})
    print("Testing Loss: {:.4f}, Testing Error Rate: {:.4%}".format(l, error))
    W_value = sess.run(W)