The following simply prevents colab from exiting the session too quickly

In [None]:
%%javascript
function ClickConnect(){
console.log("Working");
document.querySelector("colab-toolbar-button#connect").click()
}setInterval(ClickConnect,60000)

Install the packages we need

In [None]:
!pip install -U keras-tuner
!pip install -q tensorflow-model-optimization

Here, you should upload your data for training and testing (note if the following cell fails, you can use the file uploader in the left-hand side panel of colab)

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Import the packages we need and fix the random seeds

In [None]:
import pandas as pd
from tensorflow.keras.models import Sequential
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow import keras
import random
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
from sklearn import metrics
from tensorflow import keras
from tensorflow.keras import layers
from kerastuner.tuners import RandomSearch
from kerastuner import HyperModel
import kerastuner
from tensorflow.data import Dataset
from typing import Optional

SEED = 13370
NUMPY_SEED = 1337
TF_SEED = 133

random.seed(SEED)
np.random.seed(NUMPY_SEED)
tf.random.set_seed(TF_SEED)

Use the built in function for creating a `tf.data.Dataset` from a collection of CSV files.

In [None]:
BATCH_SIZE = 16
NUM_TIMESTEPS = 128
BUFFER_SIZE = 500

# For how to write the glob patterns, see: https://www.tensorflow.org/api_docs/python/tf/io/gfile/glob
# For documentation on the dataloader, see: https://www.tensorflow.org/api_docs/python/tf/data/experimental/make_csv_dataset

kwargs = {
    "batch_size": 1,
    "label_name": "is_air",
    "select_columns": ["x-axis (g)", "y-axis (g)", "z-axis (g)", "is_air"],
    "header": True,
    "num_epochs": 1,                  # Will set num_epochs within model.fit()
    "shuffle": False                  # Must be false to sample windows as they appear in the input
}

train_dataset = tf.data.experimental.make_csv_dataset("jump_detection/train/*.csv", **kwargs)
valid_dataset = tf.data.experimental.make_csv_dataset("jump_detection/valid/*.csv", **kwargs)
test_dataset = tf.data.experimental.make_csv_dataset("jump_detection/test/*.csv", **kwargs)

Here, we perform a series of steps:

- Reshape the data
- Implement best practices for fast data loading
- Batch the data

In [None]:
def pack_features_vector(features, labels):
  """Pack the features into a single array.
  
  Adapted from: https://www.tensorflow.org/tutorials/customization/custom_training_walkthrough#create_a_tfdatadataset
  """
  features = tf.stack(list(features.values()), axis=1)
  # Remove all but the leading dimension.
  # We set up batching outside of this function.
  features = tf.reshape(features, shape=(-1, ))
  labels = tf.reshape(labels, shape=(-1,))
  
  return features, labels

def squeeze_labels_vector(features, labels):
  """Remove the trailing dimension of labels."""
  labels = tf.squeeze(labels, axis=-1)
  return features, labels

def cache_shuffle_batch_prefetch(
    dataset: Dataset, batch_size: int, shuffle: bool = False, buffer_size: Optional[int] = None
):
    """Given a `dataset`, returns a new `dataset` which generates batches of size `batch_size`.
    If `shuffle`, the data is shuffled by randomly choosing items from `buffer_size` number of
    examples. Follows best practices for optimized data loading by caching and prefetching the data.

    See the individual tf.data.Dataset methods for more details:

    - [cache](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#cache)
    - [shuffle](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#shuffle)
    - [batch](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#batch)
    - [prefetch](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#prefetch)
    """
    dataset = dataset.cache()
    if shuffle:
        dataset = dataset.shuffle(buffer_size=buffer_size, reshuffle_each_iteration=True)
    dataset = dataset.batch(batch_size=batch_size, drop_remainder=True)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

