# **😎 TensorFlow 年齡與性別估計（活頁簿 2：性別分類）**

* 欲進行年齡估計，請至 [活頁簿 1（年齡估計）](https://colab.research.google.com/drive/1to2iolQGIVZgXFWRmRKZGrny5kfXk40h?usp=sharing)

在本筆記本中，我們將訓練一個 Keras 模型，根據 *已裁剪人臉* 圖片來分類一個人的性別。我們使用著名的 [UTKFace 資料集](https://susanqq.github.io/UTKFace/)，該資料集包含 23,000 張影像，每張影像都標註了性別、年齡和族裔。

> **注意：請確認您已連接到 Google Colab 的 GPU 執行環境，否則訓練可能需要非常長的時間。請至「執行階段 > 變更執行類型 > 硬體加速器」中啟用 GPU。**


## 1) **下載 UTKFace 資料集** 💻


UTKFace 資料集可透過以下 Google 雲端硬碟資料夾取得：[連結](https://drive.google.com/drive/folders/0BxYys69jI14kU0I1YUQyY1ZDRUE)。請下載並將其放入您的 Google 雲端硬碟中。

完成後，請在本筆記本中掛載您的 Google 雲端硬碟。  

In [None]:

from google.colab import drive

drive.mount('/content/drive')


將 `utkface_23k.zip` 檔案解壓縮到本地目錄。這樣可以大幅提升影像解析的速度。詳情請參考 StackOverflow 的 [這篇回答](https://stackoverflow.com/a/60802388/10878733)。


In [None]:

# Replace with your path!
!unzip -q /content/drive/MyDrive/Datasets/utkface_23k.zip -d data


將資料夾名稱從 `utkace_23k` 更改為 `utkface23k`，以移除底線，避免在拆分檔名（解析年齡）時發生錯誤。

In [None]:

!mv data/utkface_23k data/utkface23k


In [2]:

# # To download checkpoints, Keras models, TFLite models
# from google.colab import files

# Life is incomplete without this statement!
import tensorflow as tf

# And this as well!
import numpy as np

# To visualize results
import matplotlib.pyplot as plt

import os
import datetime

from pathlib import Path


## 2) **資料處理** 🦾

下載資料集後，我們需要對其執行下列操作，以便用於模型訓練：

* 👉🏻 讀取影像檔並轉成 3 維 NumPy 陣列。注意，我們將使用三通道 RGB 影像進行訓練，因此每個陣列的形狀為 `[影像寬度, 影像高度, 3]`。  
* 👉🏻 拆分檔名以解析影像中人物的性別。我們使用 `tf.strings.split()` 方法來完成此任務。  
* 👉🏻 將性別進行 one-hot 編碼，因為我們要執行**二分類**任務。

完成上述步驟後，將得到 N 個樣本，每個樣本包含影像陣列（形狀 `[128, 128, 3]`）及其對應的 one-hot 編碼標籤（形狀 `[1, 2]`）。

我們使用 `tf.data.Dataset` 來加速資料處理，以利平行運算。上述操作會透過 `tf.data.Dataset.map` 方法，映射到每個檔名上進行處理。  


In [3]:

# ----------------------------------------
# 參數設定
# ----------------------------------------
MODEL_INPUT_IMAGE_SIZE = [128, 128]
TRAIN_TEST_SPLIT       = 0.3
NUM_SAMPLES            = 20000

# ----------------------------------------
# 建立資料夾路徑（參考第二段程式）
# ----------------------------------------
try:
    BASE_DIR = Path(__file__).resolve().parent
except NameError:
    BASE_DIR = Path.cwd()

PROJECT_ROOT = BASE_DIR.parent
UTKFACE_DIR  = PROJECT_ROOT / "dataset" / "archive" / "UTKFace"

if not UTKFACE_DIR.exists():
    raise FileNotFoundError(f"找不到資料夾：{UTKFACE_DIR}")

# ----------------------------------------
# One-hot 編碼向量
# ----------------------------------------
y1 = tf.constant([1., 0.], dtype='float32')
y2 = tf.constant([0., 1.], dtype='float32')

# ----------------------------------------
# 影像解析函式（依原第一段程式）
# ----------------------------------------
def parse_image(filename):
    image_raw = tf.io.read_file(filename)
    image     = tf.image.decode_jpeg(image_raw, channels=3)
    image     = tf.image.resize(image, MODEL_INPUT_IMAGE_SIZE) / 255.0

    # 取出純檔名
    fname = tf.strings.split(filename, os.sep)[-1]
    parts = tf.strings.split(fname, "_")
    # parts[1] 是性別 (0 or 1)
    gender = tf.strings.to_number(parts[1], out_type=tf.float32)
    gender_onehot = (gender * y2) + ((1 - gender) * y1)

    return image, gender_onehot

# ----------------------------------------
# 建立 Dataset
# ----------------------------------------
pattern = str(UTKFACE_DIR / "*.jpg")   # 或 "*.chip.jpg" 視你資料命名而定
list_ds = tf.data.Dataset.list_files(pattern, shuffle=True)

dataset = (
    list_ds
    .map(parse_image, num_parallel_calls=tf.data.AUTOTUNE)
    .take(NUM_SAMPLES)
)

# ----------------------------------------
# 範例：列印前 5 筆
# ----------------------------------------
for idx, (img, lbl) in enumerate(dataset.take(5)):
    print(f"[{idx:02d}] image.shape = {img.shape}, gender_onehot = {lbl.numpy()}")


[00] image.shape = (128, 128, 3), gender_onehot = [0. 1.]
[01] image.shape = (128, 128, 3), gender_onehot = [0. 1.]
[02] image.shape = (128, 128, 3), gender_onehot = [0. 1.]
[03] image.shape = (128, 128, 3), gender_onehot = [0. 1.]
[04] image.shape = (128, 128, 3), gender_onehot = [1. 0.]



我們會從資料集中建立兩個切分，一個用於訓練模型，另一個用於測試模型。測試資料的比例由 `TRAIN_TEST_SPLIT` 決定。  



In [4]:

# Create train and test splits of the dataset.
num_examples_in_test_ds = int( dataset.cardinality().numpy() * TRAIN_TEST_SPLIT )

test_ds = dataset.take( num_examples_in_test_ds )
train_ds = dataset.skip( num_examples_in_test_ds )

print( 'Num examples in train ds {}'.format( train_ds.cardinality() ) )
print( 'Num examples in test ds {}'.format( test_ds.cardinality() ) )


Num examples in train ds 14000
Num examples in test ds 6000


## 3) **CNN 模型** 👨‍🎓

我們的目標是設計一個參數更少（意味著推論時間與模型大小更小），但仍具有強大表現力以提升泛化能力的模型。

* 👉🏻 模型接受形狀為 `[None, 128, 128, 3]` 的輸入批次，並依據 `num_blocks` 執行多層捲積運算。  
* 👉🏻 每個 Block 包含以下層序列：`Conv2D → BatchNorm → LeakyReLU`  


```
# Define the conv block.
if lite_model:
        x = tf.keras.layers.SeparableConv2D( num_filters ,
                                            kernel_size=kernel_size ,
                                            strides=strides
                                            , use_bias=False ,
                                            kernel_initializer=tf.keras.initializers.HeNormal() ,
                                            kernel_regularizer=tf.keras.regularizers.L2( 1e-5 )
                                             )( x )
    else:
        x = tf.keras.layers.Conv2D( num_filters ,
                                   kernel_size=kernel_size ,
                                   strides=strides ,
                                   use_bias=False ,
                                   kernel_initializer=tf.keras.initializers.HeNormal() ,
                                   kernel_regularizer=tf.keras.regularizers.L2( 1e-5 )
                                    )( x )

    x = tf.keras.layers.BatchNormalization()( x )
    x = tf.keras.layers.LeakyReLU( leaky_relu_alpha )( x )
```

* 若將 `lite_model` 設為 `True`，則使用[可分離捲積（Separable Convolutions）](https://towardsdatascience.com/a-basic-introduction-to-separable-convolutions-b99ec3102728)，可減少參數數量，以換取更快速的推論速度，代價是可能略微犧牲效能。

* 👉🏻 我們依序堆疊 `num_blocks` 個此類 Block，各層的過濾器數量由 `num_filters` 決定。

* 👉🏻 接著加入多層 `Dense` 全連接層，以學習由捲積層所提取的特徵；同時插入 `Dropout` 層以減少過擬合。每層 `Dropout` 的 `rate` 會逐層遞減，確保單位數較少的全連接層仍具備良好可學習性。


```
def dense( x , filters , dropout_rate ):
    x = tf.keras.layers.Dense( filters , kernel_regularizer=tf.keras.regularizers.L2( 0.1 ) , bias_regularizer=tf.keras.regularizers.L2( 0.1 ) )( x )
    x = tf.keras.layers.LeakyReLU( alpha=leaky_relu_alpha )( x )
    x = tf.keras.layers.Dropout( dropout_rate )( x )
    return x
```

* 👉🏻 最後一個 `Dense` 層使用 softmax 激活函數，輸出 `male` 與 `female` 兩類的機率分布。

> 如需選擇前述兩個區塊中所使用的權重衰減值，請參考 [這篇部落格](https://machinelearningmastery.com/how-to-reduce-overfitting-in-deep-learning-with-weight-regularization/)。

* 👉🏻 模型的輸出為形狀 `[None, 2]` 的張量。



In [5]:

# Negative slope coefficient for LeakyReLU.
leaky_relu_alpha = 0.2

lite_model = True

# Define the conv block.
def conv( x , num_filters , kernel_size=( 3 , 3 ) , strides=1 ):
    if lite_model:
        x = tf.keras.layers.SeparableConv2D( num_filters ,
                                            kernel_size=kernel_size ,
                                            strides=strides,
                                            use_bias=False ,
                                            kernel_initializer=tf.keras.initializers.HeNormal() ,
                                            kernel_regularizer=tf.keras.regularizers.L2( 1e-5 )
                                             )( x )
    else:
        x = tf.keras.layers.Conv2D( num_filters ,
                                   kernel_size=kernel_size ,
                                   strides=strides ,
                                   use_bias=False ,
                                   kernel_initializer=tf.keras.initializers.HeNormal() ,
                                   kernel_regularizer=tf.keras.regularizers.L2( 1e-5 )
                                    )( x )

    x = tf.keras.layers.BatchNormalization()( x )
    x = tf.keras.layers.LeakyReLU( leaky_relu_alpha )( x )
    return x

def dense( x , filters , dropout_rate ):
    x = tf.keras.layers.Dense( filters , kernel_regularizer=tf.keras.regularizers.L2( 0.1 ) , bias_regularizer=tf.keras.regularizers.L2( 0.1 ) )( x )
    x = tf.keras.layers.LeakyReLU( alpha=leaky_relu_alpha )( x )
    x = tf.keras.layers.Dropout( dropout_rate )( x )
    return x


# No. of convolution layers to be added.
num_blocks = 5
# Num filters for each conv layer.
num_filters = [ 16 , 32 , 64 , 128 , 256 , 256 ]
# Kernel sizes for each conv layer.
kernel_sizes = [ 3 , 3 , 3 , 3 , 3 , 3 ]

# Init a Input Layer.
inputs = tf.keras.layers.Input( shape=MODEL_INPUT_IMAGE_SIZE + [ 3 ] )

# Add conv blocks sequentially
x = inputs
for i in range( num_blocks ):
    x = conv( x , num_filters=num_filters[ i ] , kernel_size=kernel_sizes[ i ] )
    x = tf.keras.layers.MaxPooling2D()( x )

# Flatten the output of the last Conv layer.
x = tf.keras.layers.Flatten()( x )
conv_output = x

# Add Dense layers ( Dense -> LeakyReLU -> Dropout )
x = dense( conv_output , 256 , 0.6 )
x = dense( x , 64 , 0.4 )
x = dense( x , 32 , 0.2 )
outputs = tf.keras.layers.Dense( 2 , activation='softmax' )( x )

# Build the Model
model = tf.keras.models.Model( inputs , outputs )

# Uncomment the below to view the summary of the model.
# model.summary()
# tf.keras.utils.plot_model( model , to_file='architecture.png' )


執行此程式區塊，以在本筆記本中直接使用 TensorBoard 可視化模型訓練過程。


In [6]:

%load_ext tensorboard
%tensorboard --logdir tb_logs/


Reusing TensorBoard on port 6006 (pid 34752), started 1:28:29 ago. (Use '!kill 34752' to kill it.)

```markdown
## 4) **編譯模型（及其他 Callback）** 🧱

定義完模型架構後，我們將編譯 Keras 模型並初始化一些有用的 Callback。

* 👉🏻 由於我們進行的是分類任務，因此使用「分類交叉熵（Categorical Crossentropy）」作為損失函數。詳情請參考 [`tf.keras.losses.CategoricalCrossentropy`](https://www.tensorflow.org/api_docs/python/tf/keras/losses/CategoricalCrossentropy)。

* 👉🏻 使用 Adam 優化器來訓練模型。詳情請參考 [`tf.keras.optimizers.Adam`](https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Adam)。

* 👉🏻 為了評估模型效能，我們使用「準確率（Accuracy）」作為評估指標。詳情請參考 [`tf.keras.metrics.Accuracy`](https://www.tensorflow.org/api_docs/python/tf/keras/metrics/Accuracy)。

#### Callback：

* 👉🏻 [`tf.keras.callbacks.ModelCheckpoint`](https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/ModelCheckpoint)：在每個 epoch 結束後，將模型儲存為 H5 檔案。

* 👉🏻 [`tf.keras.callbacks.TensorBoard`](https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/TensorBoard)：使用 TensorBoard 可視化訓練過程。

* 👉🏻 [`tf.keras.callbacks.EarlyStopping`](https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/EarlyStopping)：當測試資料集上的評估指標（即準確率）不再改善時，停止訓練。
```


In [7]:

learning_rate = 0.0001
num_epochs = 10
batch_size = 128

train_ds = train_ds.batch( batch_size ).repeat( num_epochs )
test_ds = test_ds.batch( batch_size ).repeat( num_epochs )

save_dir = 'train-1/cp.ckpt'
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint( save_dir )

logdir = os.path.join( "tb_logs" , datetime.datetime.now().strftime("%Y%m%d-%H%M%S") )
tensorboard_callback = tf.keras.callbacks.TensorBoard( logdir )

early_stopping_callback = tf.keras.callbacks.EarlyStopping( monitor='val_accuracy' , patience=3 )

model.compile(
    loss=tf.keras.losses.categorical_crossentropy ,
    optimizer = tf.keras.optimizers.Adam( learning_rate ) ,
    metrics =[ 'accuracy' ]
)


## 5) **訓練與評估模型** 🏋🏻‍♂️

將所有 Callback 一次打包後，開始執行訓練迴圈。

In [8]:

model.fit(
    train_ds,
    epochs=num_epochs,
    validation_data=test_ds, callbacks=[ checkpoint_callback , tensorboard_callback , early_stopping_callback ]
)


Epoch 1/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 2/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 3/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 4/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 5/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 6/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 7/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 8/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 9/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets
Epoch 10/10
INFO:tensorflow:Assets written to: train-1\cp.ckpt\assets


<keras.callbacks.History at 0x1f645ac1a90>

評估模型。  

In [22]:

# Load pretrained model ( if required )
model = tf.keras.models.load_model( 'model_gender.h5' )


In [11]:

test_images = []
test_labels = []

# Unbatch the dataset first.
test_ds = test_ds.unbatch()
for image , label in test_ds.as_numpy_iterator():
    test_images.append( image )
    test_labels.append( label )

# Converts lists to ndarrays
test_images = np.array( test_images )
test_labels = np.array( test_labels )


In [23]:

p = model.evaluate( test_images , test_labels )
print( 'loss is {} \n accuracy is {} %'.format( p[0] , p[1] * 100 ) )


InternalError: Failed copying input tensor from /job:localhost/replica:0/task:0/device:CPU:0 to /job:localhost/replica:0/task:0/device:GPU:0 in order to run _EagerConst: Dst tensor is not initialized.

In [None]:

from sklearn.metrics import classification_report , confusion_matrix , ConfusionMatrixDisplay

# Fetch model predictions for test_x
pred_y = model.predict( test_images )

# Print the classification report
report = classification_report( np.argmax( test_labels , axis=1 ) , np.argmax( pred_y , axis=1 ) , target_names=[ 'male' , 'female' ] )
print( report )


InternalError: Failed copying input tensor from /job:localhost/replica:0/task:0/device:CPU:0 to /job:localhost/replica:0/task:0/device:GPU:0 in order to run _EagerConst: Dst tensor is not initialized.

In [None]:

# Plot the confusion matrix
conf_matrix = confusion_matrix( np.argmax( test_labels , axis=1 ) ,np.argmax( pred_y , axis=1 ) )
disp = ConfusionMatrixDisplay( conf_matrix , display_labels=[ 'male' , 'female' ] )
disp.plot()



將 Keras 模型儲存到本機磁碟，以便日後需要時能夠續訓。  




In [21]:

model_name = 'model_gender' #@param {type: "string"}
model_name_ = model_name + '.h5'

model.save( model_name_ )
# files.download( model_name_ )


## 6) **可視化結果**

我們將從 `test_ds` 中取出一些影像進行性別預測，並使用 `matplotlib` 繪製結果。  


In [24]:

fig = plt.figure( figsize=( 12 , 15 ) )
classes = [ 'Male' , 'Female' ]
rows = 5
columns = 2

i = 1
for image , label in zip( test_images[ 0 : 10 ] , test_labels[ 0 : 10 ] ):
    image = image.numpy()
    fig.add_subplot( rows , columns , i )
    plt.imshow( image )
    label_ = classes[ np.argmax( model.predict( np.expand_dims( image , 0 ) ) ) ]
    plt.axis( 'off' )
    plt.title( 'Predicted gender : {} , actual gender : {}'.format( label_ , classes[ np.argmax( label ) ] ) )
    i += 1


AttributeError: 'numpy.ndarray' object has no attribute 'numpy'

<Figure size 1200x1500 with 0 Axes>

## 7) **轉換為 TensorFlow Lite 格式** 📡

我們的模型將部署在 Android 應用程式中，會使用 [TF Lite Android](https://bintray.com/google/tensorflow/tensorflow-lite) 套件來載入模型並進行推論。

我們使用 `TFLiteConverter` API 將 Keras 模型（`.h5`）轉換為 TF Lite 緩衝區（`.tflite`）。詳見官方文件：[TFLiteConverter](https://www.tensorflow.org/api_docs/python/tf/lite/TFLiteConverter/)。  
最終會產出兩個 TF Lite 檔案：  
1. 使用 float16 量化的版本  
2. 未量化的版本  


In [25]:

converter = tf.lite.TFLiteConverter.from_keras_model( model )
converter.optimizations = [ tf.lite.Optimize.DEFAULT ]
converter.target_spec.supported_types = [ tf.float16 ]
buffer = converter.convert()

open( '{}_q.tflite'.format( model_name ) , 'wb' ).write( buffer )
# files.download( '{}_q.tflite'.format( model_name ) )


INFO:tensorflow:Assets written to: C:\Users\chenk\AppData\Local\Temp\tmp5ve30i1p\assets


669312

將模型轉換為未量化的 TF Lite 緩衝區。  



In [26]:

converter = tf.lite.TFLiteConverter.from_keras_model( model )
buffer = converter.convert()

open( '{}_nonq.tflite'.format( model_name ) , 'wb' ).write( buffer )
# files.download( '{}_nonq.tflite'.format( model_name ) )


INFO:tensorflow:Assets written to: C:\Users\chenk\AppData\Local\Temp\tmp4q8s40h8\assets


INFO:tensorflow:Assets written to: C:\Users\chenk\AppData\Local\Temp\tmp4q8s40h8\assets


1319560


## Utility Methods

Use these methods to automate some of the tasks.


In [None]:

#@title Utility to zip and download a directory
#@markdown Use this method to zip and download a directory. For ex. a TB logs
#@markdown directory or a checkpoint(s) directory.

dir_to_zip = 'tb_logs' #@param {type: "string"}
output_filename = 'logs.zip' #@param {type: "string"}
delete_dir_after_download = "No"  #@param ['Yes', 'No']

os.system( "zip -r {} {}".format( output_filename , dir_to_zip ) )

if delete_dir_after_download == "Yes":
    os.system( "rm -r {}".format( dir_to_zip ) )

files.download( output_filename )


In [None]:

#@title Utility to delete a directory
#@markdown Use this method to delete a directory.

dir_path = ''  #@param {type: "string"}
os.system( f'rm -r {dir_path}')



# LICENSE

```
  
MIT License

Copyright (c) 2021 Shubham Panchal

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
```