In [1]:
import torchaudio
import pandas as pd
import glob
import numpy as np
from tqdm import tqdm
from torchaudio import transforms
import torch
from typing import Any, Callable, Dict, Sequence, Tuple, Union
SequenceOrTensor = Union[Sequence, torch.Tensor]



In [2]:
if __name__ == '__main__':
    # TODO: MAKE THIS READ TEH PATH FROM ARGPARSE
    file_path = os.path.abspath("").replace("data", "docs") + "/train_val_test_dist.xlsx"
    train_test_dist = pd.read_excel(file_path)
    

In [3]:
train_files = train_test_dist[train_test_dist.set == "Train"].video.unique()
test_files = train_test_dist[train_test_dist.set == "Test"].video.unique()
val_files = train_test_dist[train_test_dist.set == "Val"].video.unique()

In [4]:
def assemble_path(file_name):
    file_path = os.path.abspath("").replace("data", "raw_videos/") + file_name + "/"
    audio_file = file_path + "audio/" + file_name + ".wav"
    keypoints_folder = file_path + "keypoints_reduced/" 

    return audio_file, keypoints_folder

def find_keypoints(keypoints_folder):
    keypoints = []
    # open the files while sorting as the order matters
    frames_list = sorted(glob.glob(keypoints_folder + "*.txt"))
    # read each txt with the samples
    for frame in frames_list:
        keypoints.append(read_keypoint_txt(frame))
    
    # transform to numpy array to make processing easier
    keypoints = np.array(keypoints) 

    return keypoints

def read_keypoint_txt(keypoints_folder):
    keypoints = np.loadtxt(keypoints_folder, delimiter=",")
    return keypoints

def read_audio_file(path=""):
    """
    Reads the specifies file in path and returns it in Tensor format
    """
    # fix to work on windows
    path = path.replace("\\", "\\")
    return torchaudio.load(path)

def extract_mfcc(audio_file_path):
    audio, sr =  read_audio_file(audio_file_path)
    mfcc = transforms.MFCC(sample_rate=sr, melkwargs={"n_mels": 40})
    coefs = mfcc(audio)
    return coefs

def assemble_set(train_test_dist, set_name="Train"):
    file_names = train_test_dist[train_test_dist.set == set_name].video.unique()
    audio_files = []
    keypoints_list = []
    ds = []
    file_names = tqdm(file_names)
    for file_name in file_names:
        # get  the files to process
        audio_file_path, keypoints_folder = assemble_path(file_name)
        # extract keypoints
        keypoints = find_keypoints(keypoints_folder)
        # extract mfccs
        mfccs = extract_mfcc(audio_file_path)
        # append to the dataset list
        ds.append([mfccs, keypoints] )
    return np.array(ds, dtype=object)

for set_dist in ["Train", "Val", "Test"]:
    dataset_folder = os.path.abspath("").replace("data", "dataset/")

    ds = assemble_set(train_test_dist, set_name=set_dist)
    np.save(f"{dataset_folder}{set_dist}", ds, allow_pickle=True)
    



100%|██████████| 80/80 [00:23<00:00,  3.35it/s]
100%|██████████| 21/21 [00:06<00:00,  3.45it/s]
100%|██████████| 21/21 [00:06<00:00,  3.48it/s]


torch.Size([1, 40, 2869])

In [5]:
np.load(f"{dataset_folder}{set_dist}.npy", allow_pickle=True)[0]

