# Introduction

In this notebook we will train a model that will take a beat vector, a list of (hit time, hit strength) tuples, and classify it as one of the four predefined classes.

We'll convert the model into something that can be used by tflite and then run it on the ESP32.

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras import Input
from tensorflow.data import Dataset
import numpy as np
import itertools
import pandas as pd
import random

# Creating a Dataset to Train Our Model
We use the data generator to generate samples by applying variations on the class beat vectors.

Next we have preprocessing on the sample beat vector to make it a list of when there is a hit and when there is not in each small time interval (set by T and N). 

The TensorFlow dataset is then used to train our model.

In [None]:
def data_generator():
  
  # Class definitions
  classes = {1: [(0, 1), (575, 1), (1110, 0), (1379, 0), (1651, 0), (2193, 1), (2723, 1), (3290, 1)],
             2: [(0, 1), (564, 0), (806, 1), (1161, 0), (1563, 1), (1807, 0), (2184, 0), (2404, 1), (2784, 0)],
             3: [(0, 1), (386, 1), (758, 0), (1759, 1), (2154, 0), (2386, 1)],
             4: [(0, 1), (364, 0), (698, 0), (1050, 1), (1269, 0), (1734, 0), (2105, 1)]}

  T = 4000 # the total amount of time  of a beat (milliseconds)
  N = 500 # the number of intervals that the total time is split into
  time_interval = T/N

  while(True):
    # choose a random number between 1,2,3,4 uniformly
    # this selects which class the sample will emulate
    num_class = np.random.randint(1, 5) # 1 inclusive, 5 exclusive

    # store the label of the sample that will be generated now
    label = [0] * 4
    label[num_class-1] = 1

    # generate a sample
    # we make sure to never change the first hit time (time=0)
    sample_arr = []
    class_data = classes[num_class]
    inc = np.random.randint(2, 10) # increment - how much to add to each hit time
    num = np.random.randint(0, 25)  # num_iterations - number to multiply inc by
    variation = np.random.randint(1, 5) 

    neg = np.random.uniform(0, 1) # sets if inc should be neg or pos, i.e. subtract or add
    if neg < 0.5:
      inc *= -1

    if variation == 1:
      # change up hit times, add/subtract an offset between hits
      sample_arr = [(hit_time + inc * num, strength) if hit_time != 0 else (hit_time, strength) for
                  hit_time, strength in class_data]

    elif variation == 2:
      # drop a hit, in case any weren't recorded
      sample_arr = [(hit_time + inc * num, strength) if hit_time != 0 else (hit_time, strength) for
                  hit_time, strength in class_data]
      # drop a hit randomly, not the first tuple
      droppedHit = random.choice(sample_arr[1:])
      sample_arr.remove(droppedHit)

    elif variation == 3:
      # randomly select four hits to change the strength of
      sample_arr = [(hit_time + inc * num, strength) if hit_time != 0 else (hit_time, strength) for
                  hit_time, strength in class_data]
      hits_to_change = random.sample(sample_arr[1:], 4)
      for hit in hits_to_change:
          new_strength = 1 - hit[1]  # invert strength
          sample_arr[sample_arr.index(hit)] = (hit[0], new_strength)
      # drop a hit randomly, not the first tuple
      droppedHit = random.choice(sample_arr[1:])
      sample_arr.remove(droppedHit)

    else: # elif variation == 4:
      # in case that a random hit was recorded
      sample_arr = [(hit_time + inc * num, 1 if random.random() < 0.5 else 0) if hit_time != 0 
                    else (hit_time, 1 if random.random() < 0.5 else 0) for hit_time, strength in class_data]
      # add new hit between the first and last hit times
      newHitTime = random.randint(sample_arr[0][0], sample_arr[-1][0])
      new_hit = (newHitTime, 1 if random.random() < 0.5 else 0)
      # put new hit in appropriate spot
      for j, (hit_time, strength) in enumerate(sample_arr):
          if hit_time >= newHitTime:
            sample_arr.insert(j, new_hit)
            break
          else:
            sample_arr.append(new_hit)

    # we have the sample as an array of (hit time, hit strength) tuples
    # preprocessing of the sample to be in the expected form
    sample = [0] * N
    for i,tup in enumerate(sample_arr):
      if i == 0:
        continue
      n = int(tup[0] / time_interval)
      if n == 0:
        fill = [n, n+1, n+2]
      elif n == 1:
        fill = [n-1, n, n+1, n+2]
      elif n == N-1:
        fill = [n-2, n-1, n]
      elif n == N-2:
        fill = [n-2, n-1, n, n+1]
      else:
        fill = [n-2, n-1, n, n+1, n+2]
      for i in fill:
        sample[i] = tup[1] + 1 # for loud store 2, soft store 1, no hit already have 0

    # our input data is an array of length N
    X = sample

    # our label is [1,0,0,0] or [0,1,0,0] or [0,0,1,0] or [0,0,0,1]
    Y = label

    # our generator should return the input data and the label
    yield X, Y

