In [8]:
!rm -rf sample_data

!git clone https://github.com/gabbiurlaro/aml22-ego.git aml22-ego
!cd aml22-ego && git checkout vae

Cloning into 'aml22-ego'...
remote: Enumerating objects: 3462, done.[K
remote: Counting objects: 100% (114/114), done.[K
remote: Compressing objects: 100% (48/48), done.[K
remote: Total 3462 (delta 74), reused 94 (delta 66), pack-reused 3348[K
Receiving objects: 100% (3462/3462), 1.43 GiB | 24.84 MiB/s, done.
Resolving deltas: 100% (2540/2540), done.
Updating files: 100% (45/45), done.
Updating files: 100% (150/150), done.
Branch 'vae' set up to track remote branch 'vae' from 'origin'.
Switched to a new branch 'vae'


In [9]:
# Mount google drive 
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [10]:
# Install conda

!pip install -q condacolab
import condacolab
condacolab.install()

[0m✨🍰✨ Everything looks OK!


In [11]:

!mkdir -p /usr/local/envs/egovision
!tar xf /content/drive/MyDrive/egovision.tar.gz --directory=/usr/local/envs/egovision

In [3]:
%%bash
cd aml22-ego

In [15]:
import pickle 
import numpy as np
import pandas as pd
import pywt
import torch
import os
import torch.nn.functional as F
from scipy.interpolate import CubicSpline
import random
from torch.utils.data import Dataset
from utils.loaders import ActionNetDataset

dataset_conf = {
  'annotations_path': '../drive/MyDrive/train_val_EMG',
  'shift': 'ActionNet-ActionNet',
  'workers': 4,
  'stride': 2,
  'resolution': 224
}
train =  ActionNetDataset('ActionNet', ['EMG'],'train',
                                   dataset_conf, {'EMG': 32}, 5, {'EMG': False},
                                   None, load_feat=False, additional_info=True)

test =  ActionNetDataset('ActionNet', ['EMG'],' test',
                                   dataset_conf, {'EMG': 32}, 5, {'EMG': False},
                                   None, load_feat=False, additional_info=True)
                                     
train

ModuleNotFoundError: ignored

In [None]:
def wavelet_decomposition(signal, wavelet_name, decomposition_level, detail_factor):
    coeffs = pywt.wavedec(signal, wavelet=wavelet_name, level=decomposition_level)
    cA, cD = coeffs[0], coeffs[1:]  # Approximation and detail coefficients
    
    # Modify detail coefficients
    cD_modified = [detail_factor * cd for cd in cD]
    
    # Reconstruct the augmented signal
    augmented_coeffs = [cA] + cD_modified
    augmented_signal = np.array(pywt.waverec(augmented_coeffs, wavelet=wavelet_name))
    
    return augmented_signal


class WaveletDecompositionTransform:
    def __init__(self, wavelet_name, decomposition_level, detail_factor, num_clips):
        self.wavelet_name = wavelet_name
        self.decomposition_level = decomposition_level
        self.detail_factor = detail_factor
        self.num_clips = num_clips
    
    def __call__(self, sample):
        signals = sample['features_EMG']
        augmented_signals = []
        for i in range(self.num_clips):
          augmented_signals.append(wavelet_decomposition(torch.Tensor(signals[i]), self.wavelet_name, self.decomposition_level, self.detail_factor))
        # Create a new augmented sample
        augmented_sample = {
            'features_EMG': np.array(augmented_signals),
            'label': sample['label'],
            'uid': sample['uid'],
            'untrimmed_video_name': sample['untrimmed_video_name']
        }
        
        return augmented_sample

In [None]:
def magnitude_warping(signal, variance=0.01):
    T = signal.size(0)
    t = torch.linspace(0, 1, T)  # Equidistant time points
    r = torch.randn(T)  # Random numbers from a normal distribution
    r = torch.clamp(r, -2.0, 2.0)  # Limit the range of random numbers to avoid extreme warping
    
    # Generate a smooth curve using cubic splines
    spline = CubicSpline(t, r)
    cubic_spline = torch.from_numpy(spline(t)).float()
    
    # Elementwise product of the interpolated curve with the signal
    warped_signal = np.array(signal * (1.0 + variance * cubic_spline))

    return warped_signal

class MagnitudeWarpingTransform:
    def __init__(self, variance, num_clips):
        self.variance= variance
        self.num_clips = num_clips
    
    def __call__(self, sample):
        signals = sample['features_EMG']
        augmented_signals = []
        for i in range(self.num_clips):
          augmented_signals.append(magnitude_warping(torch.Tensor(signals[i]), variance=self.variance))
        # Create a new augmented sample
        augmented_sample = {
            'features_EMG': np.array(augmented_signals),
            'label': sample['label'],
            'uid': sample['uid'],
            'untrimmed_video_name': sample['untrimmed_video_name']
        }
        
        return augmented_sample

# Example usage
#signal = torch.randn(1024)  # Assuming input signal of size 1024
#warped_signal = magnitude_warping(signal, variance=0.01)

In [None]:
class sEMGSimulationSS1Transform:
    def __init__(self, fl, fh, num_electrodes=1):
        self.fl = fl
        self.fh = fh
        self.num_electrodes = num_electrodes

    def __call__(self, x):
        batch_size, signal_length = x.size()

        # Generate random vector w from standard normal distribution
        w = torch.randn(batch_size, self.num_electrodes, signal_length)

        # Apply shaping filter g
        g = torch.sqrt((self.fh ** 2 * self.fl ** 2) / ((self.fl ** 2 + self.f ** 2) * (self.fh ** 2 + self.f ** 2) ** 2))
        w_filtered = torch.fft.irfft(torch.fft.rfft(w, signal_length) * g.unsqueeze(1), signal_length, signal_ndim=1)

        # Generate Gaussian noise
        noise = torch.randn(batch_size, self.num_electrodes, signal_length)

        # Generate lowpass filtered (LPF) signal
        lp_filtered_signal = F.avg_pool1d(x.unsqueeze(1), kernel_size=15, stride=1).squeeze(1)

        # Generate synthetic sEMG signal
        xi_star = (w_filtered * lp_filtered_signal.unsqueeze(1)) + noise

        return xi_star


class sEMGSimulationSS2Transform:
    def __init__(self, alpha, beta, num_electrodes=1):
        self.alpha = alpha
        self.beta = beta
        self.num_electrodes = num_electrodes

    def __call__(self, x):
        batch_size, signal_length = x.size()

        # Generate random vector w from standard normal distribution
        w = torch.randn(batch_size, self.num_electrodes, signal_length)

        # Apply shaping filter g
        g = torch.sqrt((fh ** 2 * fl ** 2) / ((fl ** 2 + f ** 2) * (fh ** 2 + f ** 2) ** 2))
        w_filtered = torch.fft.irfft(torch.fft.rfft(w, signal_length) * g.unsqueeze(1), signal_length, signal_ndim=1)

        # Generate sEMG variance sigma^2 from inverse gamma distribution
        sigma_sq = torch.randn(batch_size, self.num_electrodes, signal_length).abs().pow(-2 * self.alpha)

        # Generate Gaussian noise
        noise = torch.randn(batch_size, self.num_electrodes, signal_length)

        # Generate synthetic sEMG signal
        xi_star = (w_filtered * torch.sqrt(sigma_sq)) + noise

        return xi_star


class sEMGDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        sample = self.data[index]
        return sample


# Example usage
fl = 10  # Low frequency cutoff
fh = 100  # High frequency cutoff
alpha = 2  # Alpha parameter for inverse gamma distribution
beta = 3  # Beta parameter for inverse gamma distribution

# Create synthetic sEMG dataset using SS1 transform
ss1_transform = sEMGSimulationSS1Transform(fl, fh)
synthetic_dataset_ss1 = sEMGDataset([ss1_transform(real_data) for real_data in real_dataset])

# Create synthetic sEMG dataset using SS2 transform
ss2_transform = sEMGSimulationSS2Transform(alpha, beta)
synthetic_dataset_ss2 = sEMGDataset([ss2_transform(real_data) for real_data in real_dataset])


In [None]:
class SlidingWindowTransform:
    def __init__(self, window_length, overlap=False):
        self.window_length = window_length
        self.overlap = overlap

    def __call__(self, x):
        num_segments = len(x) // self.window_length
        if self.overlap:
            stride = self.window_length // 2
        else:
            stride = self.window_length

        segments = []
        for i in range(num_segments):
            start = i * stride
            end = start + self.window_length
            segment = x[start:end]
            segments.append(segment)

        return torch.stack(segments)


In [None]:
class AugmentorOneTransform:
    def __init__(self, augmentations):
        self.augmentations = augmentations

    def __call__(self, x):
        augmentation = random.choice(self.augmentations)
        return augmentation(x)

class AugmentorAllTransform:
    def __init__(self, augmentations):
        self.augmentations = augmentations

    def __call__(self, x):
        for augmentation in self.augmentations:
            x = augmentation(x)
        return x

class AugmentorRandomTransform:
    def __init__(self, augmentations, p):
        self.augmentations = augmentations
        self.p = p

    def __call__(self, x):
        for augmentation in self.augmentations:
            if random.random() > self.p:
                x = augmentation(x)
        return x

class AugmentedDataset(Dataset):
    def __init__(self, dataset, augmentation_transform):
        self.dataset = dataset
        self.augmentation_transform = augmentation_transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        x = self.dataset[index]
        x_augmented = self.augmentation_transform(x)
        return x_augmented

# Example usage



In [None]:
variance = 0.01
wavelet_name = 'db4'  # Wavelet name (e.g., Daubechies 4)
decomposition_level = 3  # Number of decomposition levels
detail_factor = 0.5  # Scaling factor for modifying detail coefficients
num_clips = 5


wavelet_transform = WaveletDecompositionTransform(wavelet_name, decomposition_level, detail_factor, num_clips)
magnitude_warp_transform = MagnitudeWarpingTransform(variance, num_clips),
ss1_transform = sEMGSimulationSS1Transform(fl=10, fh=100)
ss2_transform = sEMGSimulationSS2Transform(alpha=1.0, beta=2.0)
sw_transform = SlidingWindowTransform(window_length=100, overlap=True)

augmentations = [wavelet_transform, magnitude_warp_transform, ss1_transform, ss2_transform]
augmentor_one_transform = AugmentorOneTransform(augmentations=augmentations)
augmentor_all_transform = AugmentorAllTransform(augmentations=augmentations)
augmentor_random_transform = AugmentorRandomTransform(augmentations=augmentations, p=0.5)

augmented_dataset_ao = AugmentedDataset(train, augmentation_transform=augmentor_one_transform)
augmented_dataset_aa = AugmentedDataset(train, augmentation_transform=augmentor_all_transform)
augmented_dataset_ar = AugmentedDataset(train, augmentation_transform=augmentor_random_transform)
augmented_dataset_sw = AugmentedDataset(train, augmentation_transform=sw_transform)


augmented_dataset_aa
# augmented_samples_train = [transform(train['features'][i]) for i in range(len(train['features']))]
# out_train = {'features': list(augmented_samples_train)}

# augmented_samples_test = [transform(test['features'][i]) for i in range(len(test['features']))]
# out_test = {'features': list(augmented_samples_test)}

# filename = './ActionNet_augmented_clips'
# with open(f"{filename}_train.pkl", "wb") as file:
#             pickle.dump(out_train, file)

# with open(f"{filename}_test.pkl", "wb") as file:
#             pickle.dump(out_test, file)