In [9]:
from keras import optimizers
from keras import layers
from keras.utils import to_categorical
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split

tf.__version__

In [11]:
WINDOW_SIZE = 300 # 60hz * 5sec
SENSOR_NUM = 4 
SENSOR_NAME = ["Gravity", "Linear_accelerator", "Gyroscope", "Magnetic"]

SENSOR_FEATURES = 3 # each sensor has x, y, z features
INPUT_SHAPE = (300, 3)

DROP_RATE = 0.2
CLASS_NUM = 7

CLASS_NAME_7 = ["Still", "Walking", "Manual", "Electric", "Bus", "Subway", "Car"]
# CLASS_NAME_5 = ["Still", "Manual", "Electric", "Subway", "Car"]

BATCH_SIZE = 128
NUM_EPOCHS = 30

class MultiCNN(tf.keras.Model):

    def __init__(self):
        super(MultiCNN, self).__init__()
        
        self.conv_layers = []
        
        for _ in range(SENSOR_NUM):
            conv_layers = [
                layers.Conv1D(filters=32, kernel_size=3, strides=1, padding='Same', activation="relu", input_shape=INPUT_SHAPE),
                layers.MaxPooling1D(pool_size=2, strides=2),
                layers.Conv1D(filters=64, kernel_size=3, strides=1, padding='Same', activation='relu'),
                layers.MaxPooling1D(pool_size=2, strides=2),
                layers.Conv1D(filters=128, kernel_size=3, strides=1, padding='Same', activation='relu'),
                layers.MaxPooling1D(pool_size=2, strides=2),
                layers.Flatten()
            ]
            self.conv_layers.append(conv_layers)
        
        self.concat = layers.Concatenate(axis=-1)
        self.dense1 = layers.Dense(256, activation="relu")
        self.dropout = layers.Dropout(DROP_RATE)
        self.output_layer = layers.Dense(CLASS_NUM, activation='softmax')
    
    # Define forward procedures
    def call(self, inputs, training=False):
        x = []
        for i in range(SENSOR_NUM):
            xi = inputs[i]
            for layer in self.conv_layers[i]:
                xi = layer(xi)
            x.append(xi)
        x = self.concat(x)
        x = self.dense1(x)
        
        if training:
            x = self.dropout(x, training=training)
        return self.output_layer(x)
    
    # TFlite에서 학습을 진행하려면 tf.function으로 감싸줘야 함.
    @tf.function(input_signature=[
        tf.TensorSpec(shape=[SENSOR_NUM, None, WINDOW_SIZE, SENSOR_FEATURES], dtype=tf.float32),
        tf.TensorSpec(shape=[None, CLASS_NUM], dtype=tf.float32)
    ])
    def train(self, x, y):
        with tf.GradientTape() as tape:
            inputs = [tf.gather(x, i, axis=0) for i in range(SENSOR_NUM)]
            prediction = self(inputs, training=True)
            loss = tf.keras.losses.CategoricalCrossentropy()(y, prediction)
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        result = {"loss": loss}
        return result
    
    @tf.function(input_signature=[
        tf.TensorSpec(shape=[SENSOR_NUM, None, WINDOW_SIZE, SENSOR_FEATURES], dtype=tf.float32),
        tf.TensorSpec(shape=[None, CLASS_NUM], dtype=tf.float32)
    ])
    def valid(self, x, y):
        inputs = [tf.gather(x, i, axis=0) for i in range(SENSOR_NUM)]
        prediction = self(inputs, training=False)  
        loss = tf.keras.losses.CategoricalCrossentropy()(y, prediction)
        result = {"val_loss": loss}
        return result
    
    @tf.function(input_signature=[
        tf.TensorSpec(shape=[SENSOR_NUM, None, WINDOW_SIZE, SENSOR_FEATURES], dtype=tf.float32)
    ])
    def infer(self, x):
        inputs = [tf.gather(x, i, axis=0) for i in range(SENSOR_NUM)]
        logits = self(inputs, training=False)
        probabilities = tf.nn.softmax(logits, axis=-1)
        return {
            "output": probabilities,
            "logits": logits
        }
        
    @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.string)])
    def save(self, checkpoint_path):
        tensor_names = [weight.name for weight in self.weights]
        tensors_to_save = [weight.read_value() for weight in self.weights]
        tf.raw_ops.Save(
            filename=checkpoint_path, tensor_names=tensor_names,
            data=tensors_to_save, name='save')
        return {
            "checkpoint_path": checkpoint_path
        }

    @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.string)])
    def restore(self, checkpoint_path):
        restored_tensors = {}
        for var in self.weights:
            restored = tf.raw_ops.Restore(
                file_pattern=checkpoint_path, tensor_name=var.name, dt=var.dtype,
                name='restore')
            var.assign(restored)
            restored_tensors[var.name] = restored
        return restored_tensors

In [12]:
# load data
x_total = np.load('../CRC_data/CRC_2nd _random/crc2nd_round1_300_data_object.npy')
y_total = np.load('../CRC_data/CRC_2nd _random/crc2nd_round1_300_label_object.npy')

x_12features = x_total[:, :, :12]
x_train, x_test, y_train, y_test = train_test_split(x_12features, y_total, test_size=0.2, random_state=1004)

y_train_one_hot = to_categorical(y_train, 7)
y_test_one_hot = to_categorical(y_test, 7)

