In [None]:
import tensorflow as tf
from tensorflow import keras
import datetime

In [None]:
(train_image, train_labels), (test_image,
                              test_labels) = tf.keras.datasets.mnist.load_data()

In [None]:
train_image.shape

In [None]:
train_image = tf.expand_dims(train_image, -1)
test_image = tf.expand_dims(test_image, -1)

In [None]:
train_image.shape

In [None]:
train_image = tf.cast(train_image/255, tf.float32)
test_image = tf.cast(test_image/255, tf.float32)

In [None]:
train_labels = tf.cast(train_labels, tf.int64)
test_labels = tf.cast(test_labels, tf.int64)

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((train_image, train_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_image, test_labels))

In [None]:
dataset

In [None]:
# 注意，这里repeat没有参数，因此会一直产生数据
dataset = dataset.repeat().shuffle(60000).batch(128)
test_dataset = test_dataset.repeat().batch(128)

In [None]:
dataset, test_dataset

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(
        16, [3, 3], activation='relu', input_shape=(None, None, 1)),
    tf.keras.layers.Conv2D(32, [3, 3], activation='relu'),
    tf.keras.layers.GlobalMaxPooling2D(),
    tf.keras.layers.Dense(10, activation='softmax')
])

model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
import os
log_dir = os.path.join(
    'logs', datetime.datetime.now().strftime("%Y%m%d-%H:%M:%S"))

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir=log_dir, histogram_freq=1)

In [None]:
# 创建文件编写器
file_writer = tf.summary.create_file_writer(log_dir+'/lr')
file_writer.set_as_default()

In [None]:
# 控制学习速率
def lr_sche(epoch):
    learning_rate = 0.2
    if epoch > 5:
        learning_rate = 0.02
    if epoch > 10:
        learning_rate = 0.01
    if epoch > 20:
        learning_rate = 0.005
    # 将leaning-rate变化写入磁盘
    tf.summary.scalar('leaning_rate', data=learning_rate, step=epoch)
    return learning_rate

In [None]:
lr_callback = tf.keras.callbacks.LearningRateScheduler(lr_sche)

In [None]:
model.fit(dataset,
          epochs=30,
          steps_per_epoch=60000//128,
          validation_data=test_dataset,
          validation_steps=60000//128,
          callbacks=[tensorboard_callback, lr_callback])

In [None]:
%load_ext tensorboard
%matplotlib inline
%tensorboard - -logdir logs

# 自定义训练中使用Tensorboard

In [None]:
optimizer = tf.keras.optimizers.Adam()  # 定义优化函数
loss_func = tf.keras.losses.SparseCategoricalCrossentropy()  # 定义损失函数

In [None]:
def loss(model, x, y):
    y_model(x)
    return loss_func(x, y_)

In [None]:
# 创建汇总模块
train_loss = tf.keras.metrics.Mean('train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('train_accuracy')

test_loss = tf.keras.metrics.Mean('test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy('test_accuracy')

In [None]:
def train_step(model, images, labels):
    with tf.GradientTape() as t:
        pred = model(images)
        loss_step = loss_func(labels, pred)
    grads = t.gradient(loss_step, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    train_loss(loss_step)
    train_accuracy(labels, pred)

In [None]:
def test_step(model, images, labels):
    pred = model(images)
    loss_step = loss_func(labels, pred)
    test_loss(loss_step)
    test_accuracy(labels, pred)

In [None]:
current_time=datetime.datetime.now().strftime("%Y%m%d-%H:%M:%%SVG")
train_log_dir='logs/gradient_tape'+current_time+'train'
test_log_dir='logs/gradient_tape'+current_time+'test'

train_writer=tf.summary.create_file_writer(train_log_dir)
test_writer=tf.summary.create_file_writer(test_log_dir)

In [None]:
def train():
    for epoch in range(10):
        for(batch, (images, labels)) in enumerate(dataset):
            train_step(model, images, labels)
            print("\rstep:{}, loss:{:.4f}, acc:{:.4f}".format(batch,
                                                              train_loss.result(),
                                                              train_accuracy.result()), end="")
        with train_writer.as_default():
            tf.summary.scalar('loss', train_loss.result(), step=epoch)
            tf.summary.scalar('acc', train_accuracy.result(), step=epoch)
#------------------------------------------------------------------------
        for (batch, (images, labels)) in enumerate(test_dataset):
            test_step(model, images, labels)
            print("\rstep:{}, test_loss:{:.4f}, test_acc:{:.4f}".format(batch,
                                                                        test_loss.result(),
                                                                        test_accuracy.result()), end="")
        with train_writer.as_default():
            tf.summary.scalar('test_loss', test_loss.result(), step=epoch)
            tf.summary.scalar('test_acc', test_accuracy.result(), step=epoch)
#--------------------------------------------------------------------------
        template = 'Epoch{},loss:{},acc:{},test_loss:{},test_acc:{}'
        print(template.format(epoch+1,
                              train_loss.result(),
                              train_accuracy.result(),
                              test_loss.result(),
                              test_accuracy.result()))

        train_loss.reset_states()
        train_accuracy.reset_states()
        test_loss.reset_states()
        test_accuracy.reset_states()

In [None]:
 train()