In [None]:
# The words we want to use
WANTED_WORDS = "send,help"

In [None]:
# 3000 steps for training
# 1000 steps for testing
TRAINING_STEPS = "3000,1000"  

# 0.001 for training
# 0.0001 for testing
LEARNING_RATE = "0.001,0.0001"

In [None]:
MODEL_ARCHITECTURE = 'tiny_conv'

In [None]:
# Calculate the total number of steps, which is used to identify the checkpoint
# file name.
TOTAL_STEPS = str(sum(map(lambda string: int(string), TRAINING_STEPS.split(","))))

In [None]:
# Calculate the percentage of 'silence' and 'unknown' training samples required
# to ensure that we have equal number of samples for each label.
number_of_labels = WANTED_WORDS.count(',') + 1
number_of_total_labels = number_of_labels + 2 # for 'silence' and 'unknown' label
equal_percentage_of_training_samples = int(100.0/(number_of_total_labels))
SILENT_PERCENTAGE = equal_percentage_of_training_samples
UNKNOWN_PERCENTAGE = equal_percentage_of_training_samples

# Constants used during training only
VERBOSITY = 'DEBUG'
EVAL_STEP_INTERVAL = '1000'
SAVE_STEP_INTERVAL = '1000'

# Constants for training directories and filepaths
LOGS_DIR = 'logs/'
TRAIN_DIR = 'train/' # for training checkpoints and other files.

# Constants for inference directories and filepaths
import os
MODELS_DIR = 'models'
if not os.path.exists(MODELS_DIR):
  os.mkdir(MODELS_DIR)
MODEL_TF = os.path.join(MODELS_DIR, 'KWS_custom.pb')
MODEL_TFLITE = os.path.join(MODELS_DIR, 'KWS_custom.tflite')
MODEL_TFLITE_MICRO = os.path.join(MODELS_DIR, 'KWS_custom.cc')
SAVED_MODEL = os.path.join(MODELS_DIR, 'KWS_custom_saved_model')

In [None]:
# Constants which are shared during training and inference
PREPROCESS = 'micro'
WINDOW_STRIDE = 20

# Constants for Quantization
QUANT_INPUT_MIN = 0.0
QUANT_INPUT_MAX = 26.0
QUANT_INPUT_RANGE = QUANT_INPUT_MAX - QUANT_INPUT_MIN

# Constants for audio process during Quantization and Evaluation
SAMPLE_RATE = 16000
CLIP_DURATION_MS = 1000
WINDOW_SIZE_MS = 30.0
FEATURE_BIN_COUNT = 40
BACKGROUND_FREQUENCY = 0.8
BACKGROUND_VOLUME_RANGE = 0.1
TIME_SHIFT_MS = 100.0

# Use the custom local dataset and set the tes/val/train split
DATA_URL = ''
VALIDATION_PERCENTAGE = 10
TESTING_PERCENTAGE = 10

In [None]:
# Load in the tensorflow board to visualize the training process
%load_ext tensorboard
%tensorboard --logdir {LOGS_DIR}

In [None]:
!python tensorflow/tensorflow/examples/speech_commands/train.py \
  --data_dir={DATASET_DIR} \
  --data_url={DATA_URL} \
  --wanted_words={WANTED_WORDS} \
  --silence_percentage={SILENT_PERCENTAGE} \
  --unknown_percentage={UNKNOWN_PERCENTAGE} \
  --preprocess={PREPROCESS} \
  --window_stride={WINDOW_STRIDE} \
  --model_architecture={MODEL_ARCHITECTURE} \
  --how_many_training_steps={TRAINING_STEPS} \
  --learning_rate={LEARNING_RATE} \
  --train_dir={TRAIN_DIR} \
  --summaries_dir={LOGS_DIR} \
  --verbosity={VERBOSITY} \
  --eval_step_interval={EVAL_STEP_INTERVAL} \
  --save_step_interval={SAVE_STEP_INTERVAL}

