# 基础模型训练和预测

In [6]:
import tensorflow as tf
import numpy as np
#from tensorflow.keras import *

"""
自定义指标只需继承Metric类， 并重写一下函数
_init_(self)，初始化。
update_state(self，y_true，y_pred，sample_weight = None)，它使用目标y_true和模型预测y_pred来更新状态变量。
result(self)，它使用状态变量来计算最终结果。
reset_states(self)，重新初始化度量的状态。
"""
# 这是一个简单的示例，显示如何实现CatgoricalTruePositives指标，该指标计算正确分类为属于给定类的样本数量
class CatgoricalTruePostives(tf.keras.metrics.Metric):
    def __init__(self, name='binary_true_postives', **kwargs):
        super().__init__(name=name, **kwargs)
        self.true_postives = self.add_weight(name='tp', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.argmax(y_pred)
        y_true = tf.equal(tf.cast(y_pred, tf.int32), tf.cast(y_true, tf.int32))

        y_true = tf.cast(y_true, tf.float32)

        if sample_weight is not None:
            sample_weight = tf.cast(sample_weight, tf.float32)
            y_true = tf.multiply(sample_weight, y_true)

        return self.true_postives.assign_add(tf.reduce_sum(y_true))

    def result(self):
        return tf.identity(self.true_postives)

    def reset_states(self):
        self.true_postives.assign(0.)

class LossHistory(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs):
        self.losses = []
    def on_epoch_end(self, batch, logs):
        self.losses.append(logs.get('loss'))

# 定义损失函数
def custom_loss(y_actual,y_pred):
    custom_loss= (y_actual-y_pred)**2
    return custom_loss

model = tf.keras.Sequential()
model.add(tf.keras.layers.Flatten(input_shape=(28, 28)))
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(10, activation='softmax'))
model.summary()


#model.add_metric(keras.backend.std(inputs), name='std_of_activation', aggregation='mean')
#model.add_loss(tf.reduce_sum(h1)*0.1)

model.compile(optimizer=tf.keras.optimizers.RMSprop(1e-3),
             loss=custom_loss,
             metrics=[CatgoricalTruePostives()])

#checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=model_name, monitor='val_IOU',mode='auto' ,save_best_only='True')
#reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='loss', patience=5, mode='auto')
#early_stop = tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss')
#tensor_board = tf.keras.callbacks.TensorBoard(log_dir='./logs')
callbacks = [LossHistory()]
x_train, y_train = np.zeros((1000, 28, 28)), np.zeros((1000, 10))
model.fit(x_train, y_train, batch_size=64, callbacks=callbacks, epochs=3)


print(model.evaluate(x_train, y_train))

print(model.predict(x_train))

Model: "sequential_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten_4 (Flatten)          (None, 784)               0         
_________________________________________________________________
dense_8 (Dense)              (None, 128)               100480    
_________________________________________________________________
dropout_4 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_9 (Dense)              (None, 10)                1290      
Total params: 101,770
Trainable params: 101,770
Non-trainable params: 0
_________________________________________________________________
Train on 1000 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3
[0.009999994248151779, 10000.0]
[[0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 ...
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [0.1 0.1 0.1 ... 0.1 0.1 0.1]
 [

# 高级模型训练与预测

In [18]:
import tensorflow as tf
import numpy as np

# 训练并验证
# 获取模型
inputs = tf.keras.Input(shape=(28,28), name='digits')
x = tf.keras.layers.Flatten()(inputs)
x = tf.keras.layers.Dense(64, activation='relu', name='dense_1')(x)
x = tf.keras.layers.Dense(64, activation='relu', name='dense_2')(x)
outputs = tf.keras.layers.Dense(10, activation='softmax', name='predictions')(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

# sgd优化器
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
# 分类损失函数
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()

# 设定统计参数
train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy() 
val_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()


(x_train, y_train), (x_val, y_val) = tf.keras.datasets.mnist.load_data()
x_train, x_val = (x_train / 255.0).astype(np.float32), (x_val / 255.0).astype(np.float32)

# 准备训练数据
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# 准备验证数据
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)


# 迭代训练
for epoch in range(3):
    print('Start of epoch %d' % (epoch,))

 
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # 更新统计传输
    train_acc_metric(y_batch_train, logits)

    # 输出
    if step % 200 == 0:
        print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
        print('Seen so far: %s samples' % ((step + 1) * 64))

    # 输出统计参数的值
    train_acc = train_acc_metric.result()
    print('Training acc over epoch: %s' % (float(train_acc),))
    # 重置统计参数
    train_acc_metric.reset_states()

    # 用模型进行验证
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val)
        val_acc_metric(y_batch_val, val_logits)
        
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print('Validation acc: %s' % (float(val_acc),))


Start of epoch 0
Training acc over epoch: 0.65625
Validation acc: 0.4359999895095825
Start of epoch 1
Training acc over epoch: 0.75
Validation acc: 0.692300021648407
Start of epoch 2


KeyboardInterrupt: 