In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from datetime import datetime
import numpy as np
import random
import os

# limit VRAM usage
gpus = tf.config.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# enable mixed precision training (NVIDIA GPU only)
keras.mixed_precision.set_global_policy('mixed_float16')

# enable XLA compilation (GPU or CPU)
tf.config.optimizer.set_jit(True)

# preserve GPU threads for better performance
os.environ['TF_GPU_THREAD_MODE'] = 'gpu_private'

In [None]:
input_size = 96

In [None]:
pre_trained_model = keras.applications.MobileNetV2(
    input_shape=(input_size, input_size, 3), 
    include_top=False,
    pooling='avg',
    weights='imagenet')

for layer in pre_trained_model.layers:
    layer.trainable = False

NAME = 'classifier'
x = layers.Dense(3, activation="softmax", name="classification")(pre_trained_model.output)
model_classifier = keras.Model(pre_trained_model.input, x, name=NAME)

NAME = 'keypoint_estimator'
x = layers.Dense(26, name="regression")(pre_trained_model.output)
model_keypoint_estimator = keras.Model(pre_trained_model.input, x, name=NAME)

model_classifier.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy'])

model_keypoint_estimator.compile(
    optimizer='adam',
    loss='mean_squared_error')

NAME = 'combined'
model_combined = keras.Model(pre_trained_model.input, [model_classifier.output, model_keypoint_estimator.output])

#model_combined.summary()
#keras.utils.plot_model(model_combined, to_file=NAME+'.png', show_shapes=True)

In [None]:
# gather classification data
training_path = "./data_classification/training"
validation_path = "./data_classification/validation"
label_to_int = {'cycling': '0', 'jogging': '1', 'sitting': '2'}
training_list = []
validation_list = []

for label in os.listdir(training_path):
    image_path = os.path.join(training_path, label)
    for filename in os.listdir(image_path):
        training_list.append((os.path.join(image_path, filename), label_to_int[label]))
        
for label in os.listdir(validation_path):
    image_path = os.path.join(validation_path, label)
    for filename in os.listdir(image_path):
        validation_list.append((os.path.join(image_path, filename), label_to_int[label]))

random.shuffle(training_list)
random.shuffle(validation_list)
training_list = tf.data.Dataset.from_tensor_slices(training_list)
validation_list = tf.data.Dataset.from_tensor_slices(validation_list)
print("Number of training data:", len(training_list))
print("Number of validation data:", len(validation_list))

In [None]:
# load classification data
def preprocess_data(sample):
    
    image = tf.io.read_file(sample[0])
    image = tf.io.decode_image(image, channels=3, expand_animations=False)
    image = tf.image.resize(image, [input_size, input_size], antialias=True)
    image = keras.applications.mobilenet_v2.preprocess_input(image)
    
    label = sample[1]
    label = tf.strings.to_number(label, out_type='int32')
    
    return image, label

# apply data augmentation
def data_augmentation(image, label):
    
    image = tf.image.random_brightness(image, 0.125)
    image = tf.image.random_hue(image, 0.1)
    image = tf.image.random_flip_left_right(image)
    image = tf.clip_by_value(image, -1., 1.)
    
    return image, label

In [None]:
# create classification dataset
BATCH_SIZE = 16
training_data = training_list.map(
    preprocess_data,
    num_parallel_calls=tf.data.AUTOTUNE).map(
    data_augmentation,
    num_parallel_calls=tf.data.AUTOTUNE).cache().shuffle(10000).batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.AUTOTUNE)
validation_data = validation_list.map(
    preprocess_data,
    num_parallel_calls=tf.data.AUTOTUNE).batch(BATCH_SIZE).cache().prefetch(tf.data.AUTOTUNE)

In [None]:
# train classifier
EPOCHS = 100

history = model_classifier.fit(
    training_data,
    epochs=EPOCHS,
    validation_data=validation_data,
    verbose=2)