# defining these variables outside of the function for use outside of the function
T = 4000 # the total amount of time  of a beat (milliseconds)
N = 500 # the number of intervals that the total time is split into
time_interval = T/N

# Create a dataset from our generator
train_dataset = tf.data.Dataset.from_generator(
    data_generator, 
    output_types = (tf.int32 , tf.int32),
    output_shapes = ((N), (4))
)

# Setting the bach size for the training
train_dataset = train_dataset.batch(batch_size=40)

# Training the Model

We define a small neural network with an input layer, an output layer, and one hidden layer. The sigmoid activation function will output a value between 0 and 1 for each of the four classes. This value is the probability that the given input should be classified as this class. For the loss function we use binary cross entropy. The classification is then the class with the maximum probability value.

In [None]:
model = Sequential([
    Input(shape=(N)),
    Dense(N/4, activation='relu'), 
    Dense(4, activation='sigmoid')
])

In [None]:
# Compiling the model
model.compile(optimizer='adam',
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['accuracy'])

model.summary()

Model: "sequential_17"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_34 (Dense)            (None, 125)               62625     
                                                                 
 dense_35 (Dense)            (None, 4)                 504       
                                                                 
Total params: 63,129
Trainable params: 63,129
Non-trainable params: 0
_________________________________________________________________


In [None]:
# Training the model
model.fit(
    train_dataset,
    steps_per_epoch=10,
    epochs=8
)

Epoch 1/8
Epoch 2/8
Epoch 3/8
Epoch 4/8
Epoch 5/8
Epoch 6/8
Epoch 7/8
Epoch 8/8


<keras.callbacks.History at 0x7f3d7056bd60>

##Explanation of the Model

The neural network consists of three layers. All of the layers are fully connected. The first layer is the input layer, and has 500 neurons where each neuron receives the input of the quantisized array. The values are 0 for no hit, 1 for a soft hit, and 2 for a loud hit. The second layer is the hidden layer and has a relu activation funciton for adding non linearity. The third layer has four neurons where each neuron represents a class. We use a signoid activaiton function to receive the output which represents the probability that we have of being in a class. Then binary cross entropy is used to receive the maximum of these probabilities. In the end, the output is which of the classes the neural network decides based on the input. 

# Testing our model
We create a small test to make sure the model predicts and works as expected.

