In [None]:
import tensorflow as tf
from tensorflow.keras.models import load_model

# load model with cuostmized metrics
def load_model_custom(model_name):
    model = load_model(model_name, custom_objects={"f1_m": f1_m})
    return model

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
path = "/content/gdrive/MyDrive/"
path_model = path + "Yue_Net/best_model_cnn-allrun5_c8b_mix4-SG0-ST20-WS40-MU[0, 1, 2, 3]_1644222946_f.h5"

In [None]:
# calculate f1 score
def f1_m(y_true, y_pred):
    #     precision = precision_m(y_true, y_pred)
    #     recall = recall_m(y_true, y_pred)
    y_pred_binary = tf.where(y_pred >= 0.5, 1., 0.)
    true_positives = K.sum(K.round(K.clip(y_true * y_pred_binary, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred_binary, 0, 1)))

    precision = true_positives / (predicted_positives + K.epsilon())
    recall = true_positives / (possible_positives + K.epsilon())
    return 2 * ((precision * recall) / (precision + recall + K.epsilon()))

In [None]:
model = load_model_custom(path_model)

In [None]:
model.summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 EMG (InputLayer)               [(None, 40, 64)]     0           []                               
                                                                                                  
 conv1d_12 (Conv1D)             (None, 38, 128)      24704       ['EMG[0][0]']                    
                                                                                                  
 conv1d_13 (Conv1D)             (None, 36, 128)      49280       ['conv1d_12[0][0]']              
                                                                                                  
 max_pooling1d_6 (MaxPooling1D)  (None, 18, 128)     0           ['conv1d_13[0][0]']              
                                                                                            

In [None]:
# dummy X_test
X_test = tf.ones((100,40,64))

In [None]:
# convert model to tflite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()



In [None]:
import numpy as np

In [None]:
# Run the model with TensorFlow to get expected results.
TEST_CASES = 100

# Run the model with TensorFlow Lite
interpreter = tf.lite.Interpreter(model_content=tflite_model)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

for i in range(TEST_CASES):
  expected = model.predict(np.expand_dims(X_test[i:i+1][0],axis=0))
  interpreter.set_tensor(input_details[0]["index"], np.array(np.expand_dims(X_test[i:i+1][0],axis=0),dtype=np.float32))
  interpreter.invoke()
  result_1 = interpreter.get_tensor(output_details[0]["index"])
  result_2 = interpreter.get_tensor(output_details[1]["index"])
  result_3 = interpreter.get_tensor(output_details[2]["index"])
  result_4 = interpreter.get_tensor(output_details[3]["index"])

  result = [result_2,result_4,result_3,result_1]

  # Assert if the result of TFLite model is consistent with the TF model.
  np.testing.assert_almost_equal(expected, result, decimal=5)
  print("Done. The result of TensorFlow matches the result of TensorFlow Lite.")

  # Please note: TfLite fused Lstm kernel is stateful, so we need to reset
  # the states.
  # Clean up internal states.
  interpreter.reset_all_variables()

Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of 

In [None]:
# used to reorder the tflite outputs:
expected

[array([[0.5679825]], dtype=float32),
 array([[0.5114401]], dtype=float32),
 array([[0.41835022]], dtype=float32),
 array([[0.38279408]], dtype=float32)]

In [None]:
result

[array([[0.5679825]], dtype=float32),
 array([[0.5114401]], dtype=float32),
 array([[0.41835025]], dtype=float32),
 array([[0.38279408]], dtype=float32)]

In [None]:
expected = model.predict(np.expand_dims(X_test[i:i+1][0],axis=0))




In [None]:
expected

[array([[0.5679825]], dtype=float32),
 array([[0.5114401]], dtype=float32),
 array([[0.41835022]], dtype=float32),
 array([[0.38279408]], dtype=float32)]

In [None]:
interpreter

<tensorflow.lite.python.interpreter.Interpreter at 0x7d3b3bfcc760>

In [None]:
interpreter.get_output_details()[0]