# Stack the features and labels of the dataset into tensors
train_dataset = train_dataset.map(pack_features_vector, num_parallel_calls=tf.data.AUTOTUNE)
valid_dataset = valid_dataset.map(pack_features_vector, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.map(pack_features_vector, num_parallel_calls=tf.data.AUTOTUNE)

# Create the first batch dimension, which corresponds to timesteps
train_dataset = train_dataset.batch(NUM_TIMESTEPS, drop_remainder=True)
valid_dataset = valid_dataset.batch(NUM_TIMESTEPS, drop_remainder=True)
test_dataset = test_dataset.batch(NUM_TIMESTEPS, drop_remainder=True)

# Batching introduces an unnecessary trailing dimension in the labels, drop it
train_dataset = train_dataset.map(squeeze_labels_vector, num_parallel_calls=tf.data.AUTOTUNE)
valid_dataset = valid_dataset.map(squeeze_labels_vector, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.map(squeeze_labels_vector, num_parallel_calls=tf.data.AUTOTUNE)

# Finally, we setup traditional batching, using a helper function that also implements best practices (e.g. caching, prefetching)
train_dataset = cache_shuffle_batch_prefetch(train_dataset, batch_size=BATCH_SIZE, shuffle=True, buffer_size=BUFFER_SIZE)
valid_dataset = cache_shuffle_batch_prefetch(valid_dataset, batch_size=BATCH_SIZE, shuffle=False, buffer_size=BUFFER_SIZE)
test_dataset = cache_shuffle_batch_prefetch(test_dataset, batch_size=BATCH_SIZE, shuffle=False, buffer_size=BUFFER_SIZE)

In [None]:
features, labels = next(iter(train_dataset))
print(f"Features are of shape (batch_size, timesteps, feature_dim): {features.shape}")
print(f"Labels are of shape: {labels.shape}")

Next, we will run several import preprocessing steps:

1. Compute a resonable bias for our classifier (to favour the more popular class)
2. Compute appropriate class weights in order to weight our loss function (to favor the least popular class)
3. Compute additional features, like `sum(x, y, z) = x + y + z` and `norm(x, y, z) = sqrt(x^2 + y^2 + z^2)`.

In [None]:
all_features = []
all_labels = []

random_baseline_precision = tf.keras.metrics.Precision()
random_baseline_recall = tf.keras.metrics.Recall()
always_jumping_precision = tf.keras.metrics.Precision()
always_jumping_recall = tf.keras.metrics.Recall()

for features, labels in iter(train_dataset):
    features = features.numpy()
    labels = labels.numpy()

    # Flatten the batch dimension
    all_features.append(features.reshape(-1, features.shape[-1]))
    all_labels.append(labels.reshape(-1))

    # Compute baseline precision and recalls to compare against
    random_prediction = np.random.randint(2, size=labels.shape)
    all_ones_prediction = np.ones_like(labels)
    
    random_baseline_precision.update_state(labels, random_prediction)
    random_baseline_recall.update_state(labels, random_prediction)
    always_jumping_precision.update_state(labels, all_ones_prediction)
    always_jumping_recall.update_state(labels, all_ones_prediction)

all_features = np.concatenate(all_features, axis=0)
all_labels = np.concatenate(all_labels, axis=0)

# Compute a more reasonable initial bias value for our classifier
# and reasonable weights for our pos and neg classes for our loss function.
pos = all_labels.sum()
neg = all_labels.shape[0] - pos
classifier_bias = np.log(pos/neg)

# Compute the sum and norm of our accelerometer data.
# Train a normalization layer on all features.
sum_ = np.sum(all_features, axis=-1, keepdims=True)
norm = np.linalg.norm(all_features, axis=-1, keepdims=True)
all_features = np.concatenate((all_features, sum_, norm), axis=-1)
normalizer = preprocessing.Normalization(-1)
normalizer.adapt(all_features)

print(f"Classifier bias: {classifier_bias}")
print(f"Precision of random baseline: {random_baseline_precision.result().numpy()}")
print(f"Recall of random baseline: {random_baseline_recall.result().numpy()}")
print(f"Precision of always jumping baseline: {always_jumping_precision.result().numpy()}")
print(f"Recall of always jumping baseline: {always_jumping_recall.result().numpy()}")

Here, we actually define and build the model

In [None]:
tf.keras.backend.clear_session()

METRICS = [
      keras.metrics.BinaryAccuracy(name='accuracy'),
      keras.metrics.Precision(name='precision'),
      keras.metrics.Recall(name='recall'),
      keras.metrics.AUC(name='auc'),
]

class FeatureExtractor(layers.Layer):
    """Extracts additional features from our inputs, like sum, and norm."""
    def call(self, inputs):
        sum_ = tf.math.reduce_sum(
            inputs, axis=-1, keepdims=True, name="sum"
        )
        norm = tf.norm(inputs, axis=-1, keepdims=True, name="norm")
        return tf.concat([inputs, sum_, norm], axis=-1)

def build_model():
    inputs = keras.Input(shape=(None, 3))
    feature_extractor = FeatureExtractor()
    conv_1d = layers.Conv1D(
        filters=32,
        kernel_size=3,
        strides=1,
        padding="causal",
        activation="relu"
    )
    dropout = layers.Dropout(0.2)
    lstm_1 = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.1))
    lstm_2 = layers.Bidirectional(layers.LSTM(32, return_sequences=True, dropout=0.2))
    dense = layers.Dense(
        1,
        activation="sigmoid",
        bias_initializer=tf.keras.initializers.Constant(value=classifier_bias)
    )
    reshape = layers.Reshape((-1,))


    x = feature_extractor(inputs)
    x = normalizer(x)
    x = conv_1d(x)
    x = dropout(x)
    x = lstm_1(x)
    x = lstm_2(x)
    x = dense(x)
    outputs = reshape(x)

    model = keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=keras.optimizers.Adam(5e-4),
        loss=keras.losses.BinaryCrossentropy(),
        metrics=METRICS,
    )

    return model