In [None]:
test_exs = [[(0, 1), (321, 1)], 
            [(0, 1), (321, 1), (642, 1), (963, 1), (1285, 1), (1606, 1), (1927, 1), (2248, 1), (2569, 1), (2890, 1), (3211, 1), (3532, 1), (3853, 1)],
            [(0, 1), (188, 1), (823, 1), (874, 1), (1093, 1), (1558, 1), (1929, 0)],
            [(0, 1), (321, 1), (503, 1), (1038, 0), (2154, 0), (2386, 1), (2404, 1)],
            [(0, 1), (503, 1), (1038, 0), (1307, 0), (1579, 0), (2121, 1), (2651, 1), (3218, 1)],
            [(0, 1), (700, 0), (942, 1), (1297, 0), (1699, 1), (1943, 0), (2320, 0), (2920, 0)],
            [(0, 1), (410, 1), (782, 0), (2178, 0), (2410, 1)],
            [(0, 1), (188, 1), (522, 1), (823, 1), (874, 1), (1093, 1), (1558, 1), (1929, 0)],
            [(0, 1), (575, 1), (1110, 0), (1379, 0), (1651, 0), (2193, 1), (2723, 1), (3290, 1)],
            [(0, 1), (564, 0), (806, 1), (1161, 0), (1563, 1), (1807, 0), (2184, 0), (2404, 1), (2784, 0)],
            [(0, 1), (386, 1), (758, 0), (1759, 1), (2154, 0), (2386, 1)],
            [(0, 1), (364, 0), (698, 0), (1050, 1), (1269, 0), (1734, 0), (2105, 1)],
            [(0, 1), (321, 1), (642, 1), (963, 1), (1284, 1), (1605, 1), (1926, 1)]]

# Preprocessing!!!!
preproc_list = []
for ex in test_exs:
  arr = [0] * N
  for i,tup in enumerate(ex):
    if i == 0:
      continue
    n = int(tup[0] / time_interval)
    if n == 0:
      fill = [n, n+1, n+2]
    elif n == 1:
      fill = [n-1, n, n+1, n+2]
    elif n == N-1:
      fill = [n-2, n-1, n]
    elif n == N-2:
      fill = [n-2, n-1, n, n+1]
    else:
      fill = [n-2, n-1, n, n+1, n+2]
    for i in fill:
      arr[i] = (tup[1] + 1) # for loud store 2, soft store 1, no hit already have 0
    # arr[n] = (tup[1] + 1) # for loud store 2, soft store 1, no hit already have 0
  preproc_list.append(arr)
  print(arr)
  print()

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

In [None]:
test_X = tf.convert_to_tensor(preproc_list)
Y = model.predict_on_batch(test_X)
np.set_printoptions(formatter={'float': lambda x: "{0:0.2f}".format(x)})
print(Y)

[[0.16 0.11 0.76 0.50]
 [0.63 0.12 0.01 0.01]
 [0.01 0.29 0.05 0.87]
 [0.06 0.11 0.67 0.08]
 [0.98 0.04 0.00 0.02]
 [0.08 0.41 0.09 0.33]
 [0.05 0.21 0.84 0.05]
 [0.01 0.17 0.04 0.86]
 [0.99 0.04 0.00 0.01]
 [0.03 0.95 0.07 0.01]
 [0.01 0.06 0.95 0.04]
 [0.03 0.03 0.03 0.94]]


# Converting our Model into a TFLite Model

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