{'name': 'StatefulPartitionedCall:3',
 'index': 62,
 'shape': array([1, 1], dtype=int32),
 'shape_signature': array([-1,  1], dtype=int32),
 'dtype': numpy.float32,
 'quantization': (0.0, 0),
 'quantization_parameters': {'scales': array([], dtype=float32),
  'zero_points': array([], dtype=int32),
  'quantized_dimension': 0},
 'sparsity_parameters': {}}

In [None]:
for i in range(TEST_CASES):
  expected = model.predict(np.expand_dims(X_test[i:i+1][0],axis=0))
  interpreter.set_tensor(input_details[0]["index"], np.array(np.expand_dims(X_test[i:i+1][0],axis=0),dtype=np.float32))
  interpreter.invoke()
  result_1 = interpreter.get_tensor(output_details[0]["index"])
  result_2 = interpreter.get_tensor(output_details[1]["index"])
  result_3 = interpreter.get_tensor(output_details[2]["index"])
  result_4 = interpreter.get_tensor(output_details[3]["index"])

  result = [result_2,result_4,result_3,result_1]



In [None]:
np.expand_dims(X_test[0],axis=0).shape

(1, 40, 64)

# Check time with dummy data:

In [None]:
%%timeit
expected = model.predict(np.expand_dims(X_test[0:1][0],axis=0),verbose=0)

59.7 ms ± 12.6 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [None]:
%%timeit
interpreter.set_tensor(input_details[0]["index"], np.array(np.expand_dims(X_test[0:1][0],axis=0),dtype=np.float32))
interpreter.invoke()
result_1 = interpreter.get_tensor(output_details[0]["index"])
result_2 = interpreter.get_tensor(output_details[1]["index"])
result_3 = interpreter.get_tensor(output_details[2]["index"])
result_4 = interpreter.get_tensor(output_details[3]["index"])
result = [result_2,result_4,result_3,result_1]

1.41 ms ± 173 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


# Check performance with real data

In [None]:
import scipy.io as sio

In [None]:
trial = '5_50_GM'
train_seg = 0
test_seg = 2
step_size = 20
window_size = 120
mu = [0, 1, 2, 3]
mucnt = len(mu)

In [None]:
def load_data_mat(TR, SG=0, ST=10, MU=1, WS=120, TF=0, MutiSeg=0):
    # TR - trial name (e.g., 1_30_GM)
    # SG - segment ID (e.g., 0-2)
    # ST - step size (5, 10, 20, 30, 40, 50)
    # MU - motor unit index (0-N, N is the number)
    # WS - window size (e.g., 120)
    # TF = 0, no shuffle; TF = 1, shuffle;  0<TF<1, seperate data
    # MutiSeg - 0: train with one segment of data; 1: train with two segments of data

    seg = [1, 2, 3]
    # load train data set
    segment = seg[SG]
    # construct mat file name based on parameters
    prefix = "{}-SG{}-WS{}-ST{}".format(TR, segment, WS, ST)
    matfile = "{}.mat".format(prefix)
    #if not path.exists(matfile):

    pathstr = path+"Yue_Net/"  # data folder for emg
    matfile = "{}{}".format(pathstr, matfile)

    vnames = ['EMGs', 'Spikes']
    # load mat file
    data = sio.loadmat(matfile, variable_names=vnames)
    x_data = data['EMGs']
    spikes = data['Spikes']

    # load second segment if MutiSeg is 1
    if MutiSeg:
        seg2 = [2, 3, 1]
        segment = seg2[SG]
        prefix = "{}-SG{}-WS{}-ST{}".format(TR, segment, WS, ST)
        matfile = "{}.mat".format(prefix);
        if not path.exists(matfile):
            pathstr = 'D:\\emg_data\\'
            matfile = "{}{}".format(pathstr, matfile)
        #     print(matfile)
        data_2 = sio.loadmat(matfile, variable_names=vnames)
        x_data_2 = data_2['EMGs']
        spikes_2 = data_2['Spikes']
        x_data = np.concatenate((x_data, x_data_2))
        spikes = np.concatenate((spikes, spikes_2))

    #     x_data.shape
    # exactract spikes for given motor units
    if type(MU) is list:
        y_data = []
        for c in MU:
            if c < spikes.shape[1]:
                y_data.append(spikes[:, c])
            else:
                y_data.append(spikes[:, -1] * 0)
    else:
        y_data = []
        y_data.append(spikes[:, MU])

    ## shuffle the data based on TF flag
    y_data = np.array(y_data)
    y_data = y_data.T
    if TF == 1:
        x_data, y_data = tf.random.shuffle(x_data, y_data)
    elif TF > 0:
        x_data, _, y_data, _ = tf.split(x_data, y_data, test_size=1.0 - TF)
    else:
        print('no shuffle')
    y_data = y_data.T
    y_data = list(y_data)

    return x_data, y_data

