In [1]:
import os
import sys
project_path = os.path.expanduser('~/emg2qwerty')
sys.path.append(project_path)

from emg2qwerty.data import EMGSessionData, LabelData, WindowedEMGDataset
import numpy as np
import torch

# First import test data

In [2]:
# Import test data
import yaml
user_name = 'user0'
yaml_path = os.path.expanduser(f'~/emg2qwerty/config/user/{user_name}.yaml')
yaml = yaml.safe_load(open(yaml_path, 'r'))
session_list = yaml['dataset']['test']

# Later, we will need to iterate
# for session in session_list:
#     filename = session['session']
filename = session_list[0]['session'] + '.hdf5'

hdf5_dir = os.path.join(project_path, 'data') 
hdf5_path = os.path.join(hdf5_dir, filename)

# What should our stride & window size be?
- Let's assume we are keeping window size <- the legnth of prediction (4 seconds)
- Let's assume we are also using the same padding, 1000 ms in the past. This part is given as input, but keystrokes inside this padding is not predicted
- What should our stride be?

## Note) Later Steps
- window size should decrease
- window size + padding should remain the same. (increase padding, reduce window size)
- decrease stride to an appropriate sampling rate

### Experiment 1)
- How long does it take for the baseline model to produce inference for one window?

In [3]:
# Import the personalized model for the user
from emg2qwerty.lightning import TDSConvCTCModule
model_path = os.path.expanduser(f'~/emg2qwerty/models/personalized-finetuned/{user_name}.ckpt')
model = TDSConvCTCModule.load_from_checkpoint(model_path)
model.eval()

