/
forward.py
114 lines (97 loc) · 3.66 KB
/
forward.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
# 加载数据集
# x: [60k,28,28] , [10,28,28]
# y: [60k],[10k]
(x, y), (x_test, y_test) = datasets.mnist.load_data()
# 转换数据类型
x = tf.convert_to_tensor(x, dtype=tf.float32) / 255. # x:[0~255] => [0~1.]
y = tf.convert_to_tensor(y, dtype=tf.int32)
x_test = tf.convert_to_tensor(x_test, dtype=tf.float32) / 255. # x:[0~255] => [0~1.]
y_test = tf.convert_to_tensor(y_test, dtype=tf.int32)
print(x.shape, y.shape, x.dtype, y.dtype)
print(tf.reduce_min(x), tf.reduce_max(x))
print(tf.reduce_min(y), tf.reduce_max(y))
# 处理数据集
train_db = tf.data.Dataset.from_tensor_slices((x, y)).batch(128)
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(128)
train_iter = iter(train_db)
sample = next(train_iter)
print('batch: ', sample[0].shape, sample[1].shape) # 输入样例
# [b,784]=>[b,512]=>[b,128]=>[b,10]
# [dim_in,dim_out],[dim,out]
w1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1)) # 均值为0,方差为0.1
b1 = tf.Variable(tf.zeros([256]))
w2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=0.1))
b2 = tf.Variable(tf.zeros([128]))
w3 = tf.Variable(tf.random.truncated_normal([128, 10], stddev=0.1))
b3 = tf.Variable(tf.zeros([10]))
lr = 1e-3 # 学习率
for epoch in range(100): # iterate db for 10
# 前向计算
for step, (x, y) in enumerate(train_db): # for every batch
# x:[128,28,28]
# y:[128]
# [b,28,28]
x = tf.reshape(x, [-1, 28 * 28])
with tf.GradientTape() as tape: # tf.Variable
# x:[b,28*28]
# h1=x@x1+b1
# [b,784]@[784,256]+[256] => [b,256]+[256] => [b,256]+[b,256]
h1 = x @ w1 + tf.broadcast_to(b1, [x.shape[0], 256])
h1 = tf.nn.relu(h1)
# [b,256] => [b,128]
h2 = h1 @ w2 + b2
h2 = tf.nn.relu(h2)
# [b,128] => [b,10]
out = h2 @ w3 + b3
# compute loss
# out: [b,10]
# y:[b]
y_onehot = tf.one_hot(y, depth=10)
# 均方差 mse=mean(sum(y-out)^2)
# [b,10]
loss = tf.square(y_onehot - out)
# mean:scalar
loss = tf.reduce_mean(loss)
# compute gradients
grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3])
# print(grads)
# w1=w1-lr*w1_grad
w1.assign_sub(lr * grads[0]) # 原地更新,引用保持不变
# w1 = w1 - lr * grads[0] #赋给新的对象
b1.assign_sub(lr * grads[1])
w2.assign_sub(lr * grads[2])
b2.assign_sub(lr * grads[3])
w3.assign_sub(lr * grads[4])
b3.assign_sub(lr * grads[5])
if step % 100 == 0:
print(epoch, step, 'loss: ', float(loss))
# test/evaluation
# [w1,b1,w2,b2,w3,b3]
total_correct, total_num = 0, 0
for step, (x, y) in enumerate(test_db):
# [b,28,28]=>[b,28*28]
x = tf.reshape(x, [-1, 28 * 28])
# [b,784]=>[b,256]=>[b,128]=>[b,10]
h1 = tf.nn.relu(x @ w1 + b1)
h2 = tf.nn.relu(h1 @ w2 + b2)
out = h2 @ w3 + b3
# out:[b,10]~R
# prob:[b,10]~[0,1]
prob = tf.nn.softmax(out, axis=1)
# [b,10]=>[b]
# int64!!!
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
# y:[b]
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_correct += int(correct)
total_num += x.shape[0]
acc = total_correct / total_num
print('test acc: ', acc)