In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.optim.lr_scheduler import StepLR, MultiStepLR

from torchvision import transforms
import numpy as np
import torch.nn.functional as F
import cv2

from tqdm import tqdm
from operator import add

import os
import sys
from collections import OrderedDict

import csv
import json
import math
import os
import random
import glob
import torch.utils.data as data_utl
import numbers

# Using the I3D model:

In [None]:
class Identity(torch.nn.Module):
    def __init__(self):
        super(Identity, self).__init__()

    def forward(self, x):
        return x

class MaxPool3dSamePadding(nn.MaxPool3d):

    def compute_pad(self, dim, s):
        if s % self.stride[dim] == 0:
            return max(self.kernel_size[dim] - self.stride[dim], 0)
        else:
            return max(self.kernel_size[dim] - (s % self.stride[dim]), 0)

    def forward(self, x):
        # compute 'same' padding
        (batch, channel, t, h, w) = x.size()
        #print t,h,w
        out_t = np.ceil(float(t) / float(self.stride[0]))
        out_h = np.ceil(float(h) / float(self.stride[1]))
        out_w = np.ceil(float(w) / float(self.stride[2]))
        #print out_t, out_h, out_w
        pad_t = self.compute_pad(0, t)
        pad_h = self.compute_pad(1, h)
        pad_w = self.compute_pad(2, w)
        #print pad_t, pad_h, pad_w

        pad_t_f = pad_t // 2
        pad_t_b = pad_t - pad_t_f
        pad_h_f = pad_h // 2
        pad_h_b = pad_h - pad_h_f
        pad_w_f = pad_w // 2
        pad_w_b = pad_w - pad_w_f

        pad = (pad_w_f, pad_w_b, pad_h_f, pad_h_b, pad_t_f, pad_t_b)
        #print x.size()
        #print pad
        x = F.pad(x, pad)
        return super(MaxPool3dSamePadding, self).forward(x)


class Unit3D(nn.Module):

    def __init__(self, in_channels,
                 output_channels,
                 kernel_shape=(1, 1, 1),
                 stride=(1, 1, 1),
                 padding=0,
                 activation_fn=F.relu,
                 use_batch_norm=True,
                 use_bias=False,
                 name='unit_3d'):

        """Initializes Unit3D module."""
        super(Unit3D, self).__init__()

        self._output_channels = output_channels
        self._kernel_shape = kernel_shape
        self._stride = stride
        self._use_batch_norm = use_batch_norm
        self._activation_fn = activation_fn
        self._use_bias = use_bias
        self.name = name
        self.padding = padding

        self.conv3d = nn.Conv3d(in_channels=in_channels,
                                out_channels=self._output_channels,
                                kernel_size=self._kernel_shape,
                                stride=self._stride,
                                padding=0, # we always want padding to be 0 here. We will dynamically pad based on input size in forward function
                                bias=self._use_bias)

        if self._use_batch_norm:
            self.bn = nn.BatchNorm3d(self._output_channels, eps=0.001, momentum=0.01)

    def compute_pad(self, dim, s):
        if s % self._stride[dim] == 0:
            return max(self._kernel_shape[dim] - self._stride[dim], 0)
        else:
            return max(self._kernel_shape[dim] - (s % self._stride[dim]), 0)


    def forward(self, x):
        # compute 'same' padding
        (batch, channel, t, h, w) = x.size()
        #print t,h,w
        out_t = np.ceil(float(t) / float(self._stride[0]))
        out_h = np.ceil(float(h) / float(self._stride[1]))
        out_w = np.ceil(float(w) / float(self._stride[2]))
        #print out_t, out_h, out_w
        pad_t = self.compute_pad(0, t)
        pad_h = self.compute_pad(1, h)
        pad_w = self.compute_pad(2, w)
        #print pad_t, pad_h, pad_w

        pad_t_f = pad_t // 2
        pad_t_b = pad_t - pad_t_f
        pad_h_f = pad_h // 2
        pad_h_b = pad_h - pad_h_f
        pad_w_f = pad_w // 2
        pad_w_b = pad_w - pad_w_f

        pad = (pad_w_f, pad_w_b, pad_h_f, pad_h_b, pad_t_f, pad_t_b)
        #print x.size()
        #print pad
        x = F.pad(x, pad)
        #print x.size()

        x = self.conv3d(x)
        if self._use_batch_norm:
            x = self.bn(x)
        if self._activation_fn is not None:
            x = self._activation_fn(x)
        return x



class InceptionModule(nn.Module):
    def __init__(self, in_channels, out_channels, name):
        super(InceptionModule, self).__init__()

        self.b0 = Unit3D(in_channels=in_channels, output_channels=out_channels[0], kernel_shape=[1, 1, 1], padding=0,
                         name=name+'/Branch_0/Conv3d_0a_1x1')
        self.b1a = Unit3D(in_channels=in_channels, output_channels=out_channels[1], kernel_shape=[1, 1, 1], padding=0,
                          name=name+'/Branch_1/Conv3d_0a_1x1')
        self.b1b = Unit3D(in_channels=out_channels[1], output_channels=out_channels[2], kernel_shape=[3, 3, 3],
                          name=name+'/Branch_1/Conv3d_0b_3x3')
        self.b2a = Unit3D(in_channels=in_channels, output_channels=out_channels[3], kernel_shape=[1, 1, 1], padding=0,
                          name=name+'/Branch_2/Conv3d_0a_1x1')
        self.b2b = Unit3D(in_channels=out_channels[3], output_channels=out_channels[4], kernel_shape=[3, 3, 3],
                          name=name+'/Branch_2/Conv3d_0b_3x3')
        self.b3a = MaxPool3dSamePadding(kernel_size=[3, 3, 3],
                                stride=(1, 1, 1), padding=0)
        self.b3b = Unit3D(in_channels=in_channels, output_channels=out_channels[5], kernel_shape=[1, 1, 1], padding=0,
                          name=name+'/Branch_3/Conv3d_0b_1x1')
        self.name = name

    def forward(self, x):
        b0 = self.b0(x)
        b1 = self.b1b(self.b1a(x))
        b2 = self.b2b(self.b2a(x))
        b3 = self.b3b(self.b3a(x))
        return torch.cat([b0,b1,b2,b3], dim=1)


