## Modelling

* count real occlusions over the whole dataset

```
Average length of occlusions: 2.92 frames
Average number of occluded frames per video: 24.15 frames
Average percentage of occluded frames per video: 3.17%
Total number of occluded frames: 2994 out of total for the dataset 101156 
```

* create X (three?) times as many fake occlusions with ground truth data

* "Target" for real occlusions: 

1. random numbers
2. linear interpolations
3. --> cubic interpolations

* ✅ add noise
* ✅ create DataLoader object
* define train-val-test split
* create bi-LSTM object
* create training loop
* create testing loop
* create predict function for one video
* save sample .csv of blendshapes for all models (3 so far)

In [1]:
import os
import datetime
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import ArtistAnimation
from sklearn.preprocessing import StandardScaler
from scipy import interpolate
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


# Standardization functions
def fit_and_standardize(data):
    shape = data.shape
    flat = data.reshape((-1, data.shape[-1]))
    scaler = StandardScaler().fit(flat)
    scaled = scaler.transform(flat).reshape(shape)
    return scaled, scaler


def standardize(data, scaler):
    shape = data.shape
    flat = data.reshape((-1, data.shape[-1]))
    scaled = scaler.transform(flat).reshape(shape)
    return scaled


def inv_standardize(data, scaler):
    shape = data.shape
    flat = data.reshape((-1, data.shape[-1]))
    scaled = scaler.inverse_transform(flat).reshape(shape)
    return scaled


# Data processing function
def process_data(data):
    gaps = np.isnan(data)
    all_null = np.sum(np.sum(gaps, axis=1), axis=1) > gaps.shape[1] * gaps.shape[2] * 0.8
    data = data[~all_null, :, :]
    gaps = gaps[~all_null, :, :]
    keep = ~np.isnan(np.sum(data, axis=(1, 2)))
    return data[keep, :, :], gaps[keep, :, :]

In [5]:
import os
import datetime
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import ArtistAnimation
from sklearn.preprocessing import StandardScaler
from scipy import interpolate
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class TimeSeriesDataset(Dataset):
    def __init__(self, data, labels=None, smoothed_velocities=None, occ_prob=0.3, noise_std=0.01):
        """
        Args:
            data (np.array): Ground truth data of shape (num_videos, num_frames, blendshapes_dim). 125 videos, F frames, 51 features
            labels (np.array): Binary occlusion labels of shape (num_videos, num_frames).  (125, F)
            smoothed_velocities (np.array): Smoothed velocity for each frame of the video (num_videos, num_frames).  (125, F)
            occ_prob (float): Probability of adding synthetic occlusions.
            noise_std (float): Standard deviation of Gaussian noise to add to data.
        """
        self.data = data.copy()
        self.labels = labels.copy()
        self.smoothed_velocities = smoothed_velocities.copy()
        self.occ_prob = occ_prob
        self.noise_std = noise_std

        # Add Gaussian noise to the entire dataset
        self.data = self.add_noise(self.data)

        self.data, self.labels, self.mask = self.insert_fake_occlussions() 

    def __len__(self):
        return len(self.data)
    
    def insert_fake_occlussions(self):
        data = self.data.copy()  # (125 videos, F frames, 51 features)
        labels = self.labels.copy()
        # create one hot vector that says which data is real 1 and which data is fake 0
        # mask = np.ones_like(labels, dtype=int)  # 1 for real, 0 for synthetic
        mask = [[1] * len(video) for video in data]

        total_frames = sum([len(video) for video in data])
        occ_p_init = sum([sum(video) for video in labels]) / total_frames
        print(f"\nPercentage of REAL occlusions: {occ_p_init}\n")
        
        for video_i, video in enumerate(data):
            len_video = len(video)
            average_velocity = np.mean(self.smoothed_velocities[video_i])
            std_velocity = np.std(self.smoothed_velocities[video_i])
            print(f"Video {video_i} length: {len_video}")
            for frame_n, frame in enumerate(video):

                # if the current frame velocity is higher than the average + 1 std, then add an occlusion with a probability
                if self.smoothed_velocities[video_i][frame_n] > average_velocity + std_velocity:
                    
                    if np.random.rand() < self.occ_prob:
                        occ_len = int(np.clip(np.random.normal(2.92, 12.05), 2, 24))   # mean and std from the dataset
                        # print(f"Occ len: {occ_len}")
                        # start = np.random.randint(0, len_video - occ_len)
                        start = frame_n
                        end = start + occ_len
                        if end + 1 >= len_video:
                            end = len_video - 2
                        # check if the occlusion is not already there and update mask only where there wasn't an occlusion already
                        mask_update_indices = (np.array(labels[video_i][start:end+1]) != 1)
                        # print(f"Mask update indices: {mask_update_indices}")
                        mask[video_i][start:end+1] = [0 if mask_update_indices[k] else el for k, el in enumerate(mask[video_i][start:end+1])]  # update mask with zeros where fake occlusions are
                        labels[video_i][start:end+1] = [1 if mask_update_indices[k] else el for k, el in enumerate(labels[video_i][start:end+1])] # update labels to have 1 for occlusions
                        # do nothing to the `data` as it stands for ground truth

        # total percentage of all occlusions after inserting fake ones 
        occ_p = sum([sum(video) for video in labels]) / total_frames
        print(f"\nPercentage of occlusions: {occ_p}\n")
        # percentage of masked data
        mask_p = 1 - (sum([sum(video) for video in mask]) / total_frames)
        print(f"\nPercentage of masked data: {mask_p}\n")
        return data, labels, mask
    
    def add_noise(self, data):
        # Add Gaussian noise across all frames in each video
        all_blendshapes = []
        for video in data:
            blendshapes_in_video = []
            for frame in video:
                # order the keys in dict `frame` alphabetically
                frame = dict(sorted(frame.items()))
                blendshapes = frame.values()  # 51 features
                if len(blendshapes) != 51:
                    print("ERROR: Number of blendshapes is not 51")
                    blendshapes = [0.0] * 51
                blendshapes = np.array(list(blendshapes))
                blendshapes_in_video.append(blendshapes)
            # Apply noise across all frames in the video
            blendshapes_in_video = np.array(blendshapes_in_video)  # F, 51
            print(blendshapes_in_video.shape)
            noise = np.random.normal(0, self.noise_std, blendshapes_in_video.shape)
            blendshapes_in_video = blendshapes_in_video + noise
            all_blendshapes.append(blendshapes_in_video)
        # noise = np.random.normal(0, self.noise_std, data_blendshapes_lists.shape)
        return all_blendshapes

    def __getitem__(self, idx):
        sample = self.data[idx]
        mask = self.mask[idx]

        if self.labels is not None:
            sample_label = self.labels[idx]
        else:
            sample_label = None

        return torch.tensor(sample, dtype=torch.float32), \
               torch.tensor(sample_label, dtype=torch.float32), \
               torch.tensor(mask, dtype=torch.float32)
    

