In [None]:
from pathlib import Path
import numpy as np
import glob
import pandas as pd

data_folder = Path(
    r"F:\Dropbox (Personal)\BCII\BCI Challenges\2024 ALVI EMG Decoding\dataset_v2_blocks\dataset_v2_blocks"
)

dest_folder = Path(
    r'F:\Dropbox (Personal)\BCII\BCI Challenges\2024 ALVI EMG Decoding\kaggle_files'
)

hand_types = ["left", "right"]
human_types = ['health', 'amputant']



LEFT_TO_RIGHT_HAND = [6, 5, 4, 3, 2, 1, 0, 7]
N_MYO = 8
N_ANGLES = 20


### Files per condition

In [None]:
by_human_types = {ht:0 for ht in human_types}

for ht in human_types:
    by_hand_types = {hand_type:0 for hand_type in hand_types}
    for hand_type in hand_types:
        base_folder = data_folder / ht / hand_type

        # get all folders 
        folders = glob.glob(str(base_folder / "*"))
        by_hand_types[hand_type] = len(folders)
        by_human_types[ht] += len(folders)

    print(f"{ht}: {by_hand_types}")
print(f"Total: {by_human_types}")

# Load data

For each "human type" and "hand type", get all .npz files in the `train` and `test` folders.
Creates one huge dataframe with all the EMG data and target angles. Each row is one sample and has some metadata to identify it. 

In [None]:
def add_to_data(data:dict, folder:str, name:str, tset:str,  human_type:str, hand_type:str):
    """
        Loads all .npz files in a folder and adds the data to a growin dict
        storing metadata, features and variables being predicted.
    """
    files = glob.glob(str(folder / "*.npz"))
    print(f"{human_type}/{hand_type}/{name}/{tset}: {len(files)} files")

    for tid, fl in enumerate(files):
        fdata = dict(np.load(fl))

        T = fdata['data_myo'].shape[0]

        ht = "healty" if human_type == "health" else "amputee"
        data['subject_type'] += [ht] * T
        data['subject_name'] += [name.split("_")[0]] * T
        data['exp_id'] += [name] * T
        data['trial_id'] += [tid] * T
        data['condition'] += [ht] * T

        for i in range(N_ANGLES):
            data[f"ang_{i}"] += fdata[f"data_angles"][:, i].tolist()


        if hand_type == 'left':
            emg_data = fdata['data_myo'][:, LEFT_TO_RIGHT_HAND]
        else:
            emg_data = fdata['data_myo']

        for i in range(N_MYO):
            data[f"myo_{i}"] += emg_data[:, i].tolist()

In [None]:

data = {
    'subject_type': [],
    'subject_name': [],
    'exp_id': [],
    'trial_id': [],
    'condition': [],
}
data = {**data, **{f"ang_{i}": [] for i in range(N_ANGLES)}}
data = {**data, **{f"myo_{i}": [] for i in range(N_MYO)}}



for ht in human_types:
    for hand_type in hand_types:
        base_folder = data_folder / ht / hand_type

        # get all folders 
        folders = glob.glob(str(base_folder / "*"))
        
        for folder in folders:
            name = Path(folder).name

            for tset in ("train", "test"):
                # find all numpy files in folder/train
                complete_folder = Path(folder) / "preproc_angles" / tset
                if not complete_folder.exists():
                    continue
                
                # add all data to original dict
                add_to_data(
                    data, complete_folder, name, tset, ht, hand_type
                )




Create DF

In [None]:
df = pd.DataFrame(data)
del data

# add a unique ID column 
df['sample_id'] = np.arange(df.shape[0])
print(df.shape)

### Split TRAIN vs TEST

TEST is made of only data from fedya, 1/2 of the data from this subject is included in the training data, the remainder is kept for testing

In [None]:
# get a subset of fedya's trials
fedya = df.loc[df.subject_name == "fedya"]
fedya_trials = fedya.trial_id.unique()
fedya_trials_test = fedya_trials[::2]
fedya_trials_train = fedya_trials[1::2]


In [None]:
# keep samples when subject != 'fedya' or for fedya trial_id is not in trials_test
train = df.loc[(df.subject_name != "fedya") | (df.trial_id.isin(fedya_trials_train))]
test_data = df.loc[(df.subject_name == "fedya") & (df.trial_id.isin(fedya_trials_test))]

print(train.shape)
print(test_data.shape)
print(train.tail(5))
print(test_data.tail(5))
del df

### Prepare submission data

In [None]:
target_cols = [f'ang_{i}' for i in range(N_ANGLES)]
inputs_cols = [f'myo_{i}' for i in range(N_MYO)]

# select columsn subsets
solution = test_data[['sample_id'] + target_cols]
print(solution.head())

test = test_data[['sample_id', 'trial_id'] + inputs_cols]
print(test.head())

# make a sample solution df
sample_solution = solution.copy()
sample_solution[target_cols] = 0.0
print(sample_solution.head())


Add an `Usage` column to solution to split between public and private leaderboards

In [None]:
assert test.shape[0] == solution.shape[0]

In [None]:
trials = list(test.trial_id.unique())
public = trials[::2]


usage = np.zeros(solution.shape[0])

for tr in trials:
    trial_usage = 1 if tr in public else 0
    rows_idxs = np.where(test.trial_id == tr)
    usage[rows_idxs] = trial_usage


usage = ['Public' if u == 1 else 'Private' for u in usage]
solution['Usage'] = usage
solution.tail(3)

### Save

In [None]:
train.to_csv(dest_folder / 'train.csv', index=False)
print(train.shape)
train.head()

In [None]:
test.to_csv(dest_folder / 'test.csv', index=False)
print(test.shape)
test.head()

In [None]:
solution.to_csv(dest_folder / 'solution.csv', index=False)
print(solution.shape)
solution.head()

In [None]:
sample_solution.to_csv(dest_folder / 'sample_solution.csv', index=False)
print(sample_solution.shape)
sample_solution.head()