### a. Feature Extraction

In [150]:
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt

def draw_pose(points, 
              image_size=(135, 135), 
              padding=1, 
              point_radius=2, 
              line_thickness=2, 
              color=(0, 0, 255), 
              file_name='test'):
    """
    Draws 2D pose points on a white background image.

    :param points: List of tuples (x, y) representing the keypoints.
    :param image_size: Tuple (width, height) of the output image.
    :param point_radius: Radius of the circle to draw for each point.
    :param line_thickness: Thickness of the line to connect keypoints.
    :param color: Color of the points and lines (B, G, R).
    :return: Image with pose drawn.
    """
    # Create a white background image
    image = np.full((image_size[1], image_size[0], 3), 255, dtype=np.uint8)

    # Convert normalized points (-1 to 1) to pixel coordinates
    scaled_points = [(int((x + 1) * 0.3 * (image_size[0])), int((y + 1) * 0.3 * (image_size[1]))) for x, y, z in points]

    # Add custum connections
    # Draw lines between points (optional, depends on the structure of your points)
    # for i in range(len(scaled_points) - 1):
    #     rand_color = (random.randint(0,255), random.randint(0,255),random.randint(0,255))
    #     cv2.line(image, scaled_points[i], scaled_points[i+1], rand_color, line_thickness)

    # Draw points
    for point in scaled_points:
        rand_color = (random.randint(0,255), random.randint(0,255),random.randint(0,255))
        cv2.circle(image, point, point_radius, rand_color, -1)  # -1 fills the circle

    plt.imsave(f'{file_name}.png', image)

In [151]:
# # Example pose points (you can update this with real data)
# pose_points = [(100, 100), (150, 200), (200, 300), (250, 100), (300, 200)]

# # Generate the image
# draw_pose(pose_points)

In [152]:
import os
import glob

POSE_DATA_PATH = 'toy-dataset/pose/'
FILES = glob.glob(os.path.join(POSE_DATA_PATH, '*.npy'))

In [153]:
import numpy as np
import pandas as pd

def open_poses(pose_file = FILES[1]):

    with open(pose_file,'rb') as f:
        pose_df = np.load(f)
        pose_df = pd.DataFrame(pose_df)
        pose_df = pose_df.replace(np.nan, 0)

    return pose_df
    

In [154]:
df = open_poses()

In [161]:
arr = df.iloc[35].to_numpy()
arr = arr.reshape(int(arr.shape[0] / 3), 3)

draw_pose(
    points=arr,
    image_size=(2120, 2120), 
    padding=1, 
    point_radius=5, 
    line_thickness=5, 
    color=(0, 0, 255), 
    file_name='test'
    )

### b. Wav2Vec2 Test

In [4]:
# !pip3 install 'urllib3<2' soundfile librosa torch transformers torchaudio
!pip3 install datasets

Collecting datasets
  Downloading datasets-2.19.1-py3-none-any.whl.metadata (19 kB)
