In [1]:
from model.dataloader import NinaProDatasetLoader, NearlabDatasetLoader
from model.nina_helper import *
import torch
import pandas as pd
from sklearn.utils import shuffle
from scipy import signal
from model.utils import list_files

In [2]:
class NinaProDatasetLoader:
    """
    NinaProDatasetLoader class to load and preprocess NinaPro dataset.
    
    Parameters:
    ----------
    folder_path : str
        Path to the folder containing NinaPro database files
    subject : int
        Subject number to load (1-27 for DB1, 1-40 for DB2)
    database : int
        Which NinaPro database to use (1 or 2)
    window_length : int
        Length of the sliding window in samples
    window_increment : int
        Increment between consecutive windows in samples
    rest_length_cap : int, optional
        Number of seconds of rest data to keep before/after movement (default: 5)
    """
    def __init__(self, folder_path, subject, database, window_length, window_increment, rest_length_cap = 5):
        
        self.folder_path = folder_path
        self.subject = subject
        self.database = database
        self.window_length = window_length
        self.window_increment = window_increment
        self.rest_length_cap = rest_length_cap
        
    def load_data(self, split_method = "repetition_wise", test_reps = 2):
        """
        Load and preprocess the NinaPro dataset.
        
        Parameters:
        ----------
        split_method : str
            Method to split the data ("repetition_wise" or "balanced")
        test_reps : int
            Number of repetitions to use for testing
        
        """
        # Load in Ninapro data based on database
        if self.database == 1:
            data = import_db1(self.folder_path, self.subject, self.rest_length_cap)
        elif self.database == 2:
            data = import_db2(self.folder_path, self.subject, self.rest_length_cap)
        else:
            raise ValueError("Database must be 1 or 2")
            
        rep_ids = np.unique(data["rep"])
        rep_ids = rep_ids[rep_ids > 0]
        
        # Split into train test set
        if split_method == "repetition_wise":
            train_reps, test_reps = gen_split_rand(rep_ids, test_reps, 12, base=[2, 5])
        elif split_method == "balanced":
            train_reps, test_reps = gen_split_balanced(rep_ids, test_reps, base=[2, 5])
        else:
            raise ValueError("Split not included")
            
        # Use first split if multiple were generated
        train_reps = train_reps[0]
        test_reps = test_reps[0]
        
        # Normalize data
        normalized_emg = normalise_emg(data["emg"], data["rep"], train_reps)

        # Convert to Dataframe for the filter function
        emg_df = pd.DataFrame(normalized_emg, columns=[f"channel_{i+1}" for i in range(normalized_emg.shape[1])])
        emg_df["stimulus"] = data["move"]
        emg_df["repetition"] = data["rep"]

        filtered_emg = self.filter_data(emg_df, f=(10, 450), butterworth_order=4, btype='bandpass')

        emg_filtered = filtered_emg.values[:, :12]
        
        # Get windowed data for training set
        X_train, y_train, _ = get_windows(
            train_reps,
            self.window_length,
            self.window_increment,
            emg_filtered,
            data["move"],
            data["rep"]
        )
        
        # Get windowed data for test set
        X_test, y_test, _ = get_windows(
            test_reps,
            self.window_length,
            self.window_increment,
            emg_filtered,
            data["move"],
            data["rep"]
        )
        
        # Shuffle the data
        X_train, y_train = shuffle(X_train, y_train, random_state=42)
        X_test, y_test = shuffle(X_test, y_test, random_state=42)
        
        # Convert to PyTorch tensors
        X_train = torch.FloatTensor(X_train).squeeze(-1)
        y_train = torch.LongTensor(y_train)
        X_test = torch.FloatTensor(X_test).squeeze(-1)
        y_test = torch.LongTensor(y_test)
        
        return X_train, y_train, X_test, y_test
    
    # from https://github.com/parasgulati8/NinaPro-Helper-Library/blob/master/NinaPro_Utility.py
    def filter_data(self, data, f, butterworth_order = 4, btype = 'lowpass'):
        emg_data = data.values[:,:12]
        
        f_sampling = 2000
        nyquist = f_sampling/2
        if isinstance(f, int):
            fc = f/nyquist
        else:
            fc = list(f)
            for i in range(len(f)):
                fc[i] = fc[i]/nyquist
                
        b,a = signal.butter(butterworth_order, fc, btype=btype)
        transpose = emg_data.T.copy()
        
        for i in range(len(transpose)):
            transpose[i] = (signal.lfilter(b, a, transpose[i]))
        
        filtered = pd.DataFrame(transpose.T)
        filtered['stimulus'] = data['stimulus']
        filtered['repetition'] = data['repetition']
        
        return filtered

In [None]:
from model.model import CNet2D
version="Softmax"
folder_path = "/Users/dennisschielke/Desktop/Uni/Bachelor_Thesis/src/data/ninapro/DB2/person1/"

ninapro = NinaProDatasetLoader(folder_path, 1, 2, 400, 40)

X_train, y_train, X_test, y_test = ninapro.load_data(split_method="balanced", test_reps=2)

current_model = CNet2D(version=version, epochs=1, num_classes=50, batch_size=128, dataset_type="NinaPro")
history = current_model.fit(X_train, y_train)


In [5]:
print(np.unique(y_train))

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49]


In [46]:
print(X_train.shape)

torch.Size([88272, 400, 12, 1])


In [42]:
path_nearlab = "/Users/dennisschielke/Desktop/Uni/Bachelor_Thesis/src/data/nearlab/8features/person1"
file_paths_nearlab = list_files(path_nearlab, "csv")

data = NearlabDatasetLoader(file_paths_nearlab[:2], file_paths_nearlab[2:])
X_train, y_train, X_test, y_test = data.load_data(split_method="repetition_wise")

In [43]:
print(X_train.shape)

torch.Size([4944, 10, 512])