array([tensor([[[-3.5612e+02, -3.5448e+02, -2.9898e+02,  ..., -3.5612e+02,
          -3.5612e+02, -3.5612e+02],
         [ 3.8560e-05,  1.4992e+00,  3.1084e+01,  ...,  3.8560e-05,
           3.8560e-05,  3.8560e-05],
         [-1.8065e-05, -3.5047e-01, -1.9789e+01,  ..., -1.8065e-05,
          -1.8065e-05, -1.8065e-05],
         ...,
         [ 6.1035e-05, -1.0891e-01,  1.3865e+00,  ...,  6.1035e-05,
           6.1035e-05,  6.1035e-05],
         [ 1.7593e-05,  2.6186e-02, -3.1844e+00,  ...,  1.7593e-05,
           1.7593e-05,  1.7593e-05],
         [ 1.5259e-04,  6.3477e-02,  1.4752e+00,  ...,  1.5259e-04,
           1.5259e-04,  1.5259e-04]]]),
       array([[[ 67., 130.],
        [ 67., 145.],
        [ 71., 158.],
        ...,
        [135., 181.],
        [129., 181.],
        [123., 181.]],

       [[ 67., 130.],
        [ 69., 145.],
        [ 70., 158.],
        ...,
        [135., 181.],
        [129., 181.],
        [123., 179.]],

       [[ 66., 130.],
        [ 66., 145.],
 

In [6]:
def scale_keypoints(keypoints, max_size=(256, 256)):
    # divide all x and y coordinates by the corresponding axis max
    # this is defined on the image processing stage
    keypoints[:, :, 0] = keypoints[:, :, 0]/max_size[0]
    keypoints[:, :, 1] = keypoints[:, :, 1]/max_size[1]
    
    return keypoints

In [7]:
import torch


class BaseDataset(torch.utils.data.Dataset):
    """
    Base Dataset class that simply processes data and targets through optional transforms.

    Read more: https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset

    Parameters
    ----------
    data
        commonly these are torch tensors, numpy arrays, or PIL Images
    targets
        commonly these are torch tensors or numpy arrays
    transform
        function that takes a datum and returns the same
    target_transform
        function that takes a target and returns the same
    """

    def __init__(
        self,
        data: SequenceOrTensor,
        targets: SequenceOrTensor,
        transform: Callable = None,
        target_transform: Callable = None,
        
    ) -> None:
        if len(data) != len(targets):
            raise ValueError("Data and targets must be of equal length")
        super().__init__()
        self.data = data
        self.targets = targets
        self.transform = transform
        self.target_transform = target_transform


    def __len__(self) -> int:
        """Return length of the dataset."""
        return len(self.data)

    def __getitem__(self, index: int) -> Tuple[Any, Any]:
        """
        Return a datum and its target, after processing by transforms.

        Parameters
        ----------
        index

        Returns
        -------
        (datum, target)
        """
        datum, target = self.data[index], self.targets[index]
        
        if self.transform is not None:
            datum = self.transform(datum)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return datum, target



In [8]:
dataset = np.load(f"{dataset_folder}{set_dist}.npy", allow_pickle=True)
data = dataset[:, 0]
targets = dataset[:, 1]
train_set  = BaseDataset(data=data, targets=targets, target_transform=scale_keypoints)

In [9]:
aud, kp = train_set[0]
kp

array([[[0.26171875, 0.5078125 ],
        [0.26171875, 0.56640625],
        [0.27734375, 0.6171875 ],
        ...,
        [0.52734375, 0.70703125],
        [0.50390625, 0.70703125],
        [0.48046875, 0.70703125]],

       [[0.26171875, 0.5078125 ],
        [0.26953125, 0.56640625],
        [0.2734375 , 0.6171875 ],
        ...,
        [0.52734375, 0.70703125],
        [0.50390625, 0.70703125],
        [0.48046875, 0.69921875]],

       [[0.2578125 , 0.5078125 ],
        [0.2578125 , 0.56640625],
        [0.26953125, 0.6171875 ],
        ...,
        [0.5234375 , 0.70703125],
        [0.5       , 0.70703125],
        [0.4765625 , 0.70703125]],

       ...,

       [[0.25390625, 0.5078125 ],
        [0.26171875, 0.56640625],
        [0.26953125, 0.6171875 ],
        ...,
        [0.52734375, 0.70703125],
        [0.5       , 0.69921875],
        [0.4765625 , 0.69921875]],

       [[0.25390625, 0.5078125 ],
        [0.26171875, 0.56640625],
        [0.26953125, 0.62890625],
        .

In [10]:
sample = targets[0]

In [11]:
sample[:,:, 1].shape

(261, 68)