Collecting pyarrow-hotfix (from datasets)
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl.metadata (3.6 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.4.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Downloading datasets-2.19.1-py3-none-any.whl (542 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m542.0/542.0 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading multiprocess-0.70.16-py311-none-any.whl (143 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.5/143.5 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━

In [4]:
import torch
from transformers import AutoFeatureExtractor, Wav2Vec2ForPreTraining
from transformers.models.wav2vec2.modeling_wav2vec2 import _compute_mask_indices, _sample_negative_indices
from datasets import load_dataset

def sample_infer():

    feature_extractor = AutoFeatureExtractor.from_pretrained("facebook/wav2vec2-base")
    model = Wav2Vec2ForPreTraining.from_pretrained("facebook/wav2vec2-base")

    ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
    input_values = feature_extractor(ds[0]["audio"]["array"], return_tensors="pt").input_values  # Batch size 1

    # compute masked indices
    batch_size, raw_sequence_length = input_values.shape
    sequence_length = model._get_feat_extract_output_lengths(raw_sequence_length).item()
    mask_time_indices = _compute_mask_indices(
        shape=(batch_size, sequence_length), mask_prob=0.2, mask_length=2
    )
    sampled_negative_indices = _sample_negative_indices(
        features_shape=(batch_size, sequence_length),
        num_negatives=model.config.num_negatives,
        mask_time_indices=mask_time_indices,
    )
    mask_time_indices = torch.tensor(data=mask_time_indices, device=input_values.device, dtype=torch.long)
    sampled_negative_indices = torch.tensor(
        data=sampled_negative_indices, device=input_values.device, dtype=torch.long
    )

    with torch.no_grad():
        outputs = model(input_values, mask_time_indices=mask_time_indices)

    # compute cosine similarity between predicted (=projected_states) and target (=projected_quantized_states)
    cosine_sim = torch.cosine_similarity(outputs.projected_states, outputs.projected_quantized_states, dim=-1)

    # show that cosine similarity is much higher than random
    cosine_sim[mask_time_indices.to(torch.bool)].mean() > 0.5

    # for contrastive loss training model should be put into train mode
    model = model.train()

    output = model(
        input_values, mask_time_indices=mask_time_indices, sampled_negative_indices=sampled_negative_indices
    )


    return feature_extractor, model, ds, input_values, mask_time_indices, sampled_negative_indices, outputs, cosine_sim, output

In [5]:
feature_extractor, model, ds, input_values, mask_time_indices, sampled_negative_indices, outputs, cosine_sim, output = sample_infer()

Some weights of the model checkpoint at facebook/wav2vec2-base were not used when initializing Wav2Vec2ForPreTraining: ['wav2vec2.encoder.pos_conv_embed.conv.weight_g', 'wav2vec2.encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing Wav2Vec2ForPreTraining from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing Wav2Vec2ForPreTraining from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of Wav2Vec2ForPreTraining were not initialized from the model checkpoint at facebook/wav2vec2-base and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should prob

In [6]:
from lib.utils.sign2vec import Sign2VecFeatureEncoder
from transformers.models.wav2vec2.modeling_wav2vec2 import Wav2Vec2FeatureProjection

In [7]:
# Video conv 1d layer to process the video input
# (B x C x T x H x W) -> (B x C x T x H x W)
import torch
import torch.nn as nn

def conv3x3x3(in_planes, out_planes, stride=1):
    return nn.Conv3d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
    batch_size = 1
    channels = 3
    time = 512
    height = 75
    width = 75

    input_video = torch.randn(
        batch_size, channels, time, height, width
    )

    print('INPUT_SHAPE', input_video.shape)

    output = nn.Conv3d(
        in_channels=3,
        out_channels=10,
        kernel_size=(10, 10, 10),
        stride=(1, 1, 1),
        padding=(0, 1, 1),
        dilation=(1, 1, 1),
        groups=1,
        bias=True,
        padding_mode='zeros'
    )(input_video)

    print('LAYER_O OUTPUT_SHAPE',output.shape)

    output = nn.Conv3d(
        in_channels=10,
        out_channels=20,
        kernel_size=(10, output.shape[3], output.shape[4]),
        stride=(1, 1, 1),
        padding=(0, 1, 1),
        dilation=(1, 1, 1),
        groups=1,
        bias=True,
        padding_mode='zeros'
    )(output)

    print('LAYER_1 OUTPUT_SHAPE',output.shape)

    output = nn.Conv3d(
        in_channels=20,
        out_channels=50,
        kernel_size=(10, output.shape[3], output.shape[4]),
        stride=(1, 1, 1),
        padding=(0, 1, 1),
        dilation=(1, 1, 1),
        groups=1,
        bias=True,
        padding_mode='zeros'
    )(output)

    print('LAYER_1 OUTPUT_SHAPE',output.shape)

    # output = output.reshape(output.shape[0] * output.shape[1], output.shape[2], output.shape[3], output.shape[4])
    output = output.transpose(1,2)
    # merge channel and (height, width) dimensions with einsum
    output = output.reshape(output.shape[0], output.shape[1], -1)

    print('LAYER_1 OUTPUT_SHAPE',output.shape)

In [8]:
config = model.config

In [9]:
conv_3d_layer_dict = [
    { 'in_channels': 3,  'out_channels': 10, 'kernel_size': ( 5,  5,  5 ), 'stride': (1, 1, 1), 'padding': (0, 1, 1) }, 
    { 'in_channels': 10, 'out_channels': 20, 'kernel_size': ( 2,  5,  5 ), 'stride': (1, 2, 2), 'padding': (0, 1, 1) }, 
    { 'in_channels': 20, 'out_channels': 20, 'kernel_size': ( 2,  2,  2 ), 'stride': (1, 2, 2), 'padding': (0, 1, 1) }, 
    { 'in_channels': 20, 'out_channels': 20, 'kernel_size': ( 2,  2,  2 ), 'stride': (1, 2, 2), 'padding': (0, 1, 1) }, 
    { 'in_channels': 20, 'out_channels': 20, 'kernel_size': ( 1,  2,  2 ), 'stride': (1, 1, 1), 'padding': (0, 1, 1) }, 
    { 'in_channels': 20, 'out_channels': 30, 'kernel_size': ( 1,  2,  1 ), 'stride': (1, 1, 1), 'padding': (0, 1, 1) }, 
    { 'in_channels': 30, 'out_channels': 5,  'kernel_size': ( 1,  2,  2 ), 'stride': (1, 1, 1), 'padding': (0, 1, 1) }, 
    { 'in_channels': 30, 'out_channels': 5,  'kernel_size': ( 2,  2,  2 ), 'stride': (1, 1, 1), 'padding': (0, 1, 1) }, 
]

In [10]:
conv_channels = []
conv_kernel = []
conv_stride = []

for layer in conv_3d_layer_dict:
    conv_channels.append(layer['out_channels'])
    conv_kernel.append(layer['kernel_size'])
    conv_stride.append(layer['stride'])


config.conv_3d_channels = conv_channels
config.conv_3d_kernel = conv_kernel
config.conv_3d_stride = conv_stride
config.num_3d_feat_extract_layers = len(conv_3d_layer_dict)

In [24]:
from transformers.activations import ACT2FN
from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2GroupNormConvLayer,
    Wav2Vec2LayerNormConvLayer,
    Wav2Vec2NoLayerNormConvLayer,
)

from lib.utils.sign2vec import (
    Sign2VecGroupNormConvLayer,
    Sign2VecLayerNormConvLayer,
    Sign2VecNoLayerNormConvLayer,
)

class Sign2VecFeatureEncoder(nn.Module):
    """Construct the features from raw audio waveform"""

    def __init__(self, config):
        super().__init__()

        # 3D Convolutional Layers - to spatio-temporally downsample the input
        if config.feat_extract_norm == "group":
            conv_layers = [Sign2VecGroupNormConvLayer(config, layer_id=0)] + [
                Sign2VecNoLayerNormConvLayer(config, layer_id=i + 1) for i in range(config.num_3d_feat_extract_layers - 1)
            ]
        elif config.feat_extract_norm == "layer":
            conv_layers = [
                Sign2VecLayerNormConvLayer(config, layer_id=i) for i in range(config.num_3d_feat_extract_layers)
            ]
        else:
            raise ValueError(
                f"`config.feat_extract_norm` is {config.feat_extract_norm}, but has to be one of ['group', 'layer']"
            )
        
        self.conv_3d_layers = nn.ModuleList(conv_layers)


        if config.feat_extract_norm == "group":
            conv_layers =  [
                Wav2Vec2NoLayerNormConvLayer(config, layer_id=i + 1) for i in range(config.num_feat_extract_layers - 1)
            ]
        elif config.feat_extract_norm == "layer":
            conv_layers = [
                Wav2Vec2LayerNormConvLayer(config, layer_id=i) for i in range(config.num_feat_extract_layers)
            ]
        else:
            raise ValueError(
                f"`config.feat_extract_norm` is {config.feat_extract_norm}, but has to be one of ['group', 'layer']"
            )
        
        self.conv_layers = nn.ModuleList(conv_layers)

        self.gradient_checkpointing = False
        self._requires_grad = True

    def _freeze_parameters(self):
        for param in self.parameters():
            param.requires_grad = False
        self._requires_grad = False

    def forward(self, hidden_states):
        # hidden_states: (batch_size, channels, time_steps, height, width)
        # make sure hidden_states require grad for gradient_checkpointing
        if self._requires_grad and self.training:
            hidden_states.requires_grad = True

        for ix, conv_layer in enumerate(self.conv_3d_layers):
            if self._requires_grad and self.gradient_checkpointing and self.training:

                def create_custom_forward(module):
                    def custom_forward(*inputs):
                        return module(*inputs)

                    return custom_forward

                hidden_states = torch.utils.checkpoint.checkpoint(
                    create_custom_forward(conv_layer),
                    hidden_states,
                )
            else:
                hidden_states = conv_layer(hidden_states)

        hidden_states = hidden_states.transpose(1,2)
        # merge (channel) and (height, width) dimensions
        hidden_states = hidden_states.reshape(hidden_states.shape[0], hidden_states.shape[1], -1)
        hidden_states = hidden_states.transpose(1,2)

        print('HIDDEN_STATES', hidden_states.shape)

        for ix, conv_layer in enumerate(self.conv_layers):
            if self._requires_grad and self.gradient_checkpointing and self.training:

                def create_custom_forward(module):
                    def custom_forward(*inputs):
                        return module(*inputs)

                    return custom_forward

                hidden_states = torch.utils.checkpoint.checkpoint(
                    create_custom_forward(conv_layer),
                    hidden_states,
                )
            else:
                hidden_states = conv_layer(hidden_states)


        return hidden_states

In [27]:
config.conv_dim[0] = 660

In [28]:
feature_extractor = Sign2VecFeatureEncoder(config)
feature_projection = Wav2Vec2FeatureProjection(config)

In [29]:
batch_size = 1
channels = 3
time = 256
height = 128
width = 128

input_video = torch.randn(
    batch_size, channels, time, height, width
)

extract_features = feature_extractor(input_video)
extract_features.shape  # (batch_size, num_3d_feat_extract_layers, T, H, W)
# extract_features = extract_features.transpose(1, 2)

# extract_features.shape

3d LAYER: 0
LAYER_INPUT: torch.Size([1, 3, 256, 128, 128])
CONV_LAYER: Sign2VecGroupNormConvLayer(
  (conv): Conv3d(3, 10, kernel_size=(5, 5, 5), stride=(1, 1, 1), bias=False)
  (activation): GELUActivation()
  (layer_norm): GroupNorm(10, 10, eps=1e-05, affine=True)
)
-------------------
3d LAYER: 1
LAYER_INPUT: torch.Size([1, 10, 252, 124, 124])
CONV_LAYER: Sign2VecNoLayerNormConvLayer(
  (conv): Conv3d(10, 20, kernel_size=(2, 5, 5), stride=(1, 2, 2), bias=False)
  (activation): GELUActivation()
)
-------------------
3d LAYER: 2
LAYER_INPUT: torch.Size([1, 20, 251, 60, 60])
CONV_LAYER: Sign2VecNoLayerNormConvLayer(
  (conv): Conv3d(20, 20, kernel_size=(2, 2, 2), stride=(1, 2, 2), bias=False)
  (activation): GELUActivation()
)
-------------------
3d LAYER: 3
LAYER_INPUT: torch.Size([1, 20, 250, 30, 30])
CONV_LAYER: Sign2VecNoLayerNormConvLayer(
  (conv): Conv3d(20, 20, kernel_size=(2, 2, 2), stride=(1, 2, 2), bias=False)
  (activation): GELUActivation()
)
-------------------
3d LAYER: 

torch.Size([1, 512, 3])

In [None]:
from transformers.models.wav2vec2.modeling_wav2vec2 import Wav2Vec2FeatureEncoder

feature_extractor_audio = Wav2Vec2FeatureEncoder(config)

with torch.no_grad():
    extract_features_audio = feature_extractor_audio(input_values)

extract_features_audio.shape

torch.Size([1, 512, 292])

In [None]:
sequence_length

292

In [None]:

extract_features = feature_extractor(input_video)
extract_features.shape
# extract_features = extract_features.transpose(1, 2)

# extract_features.shape

In [11]:
print(input_values.shape)
print(mask_time_indices.shape)
print(sampled_negative_indices.shape)

torch.Size([1, 93680])
torch.Size([1, 292])
torch.Size([1, 292, 100])


In [19]:
for val in output.__dict__.keys():
    try:
        print(val, output[val].size() if len(output[val].size()) > 0 else output[val].item())
    except:
        print(val, 'no shape')
    print('---')  

loss 30.22791290283203
---
projected_states torch.Size([1, 292, 256])
---
projected_quantized_states torch.Size([1, 292, 256])
---
codevector_perplexity 100.37718200683594
---
hidden_states no shape
---
attentions no shape
---
contrastive_loss 25.59052848815918
---
diversity_loss 46.37383270263672
---
