**About** : This notebook is used to prepare the data.

In [None]:
# %load_ext nb_black
%load_ext autoreload
%autoreload 2

In [None]:
cd ../src/

## Initialization

### Imports

In [None]:
import os
import torch

print(torch.__version__)
os.environ['CUDA_VISIBLE_DEVICES'] = "1"
torch.cuda.get_device_name(0)

In [None]:
import os
import re
import cv2
import sys
import glob
import json
import time
import torch
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from tqdm import tqdm
from sklearn.metrics import *

warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)

In [None]:
from params import *
from data.preparation import prepare_data
from utils.plots import plot_sample, plot_sample_with_edges

### Landmarks setup

In [None]:
KEPT_LANDMARKS = [
    [468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 488],  # left hand
    [522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542],  # right hand
    [10, 54, 67, 132, 150, 152, 162, 172, 176, 234, 284, 297, 361, 379, 389, 397, 400, 454],  # silhouette
    [13, 37, 40, 61, 78, 81, 84, 87, 88, 91, 191, 267, 270, 291, 308, 311, 314, 317, 318, 321, 415],  # lips
    [500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511], # arms
    [205, 425],  # cheeks
]
MAPPING = [i + 1 for i in range(len(KEPT_LANDMARKS))]

TO_AVG = [
    [466, 387, 385, 398, 263, 390, 374, 381, 362],  # left_eye
    [246, 160, 158, 173, 33, 163, 145, 154, 133],
    [383, 293, 296, 285],  # left_eyebrow
    [156, 63, 66, 55],  # right_eyebrow
    [1, 2, 98, 327, 168],  # nose
]

In [None]:
landmarks = np.concatenate(KEPT_LANDMARKS)
type_embed = np.zeros(1000)
start = 0
for subset, idx in zip(KEPT_LANDMARKS, MAPPING):
    print(subset, idx)
    type_embed[start: start + len(subset)] = idx
    start += len(subset)

type_embed = type_embed[type_embed > 0]

type_embed = np.concatenate([type_embed, np.array([idx] * len(TO_AVG))])

print("\nn_landmarks :", len(type_embed))

### Utils

In [None]:
ROWS_PER_FRAME = 543  # number of landmarks per frame

def load_relevant_data_subset(pq_path):
    """
    Loads a relevant subset of data from a Parquet file.
    
    Args:
        pq_path (str): Path to the Parquet file.
    
    Returns:
        Tuple[pd.DataFrame, np.ndarray]: A tuple containing the DataFrame and the data array.
    """
    df = pd.read_parquet(pq_path)
    n_frames = int(len(df) / ROWS_PER_FRAME)
    data = df[['x', 'y', 'z']].values.reshape(n_frames, ROWS_PER_FRAME, 3)
    return df, data.astype(np.float32)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class Preprocessing(nn.Module):
    """
    Preprocessing module for data preparation.
    Module is compatible with onnx conversion, hence some weird operations.
    
    Attributes:
        type_embed (torch.Tensor): Type embeddings.
        landmark_embed (torch.Tensor): Landmark embeddings.
        ids (torch.Tensor): Landmark IDs.
        to_avg (List[torch.Tensor]): IDs of landmarks to average.
        hands (torch.Tensor): IDs of hand landmarks.
        frames (torch.Tensor): Frame numbers.
        max_len (torch.Tensor): Maximum length.
        
    Methods:
        filter_sign(self, x, truncate=False): Filters sign data by removing frames with missing hand landmarks,
            and striding the sequence to a small enough dimension.
        forward(self, x): Preprocesses the input sign data.
    """
    def __init__(self, type_embed, max_len=50):
        """
        Preprocessing module for data preparation.

        Args:
            type_embed (numpy array): Type embeddings.
            max_len (int, optional): Maximum length. Defaults to 50.
        """
        super(Preprocessing, self).__init__()

        self.type_embed = torch.from_numpy(type_embed[None, :].astype(np.float32))
        self.type_embed = self.type_embed.repeat(1000, 1)

        self.landmark_embed = torch.tensor(np.arange(120)).float().unsqueeze(0) + 1
        self.landmark_embed = self.landmark_embed.repeat(1000, 1)
        
        self.ids = torch.from_numpy(np.concatenate(KEPT_LANDMARKS))
        self.to_avg = [torch.tensor(avg) for avg in TO_AVG]

        self.hands = torch.tensor(
            [468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 488] + 
            [522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542]
        )
        self.frames = torch.tensor(np.arange(1000) + 1)
        self.max_len = torch.tensor([max_len])

    def filter_sign(self, x):
        """
        Filters sign data by removing frames with missing hand landmarks,
        and striding the sequence to a small enough dimension.

        Args:
            x (torch.Tensor): Sign data.

        Returns:
            torch.Tensor: Filtered sign data.
        """
        hands = x[:, self.hands, 0]
        nan_prop = torch.isnan(hands).float().mean(-1)
        x = x[torch.where(nan_prop < 1)[0]]

        length = self.frames[:x.size(0)].max().unsqueeze(0)
        sz = torch.cat([length, self.max_len]).max()
        
        divisor = (((sz - self.max_len) > 0) * (sz / self.max_len) + 1).int()
        ids = (self.frames[:x.size(0)] % divisor) == 0
        
        return x[ids]

    def forward(self, x): 
        """
        Forward pass of the preprocessing module.

        Args:
            x (torch.Tensor): Input sign data.

        Returns:
            torch.Tensor: Processed sign data.
        """
        x = self.filter_sign(x)
            
        n_frames = x.shape[0]     
        
        avg_ids = []
        for ids in self.to_avg:
            avg_id = x[:, ids].mean(1, keepdims=True)
            avg_ids.append(avg_id)

        x = torch.cat([x[:, self.ids]] + avg_ids, 1)

        type_embed = self.type_embed[:n_frames]
        landmark_embed = self.landmark_embed[:n_frames, :x.shape[1]]
        
        # Normalize & fill nans
        nonan = x[~torch.isnan(x)].view(-1, x.shape[-1])
        x = x - nonan.mean(0)[None, None, :]
        x = x / nonan.std(0, unbiased=False)[None, None, :]
        x[torch.isnan(x)] = 0

        # Concat
        x = torch.cat([
            type_embed.unsqueeze(-1).to(x.device), x # , landmark_embed.unsqueeze(-1)
        ], -1).transpose(1, 2)
        
        return x

### Main

In [None]:
df = prepare_data(DATA_PATH, "")

In [None]:
prepro = Preprocessing(type_embed, max_len=25)

In [None]:
SAVE = False

SAVE_FOLDER = "../input/torch_12/"
os.makedirs(SAVE_FOLDER, exist_ok=True)

In [None]:
for i in tqdm(range(len(df['path']))):
    path = df['path'][i]
    name = f"{path.split('/')[-2]}_{path.split('/')[-1].split('.')[0]}.npy"
    
    if SAVE and os.path.exists(SAVE_FOLDER + name):
        continue

    pq, data = load_relevant_data_subset(path)

    data = torch.from_numpy(data).cuda()
    
    out_torch = prepro(data).cpu()

    if SAVE:
        np.save(SAVE_FOLDER + name, out_torch.numpy())

    if not (i % 30000):
        data = {
            "x": out_torch[:, 1],
            "y": out_torch[:, 2],
            "z": out_torch[:, 3],
            "type": out_torch[:, 0],
        }

        plot_sample_with_edges(data, n_frames=4, figsize=(10, 10))

    if not SAVE:
        break

Done ! 