In [None]:
# don't have this specific dataset:
x_test, y_test = load_data_mat(TR=trial, SG=test_seg, ST=step_size, WS=window_size, MU=mu)

In [None]:
# test with the specific dataset:
trial = "allex0_c8b_d1L"
test_seg = 2
step_size=20
window_size=40

In [None]:
# load data:
x_test, y_test = load_data_mat(TR=trial, SG=test_seg, ST=step_size, WS=window_size, MU=mu)

no shuffle


In [None]:
x_test.shape

(20471, 40, 64)

In [72]:
# Run the model with TensorFlow to get expected results.
TEST_CASES = 100

# Run the model with TensorFlow Lite
interpreter = tf.lite.Interpreter(model_content=tflite_model)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

for i in range(TEST_CASES):
  j = np.random.randint(0, x_test.shape[0]-1)
  expected = model.predict(np.expand_dims(x_test[j:j+1][0],axis=0))
  interpreter.set_tensor(input_details[0]["index"], np.array(np.expand_dims(x_test[j:j+1][0],axis=0),dtype=np.float32))
  interpreter.invoke()
  result_1 = interpreter.get_tensor(output_details[0]["index"])
  result_2 = interpreter.get_tensor(output_details[1]["index"])
  result_3 = interpreter.get_tensor(output_details[2]["index"])
  result_4 = interpreter.get_tensor(output_details[3]["index"])

  result = [result_2,result_4,result_3,result_1]

  # Assert if the result of TFLite model is consistent with the TF model.
  np.testing.assert_almost_equal(expected, result, decimal=5)
  print("Done. The result of TensorFlow matches the result of TensorFlow Lite.")

  # Please note: TfLite fused Lstm kernel is stateful, so we need to reset
  # the states.
  # Clean up internal states.
  interpreter.reset_all_variables()

Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of TensorFlow matches the result of TensorFlow Lite.
Done. The result of 

# Check time with real data:

In [None]:
%%timeit
j = np.random.randint(0, x_test.shape[0]-1)
expected = model.predict(np.expand_dims(x_test[j:j+1][0],axis=0),verbose=0)

66.7 ms ± 32.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [None]:
%%timeit
j = np.random.randint(0, x_test.shape[0]-1)
interpreter.set_tensor(input_details[0]["index"], np.array(np.expand_dims(x_test[j:j+1][0],axis=0),dtype=np.float32))
interpreter.invoke()
result_1 = interpreter.get_tensor(output_details[0]["index"])
result_2 = interpreter.get_tensor(output_details[1]["index"])
result_3 = interpreter.get_tensor(output_details[2]["index"])
result_4 = interpreter.get_tensor(output_details[3]["index"])
result = [result_2,result_4,result_3,result_1]

418 µs ± 10.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [71]:
path

'/content/gdrive/MyDrive/'

# Save model in tflite:

In [None]:
# Save the model.
with open(path + 'Yue_Net/best_model_cnn-allrun5_c8b_mix4-SG0-ST20-WS40-MU[0, 1, 2, 3]_1644222946_f.tflite', 'wb') as f:
  f.write(tflite_model)