def create_batches(x_data, y_data, batch_size):
    batches_x = []
    batches_y = []

    for i in range(0, len(x_data), batch_size):
        x_batch = x_data[i:i + batch_size]
        y_batch = y_data[i:i + batch_size]

        x_batch_set = []
        for sensor_i in range(SENSOR_NUM):
            x_batch_set.append(x_batch[:, :, (sensor_i * 3):(sensor_i * 3) + 3].astype('float32'))

        batches_x.append(x_batch_set)
        batches_y.append(y_batch.astype('float32'))

    return batches_x, batches_y

batch_x_train_set, batch_y_train_set = create_batches(x_train, y_train_one_hot, BATCH_SIZE)

model = MultiCNN()
model.compile(optimizer=optimizers.Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

# 실제 데이터를 사용하여 모델 빌드
dummy_input = [tf.random.uniform((1, WINDOW_SIZE, SENSOR_FEATURES)) for _ in range(SENSOR_NUM)]
model(dummy_input)

In [53]:
epochs = np.arange(1, NUM_EPOCHS + 1, 1)
losses = np.zeros([NUM_EPOCHS])

for i in range(NUM_EPOCHS):
    for num, x in enumerate(batch_x_train_set):
        y = batch_y_train_set[num]
        x_input = np.stack(x, axis=0)
        result = model.train(x_input, y)

    losses[i] = result['loss']

    print(f"Finished {i+1} epochs")
    print(f"  loss: {losses[i]:.3f}")

# Save the trained weights to a checkpoint.
model.save(r'/home/chang/Desktop/tensor_lite/model_test.ckpt')

batch_x_test_set, batch_y_test_set = create_batches(x_test, y_test_one_hot, BATCH_SIZE)

count = 0
number = 0
for num, x in enumerate(batch_x_test_set):
    y = batch_y_test_set[num]
    number += y.shape[0]
    x_input = np.stack(x, axis=0)
    result = model.infer(x_input)

    for i in range(result['output'].shape[0]):
        if tf.argmax((result['output'][i])) == y[i].argmax():
            count += 1

Finished 1 epochs
  loss: 0.616
Finished 2 epochs
  loss: 0.368
Finished 3 epochs
  loss: 0.229
Finished 4 epochs
  loss: 0.308
Finished 5 epochs
  loss: 0.191
Finished 6 epochs
  loss: 0.220
Finished 7 epochs
  loss: 0.118
Finished 8 epochs
  loss: 0.122
Finished 9 epochs
  loss: 0.120
Finished 10 epochs
  loss: 0.094
Finished 11 epochs
  loss: 0.176
Finished 12 epochs
  loss: 0.084
Finished 13 epochs
  loss: 0.049
Finished 14 epochs
  loss: 0.166
Finished 15 epochs
  loss: 0.131
Finished 16 epochs
  loss: 0.076
Finished 17 epochs
  loss: 0.049
Finished 18 epochs
  loss: 0.023
Finished 19 epochs
  loss: 0.164
Finished 20 epochs
  loss: 0.105
Finished 21 epochs
  loss: 0.035
Finished 22 epochs
  loss: 0.114
Finished 23 epochs
  loss: 0.049
Finished 24 epochs
  loss: 0.070
Finished 25 epochs
  loss: 0.024
Finished 26 epochs
  loss: 0.056
Finished 27 epochs
  loss: 0.022
Finished 28 epochs
  loss: 0.114
Finished 29 epochs
  loss: 0.037
Finished 30 epochs
  loss: 0.012


In [58]:
SAVED_MODEL_DIR = "saved_model_test"

tf.saved_model.save(
    model,
    SAVED_MODEL_DIR,
    signatures={
        'train':
            model.train.get_concrete_function(),
        'infer':
            model.infer.get_concrete_function(),
        'save':
            model.save.get_concrete_function(),
        'restore':
            model.restore.get_concrete_function()
    })

# Convert the model
converter = tf.lite.TFLiteConverter.from_saved_model(SAVED_MODEL_DIR)
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,  # enable TensorFlow Lite ops.
    tf.lite.OpsSet.SELECT_TF_OPS  # enable TensorFlow ops.
]
converter.experimental_enable_resource_variables = True
tflite_model = converter.convert()

# Save the TFLite model to a file
TFLITE_MODEL_FILE = "multi_cnn_test.tflite"
with open(TFLITE_MODEL_FILE, "wb") as f:
    f.write(tflite_model)

INFO:tensorflow:Assets written to: saved_model_test/assets


INFO:tensorflow:Assets written to: saved_model_test/assets
2024-09-23 17:17:09.888078: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:378] Ignored output_format.
2024-09-23 17:17:09.888095: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:381] Ignored drop_control_dependency.
2024-09-23 17:17:09.888197: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: saved_model_test
2024-09-23 17:17:09.894833: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-09-23 17:17:09.894844: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: saved_model_test
2024-09-23 17:17:09.910984: I tensorflow/cc/saved_model/loader.cc:233] Restoring SavedModel bundle.
2024-09-23 17:17:10.025237: I tensorflow/cc/saved_model/loader.cc:217] Running initialization op on SavedModel bundle at path: saved_model_test
2024-09-23 17:17:10.080084: I tensorflow/cc/saved_model/loader.cc:316] SavedModel