In [None]:
# Generate a tensorflow model for inference. We do this by combining relevant
# results into a single file for inference. Basically we create a frozen model.
!rm -rf {SAVED_MODEL}
!python tensorflow/tensorflow/examples/speech_commands/freeze.py \
--wanted_words=$WANTED_WORDS \
--window_stride_ms=$WINDOW_STRIDE \
--preprocess=$PREPROCESS \
--model_architecture=$MODEL_ARCHITECTURE \
--start_checkpoint=$TRAIN_DIR$MODEL_ARCHITECTURE'.ckpt-'{TOTAL_STEPS} \
--save_format=saved_model \
--output_file={SAVED_MODEL}

In [None]:
# Convert the frozen model into a tensorflow lite model
model_settings = models.prepare_model_settings(
    len(input_data.prepare_words_list(WANTED_WORDS.split(','))),
    SAMPLE_RATE, CLIP_DURATION_MS, WINDOW_SIZE_MS,
    WINDOW_STRIDE, FEATURE_BIN_COUNT, PREPROCESS)
audio_processor = input_data.AudioProcessor(
    DATA_URL, DATASET_DIR,
    SILENT_PERCENTAGE, UNKNOWN_PERCENTAGE,
    WANTED_WORDS.split(','), VALIDATION_PERCENTAGE,
    TESTING_PERCENTAGE, model_settings, LOGS_DIR)

In [None]:
REP_DATA_SIZE = 22
with tf.Session() as sess:
  
  converter = tf.lite.TFLiteConverter.from_saved_model(SAVED_MODEL)
  converter.optimizations = [tf.lite.Optimize.DEFAULT]
  converter.inference_input_type = tf.lite.constants.INT8
  # converter.inference_input_type = tf.compat.v1.lite.constants.INT8 #replaces the above line for use with TF2.x   
  converter.inference_output_type = tf.lite.constants.INT8
  # converter.inference_output_type = tf.compat.v1.lite.constants.INT8 #replaces the above line for use with TF2.x
  def representative_dataset_gen():
    for i in range(REP_DATA_SIZE):
      data, _ = audio_processor.get_data(1, i*1, model_settings,
                                         BACKGROUND_FREQUENCY, 
                                         BACKGROUND_VOLUME_RANGE,
                                         TIME_SHIFT_MS,
                                         'testing',
                                         sess)
      flattened_data = np.array(data.flatten(), dtype=np.float32).reshape(1, 1960)
      print(i)
      yield [flattened_data]
  converter.representative_dataset = representative_dataset_gen
  tflite_model = converter.convert()
  tflite_model_size = open(MODEL_TFLITE, "wb").write(tflite_model)
  print("Quantized model is %d bytes" % tflite_model_size)


In [None]:
# Helper function to run inference
def run_tflite_inference_testSet(tflite_model_path):

  # Load test data
  np.random.seed(0) # set random seed for reproducible test results.
  with tf.Session() as sess:
    # with tf.compat.v1.Session() as sess: #replaces the above line for use with TF2.x
    test_data, test_labels = audio_processor.get_data(
        -1, 0, model_settings, BACKGROUND_FREQUENCY, BACKGROUND_VOLUME_RANGE,
        TIME_SHIFT_MS, 'testing', sess)
  test_data = np.expand_dims(test_data, axis=1).astype(np.float32)

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(tflite_model_path)
  interpreter.allocate_tensors()
  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]
  
  # Quantize the input data from float to integer
  input_scale, input_zero_point = input_details["quantization"]
  test_data = test_data / input_scale + input_zero_point
  test_data = test_data.astype(input_details["dtype"])

  # Evaluate the predictions
  correct_predictions = 0
  for i in range(len(test_data)):
    interpreter.set_tensor(input_details["index"], test_data[i])
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]
    top_prediction = output.argmax()
    correct_predictions += (top_prediction == test_labels[i])

  print('Quantized model accuracy is %f%% (Number of test samples=%d)' % (
      (correct_predictions * 100) / len(test_data), len(test_data)))

In [None]:
# Compute quantized model accuracy
run_tflite_inference_testSet(MODEL_TFLITE)