## SBU Kinect Interaction Dataset v2.0

In [24]:
%reset -f
import pickle
import numpy as np
from scipy import ndimage
from sklearn.model_selection import train_test_split
from pathlib import Path
import shutil

---

In [25]:
def extract_skeleton_txts(ds_directory="../datasets/SBUKId/", debug=False, cleanup=False):
    ds_directory = Path(ds_directory)
    txts_directory = ds_directory.parent.joinpath(f"{ds_directory.stem}.txts/")
    txts_directory.mkdir(exist_ok=True, parents=True)
    
    sets_transformation = {
        "s01s02": "st01-p1p2",
        "s01s03": "st02-p1p3",
        "s01s07": "st03-p1p7",
        "s02s01": "st04-p2p1",
        "s02s03": "st05-p2p3",
        "s02s06": "st06-p2p6",
        "s02s07": "st07-p2p7",
        "s03s02": "st08-p3p2",
        "s03s04": "st09-p3p4",
        "s03s05": "st10-p3p5",
        "s03s06": "st11-p3p6",
        "s04s02": "st12-p4p2",
        "s04s03": "st13-p4p3",
        "s04s06": "st14-p4p6",
        "s05s02": "st15-p5p2",
        "s05s03": "st16-p5p3",
        "s06s02": "st17-p6p2",
        "s06s03": "st18-p6p3",
        "s06s04": "st19-p6p4",
        "s07s01": "st20-p7p1",
        "s07s03": "st21-p7p3",
    }

    # ---
    for f in ds_directory.rglob("*.txt"):
        _set, _activity, _sequence = f.parts[3:-1]
        n_f = txts_directory.joinpath(f"{sets_transformation[_set]}-a{int(_activity):02}-sq{int(_sequence):02}.txt")
        f.replace(n_f)

        if debug:
            print(f"{f=} \n{n_f = }")
            break

    # ---
    l_dataset = len(list(txts_directory.iterdir()))
    print(f"<{l_dataset=}> skeleton text files extracted to <{txts_directory}> dataset directory.\n")
    
    shutil.rmtree(path=ds_directory, ignore_errors=True) if cleanup else None
    return str(txts_directory)

In [26]:
# functions reused from outsiders17711-3d-dynamic-hgr parse-data-*.ipynb
# ---
def _resize_gestures(in_gest_seqs, target_length=250):
    out_gest_seqs = []
    for sequence in in_gest_seqs:
        zoomed_skeletons = []
        for skeleton in range(np.size(sequence, 1)):
            _zoom_skel = ndimage.zoom(sequence.T[skeleton], target_length / len(sequence), mode="reflect")
            zoomed_skeletons.append(_zoom_skel)

        out_gest_seqs.append(np.array(zoomed_skeletons).T)

    return np.array(out_gest_seqs)

# ---
def _write_data(data, filepath):
    with open(filepath, "wb") as output_file: pickle.dump(data, output_file)

