In [1]:
import warnings
warnings.filterwarnings('ignore')
import tensorflow as tf
import numpy as np
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.layers import Input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import load_img
from sklearn.model_selection import train_test_split

In [2]:
imagePaths = []
for files in os.listdir('./dataset'):
    for image in os.listdir('./dataset/{}'.format(files)):
        # os.path.splitext(image) => 獲取image的副檔名
        imagePaths.append('./dataset/{}/{}'.format(files , image))

# 獲取數據標簽
data , labels = [] , []
for imagePath in imagePaths:
    # 讀取image，並將image做resize
    image = load_img(imagePath, target_size = (224 , 224))
    image = img_to_array(image)
    image = preprocess_input(image)
    data.append(image)

    if imagePath.split('/')[2] == 'without_mask':
        labels.append([0 , 1])
    if imagePath.split('/')[2] == 'with_mask':
        labels.append([1 , 0])
data = np.array(data , dtype = 'float32')
labels = np.array(labels).astype(np.float32)

# 訓練集與測試集切分
trainX , testX , trainY , testY =\
train_test_split(data , labels , test_size = 0.20 , random_state = 42)

In [3]:
# finetune with MobileNetV2
mobilenet = MobileNetV2(weights = 'imagenet' ,
                        include_top = False,
                        input_tensor = Input(shape = (224 , 224 , 3)))


# 自定義layer的寫法，在本程式中用來替代tf.keras.layers.Dense
class add_layer(tf.keras.layers.Layer):   
    def __init__(self, in_size , out_size , name_ , activation_function = None):
        super(add_layer , self).__init__()
        initializer = tf.initializers.GlorotUniform()
        self.Weights = tf.Variable(initial_value = initializer([in_size , out_size]) , name = '{}_w'.format(name_))
        self.biases = tf.Variable(tf.zeros([1 , out_size]) + 0.00001 , name = '{}_b'.format(name_))
        self.activation_function = activation_function

    def call(self, inputs):
        if self.activation_function is None:
            return tf.matmul(inputs , self.Weights) + self.biases
        else:
            return self.activation_function(tf.matmul(inputs , self.Weights) + self.biases)


class new_layer(tf.Module):
    def __init__(self , name = None):
        super(new_layer , self).__init__(name = name)
        with self.name_scope: #相當於with tf.name_scope('new_layer')      
            self.maxpooling = tf.keras.layers.AveragePooling2D(pool_size = (7 , 7))
            self.flatten = tf.keras.layers.Flatten()
            self.add_layer_1 = add_layer(1280 , 128 , 'layer_1' , tf.nn.relu)
            self.add_layer_2 = add_layer(128 , 2 , 'layer_2')
            self.dropout = tf.keras.layers.Dropout(0.1)
       
    def __call__(self , inputs , training = True):           
        x = self.maxpooling(inputs)          
        x = self.flatten(x) 
        x = self.add_layer_1(x)
        x = self.dropout(x , training = training)
        x = self.add_layer_2(x)   
        output = tf.nn.softmax(x , axis = 1)
        prediction = tf.math.log(tf.clip_by_value(output , 1e-8 , tf.reduce_max(output)))
        return prediction

In [4]:
mask_model = new_layer()  

# 對image做augmentation，防止overfitting
aug = ImageDataGenerator(rotation_range = 25,
                         width_shift_range = 0.15,
                         height_shift_range = 0.2 ,
                         shear_range = 0.2 ,
                         zoom_range = 0.15,
                         horizontal_flip = True ,
                         fill_mode = 'nearest')

optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001)

@tf.function 
def train_step(x , y):
    with tf.GradientTape() as tape:
        mobilenet_output = mobilenet(x)
        prediction = mask_model(mobilenet_output , training = True)     
        cross_entropy_temp = -tf.reduce_sum(y * prediction , axis = 1)
        cross_entropy = tf.reduce_mean(cross_entropy_temp)
        correct = tf.equal(tf.math.argmax(prediction , 1) , tf.argmax(y , 1))
        correct = tf.cast(correct , tf.float32)
        accuracy = tf.reduce_mean(correct)
    # 只訓練new_layer的variable，而MobileNetV2的variable則不更新
    var_list = [var for var in mask_model.trainable_variables]
    grads = tape.gradient(cross_entropy , var_list)   
    optimizer.apply_gradients(grads_and_vars = zip(grads , var_list))
    return cross_entropy , accuracy

@tf.function
def test_step(x , y):
    mobilenet_output = mobilenet(x)
    prediction = mask_model(mobilenet_output , training = False)     
    cross_entropy_temp = -tf.reduce_sum(y * prediction , axis = 1)
    cross_entropy = tf.reduce_mean(cross_entropy_temp)
    correct = tf.equal(tf.math.argmax(prediction , 1) , tf.argmax(y , 1))
    correct = tf.cast(correct , tf.float32)
    accuracy = tf.reduce_mean(correct)
    return cross_entropy , accuracy
    
for epoch_i in range(0 , 4):
    batches = 0
    for batch_i , (x_batch , y_batch) in enumerate(aug.flow(trainX , trainY , shuffle = True , batch_size = 32)):
        train_loss , train_acc = train_step(x_batch , y_batch)
             
        batches += 1
        if batches >= len(trainX) / 32: break
    
        if batch_i % 5 == 0:
            print('=' * 30)
            print('epoch_i : {}'.format(epoch_i))
            print('batch_i : {}'.format(batch_i))
            print('training_loss : {:.2f}'.format(train_loss.numpy()))
            print('training_accuracy : {:.2%}'.format(train_acc.numpy()))
       
    test_loss , test_acc = test_step(testX , testY)
    print('*' * 30)
    print('epoch_i : {}'.format(epoch_i))
    print('testing_accuracy : {:.2f}'.format(test_loss.numpy()))
    print('testing_accuracy : {:.2%}\n'.format(test_acc.numpy()))

epoch_i : 0
batch_i : 0
training_loss : 0.63
training_accuracy : 53.12%
epoch_i : 0
batch_i : 5
training_loss : 0.47
training_accuracy : 81.25%
epoch_i : 0
batch_i : 10
training_loss : 0.25
training_accuracy : 93.75%
epoch_i : 0
batch_i : 15
training_loss : 0.32
training_accuracy : 87.50%
epoch_i : 0
batch_i : 20
training_loss : 0.16
training_accuracy : 100.00%
epoch_i : 0
batch_i : 25
training_loss : 0.10
training_accuracy : 100.00%
epoch_i : 0
batch_i : 30
training_loss : 0.09
training_accuracy : 96.88%
******************************
epoch_i : 0
testing_accuracy : 0.06
testing_accuracy : 100.00%

epoch_i : 1
batch_i : 0
training_loss : 0.07
training_accuracy : 100.00%
epoch_i : 1
batch_i : 5
training_loss : 0.05
training_accuracy : 100.00%
epoch_i : 1
batch_i : 10
training_loss : 0.13
training_accuracy : 96.88%
epoch_i : 1
batch_i : 15
training_loss : 0.05
training_accuracy : 100.00%
epoch_i : 1
batch_i : 20
training_loss : 0.03
training_accuracy : 100.00%
epoch_i : 1
batch_i : 25
tr

In [5]:
# 模型存檔
tf.saved_model.save(mask_model , 'my_network')
sess = tf.compat.v1.Session()
tf.io.write_graph(sess.graph , './my_network' , 'mask_model.pbtxt')
restore_model = tf.saved_model.load('my_network')

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: my_network\assets