In [None]:
model = build_model()
model.summary()
tf.keras.utils.plot_model(model)

In [None]:
callbacks = [
    keras.callbacks.ModelCheckpoint(
        "weights.{precision:.2%}-{recall:.2%}.h5", save_best_only=False, monitor="val_loss"
    ),
]

In [None]:
history = model.fit(
    train_dataset,
    epochs=20,
    callbacks=callbacks,
    validation_data=valid_dataset
)

pretrained_weights = "pretrained_weights.tf"
model.save_weights(pretrained_weights)

In [None]:
loss, _, precision, recall, _ = model.evaluate(test_dataset)
print(f"Precision: {precision:.2%}")
print(f"recall: {recall:.2%}")

__OPTIONAL__: The following code will prune the model during a short fine-tuning phase. See https://www.tensorflow.org/model_optimization/guide/pruning/comprehensive_guide#prune_some_layers_sequential_and_functional for more details

In [None]:
import tensorflow_model_optimization as tfmot
from tensorflow.keras.layers.experimental.preprocessing import Normalization
from tensorflow.python.keras.layers.wrappers import Bidirectional

no_prune = (FeatureExtractor, Normalization, Bidirectional)

def apply_pruning(layer):
  if not isinstance(layer, no_prune):
    return tfmot.sparsity.keras.prune_low_magnitude(layer)
  return layer


model = build_model()
model.load_weights(pretrained_weights)

model_for_pruning = tf.keras.models.clone_model(
    model,
    clone_function=apply_pruning,
)

model_for_pruning.summary()

callbacks = [
    tfmot.sparsity.keras.UpdatePruningStep(),
]

model_for_pruning.compile(
    # Drop the learning rate by a full order of magnitude
    optimizer=keras.optimizers.Adam(5e-5),
    loss=keras.losses.BinaryCrossentropy(),
    metrics=METRICS,
)

_ = model_for_pruning.fit(
    train_dataset,
    callbacks=callbacks,
    # Train for a fraction of the original number of epochs
    epochs=5,
)

Compare the performance of the model after pruning

In [None]:
loss, _, precision, recall, _ = model_for_pruning.evaluate(test_dataset)
print(f"Precision: {precision:.2%}")
print(f"Recall: {recall:.2%}")

Save the pruned model to disk

In [None]:
model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)
model_for_export.save("jump_detection.tf", include_optimizer=False)