In [27]:
def load_txt_actions(txts_directory="../datasets/SBUKId.txts/", resize_actions=True):

    activity_labels = {
        "a01": "1.Approaching",
        "a02": "2.Departing",
        "a03": "3.Kicking",
        "a04": "4.Pushing",
        "a05": "5.ShakingHands",
        "a06": "6.Hugging",
        "a07": "7.Exchanging",
        "a08": "8.Punching",
    }

    # ---
    files = list(Path(txts_directory).rglob("*.txt"))
    actions = np.array([np.genfromtxt(f, delimiter=",")[:, 1:] for f in files], dtype=object)
    action_lengths = np.array([len(a) for a in actions])
    print(f"{action_lengths.min()=}, {action_lengths.mean()=:.1f}, {action_lengths.max()=}")

    # ---
    if resize_actions:
        target_length=int(action_lengths.mean()*3.5)
        actions = _resize_gestures(actions, target_length)
        print(f"Action sequences resized: {target_length=} | {actions.shape=}")

    # ---
    l_dataset, n_skeletons, n_coords = actions.shape
    actions = actions.reshape(l_dataset, n_skeletons, -1, 3)
    print(f"Action sequences reshaped: {actions.shape=} | {actions[0].shape=}")
    # print("Normalized skeleton coordinates: \n\t", actions[0, 0, :3, :])

    # ---
    o_actions = actions.copy()
    x, y, z = 0, 1, 2
    o_actions[..., x] = 1280 - (o_actions[..., x] * 2560)
    o_actions[..., y] = 960 - (o_actions[..., y] * 1920)
    o_actions[..., z] = o_actions[..., z] * 10000 / 7.8125
    # print("Original skeleton coordinates: \n", o_actions[0, 0, :3, :])

    # ---
    labels = []

    for f in files:
        a_idx = f.stem.split("-")[2]
        a_lbl = f"{activity_labels[a_idx]}-{f.stem.replace(f'-{a_idx}', '')}"
        labels.append(a_lbl)

    print(f"Action labels generated: {len(labels)=} | {labels[0]=}\n")

    # ---
    assert len(actions) == len(o_actions) == len(labels)
    return actions, o_actions, labels

In [28]:
def _load_data(filepath):
    with open(filepath, "rb") as f:
        data = pickle.load(f, encoding="latin1")

    return (
        data["X_norm_train"], data["X_norm_valid"],
        data["X_orig_train"], data["X_orig_valid"],
        data["labels_train"], data["labels_valid"],
    )

In [29]:
# pyright: reportGeneralTypeIssues=false
def create_train_valid_data(ds_directory="../datasets/SBUKId/", resize_actions=True, seed=17711):
    txts_directory = extract_skeleton_txts(ds_directory, cleanup=True, debug=False)
    norm_actions, orig_actions, labels = load_txt_actions(txts_directory, resize_actions)
    print("> <norm_actions, orig_actions, labels> loaded successfully! \n", 
         f" {type(norm_actions)=}, {type(orig_actions)=}, {type(labels)=}"
    )
    
    # ---
    (
        X_norm_train, X_norm_valid,
        X_orig_train, X_orig_valid,
        labels_train, labels_valid,
    ) = train_test_split(norm_actions, orig_actions, labels, test_size=0.30, random_state=seed)
    print(f"> Training/validation subsets created: \n",
          f" {X_norm_train.shape=} | {X_norm_valid.shape=} | {len(labels_valid)=} \n",
          f" {type(X_norm_valid)=}, {type(X_orig_valid)=}, {type(labels_valid)=}",
    )
    
    # ---
    ds_directory = Path(ds_directory)
    data_path = f"{ds_directory.parent}/{ds_directory.stem}_3D_dictTVS_s{len(labels)}.pckl"
    data = {
        "X_norm_train": X_norm_train, "X_norm_valid": X_norm_valid,
        "X_orig_train": X_orig_train, "X_orig_valid": X_orig_valid,
        "labels_train": labels_train, "labels_valid": labels_valid,
    }

    _write_data(data, filepath=data_path)
    print(f"> {ds_directory.stem} TVS train-valid data written to <{data_path}> successfully!\n")