plt.plot(history.history['loss'], label='training loss')
plt.plot(history.history['val_loss'], label='validation loss')
plt.title('Loss over epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()

plt.plot(history.history['accuracy'], label='training accuracy')
plt.plot(history.history['val_accuracy'], label='validation accuracy')
plt.title('Accuracy over epoch')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid()
plt.show()

In [None]:
# gather regression data
training_images_path = "./data_regression/training/images"
training_labels_path = "./data_regression/training/labels"
validation_images_path = "./data_regression/validation/images"
validation_labels_path = "./data_regression/validation/labels"

training_images_list = sorted([os.path.join(training_images_path, filename) for filename in os.listdir(training_images_path)])
training_labels_list = sorted([os.path.join(training_labels_path, filename) for filename in os.listdir(training_labels_path)])
validation_images_list = sorted([os.path.join(validation_images_path, filename) for filename in os.listdir(validation_images_path)])
validation_labels_list = sorted([os.path.join(validation_labels_path, filename) for filename in os.listdir(validation_labels_path)])

training_zip_list = list(zip(training_images_list, training_labels_list))
validation_zip_list = list(zip(validation_images_list, validation_labels_list))
random.shuffle(training_zip_list)
random.shuffle(validation_zip_list)
training_list = tf.data.Dataset.from_tensor_slices(training_zip_list)
validation_list = tf.data.Dataset.from_tensor_slices(validation_zip_list)

print("Number of training data:", len(training_list))
print("Number of validation data:", len(validation_list))

In [None]:
# load regression data
def preprocess_data(sample):
    
    image = tf.io.read_file(sample[0])
    image = tf.io.decode_image(image, channels=3, expand_animations=False)
    image = tf.image.resize(image, [input_size, input_size], antialias=True)
    image = keras.applications.mobilenet_v2.preprocess_input(image)
    
    label = tf.io.read_file(sample[1])
    label = tf.strings.split(label, sep=', ')
    label = tf.strings.to_number(label, out_type='float32')
    
    return image, label

# apply data augmentation
def data_augmentation(image, label):
    
    image = tf.image.random_brightness(image, 0.125)
    image = tf.image.random_hue(image, 0.1)
    image = tf.clip_by_value(image, -1., 1.)
    
    return image, label

In [None]:
# create regression dataset
BATCH_SIZE = 128
training_data = training_list.map(
    preprocess_data,
    num_parallel_calls=tf.data.AUTOTUNE).cache().shuffle(10000).map(
    data_augmentation,
    num_parallel_calls=tf.data.AUTOTUNE).batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.AUTOTUNE)
validation_data = validation_list.map(
    preprocess_data,
    num_parallel_calls=tf.data.AUTOTUNE).batch(BATCH_SIZE).cache().prefetch(tf.data.AUTOTUNE)

In [None]:
# train keypoint estimator
EPOCHS = 100

history = model_keypoint_estimator.fit(
    training_data,
    epochs=EPOCHS,
    validation_data=validation_data,
    verbose=2)

plt.plot(history.history['loss'], label='training loss')
plt.plot(history.history['val_loss'], label='validation loss')
plt.title('Loss over epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()

In [None]:
# save combined model
#save_path = "./checkpoints/combined"
#model_combined.save(save_path)

In [None]:
# load and access regression branch
#model_combined = keras.models.load_model("./checkpoints/combined")
#model_keypoint_estimator = keras.Model(model_combined.inputs, model_combined.outputs[1])

In [None]:
# create test dataset
BATCH_SIZE = 1
test_data = validation_list.map(
    preprocess_data,
    num_parallel_calls=tf.data.AUTOTUNE).shuffle(10000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
# test on batch size 1
for sample in test_data.take(1):
    image = (np.squeeze(sample[0]) + 1.) / 2
    H, W, _ = image.shape
    plt.imshow(image)
    prediction = model_keypoint_estimator.predict(sample[0]).reshape(-1, 2)
    #prediction = sample[1].numpy().reshape(-1, 2)
    Xs, Ys = prediction[:, 0] * W, prediction[:, 1] * H
    plt.plot(
        [Xs[0], Xs[1]], [Ys[0], Ys[1]], 
        [Xs[0], Xs[2]], [Ys[0], Ys[2]],
        [Xs[1], Xs[2]], [Ys[1], Ys[2]],
        [Xs[1], Xs[3]], [Ys[1], Ys[3]],
        [Xs[2], Xs[4]], [Ys[2], Ys[4]],
        [Xs[3], Xs[5]], [Ys[3], Ys[5]],
        [Xs[4], Xs[6]], [Ys[4], Ys[6]],
        [Xs[1], Xs[7]], [Ys[1], Ys[7]],
        [Xs[2], Xs[8]], [Ys[2], Ys[8]],
        [Xs[7], Xs[8]], [Ys[7], Ys[8]],
        [Xs[7], Xs[9]], [Ys[7], Ys[9]],
        [Xs[8], Xs[10]], [Ys[8], Ys[10]],
        [Xs[9], Xs[11]], [Ys[9], Ys[11]],
        [Xs[10], Xs[12]], [Ys[10], Ys[12]]
    )
    plt.scatter(Xs, Ys, c='white')
    plt.show()