class InceptionI3d(nn.Module):
    """Inception-v1 I3D architecture.
    The model is introduced in:
        Quo Vadis, Action Recognition? A New Model and the Kinetics Dataset
        Joao Carreira, Andrew Zisserman
        https://arxiv.org/pdf/1705.07750v1.pdf.
    See also the Inception architecture, introduced in:
        Going deeper with convolutions
        Christian Szegedy, Wei Liu, Yangqing Jia, Pierre Sermanet, Scott Reed,
        Dragomir Anguelov, Dumitru Erhan, Vincent Vanhoucke, Andrew Rabinovich.
        http://arxiv.org/pdf/1409.4842v1.pdf.
    """

    # Endpoints of the model in order. During construction, all the endpoints up
    # to a designated `final_endpoint` are returned in a dictionary as the
    # second return value.
    VALID_ENDPOINTS = (
        'Conv3d_1a_7x7',
        'MaxPool3d_2a_3x3',
        'Conv3d_2b_1x1',
        'Conv3d_2c_3x3',
        'MaxPool3d_3a_3x3',
        'Mixed_3b',
        'Mixed_3c',
        'MaxPool3d_4a_3x3',
        'Mixed_4b',
        'Mixed_4c',
        'Mixed_4d',
        'Mixed_4e',
        'Mixed_4f',
        'MaxPool3d_5a_2x2',
        'Mixed_5b',
        'Mixed_5c',
        'Logits',
        'Predictions',
    )

    def __init__(self, num_classes=400, spatial_squeeze=True,
                 final_endpoint='Logits', name='inception_i3d', in_channels=3, dropout_keep_prob=0.5):
        """Initializes I3D model instance.
        Args:
          num_classes: The number of outputs in the logit layer (default 400, which
              matches the Kinetics dataset).
          spatial_squeeze: Whether to squeeze the spatial dimensions for the logits
              before returning (default True).
          final_endpoint: The model contains many possible endpoints.
              `final_endpoint` specifies the last endpoint for the model to be built
              up to. In addition to the output at `final_endpoint`, all the outputs
              at endpoints up to `final_endpoint` will also be returned, in a
              dictionary. `final_endpoint` must be one of
              InceptionI3d.VALID_ENDPOINTS (default 'Logits').
          name: A string (optional). The name of this module.
        Raises:
          ValueError: if `final_endpoint` is not recognized.
        """

        if final_endpoint not in self.VALID_ENDPOINTS:
            raise ValueError('Unknown final endpoint %s' % final_endpoint)

        super(InceptionI3d, self).__init__()
        self._num_classes = num_classes
        self._spatial_squeeze = spatial_squeeze
        self._final_endpoint = final_endpoint
        self.logits = None

        if self._final_endpoint not in self.VALID_ENDPOINTS:
            raise ValueError('Unknown final endpoint %s' % self._final_endpoint)

        self.end_points = {}
        end_point = 'Conv3d_1a_7x7'
        self.end_points[end_point] = Unit3D(in_channels=in_channels, output_channels=64, kernel_shape=[7, 7, 7],
                                            stride=(2, 2, 2), padding=(3,3,3),  name=name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'MaxPool3d_2a_3x3'
        self.end_points[end_point] = MaxPool3dSamePadding(kernel_size=[1, 3, 3], stride=(1, 2, 2),
                                                             padding=0)
        if self._final_endpoint == end_point: return

        end_point = 'Conv3d_2b_1x1'
        self.end_points[end_point] = Unit3D(in_channels=64, output_channels=64, kernel_shape=[1, 1, 1], padding=0,
                                       name=name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Conv3d_2c_3x3'
        self.end_points[end_point] = Unit3D(in_channels=64, output_channels=192, kernel_shape=[3, 3, 3], padding=1,
                                       name=name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'MaxPool3d_3a_3x3'
        self.end_points[end_point] = MaxPool3dSamePadding(kernel_size=[1, 3, 3], stride=(1, 2, 2),
                                                             padding=0)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_3b'
        self.end_points[end_point] = InceptionModule(192, [64,96,128,16,32,32], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_3c'
        self.end_points[end_point] = InceptionModule(256, [128,128,192,32,96,64], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'MaxPool3d_4a_3x3'
        self.end_points[end_point] = MaxPool3dSamePadding(kernel_size=[3, 3, 3], stride=(2, 2, 2),
                                                             padding=0)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_4b'
        self.end_points[end_point] = InceptionModule(128+192+96+64, [192,96,208,16,48,64], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_4c'
        self.end_points[end_point] = InceptionModule(192+208+48+64, [160,112,224,24,64,64], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_4d'
        self.end_points[end_point] = InceptionModule(160+224+64+64, [128,128,256,24,64,64], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_4e'
        self.end_points[end_point] = InceptionModule(128+256+64+64, [112,144,288,32,64,64], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_4f'
        self.end_points[end_point] = InceptionModule(112+288+64+64, [256,160,320,32,128,128], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'MaxPool3d_5a_2x2'
        self.end_points[end_point] = MaxPool3dSamePadding(kernel_size=[2, 2, 2], stride=(2, 2, 2),
                                                             padding=0)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_5b'
        self.end_points[end_point] = InceptionModule(256+320+128+128, [256,160,320,32,128,128], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Mixed_5c'
        self.end_points[end_point] = InceptionModule(256+320+128+128, [384,192,384,48,128,128], name+end_point)
        if self._final_endpoint == end_point: return

        end_point = 'Logits'
        self.avg_pool = nn.AvgPool3d(kernel_size=[2, 7, 7],
                                     stride=(1, 1, 1))
        self.dropout = nn.Dropout(dropout_keep_prob)
        self.logits  = Unit3D(in_channels=384+384+128+128, output_channels=self._num_classes,
                             kernel_shape=[1, 1, 1],
                             padding=0,
                             activation_fn=None,
                             use_batch_norm=False,
                             use_bias=True,
                             name='logits')

        self.build()


    def replace_logits(self, num_classes):
        self._num_classes = num_classes
        self.logits = Unit3D(in_channels=384+384+128+128, output_channels=self._num_classes,
                             kernel_shape=[1, 1, 1],
                             padding=0,
                             activation_fn=None,
                             use_batch_norm=False,
                             use_bias=True,
                             name='logits')
    def remove_last(self):
        self.logits = Identity()

    def build(self):
        for k in self.end_points.keys():
            self.add_module(k, self.end_points[k])

    def forward(self, x, pretrained=False, n_tune_layers=-1):
        if pretrained:
            assert n_tune_layers >= 0

            freeze_endpoints = self.VALID_ENDPOINTS[:-n_tune_layers]
            tune_endpoints = self.VALID_ENDPOINTS[-n_tune_layers:]
        else:
            freeze_endpoints = []
            tune_endpoints = self.VALID_ENDPOINTS

        # backbone, no gradient part
        with torch.no_grad():
            for end_point in freeze_endpoints:
                if end_point in self.end_points:
                    x = self._modules[end_point](x) # use _modules to work with dataparallel

        # backbone, gradient part
        for end_point in tune_endpoints:
            if end_point in self.end_points:
                x = self._modules[end_point](x) # use _modules to work with dataparallel

        # head
        x = self.logits(self.dropout(self.avg_pool(x)))
        if self._spatial_squeeze:
            logits = x.squeeze(3).squeeze(3)
        # logits is batch X time X classes, which is what we want to work with
        return logits


    def extract_features(self, x):
        for end_point in self.VALID_ENDPOINTS:
            if end_point in self.end_points:
                x = self._modules[end_point](x)
        return self.avg_pool(x)

In [None]:
i3d = InceptionI3d(400, in_channels=3)
i3d.replace_logits(2731)


In [None]:
import requests

def download_weights(link, filename):


    # Send a GET request to download the file
    response = requests.get(link)

    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        # Open a local file to write the content
        with open(f"{filename}.zip", "wb") as f:
            f.write(response.content)
        print("File downloaded successfully!")
    else:
        print(f"Failed to download the file. Status code: {response.status_code}")


In [None]:
# Define the URL of the file in the GitHub release
github_release_url = "https://github.com/microsoft/ASL-citizen-code/releases/download/checkpoints_v1/ASL_citizen_I3D_weights.zip"
download_weights(github_release_url, 'I3D_weights')

File downloaded successfully!


In [None]:
!unzip -d /content/ /content/I3D_weights.zip

Archive:  /content/I3D_weights.zip
  inflating: /content/ASL_citizen_I3D_weights.pt  


In [None]:
i3d.load_state_dict(torch.load('/content/ASL_citizen_I3D_weights.pt'))
i3d.cuda()

  i3d.load_state_dict(torch.load('/content/ASL_citizen_I3D_weights.pt'))


RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [None]:
#loads rgb frames from video path, centering and downsizing as needed
def load_rgb_frames_from_video(video_path, max_frames=64, resize=(256, 256)):
    vidcap = cv2.VideoCapture(video_path)
    frames = []
    total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    print("total frameS:",total_frames)

    start = 0
    frameskip = 1

    # Adjust FPS dynamically based on length of video
    frameskip = 1
    if total_frames >= 96:
        frameskip = 2
    if total_frames >= 160:
        frameskip = 3

    # Set start frame so the video is "centered" across frames
    if frameskip == 3:
        start = np.clip(int((total_frames - 192) // 2), 0, 160)
    elif frameskip == 2:
        start = np.clip(int((total_frames - 128) // 2), 0, 96)
    else:
        start = np.clip(int((total_frames - 64) // 2), 0, 64)
    vidcap.set(cv2.CAP_PROP_POS_FRAMES, start)

    for offset in range(0, min(max_frames * frameskip, int(total_frames - start))):
        success, img = vidcap.read()
        if offset % frameskip == 0:
            w, h, c = img.shape
            if w < 226 or h < 226:
                d = 226. - min(w, h)
                sc = 1 + d / min(w, h)
                img = cv2.resize(img, dsize=(0, 0), fx=sc, fy=sc)
            if w > 256 or h > 256:
                img = cv2.resize(img, (math.ceil(w * (256 / w)), math.ceil(h * (256 / h))))
            img = (img / 255.) * 2 - 1
            frames.append(img)
    return np.asarray(frames, dtype=np.float32)

def video_to_tensor(pic):
    """Convert a ``numpy.ndarray`` to tensor.
    Converts a numpy.ndarray (T x H x W x C)
    to a torch.FloatTensor of shape (C x T x H x W)

    Args:
         pic (numpy.ndarray): Video to be converted to tensor.
    Returns:
         Tensor: Converted video.
    """
    return torch.from_numpy(pic.transpose([3, 0, 1, 2]))


In [None]:


class RandomCrop(object):
    """Crop the given video sequences (t x h x w) at a random location.
    Args:
        size (sequence or int): Desired output size of the crop. If size is an
            int instead of sequence like (h, w), a square crop (size, size) is
            made.
    """

    def __init__(self, size):
        if isinstance(size, numbers.Number):
            self.size = (int(size), int(size))
        else:
            self.size = size

    @staticmethod
    def get_params(img, output_size):
        """Get parameters for ``crop`` for a random crop.
        Args:
            img (PIL Image): Image to be cropped.
            output_size (tuple): Expected output size of the crop.
        Returns:
            tuple: params (i, j, h, w) to be passed to ``crop`` for random crop.
        """
        t, h, w, c = img.shape
        th, tw = output_size
        if w == tw and h == th:
            return 0, 0, h, w

        i = random.randint(0, h - th) if h!=th else 0
        j = random.randint(0, w - tw) if w!=tw else 0
        return i, j, th, tw

    def __call__(self, imgs):

        i, j, h, w = self.get_params(imgs, self.size)

        imgs = imgs[:, i:i+h, j:j+w, :]
        return imgs

    def __repr__(self):
        return self.__class__.__name__ + '(size={0})'.format(self.size)

class CenterCrop(object):
    """Crops the given seq Images at the center.
    Args:
        size (sequence or int): Desired output size of the crop. If size is an
            int instead of sequence like (h, w), a square crop (size, size) is
            made.
    """

    def __init__(self, size):
        if isinstance(size, numbers.Number):
            self.size = (int(size), int(size))
        else:
            self.size = size

    def __call__(self, imgs):
        """
        Args:
            img (PIL Image): Image to be cropped.
        Returns:
            PIL Image: Cropped image.
        """
        t, h, w, c = imgs.shape
        th, tw = self.size
        i = int(np.round((h - th) / 2.))
        j = int(np.round((w - tw) / 2.))

        return imgs[:, i:i+th, j:j+tw, :]


    def __repr__(self):
        return self.__class__.__name__ + '(size={0})'.format(self.size)


class RandomHorizontalFlip(object):
    """Horizontally flip the given seq Images randomly with a given probability.
    Args:
        p (float): probability of the image being flipped. Default value is 0.5
    """

    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, imgs):
        """
        Args:
            img (seq Images): seq Images to be flipped.
        Returns:
            seq Images: Randomly flipped seq images.
        """
        if random.random() < self.p:
            # t x h x w
            return np.flip(imgs, axis=2).copy()
        return imgs

    def __repr__(self):
        return self.__class__.__name__ + '(p={})'.format(self.p)

In [None]:
def pad(imgs, total_frames):
        if imgs.shape[0] < total_frames:
            num_padding = total_frames - imgs.shape[0]
            if num_padding:
                prob = np.random.random_sample()
                if prob > 0.5: #pad with first frame
                    pad_img = imgs[0]
                    pad = np.tile(np.expand_dims(pad_img, axis=0), (num_padding, 1, 1, 1))
                    padded_imgs = np.concatenate([imgs, pad], axis=0)
                else: #pad with last frame
                    pad_img = imgs[-1]
                    pad = np.tile(np.expand_dims(pad_img, axis=0), (num_padding, 1, 1, 1))
                    padded_imgs = np.concatenate([imgs, pad], axis=0)
        else:
            padded_imgs = imgs
        return padded_imgs

# Using the ST-GCN model:

In [None]:
def get_hop_distance(num_node, edge, max_hop=1):
    # link matrix
    A = np.zeros((num_node, num_node))
    for i, j in edge:
        A[j, i] = 1
        A[i, j] = 1

    # compute hop steps
    hop_dis = np.zeros((num_node, num_node)) + np.inf
    transfer_mat = [np.linalg.matrix_power(A, d) for d in range(max_hop + 1)]
    arrive_mat = np.stack(transfer_mat) > 0
    for d in range(max_hop, -1, -1):
        hop_dis[arrive_mat[d]] = d
    return hop_dis


def normalize_digraph(A):
    Dl = np.sum(A, 0)
    num_node = A.shape[0]
    Dn = np.zeros((num_node, num_node))
    for i in range(num_node):
        if Dl[i] > 0:
            Dn[i, i] = Dl[i] ** (-1)
    AD = np.dot(A, Dn)
    return AD


def edge2mat(link, num_node):
    A = np.zeros((num_node, num_node))
    for i, j in link:
        A[j, i] = 1
    return A


def get_spatial_graph(num_node, self_link, inward, outward):
    I = edge2mat(self_link, num_node)
    In = normalize_digraph(edge2mat(inward, num_node))
    Out = normalize_digraph(edge2mat(outward, num_node))
    A = np.stack((I, In, Out))
    return A


class GraphWithPartition:  # Unidirected, connections with hop limit
    """The Graph to model the skeletons
    Args:
        num_nodes (int): Number of spatial nodes in the graph.
        center (int): Index of the center node.
        inward_edges (list): List of spatial edges connecting the skeleton.
        strategy (string): must be one of the follow candidates
        - uniform: Uniform Labeling
        - distance: Distance Partitioning
        - spatial: Spatial Configuration
        For more information, please refer to the section 'Partition
        Strategies' in the ST-GCN paper (https://arxiv.org/abs/1801.07455).

        max_hop (int): the maximal distance between two connected nodes. Default: 1
        dilation (int): controls the spacing between the kernel points. Default: 1
    """

    def __init__(
            self,
            num_nodes,
            center,
            inward_edges,
            strategy="spatial",
            max_hop=1,
            dilation=1,
    ):
        self.num_nodes = num_nodes
        self.center = center
        self.self_edges = [[i, i] for i in range(self.num_nodes)]
        self.inward_edges = inward_edges
        self.edges = self.self_edges + self.inward_edges

        self.max_hop = max_hop
        self.dilation = dilation

        self.hop_dis = get_hop_distance(self.num_nodes, self.edges, max_hop=max_hop)
        self.get_adjacency(strategy)

    def get_adjacency(self, strategy):
        valid_hop = range(0, self.max_hop + 1, self.dilation)
        adjacency = np.zeros((self.num_nodes, self.num_nodes))
        for hop in valid_hop:
            adjacency[self.hop_dis == hop] = 1
        normalize_adjacency = normalize_digraph(adjacency)

        if strategy == "uniform":
            A = np.zeros((1, self.num_nodes, self.num_nodes))
            A[0] = normalize_adjacency
            self.A = A
        elif strategy == "distance":
            A = np.zeros((len(valid_hop), self.num_nodes, self.num_nodes))
            for i, hop in enumerate(valid_hop):
                A[i][self.hop_dis == hop] = normalize_adjacency[self.hop_dis == hop]
            self.A = A
        elif strategy == "spatial":
            A = []
            for hop in valid_hop:
                a_root = np.zeros((self.num_nodes, self.num_nodes))
                a_close = np.zeros((self.num_nodes, self.num_nodes))
                a_further = np.zeros((self.num_nodes, self.num_nodes))
                for i in range(self.num_nodes):
                    for j in range(self.num_nodes):
                        if self.hop_dis[j, i] == hop:
                            if (
                                    self.hop_dis[j, self.center]
                                    == self.hop_dis[i, self.center]
                            ):
                                a_root[j, i] = normalize_adjacency[j, i]
                            elif (
                                    self.hop_dis[j, self.center]
                                    > self.hop_dis[i, self.center]
                            ):
                                a_close[j, i] = normalize_adjacency[j, i]
                            else:
                                a_further[j, i] = normalize_adjacency[j, i]
                if hop == 0:
                    A.append(a_root)
                else:
                    A.append(a_root + a_close)
                    A.append(a_further)
            A = np.stack(A)
            self.A = A
        else:
            raise ValueError("This Graph construction strategy is not supported")


class SpatialGraph:
    """
    Graph construction with equal weight to all the nodes.
    Args:
        num_nodes (int): Number of spatial nodes in the graph.
        inward_edges (list): List of spatial edges connecting the skeleton.
    """

    def __init__(self, num_nodes, inward_edges, strategy="spatial"):
        self.num_nodes = num_nodes
        self.strategy = strategy
        self.self_edges = [(i, i) for i in range(num_nodes)]
        self.inward_edges = inward_edges
        self.outward_edges = [(j, i) for (i, j) in self.inward_edges]
        self.A = self.get_adjacency_matrix()

    def get_adjacency_matrix(self):
        if self.strategy == "spatial":
            return get_spatial_graph(
                self.num_nodes, self.self_edges, self.inward_edges, self.outward_edges
            )
        else:
            raise ValueError()

In [None]:
class ConvTemporalGraphical(nn.Module):
    """The basic module for applying a graph convolution.
    Args:
        in_channels (int): Number of channels in the input sequence data.
        out_channels (int): Number of channels produced by the convolution.
        kernel_size (int): Size of the graph convolving kernel.
        t_kernel_size (int): Size of the temporal convolving kernel.
        t_stride (int, optional): Stride of the temporal convolution. Default: 1.
        t_padding (int, optional): Temporal zero-padding added to both sides
            of the input. Default: 0.
        t_dilation (int, optional): Spacing between temporal kernel elements.
            Default: 1.
        bias (bool, optional): If ``True``, adds a learnable bias to the
            output. Default: ``True``.
    Shape:
        - Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)`
            format
        - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
        - Output[0]: Output graph sequence in :math:`(N, out_channels, T_{out}
            , V)` format
        - Output[1]: Graph adjacency matrix for output data in :math:`(K, V, V)
            ` format
        where
            :math:`N` is a batch size,
            :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]
                `,
            :math:`T_{in}/T_{out}` is a length of input/output sequence,
            :math:`V` is the number of graph nodes.
    """

    def __init__(
            self,
            in_channels,
            out_channels,
            kernel_size,
            t_kernel_size=1,
            t_stride=1,
            t_padding=0,
            t_dilation=1,
            bias=True,
    ):
        super().__init__()

        self.kernel_size = kernel_size
        self.conv = nn.Conv2d(
            in_channels,
            out_channels * kernel_size,
            kernel_size=(t_kernel_size, 1),
            padding=(t_padding, 0),
            stride=(t_stride, 1),
            dilation=(t_dilation, 1),
            bias=bias,
        )

    def forward(self, x, A):
        assert A.size(0) == self.kernel_size

        x = self.conv(x)
        n, kc, t, v = x.size()
        x = x.view(n, self.kernel_size, kc // self.kernel_size, t, v)
        x = torch.einsum("nkctv,kvw->nctw", (x, A))

        return x.contiguous(), A


class STGCN_BLOCK(nn.Module):
    """
    Applies a spatial temporal graph convolution over an input graph
    sequence.

    Args:
        in_channels (int): Number of channels in the input sequence data.
        out_channels (int): Number of channels produced by the convolution.
        kernel_size (tuple): Size of the temporal convolving kernel and
            graph convolving kernel.
        stride (int, optional): Stride of the temporal convolution. Default: 1.
        dropout (int, optional): Dropout rate of the final output. Default: 0.
        residual (bool, optional): If ``True``, applies a residual mechanism. Default: ``True``.
    Shape:
        - Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)`
            format.
        - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
        - Output[0]: Output graph sequence in :math:`(N, out_channels, T_{out},
            V)` format.
        - Output[1]: Graph adjacency matrix for output data in :math:`(K, V,
            V)` format.
        where
            :math:`N` is a batch size,
            :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`,
            :math:`T_{in}/T_{out}` is a length of input/output sequence,
            :math:`V` is the number of graph nodes.
    """

    def __init__(
            self, in_channels, out_channels, kernel_size, stride=1, dropout=0, residual=True
    ):
        super().__init__()

        assert len(kernel_size) == 2
        assert kernel_size[0] % 2 == 1
        padding = ((kernel_size[0] - 1) // 2, 0)

        self.gcn = ConvTemporalGraphical(in_channels, out_channels, kernel_size[1])

        self.tcn = nn.Sequential(
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(
                out_channels,
                out_channels,
                (kernel_size[0], 1),
                (stride, 1),
                padding,
            ),
            nn.BatchNorm2d(out_channels),
            nn.Dropout(dropout, inplace=True),
        )

        if not residual:
            self.residual = lambda x: 0

        elif (in_channels == out_channels) and (stride == 1):
            self.residual = lambda x: x

        else:
            self.residual = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=(stride, 1)),
                nn.BatchNorm2d(out_channels),
            )

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x, A):
        res = self.residual(x)
        x, A = self.gcn(x, A)
        x = self.tcn(x) + res

        return self.relu(x), A


class STGCN(nn.Module):
    """Spatial temporal graph convolutional network backbone

    This module is proposed in
    `Spatial Temporal Graph Convolutional Networks for Skeleton-Based Action Recognition
    <https://arxiv.org/pdf/1801.07455.pdf>`_

    Args:
        in_channels (int): Number of channels in the input data.
        graph_args (dict): The arguments for building the graph.
        edge_importance_weighting (bool): If ``True``, adds a learnable importance weighting to the edges of the graph. Default: True.
        n_out_features (int): Output Embedding dimension. Default: 256.
        kwargs (dict): Other parameters for graph convolution units.
    """

    def __init__(self, in_channels, graph_args, edge_importance_weighting, n_out_features=256, **kwargs):
        super().__init__()

        self.graph = GraphWithPartition(num_nodes=graph_args['num_nodes'], center=graph_args['center'],
                                        inward_edges=graph_args['inward_edges'])
        A = torch.tensor(self.graph.A, dtype=torch.float32, requires_grad=False)
        self.register_buffer("A", A)

        spatial_kernel_size = A.size(0)
        temporal_kernel_size = 9
        self.n_out_features = n_out_features
        kernel_size = (temporal_kernel_size, spatial_kernel_size)
        self.data_bn = nn.BatchNorm1d(in_channels * A.size(1))
        kwargs0 = {k: v for k, v in kwargs.items() if k != "dropout"}
        self.st_gcn_networks = nn.ModuleList(
            (
                STGCN_BLOCK(in_channels, 64, kernel_size, 1, residual=False, **kwargs0),
                STGCN_BLOCK(64, 64, kernel_size, 1, **kwargs),
                STGCN_BLOCK(64, 64, kernel_size, 1, **kwargs),
                STGCN_BLOCK(64, 64, kernel_size, 1, **kwargs),
                STGCN_BLOCK(64, 128, kernel_size, 2, **kwargs),
                STGCN_BLOCK(128, 128, kernel_size, 1, **kwargs),
                STGCN_BLOCK(128, 128, kernel_size, 1, **kwargs),
                STGCN_BLOCK(128, 256, kernel_size, 2, **kwargs),
                STGCN_BLOCK(256, 256, kernel_size, 1, **kwargs),
                STGCN_BLOCK(256, self.n_out_features, kernel_size, 1, **kwargs),
            )
        )

        if edge_importance_weighting:
            self.edge_importance = nn.ParameterList(
                [nn.Parameter(torch.ones(self.A.size())) for i in self.st_gcn_networks]
            )
        else:
            self.edge_importance = [1] * len(self.st_gcn_networks)

    def forward(self, x):
        """
        Args:
            x (torch.Tensor): Input tensor of shape :math:`(N, in\_channels, T_{in}, V_{in})`

        Returns:
            torch.Tensor: Output embedding of shape :math:`(N, n\_out\_features)`

        where
            - :math:`N` is a batch size,
            - :math:`T_{in}` is a length of input sequence,
            - :math:`V_{in}` is the number of graph nodes,
            - :math:`n\_out\_features` is the output embedding dimension.

        """
        N, C, T, V = x.size()
        x = x.permute(0, 3, 1, 2).contiguous()  # NCTV -> NVCT
        x = x.view(N, V * C, T)
        x = self.data_bn(x)
        x = x.view(N, V, C, T)
        x = x.permute(0, 2, 3, 1).contiguous()  # NVCT -> NCTV

        for gcn, importance in zip(self.st_gcn_networks, self.edge_importance):
            x, _ = gcn(x, self.A * importance)

        x = F.avg_pool2d(x, x.size()[2:])
        x = x.view(N, -1)
        return x

In [None]:
class Network(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
class FC(nn.Module):
    """
    Fully connected layer head
    Args:
        n_features (int): Number of features in the input.
        num_class (int): Number of class for classification.
        dropout_ratio (float): Dropout ratio to use. Default: 0.2.
        batch_norm (bool): Whether to use batch norm or not. Default: ``False``.
    """

    def __init__(self, n_features, num_class, dropout_ratio=0.2, batch_norm=False):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout_ratio)
        self.bn = batch_norm
        if batch_norm:
            self.bn = nn.BatchNorm1d(self.n_features)
            self.bn.weight.data.fill_(1)
            self.bn.bias.data.zero_()
        self.classifier = nn.Linear(n_features, num_class)
        nn.init.normal_(self.classifier.weight, 0, math.sqrt(2.0 / num_class))

    def forward(self, x):
        """
        Args:
            x (torch.Tensor): Input tensor of shape: (batch_size, n_features)

        returns:
            torch.Tensor: logits for classification.
        """

        x = self.dropout(x)
        if self.bn:
            x = self.bn(x)
        x = self.classifier(x)
        return x

In [None]:
class Compose:
    """
    Compose a list of pose transforms

    Args:
        transforms (list): List of transforms to be applied.
    """

    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, x: dict):
        """Applies the given list of transforms

        Args:
            x (dict): input data

        Returns:
            dict: data after the transforms
        """
        for transform in self.transforms:
            x = transform(x)
        return x


# Adopted from: https://github.com/AmitMY/pose-format/
class ShearTransform:
    """
    Applies `2D shear <https://en.wikipedia.org/wiki/Shear_matrix>`_ transformation

    Args:
        shear_std (float): std to use for shear transformation. Default: 0.2
    """
    def __init__(self, shear_std: float=0.2):
        self.shear_std = shear_std

    def __call__(self, data:dict):
        """
        Applies shear transformation to the given data.

        Args:
            data (dict): input data

        Returns:
            dict: data after shear transformation
        """

        x = data
        assert x.shape[0] == 2, "Only 2 channels inputs supported for ShearTransform"
        x = x.permute(1, 2, 0) #CTV->TVC
        shear_matrix = torch.eye(2)
        shear_matrix[0][1] = torch.tensor(
            np.random.normal(loc=0, scale=self.shear_std, size=1)[0])
        res = torch.matmul(x.float(), shear_matrix.float())
        data = res.permute(2, 0, 1) #TVC->CTV
        return data.double()


class RotatationTransform:
    """
    Applies `2D rotation <https://en.wikipedia.org/wiki/Rotation_matrix>`_ transformation.

    Args:
        rotation_std (float): std to use for rotation transformation. Default: 0.2
    """
    def __init__(self, rotation_std: float=0.2):
        self.rotation_std = rotation_std

    def __call__(self, data):
        """
        Applies rotation transformation to the given data.

        Args:
            data (dict): input data

        Returns:
            dict: data after rotation transformation
        """
        x = data
        assert x.shape[0] == 2, "Only 2 channels inputs supported for RotationTransform"
        x = x.permute(1, 2, 0) #CTV->TVC
        rotation_angle = torch.tensor(
            np.random.normal(loc=0, scale=self.rotation_std, size=1)[0]
        )
        rotation_cos = torch.cos(rotation_angle)
        rotation_sin = torch.sin(rotation_angle)
        rotation_matrix = torch.tensor(
            [[rotation_cos, -rotation_sin], [rotation_sin, rotation_cos]],
            dtype=torch.float32,
        )
        res = torch.matmul(x.float(), rotation_matrix.float())
        data = res.permute(2, 0, 1) #TVC->CTV
        return data.double()

In [None]:
# Define the URL of the file in the GitHub release
github_release_url = "https://github.com/microsoft/ASL-citizen-code/releases/download/checkpoints_v1/ASL_citizen_stgcn_weights.zip"
download_weights(github_release_url, 'stgcn_weights')
!!unzip -d /content/ /content/stgcn_weights.zip

File downloaded successfully!


['Archive:  /content/stgcn_weights.zip',
 '  inflating: /content/ASL_citizen_stgcn_weights.pt  ']

In [None]:
with open('/content/gloss_dict.json') as f:
    gloss2idx = json.load(f)

In [None]:
idx2gloss = {}
for g in gloss2idx:
    idx = gloss2idx[g]
    idx2gloss[idx] = g

In [None]:
#load model
n_features = 256
n_classes = len(gloss2idx)
graph_args = {'num_nodes': 27, 'center': 0,
              'inward_edges': [[2, 0], [1, 0], [0, 3], [0, 4], [3, 5],
                               [4, 6], [5, 7], [6, 17], [7, 8], [7, 9],
                               [9, 10], [7, 11], [11, 12], [7, 13], [13, 14],
                               [7, 15], [15, 16], [17, 18], [17, 19], [19, 20],
                               [17, 21], [21, 22], [17, 23], [23, 24], [17, 25], [25, 26]]}
stgcn = STGCN(in_channels=2, graph_args=graph_args, edge_importance_weighting=True)
fc = FC(n_features=n_features, num_class=n_classes, dropout_ratio=0.05)
pose_model = Network(encoder=stgcn, decoder=fc)

In [None]:
pose_model.load_state_dict(torch.load('/content/ASL_citizen_stgcn_weights.pt'))
pose_model.cuda()
pose_model.train(False)

  pose_model.load_state_dict(torch.load('/content/ASL_citizen_stgcn_weights.pt'))


RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [2]:
!pip install mediapipe -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m58.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m104.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.9/294.9 kB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.1/69.1 MB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ydf 0.13.0 requires protobuf<7.0.0,>=5.29.1, but you have protobuf 4.25.8 which is incompatible.
grpcio-status 1.71.2 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 4.25.8 which is incompatible.
thinc 8.3.

In [3]:
import mediapipe as mp

def extract_mediapipe_keypoints(video_path, min_detection_confidence=0.5):

    # Initialize MediaPipe Holistic.
    mp_holistic = mp.solutions.holistic
    holistic = mp_holistic.Holistic(
        static_image_mode=False,
        min_detection_confidence=min_detection_confidence
    )

    # Initialize video capture.
    video = cv2.VideoCapture(video_path)
    if not video.isOpened():
        holistic.close()
        raise ValueError(f"Unable to open video file: {video_path}")

    # Get total number of frames.
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    if total_frames <= 0:
        holistic.close()
        video.release()
        raise ValueError(f"Video has no frames or cannot determine frame count: {video_path}")

    # Initialize the feature array.
    # 543 keypoints: 33 pose + 21 right hand + 21 left hand + 468 face
    feature =  np.zeros((int(total_frames), 543, 2))

    count = 0
    while count < total_frames:
        success, image = video.read()
        if not success:
            break  # Exit if no more frames are available.

        # Convert the BGR image to RGB.
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Process the image and extract landmarks.
        results = holistic.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

        for i in range(33):
           if results.pose_landmarks:
                feature[count][i][0] = results.pose_landmarks.landmark[i].x
                feature[count][i][1] = results.pose_landmarks.landmark[i].y

        j = 33
        for i in range(21):
            if results.right_hand_landmarks:
               feature[count][i+j][0] = results.right_hand_landmarks.landmark[i].x
               feature[count][i+j][1] = results.right_hand_landmarks.landmark[i].y

        j = 54
        for i in range(21):
            if results.left_hand_landmarks:
                feature[count][i+j][0] = results.left_hand_landmarks.landmark[i].x
                feature[count][i+j][1] = results.left_hand_landmarks.landmark[i].y

        j = 75
        for i in range(468):
            if results.face_landmarks:
                feature[count][i+j][0] = results.face_landmarks.landmark[i].x
                feature[count][i+j][1] = results.face_landmarks.landmark[i].y

        count += 1

    # Release resources.
    holistic.close()
    video.release()

    # If the actual number of processed frames is less than total_frames,
    # truncate the keypoints array accordingly.
    if count < total_frames:
        feature = feature[:count]

    return feature


In [4]:
#downsamples set of frames to get max frames
def downsample(frames, max_frames):
    length = frames.shape[0]
    # Adjust FPS dynamically based on length of video
    increment = max_frames / length
    if increment > 1.0:
        increment = 1.0
    curr_increment = 0
    curr_frame = 0
    new_frames = []
    for f in frames:
        curr_increment += increment
        if curr_increment > curr_frame:
            curr_frame += 1
            new_frames.append(f)
    if len(new_frames) > max_frames:
        new_frames = new_frames[:max_frames]
    return np.array(new_frames)

In [None]:
def preprocess_stgcn(video_path):

    max_frames = 128
    #load frames and downsample / pad as needed
    data0 = extract_mediapipe_keypoints(video_path)
    length = data0.shape[0]
    if length > max_frames:
        data0 = downsample(data0, max_frames)
    if length < max_frames:
        data0 = np.pad(data0, ((0, max_frames - length), (0, 0), (0, 0)))

    #normalize keypoints using distance between shoulders as reference
    shoulder_l = data0[:, 11, :]
    shoulder_r = data0[:, 12, :]

    center = np.zeros(2)
    for i in range(len(shoulder_l)):
        center_i = (shoulder_r[i] + shoulder_l[i]) / 2
        center = center + center_i
    center = center / shoulder_l.shape[0]

    mean_dist = np.mean(np.sqrt(((shoulder_l - shoulder_r) ** 2).sum(-1)))
    if mean_dist != 0:
        scale = 1.0 / mean_dist
        data0 = data0 - center
        data0 = data0 * scale

    #select subset of keypoints for graph
    keypoints = [0, 2, 5, 11, 12, 13, 14, 33, 37, 38, 41, 42, 45, 46, 49, 50, 53, 54,
                 58, 59, 62, 63, 66, 67, 70, 71, 74]
    data0 = data0[:, 0:75, :]
    posedata = data0[:, 0:33, :]
    rhdata = data0[:, 33:54, :]
    lhdata = data0[:, 54:, :]

    data = np.concatenate([posedata, lhdata, rhdata], axis=1)
    data = data[:, keypoints, :]
    data = np.transpose(data, (2, 0, 1))

    ret_img = torch.from_numpy(data)

    return ret_img.unsqueeze(0).float().cuda()


# Testing:

In [None]:
from IPython.display import display, Javascript,HTML
from google.colab.output import eval_js
from base64 import b64decode

def record_video(filename):
  js=Javascript("""
    async function recordVideo() {
      const options = { mimeType: "video/mp4; codecs=avc1.42E01E, mp4a.40.2" };
      const div = document.createElement('div');
      const capture = document.createElement('button');
      const stopCapture = document.createElement("button");

      capture.textContent = "Start Recording";
      capture.style.background = "orange";
      capture.style.color = "white";

      stopCapture.textContent = "Stop Recording";
      stopCapture.style.background = "red";
      stopCapture.style.color = "white";
      div.appendChild(capture);

      const video = document.createElement('video');
      const recordingVid = document.createElement("video");
      video.style.display = 'block';

      const stream = await navigator.mediaDevices.getUserMedia({audio:false, video: true});

      let recorder = new MediaRecorder(stream, options);
      document.body.appendChild(div);
      div.appendChild(video);

      video.srcObject = stream;
      video.muted = true;

      await video.play();

      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      await new Promise((resolve) => {
        capture.onclick = resolve;
      });
      recorder.start();
      capture.replaceWith(stopCapture);

      await new Promise((resolve) => stopCapture.onclick = resolve);
      recorder.stop();
      let recData = await new Promise((resolve) => recorder.ondataavailable = resolve);
      let arrBuff = await recData.data.arrayBuffer();

      // stop the stream and remove the video element
      stream.getVideoTracks()[0].stop();
      div.remove();

      let binaryString = "";
      let bytes = new Uint8Array(arrBuff);
      bytes.forEach((byte) => {
        binaryString += String.fromCharCode(byte);
      })
    return btoa(binaryString);
    }
  """)
  try:
    display(js)
    data=eval_js('recordVideo({})')
    binary=b64decode(data)
    with open(filename,"wb") as video_file:
      video_file.write(binary)
    print(f"Finished recording video at:{filename}")
  except Exception as err:
    print(str(err))

In [None]:
def predict_video_class(video_path, model, total_frames=64):
    """
    Predict the class label of a video.

    Args:
    - video_path (str): Path to the video.
    - model (torch.nn.Module): The pre-trained model (e.g., I3D).
    - total_frames (int): Number of frames to use from the video (default is 64).

    Returns:
    - predicted_class (str): The predicted class label.
    """


    # Load and preprocess the video frames
    frames = load_rgb_frames_from_video(video_path, total_frames)
    frames = pad(frames, total_frames)
    inputs = video_to_tensor(frames).cuda()
    print(inputs.unsqueeze(0).shape)
    # Model inference (disable gradient calculation)
    with torch.no_grad():
        per_frame_logits = model(inputs.unsqueeze(0), pretrained=False)

    # Apply softmax to get probabilities for each frame
    predictions = torch.max(per_frame_logits, dim=2)[0]
    y_pred_tag = torch.softmax(predictions, dim=1)

    # Get the top prediction class
    pred_class_index = torch.argmax(y_pred_tag, dim=1)[0][0][0].item()

    # Map the indices to class labels
    predicted_class = idx2gloss[pred_class_index]

    return predicted_class

In [None]:
def predict_STGCN_from_webcam(filename='recorded_video.mp4', del_end = True):
    """
    Records a video using the webcam, preprocesses it, and predicts the word using the ST-GCN model.

    Args:
        model (torch.nn.Module): The trained ST-GCN model.
        label_map (list): List mapping class indices to words.
        filename (str): Name of the file to save the recorded video.

    Returns:
        str: The predicted word.
    """
    # Step 1: Record the video
    record_video(filename)

    # Step 2: Preprocess the video
    input_tensor = preprocess_stgcn(filename)  # Ensure max_frames is passed if needed
    print(input_tensor.shape)
    # Step 4: Model prediction
    pose_model.eval()  # Ensure the model is in evaluation mode


    with torch.no_grad():
        predictions = pose_model(input_tensor)
        y_pred_tag = torch.softmax(predictions, dim=1)
        pred_args = torch.argsort(y_pred_tag, dim=1, descending=True)

    id_pred = pred_args[0][0].item()
    predicted_word = idx2gloss[id_pred]

    print(f"Predicted word: {predicted_word}")
    # Step 5: delete the video after prediction if del_end = True
    if del_end:
        os.remove(filename)

    return predicted_word

In [None]:
i3d.cuda()

InceptionI3d(
  (avg_pool): AvgPool3d(kernel_size=[2, 7, 7], stride=(1, 1, 1), padding=0)
  (dropout): Dropout(p=0.5, inplace=False)
  (logits): Unit3D(
    (conv3d): Conv3d(1024, 2731, kernel_size=(1, 1, 1), stride=(1, 1, 1))
  )
  (Conv3d_1a_7x7): Unit3D(
    (conv3d): Conv3d(3, 64, kernel_size=(7, 7, 7), stride=(2, 2, 2), bias=False)
    (bn): BatchNorm3d(64, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
  )
  (MaxPool3d_2a_3x3): MaxPool3dSamePadding(kernel_size=[1, 3, 3], stride=(1, 2, 2), padding=0, dilation=1, ceil_mode=False)
  (Conv3d_2b_1x1): Unit3D(
    (conv3d): Conv3d(64, 64, kernel_size=(1, 1, 1), stride=(1, 1, 1), bias=False)
    (bn): BatchNorm3d(64, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
  )
  (Conv3d_2c_3x3): Unit3D(
    (conv3d): Conv3d(64, 192, kernel_size=(3, 3, 3), stride=(1, 1, 1), bias=False)
    (bn): BatchNorm3d(192, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
  )
  (MaxPool3d_3a_3x3): MaxPool3dS

In [None]:
torch_input = torch.randn(torch.Size([1, 3, 64, 256, 256])).cuda()
onnx_program = torch.onnx.export(i3d, torch_input, 'i3d.onnx', verbose=True, opset_version=12, operator_export_type=torch.onnx.OperatorExportTypes.ONNX_ATEN_FALLBACK)

  if pretrained:
  out_t = np.ceil(float(t) / float(self._stride[0]))
  out_h = np.ceil(float(h) / float(self._stride[1]))
  out_w = np.ceil(float(w) / float(self._stride[2]))
  if s % self._stride[dim] == 0:
  out_t = np.ceil(float(t) / float(self.stride[0]))
  out_h = np.ceil(float(h) / float(self.stride[1]))
  out_w = np.ceil(float(w) / float(self.stride[2]))
  if s % self.stride[dim] == 0:


In [None]:
predict_STGCN_from_webcam()

<IPython.core.display.Javascript object>

Finished recording video at:recorded_video.mp4
torch.Size([1, 2, 128, 27])
Predicted word: SHORTDISTANCE


'SHORTDISTANCE'

In [5]:
import mediapipe as mp

def annotate_video(video_path, prediction, output_path='annotated_video.mp4'):
    """
    Annotates the video with MediaPipe landmarks and the prediction text using uniform size and color.

    Args:
        video_path (str): Path to the original video.
        prediction (str): The predicted word to overlay.
        output_path (str): Path to save the annotated video.

    Returns:
        None
    """
    # Initialize MediaPipe Holistic.
    mp_holistic = mp.solutions.holistic
    mp_drawing = mp.solutions.drawing_utils

    # Define unified DrawingSpecs for landmarks and connections.
    landmark_spec = mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1)  # Green color
    connection_spec = mp_drawing.DrawingSpec(color=(255, 255, 255), thickness=1)

    # Open the original video.
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Unable to open video file: {video_path}")

    # Get video properties.
    fps = cap.get(cv2.CAP_PROP_FPS)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the codec and create VideoWriter object.
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Using 'mp4v' for MP4 format
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    with mp_holistic.Holistic(
        static_image_mode=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    ) as holistic:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Convert the BGR image to RGB.
            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Process the image and extract landmarks.
            results = holistic.process(image_rgb)

            # Draw landmarks on the frame with unified specs.
            if results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                    landmark_spec, connection_spec)
            if results.left_hand_landmarks:
                mp_drawing.draw_landmarks(
                    frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                    landmark_spec, connection_spec)
            if results.right_hand_landmarks:
                mp_drawing.draw_landmarks(
                    frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                    landmark_spec, connection_spec)
            if results.face_landmarks:
                mp_drawing.draw_landmarks(
                    frame, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION,
                    landmark_spec, connection_spec)

            # Overlay the prediction text on the frame.
            cv2.putText(
                frame, f"Prediction: {prediction}", (30, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

            # Write the annotated frame to the output video.
            out.write(frame)

    # Release resources.
    cap.release()
    out.release()

    print(f"Annotated video saved at: {output_path}")



In [None]:
def record_and_predict(model, label_map, device='cpu', filename='recorded_video.mp4',
                      annotated_filename='annotated_video.mp4'):
    """
    Records a video using the webcam, processes it to extract landmarks, predicts the word,
    overlays the landmarks and prediction on the video, and displays the annotated video.

    Args:
        model (torch.nn.Module): The trained ST-GCN model.
        label_map (list): List mapping class indices to words.
        device (str): Device to run the model on ('cpu' or 'cuda').
        filename (str): Name of the file to save the recorded video.
        annotated_filename (str): Name of the file to save the annotated video.
        max_frames (int): Maximum number of frames to process.

    Returns:
        str: The predicted word.
    """
    # Step 1: Record the video
    record_video(filename)

    i3d_pred = predict_video_class(filename,i3d)
    print("the i3d model predicted:", i3d_pred)

    # Step 2: Preprocess the video
    input_tensor = preprocess_stgcn(filename)  # Shape: (C, T, V)

    # Step 3: Add batch dimension and move to device
    input_tensor = input_tensor

    # Step 4: Model prediction
    pose_model.eval()  # Ensure the model is in evaluation mode
    with torch.no_grad():
        predictions = pose_model(input_tensor)
        y_pred_tag = torch.softmax(predictions, dim=1)
        pred_args = torch.argsort(y_pred_tag, dim=1, descending=True)

    id_pred = pred_args[0][0].item()

    # Step 5: Map the prediction to the corresponding word
    predicted_word = label_map[id_pred]

    print(f"ST-GCN model Predicted: {predicted_word}")

    # Step 6: Annotate the video with landmarks and prediction
    annotate_video(filename, predicted_word, output_path=annotated_filename)


    return predicted_word


In [None]:
    from IPython.display import HTML
    from base64 import b64encode

    def display_video(video_path):
        """
        Displays a video in the Colab notebook.

        Args:
            video_path (str): Path to the video file.

        Returns:
            None
        """
        mp4 = open(video_path,'rb').read()
        data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
        display(HTML(f"""
            <video width="640" height="480" controls>
                <source src="{data_url}" type="video/mp4">
            </video>
        """))



In [None]:
!ffmpeg -i /content/final_video.mp4

In [None]:
true_label = "THANK_YOU"
ypredicted_word = record_and_predict(
    model=pose_model,
    label_map=idx2gloss,
    device="cuda",
    filename= true_label + '_recorded.mp4',
    annotated_filename= true_label + '_annotated.mp4',
)
!ffmpeg -loglevel quiet -y -i /content/THANK_YOU_annotated.mp4 -c:v libx264 -c:a aac -strict experimental -b:a 192k /content/final_THANK_YOU_annotated.mp4
display_video("/content/final_THANK_YOU_annotated.mp4")

<IPython.core.display.Javascript object>

Finished recording video at:THANK_YOU_recorded.mp4
total frameS: 60
the i3d model predicted: JAWDROP
ST-GCN model Predicted: THANKYOU
Annotated video saved at: THANK_YOU_annotated.mp4


In [None]:
true_label = "FOR"
ypredicted_word = record_and_predict(
    model=pose_model,
    label_map=idx2gloss,
    device="cuda",
    filename= true_label + '_recorded.mp4',
    annotated_filename= true_label + '_annotated.mp4',
)
!ffmpeg -loglevel quiet -y -i /content/FOR_annotated.mp4 -c:v libx264 -c:a aac -strict experimental -b:a 192k /content/final_FOR_annotated.mp4
display_video("/content/final_FOR_annotated.mp4")

<IPython.core.display.Javascript object>

Finished recording video at:FOR_recorded.mp4
total frameS: 50
the i3d model predicted: CONCEPT
ST-GCN model Predicted: FOR
Annotated video saved at: FOR_annotated.mp4


In [None]:
true_label = "YOUR"
ypredicted_word = record_and_predict(
    model=pose_model,
    label_map=idx2gloss,
    device="cuda",
    filename= true_label + '_recorded.mp4',
    annotated_filename= true_label + '_annotated.mp4',
)
!ffmpeg -loglevel quiet -y -i /content/YOUR_annotated.mp4 -c:v libx264 -c:a aac -strict experimental -b:a 192k /content/final_YOUR_annotated.mp4
display_video("/content/final_YOUR_annotated.mp4")

<IPython.core.display.Javascript object>

Finished recording video at:YOUR_recorded.mp4
total frameS: 59
torch.Size([1, 3, 64, 256, 256])
the i3d model predicted: SNOB
ST-GCN model Predicted: HIS
Annotated video saved at: YOUR_annotated.mp4


In [None]:
true_label = "ATTENTION"
ypredicted_word = record_and_predict(
    model=pose_model,
    label_map=idx2gloss,
    device="cuda",
    filename= true_label + '_recorded.mp4',
    annotated_filename= true_label + '_annotated.mp4',
)
!ffmpeg -loglevel quiet -y -i /content/ATTENTION_annotated.mp4 -c:v libx264 -c:a aac -strict experimental -b:a 192k /content/final_ATTENTION_annotated.mp4
display_video("/content/final_ATTENTION_annotated.mp4")

<IPython.core.display.Javascript object>

Finished recording video at:ATTENTION_recorded.mp4
total frameS: 71
torch.Size([1, 3, 64, 256, 256])
the i3d model predicted: ATTENTION
ST-GCN model Predicted: ATTENTION
Annotated video saved at: ATTENTION_annotated.mp4