In [30]:
def create_cross_validation_data(ds_directory="../datasets/SBUKId/", resize_actions=True, seed=17711):
    txts_directory = extract_skeleton_txts(ds_directory, cleanup=True, debug=False)
    norm_actions, orig_actions, labels = load_txt_actions(txts_directory, resize_actions)
    print("> <norm_actions, orig_actions, labels> loaded successfully! \n", 
         f" {type(norm_actions)=}, {type(orig_actions)=}, {type(labels)=}"
    )
    
    # ---
    cvs_folds = [
        [1, 9, 15, 19],        # Fold 1
        [5, 7, 10, 16],        # Fold 2
        [2, 3, 20, 21],        # Fold 3
        [4, 6, 8, 11],         # Fold 4
        [12, 13, 14, 17, 18],  # Fold 5
    ]
    cvs_data_folds = {
        "f01" : [[], [], []],
        "f02" : [[], [], []],
        "f03" : [[], [], []],
        "f04" : [[], [], []],
        "f05" : [[], [], []],
    }
    ds_directory = Path(ds_directory)
    data_path = ""

    # ---
    for (n_action, o_action, lbl) in zip(norm_actions, orig_actions, labels):
        _set = int(lbl.split("-")[1].replace("st", ""))
        
        if   _set in cvs_folds[0]: _fold = "f01"
        elif _set in cvs_folds[1]: _fold = "f02"
        elif _set in cvs_folds[2]: _fold = "f03"
        elif _set in cvs_folds[3]: _fold = "f04"
        else                     : _fold = "f05"

        cvs_data_folds[_fold][0].append(n_action)
        cvs_data_folds[_fold][1].append(o_action)
        cvs_data_folds[_fold][2].append(lbl)

    # ---
    for valid_fold in cvs_data_folds.keys():
        X_norm_train, X_norm_valid = [], []
        X_orig_train, X_orig_valid = [], []
        labels_train, labels_valid = [], []
        
        # ---
        for train_fold in cvs_data_folds.keys():
            if (valid_fold == train_fold):
                X_norm_valid, X_orig_valid, labels_valid = cvs_data_folds[valid_fold]
            else:
                X_norm_train.extend(cvs_data_folds[train_fold][0])
                X_orig_train.extend(cvs_data_folds[train_fold][1])
                labels_train.extend(cvs_data_folds[train_fold][2])

        # ---
        X_norm_train, X_norm_valid = np.array(X_norm_train), np.array(X_norm_valid)
        X_orig_train, X_orig_valid = np.array(X_orig_train), np.array(X_orig_valid)
        print(f"> @Fold {valid_fold[1:]}: Training/validation subsets created: \n",
            f" {X_norm_train.shape=} | {X_norm_valid.shape=} | {len(labels_valid)=} \n",
            f" {type(X_norm_valid)=}, {type(X_orig_valid)=}, {type(labels_valid)=}",
        )

        # ---
        data_path = f"{ds_directory.parent}/{ds_directory.stem}_3D_dictCVS_{valid_fold}_s{len(labels)}.pckl"
        data = {
            "X_norm_train": X_norm_train, "X_norm_valid": X_norm_valid,
            "X_orig_train": X_orig_train, "X_orig_valid": X_orig_valid,
            "labels_train": labels_train, "labels_valid": labels_valid,
        }
        _write_data(data, filepath=data_path)

    # ---
    print(f"> {ds_directory.stem} CVS train-valid data folds written to <{data_path.replace('05', '*')}> successfully!\n")

---

In [32]:
create_cross_validation_data(ds_directory="../datasets/SBUKId/")

<l_dataset=282> skeleton text files extracted to <..\datasets\SBUKId.txts> dataset directory.

action_lengths.min()=10, action_lengths.mean()=24.2, action_lengths.max()=46
Action sequences resized: target_length=84 | actions.shape=(282, 84, 90)
Action sequences reshaped: actions.shape=(282, 84, 30, 3) | actions[0].shape=(84, 30, 3)
Action labels generated: len(labels)=282 | labels[0]='1.Approaching-st01-p1p2-sq01'

> <norm_actions, orig_actions, labels> loaded successfully! 
  type(norm_actions)=<class 'numpy.ndarray'>, type(orig_actions)=<class 'numpy.ndarray'>, type(labels)=<class 'list'>
> @Fold 01: Training/validation subsets created: 
  X_norm_train.shape=(227, 84, 30, 3) | X_norm_valid.shape=(55, 84, 30, 3) | len(labels_valid)=55 
  type(X_norm_valid)=<class 'numpy.ndarray'>, type(X_orig_valid)=<class 'numpy.ndarray'>, type(labels_valid)=<class 'list'>
> @Fold 02: Training/validation subsets created: 
  X_norm_train.shape=(230, 84, 30, 3) | X_norm_valid.shape=(52, 84, 30, 3) | le