def create_dataloader(all_data, batch_size=2, shuffle=True):
    stacked_data = []  # 125 videos, F frames, 51 features
    all_labels = []   # 125 videos, F frames
    smoothed_velocities = []  # 125 videos, F frames
    for video, video_frames in all_data.items():
        data = []  # F, 51
        labels = []  # F
        smoothed_velocities_per_video = []  # F
        for frame in video_frames:
            label = frame.pop("occluded")
            smoothed_velocity = frame.pop("smoothed_velocity")  # to create fake occlusions mostly where the changes are and not in the rest frames (window=5)
            for k in ["Timecode", "BlendshapeCount", "velocity"]:
                frame.pop(k, None)
            data.append(frame)  # 51 features
            labels.append(label)
            smoothed_velocities_per_video.append(smoothed_velocity)
        stacked_data.append(data)
        all_labels.append(labels)
        smoothed_velocities.append(smoothed_velocities_per_video)  # 125 videos, F frames

    print(len(stacked_data[0]))
    # print(stacked_data[0])
    print(len(stacked_data))
    print(len(all_labels))
    dataset = TimeSeriesDataset(stacked_data, all_labels, smoothed_velocities)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

In [3]:
# read occlusions
with np.load("occlusions_results.npz", allow_pickle=True) as data:
    video_occlusions_frames = data["video_occlusions"].item()
print(len(video_occlusions_frames))

"""# read occlusions
with np.load("occlusions_results_timecodes.npz", allow_pickle=True) as data:
    video_occlusions = data["video_occlusions"].item()
print(len(video_occlusions))"""

125


'# read occlusions\nwith np.load("occlusions_results_timecodes.npz", allow_pickle=True) as data:\n    video_occlusions = data["video_occlusions"].item()\nprint(len(video_occlusions))'

In [4]:
# read blendshapes from npz
with np.load("blendshapes_timecodes_velocities_cubic_interpolated.npz", allow_pickle=True) as data:
    video_blendshapes_cubic = data["video_blendshapes"].item()
print(len(video_blendshapes_cubic))


def add_occlusion_labels(video_blendshapes, occlusions):
    for video_name, frames in video_blendshapes.items():
        occlusion_frames = occlusions.get(video_name, [])  # get occlusions for video_name, if not found return empty list
        occlusion_labels = np.zeros(len(frames), dtype=int)
        
        for start, end in occlusion_frames:
            occlusion_labels[start:end+1] = 1
        
        for i, frame in enumerate(frames):
            frame['occluded'] = occlusion_labels[i]
    
    return video_blendshapes

# Add occlusion labels to video_blendshapes_cubic
video_blendshapes_cubic = add_occlusion_labels(video_blendshapes_cubic, video_occlusions_frames)
video_blendshapes_cubic["varg_002_2_pmil"][0]["occluded"]

125


0

In [6]:
test_loader = create_dataloader(video_blendshapes_cubic, batch_size=32, shuffle=True)

354
125
125
(354, 51)
(824, 51)
(387, 51)
(1200, 51)
(583, 51)
(169, 51)
(1046, 51)
(1121, 51)
(1065, 51)
(778, 51)
(1319, 51)
(1169, 51)
(636, 51)
(521, 51)
(479, 51)
(381, 51)
(1295, 51)
(519, 51)
(531, 51)
(1358, 51)
(480, 51)
(456, 51)
(937, 51)
(437, 51)
(2060, 51)
(359, 51)
(838, 51)
(955, 51)
(951, 51)
(153, 51)
(205, 51)
(376, 51)
(937, 51)
(867, 51)
(338, 51)
(2118, 51)
(1136, 51)
(983, 51)
(344, 51)
(562, 51)
(936, 51)
(442, 51)
(656, 51)
(720, 51)
(504, 51)
(524, 51)
(974, 51)
(1282, 51)
(410, 51)
(563, 51)
(938, 51)
(942, 51)
(747, 51)
(438, 51)
(1057, 51)
(311, 51)
(1191, 51)
(328, 51)
(1557, 51)
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is not 51
ERROR: Number of blendshapes is 

In [7]:
for batch in test_loader:
    print(batch[0].shape)
    print(batch[1].shape)
    print(batch[2].shape)
    break

: 

In [None]:
# create a test train val split
video_blendshapes_cubic