TDSConvCTCModule(
  (model): Sequential(
    (0): SpectrogramNorm(
      (batch_norm): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): MultiBandRotationInvariantMLP(
      (mlps): ModuleList(
        (0-1): 2 x RotationInvariantMLP(
          (mlp): Sequential(
            (0): Linear(in_features=528, out_features=384, bias=True)
            (1): ReLU()
          )
        )
      )
    )
    (2): Flatten(start_dim=2, end_dim=-1)
    (3): TDSConvEncoder(
      (tds_conv_blocks): Sequential(
        (0): TDSConv2dBlock(
          (conv2d): Conv2d(24, 24, kernel_size=(1, 32), stride=(1, 1))
          (relu): ReLU()
          (layer_norm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
        (1): TDSFullyConnectedBlock(
          (fc_block): Sequential(
            (0): Linear(in_features=768, out_features=768, bias=True)
            (1): ReLU()
            (2): Linear(in_features=768, out_features=768, bias=True)
         

# Small window version

In [14]:
from emg2qwerty import transforms
# test_transforms = transforms.Compose([transforms.ToTensor()])

# window_data = WindowedEMGDataset(hdf5_path, window_length=8000, padding=(2000, 0), transform = test_transforms)
# print(f'batch :{window_data.__len__()}, input shape: {window_data[0][0].shape}, label shape : {window_data[0][1].shape}')

test_transforms = transforms.Compose([transforms.ToTensor(), transforms.LogSpectrogram()])

window_data = WindowedEMGDataset(hdf5_path, window_length=100, padding=(9900, 0), transform = test_transforms)
print(f'batch :{window_data.__len__()}, input shape: {window_data[2][0].shape}, label shape : {window_data[0][1].shape}')
# 622 ~= (8000 + 2000) / 16 = 625 (note 16 = hop length)
# Anyway, one window is size [622, 2, 16, 33] after transformation (except the first one <- no padding (no past info))

one_frame = window_data[2][0]; one_label = window_data[2][1]
one_frame = one_frame.unsqueeze(1); one_label = one_label.unsqueeze(1)

one_frame.shape, one_label.shape

input_formatted = {'inputs': one_frame, 'targets': one_label,
 'input_lengths': torch.tensor([622], dtype=torch.int32),
 'target_lengths': torch.tensor([16], dtype=torch.int32)}

batch :23641, input shape: torch.Size([622, 2, 16, 33]), label shape : torch.Size([0])


In [26]:
window_data.__len__()
# window_data[10][0].shape
window_data[2009][1].shape[0]

0

In [27]:
# Let's count how many inputs will have label that is not empty
# If most datapoints are empty, we should not use them
empty_count = 0 
for i in range(window_data.__len__()):
    if window_data[i][1].shape[0] == 0:
        empty_count += 1

empty_count / window_data.__len__()

0.8015735374984138

# Only past data version (~30ms before keystroke)

In [None]:
from emg2qwerty import transforms
# test_transforms = transforms.Compose([transforms.ToTensor()])

# window_data = WindowedEMGDataset(hdf5_path, window_length=8000, padding=(2000, 0), transform = test_transforms)
# print(f'batch :{window_data.__len__()}, input shape: {window_data[0][0].shape}, label shape : {window_data[0][1].shape}')

test_transforms = transforms.Compose([transforms.ToTensor(), transforms.LogSpectrogram()])
window_data = WindowedEMGDataset(hdf5_path, window_length=100, padding=(9960, -60), transform = test_transforms)

# Testing inference time

In [7]:
import time

start_time = time.time()
for i in range(window_data.__len__()):
    one_frame = window_data[i][0]; one_label = window_data[i][1]
    one_frame = one_frame.unsqueeze(1); one_label = one_label.unsqueeze(1)
    input_length = len(one_frame)
    target_length = len(one_label)

    input_formatted = {'inputs': one_frame, 'targets': one_label,
    'input_lengths': torch.tensor([input_length], dtype=torch.int32),
    'target_lengths': torch.tensor([target_length], dtype=torch.int32)}
    model.test_step(input_formatted)
    end_time = time.time()

print(f"Execution time: {end_time - start_time:.6f} seconds")
print(f"Average prediction time : {(end_time - start_time)/i:.6f} seconds")

ValueError: not enough values to unpack (expected 5, got 3)

## DATASET FORMULATION
- Given execution time ~= 0.04, the highest possible sampling rate is 25Hz.
- Stride length minimum = 0.04 seconds which is stride = 80 (baseline is 8000 (stride == window length))
- But that is minimum, so let's use stride = 160 as safety, with window size = 160, padding = 9840
- So the system will wait input for 0.08 seconds then make prediction

## How good is this?
- The use will experience 120 ms lag.
- How can we reduce this?
- EMG precede visible movement by about 30~100 ms
- If we augment the dataset to predict keystroke 30 ms before the actual timing and train model on that, would it work?

- If the augmentation goes successful, the delay will be ~90ms


## DATASET FORMULATION 2
- Given execution time ~= 0.04, the highest possible sampling rate is 25Hz.
- Stride length minimum = 0.04 seconds which is stride = 80 (baseline is 8000 (stride == window length))
- Use near minimum, stride = 100, with window size = 100, padding = 9900
- So the system will wait input for 0.05 seconds then make prediction

## How good is this?
- The use will experience 90 ms lag.
- How can we reduce this?
- EMG precede visible movement by about 30~100 ms
- If we augment the dataset to predict keystroke 30 ms before the actual timing and train model on that, would it work?
- If the augmentation goes successful, the delay will be ~60ms


## Hardware will introduce another source of delay, magnitude ???
## Language model will introduce another source of delay
- My bluetooth keyboard had 30 ms delay total
- The delay will be noticed if ~100ms.


# Experiment: what happens if I do negative padding, so that I only use past information, ~30ms before keystroke?
- 30 ms is kind of arbitrary. If we do a better correlation analysis (?) we may get a better insight into what this number should be

In [9]:
# Test the small window version!
test_transforms = transforms.Compose([transforms.ToTensor(), transforms.LogSpectrogram()])
window_data = WindowedEMGDataset(hdf5_path, window_length=100, padding=(9900, 0), transform = test_transforms)
print(f'batch :{window_data.__len__()}, input shape: {window_data[0][0].shape}, label shape : {window_data[0][1].shape}')

# DataLoader
from torch.utils.data import DataLoader
test_dataloader = DataLoader(
            window_data,
            batch_size=1,
            shuffle=False,
            num_workers=3,
            collate_fn=window_data.collate,
            pin_memory=True,
            persistent_workers=True)


batch = next(iter(test_dataloader))

del test_dataloader

import gc
gc.collect()
torch.cuda.empty_cache()

pred, loss, metrics = model.test_step(batch[100])
print(pred[0].__str__())

from emg2qwerty.metrics import CharacterErrorRates
cer = CharacterErrorRates()
cer.update(pred[0], GT)
cer.compute()

batch :23641, input shape: torch.Size([3, 2, 16, 33]), label shape : torch.Size([0])


KeyError: 100

In [11]:
test_dataloader = DataLoader(
            window_data,
            batch_size=1,
            shuffle=False,
            num_workers=3,
            collate_fn=window_data.collate,
            pin_memory=True,
            persistent_workers=True)

In [21]:
window_data.__getitem__(4999)

(tensor([[[[-0.8206,  0.8627,  1.9767,  ...,  0.2005, -0.4596, -2.1619],
           [-1.3654,  1.1617,  2.0713,  ..., -0.0701, -0.5394, -0.7612],
           [ 0.4448,  0.9819,  2.0513,  ..., -1.0993, -0.9447, -0.2359],
           ...,
           [-2.8162,  1.0822,  1.4743,  ..., -0.2251, -0.2153,  0.4482],
           [-0.2417,  0.7476,  1.7175,  ..., -1.6421, -1.1337, -1.4545],
           [-0.5485,  0.5717,  1.6467,  ...,  0.2941, -1.5640, -0.2922]],
 
          [[ 0.8723,  1.5427,  2.1978,  ...,  0.3729,  0.3015, -0.2534],
           [ 1.3964,  1.5111,  2.5604,  ...,  0.0914, -0.6499, -0.0425],
           [ 1.8263,  1.9207,  3.1723,  ..., -0.1532,  0.2833, -0.0295],
           ...,
           [ 0.9804,  1.5796,  1.7248,  ..., -0.5702, -0.0725, -0.6250],
           [ 0.5530,  1.5083,  1.7351,  ...,  0.1335,  0.5246,  0.6864],
           [ 0.6851,  1.1693,  0.1112,  ..., -0.1514,  0.0616, -0.2400]]],
 
 
         [[[-0.9315,  0.6049,  1.6967,  ...,  0.2184, -0.6759, -2.1156],
          