def representative_dataset_gen():
    # # Class definitions
    # classes = {1: [(0, 1), (575, 1), (1110, 0), (1379, 0), (1651, 0), (2193, 1), (2723, 1), (3290, 1)],
    #          2: [(0, 1), (564, 0), (806, 1), (1161, 0), (1563, 1), (1807, 0), (2184, 0), (2404, 1), (2784, 0)],
    #          3: [(0, 1), (386, 1), (758, 0), (1759, 1), (2154, 0), (2386, 1)],
    #          4: [(0, 1), (364, 0), (698, 0), (1050, 1), (1269, 0), (1734, 0), (2105, 1)]}
    
    # T = 4000 # the total amount of time  of a beat (milliseconds)
    # N = 500 # the number of intervals that the total time is split into
    # time_interval = T/N

    # for _ in range(10000):
    #   # choose a random number between 1,2,3,4 uniformly
    #   # this selects which class the sample will emulate
    #   num_class = np.random.randint(1, 5) # 1 inclusive, 5 exclusive

    #   # generate a sample
    #   # we make sure to never change the first hit time (time=0)
    #   sample_arr = []
    #   class_data = classes[num_class]
    #   inc = np.random.randint(2, 10) # increment - how much to add to each hit time
    #   num = np.random.randint(0, 25)  # num_iterations - number to multiply inc by
    #   variation = np.random.randint(1, 5) 

    #   neg = np.random.uniform(0, 1) # sets if inc should be neg or pos, i.e. subtract or add
    #   if neg < 0.5:
    #     inc *= -1

    #   if variation == 1:
    #     # change up hit times, add/subtract an offset between hits
    #     sample_arr = [(hit_time + inc * num, strength) if hit_time != 0 else (hit_time, strength) for
    #                 hit_time, strength in class_data]

    #   elif variation == 2:
    #     # drop a hit, in case any weren't recorded
    #     sample_arr = [(hit_time + inc * num, strength) if hit_time != 0 else (hit_time, strength) for
    #                 hit_time, strength in class_data]
    #     # drop a hit randomly, not the first tuple
    #     droppedHit = random.choice(sample_arr[1:])
    #     sample_arr.remove(droppedHit)

    #   elif variation == 3:
    #     # randomly select four hits to change the strength of
    #     sample_arr = [(hit_time + inc * num, strength) if hit_time != 0 else (hit_time, strength) for
    #                 hit_time, strength in class_data]
    #     hits_to_change = random.sample(sample_arr[1:], 4)
    #     for hit in hits_to_change:
    #         new_strength = 1 - hit[1]  # invert strength
    #         sample_arr[sample_arr.index(hit)] = (hit[0], new_strength)
    #     # drop a hit randomly, not the first tuple
    #     droppedHit = random.choice(sample_arr[1:])
    #     sample_arr.remove(droppedHit)

    #   else: # elif variation == 4:
    #     # in case that a random hit was recorded
    #     sample_arr = [(hit_time + inc * num, 1 if random.random() < 0.5 else 0) if hit_time != 0 
    #                   else (hit_time, 1 if random.random() < 0.5 else 0) for hit_time, strength in class_data]
    #     # add new hit between the first and last hit times
    #     newHitTime = random.randint(sample_arr[0][0], sample_arr[-1][0])
    #     new_hit = (newHitTime, 1 if random.random() < 0.5 else 0)
    #     # put new hit in appropriate spot
    #     for j, (hit_time, strength) in enumerate(sample_arr):
    #         if hit_time >= newHitTime:
    #           sample_arr.insert(j, new_hit)
    #           break
    #         else:
    #           sample_arr.append(new_hit)

    #   # we have the sample as an array of (hit time, hit strength) tuples
    #   # preprocessing of the sample to be in the expected form
    #   sample = [0] * N
    #   for i,tup in enumerate(sample_arr):
    #     if i == 0:
    #       continue
    #     n = int(tup[0] / time_interval)
    #     if n == 0:
    #       fill = [n, n+1, n+2]
    #     elif n == 1:
    #       fill = [n-1, n, n+1, n+2]
    #     elif n == N-1:
    #       fill = [n-2, n-1, n]
    #     elif n == N-2:
    #       fill = [n-2, n-1, n, n+1]
    #     else:
    #       fill = [n-2, n-1, n, n+1, n+2]
    #     for i in fill:
    #       sample[i] = tup[1] + 1 # for loud store 2, soft store 1, no hit already have 0
    
    r = [np.random.random() for i in range(3)]
    s = sum(r)
    r = [ i/s for i in r ]

    sample = np.random.choice([0,1,2], size=N, p=r)
    
    yield [np.array(sample, dtype=np.float32)]


converter.representative_dataset = representative_dataset_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
tflite_quant_model = converter.convert()
open("converted_model.tflite", "wb").write(tflite_quant_model)



65864

# Converting the TFLite Model File to a C++ Source File

In [None]:
# !apt-get update && apt-get install xxd

In [None]:
!xxd -i converted_model.tflite > model_data.cc