In [1]:
!pwd

/content


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!git clone https://github.com/longphamkhac/Extract-Features-X3D.git

Cloning into 'Extract-Features-X3D'...
remote: Enumerating objects: 15, done.[K
remote: Counting objects: 100% (15/15), done.[K
remote: Compressing objects: 100% (15/15), done.[K
remote: Total 15 (delta 4), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (15/15), 10.70 MiB | 5.73 MiB/s, done.


In [4]:
!pip install "git+https://github.com/facebookresearch/pytorchvideo.git"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/facebookresearch/pytorchvideo.git
  Cloning https://github.com/facebookresearch/pytorchvideo.git to /tmp/pip-req-build-c3tvxeol
  Running command git clone --filter=blob:none --quiet https://github.com/facebookresearch/pytorchvideo.git /tmp/pip-req-build-c3tvxeol
  Resolved https://github.com/facebookresearch/pytorchvideo.git to commit 702f9f42569598c5cce8c5e2dd7e37c3d6c46efd
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fvcore
  Downloading fvcore-0.1.5.post20221221.tar.gz (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.2/50.2 KB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting av
  Downloading av-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (31.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[

In [5]:
"""
Mostly from torchvision
"""
import torch
from typing import Iterable
import numpy as np
from PIL import Image
import random


def crop(vid, i, j, h, w):
    return vid[..., i:(i + h), j:(j + w)]


def center_crop(vid, output_size):
    h, w = vid.shape[-2:]
    th, tw = output_size

    i = int(round((h - th) / 2.))
    j = int(round((w - tw) / 2.))
    return crop(vid, i, j, th, tw)


def hflip(vid):
    return vid.flip(dims=(-1,))


def pad(vid, padding, fill=0, padding_mode="constant"):
    # NOTE: don't want to pad on temporal dimension, so let as non-batch
    # (4d) before padding. This works as expected
    return torch.nn.functional.pad(vid, padding, value=fill, mode=padding_mode)


def to_normalized_float_tensor(vid):
    return vid.permute(3, 0, 1, 2).to(torch.float32) / 255


def normalize(vid, mean, std):
    shape = (-1,) + (1,) * (vid.dim() - 1)
    mean = torch.as_tensor(mean).reshape(shape)
    std = torch.as_tensor(std).reshape(shape)
    return (vid - mean) / std


# Class interface

class RandomCrop(object):
    def __init__(self, size):
        self.size = size

    @staticmethod
    def get_params(vid, output_size):
        """Get parameters for ``crop`` for a random crop.
        """
        h, w = vid.shape[-2:]
        th, tw = output_size
        if w == tw and h == th:
            return 0, 0, h, w
        i = random.randint(0, h - th)
        j = random.randint(0, w - tw)
        return i, j, th, tw

    def __call__(self, vid):
        i, j, h, w = self.get_params(vid, self.size)
        return crop(vid, i, j, h, w)


class CenterCrop(object):
    def __init__(self, size):
        self.size = size

    def __call__(self, vid):
        return center_crop(vid, self.size)


class Resize(object):
    def __init__(self, size):
        self.size = size

    def __call__(self, vid):
        # NOTE: for those functions, which generally expect mini-batches, we keep them
        # as non-minibatch so that they are applied as if they were 4d (thus image).
        # this way, we only apply the transformation in the spatial domain
        interpolation = 'bilinear'
        # NOTE: using bilinear interpolation because we don't work on minibatches
        # at this level
        scale = None
        if isinstance(self.size, int):
            scale = float(self.size) / min(vid.shape[-2:])
            size = None
        else:
            size = self.size
        return torch.nn.functional.interpolate(
            vid, size=size, scale_factor=scale, mode=interpolation, align_corners=False,
            recompute_scale_factor=False
        )


class ToFloatTensorInZeroOne(object):
    def __call__(self, vid):
        return to_normalized_float_tensor(vid)


class Normalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, vid):
        return normalize(vid, self.mean, self.std)


class RandomHorizontalFlip(object):
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, vid):
        if random.random() < self.p:
            return hflip(vid)
        return vid


class Pad(object):
    def __init__(self, padding, fill=0):
        self.padding = padding
        self.fill = fill

    def __call__(self, vid):
        return pad(vid, self.padding, self.fill)


class TensorCenterCrop(object):

    def __init__(self, crop_size: int) -> None:
        self.crop_size = crop_size

    def __call__(self, tensor: torch.FloatTensor) -> torch.FloatTensor:
        H, W = tensor.size(-2), tensor.size(-1)
        from_H = ((H - self.crop_size) // 2)
        from_W = ((W - self.crop_size) // 2)
        to_H = from_H + self.crop_size
        to_W = from_W + self.crop_size
        return tensor[..., from_H:to_H, from_W:to_W]


class ScaleTo1_1(object):

    def __call__(self, tensor: torch.FloatTensor) -> torch.FloatTensor:
        return (2 * tensor / 255) - 1


class PermuteAndUnsqueeze(object):

    def __call__(self, tensor: torch.FloatTensor) -> torch.FloatTensor:
        return tensor.permute(1, 0, 2, 3).unsqueeze(0)


class Clamp(object):

    def __init__(self, min_val, max_val) -> None:
        self.min_val = min_val
        self.max_val = max_val

    def __call__(self, tensor):
        return torch.clamp(tensor, min=self.min_val, max=self.max_val)


class ToUInt8(object):

    def __call__(self, flow_tensor: torch.FloatTensor) -> torch.FloatTensor:
        # preprocessing as in
        # https://github.com/deepmind/kinetics-i3d/issues/61#issuecomment-506727158
        # but for pytorch
        # [-20, 20] -> [0, 255]
        flow_tensor = 128 + 255 / 40 * flow_tensor
        return flow_tensor.round()


class ToCFHW_ToFloat(object):

    def __call__(self, tensor_fhwc: torch.Tensor) -> torch.Tensor:
        return tensor_fhwc.permute(3, 0, 1, 2).float()


class ToFCHW(object):

    def __call__(self, tensor_cfhw: torch.Tensor) -> torch.Tensor:
        return tensor_cfhw.permute(1, 0, 2, 3)


def resize(img, size, resize_to_smaller_edge=True, interpolation=Image.BILINEAR):
    r"""
    (v-iashin): this is almost the same implementation as in PyTorch except it has no _is_pil_image() check
    and has an extra argument governing what happens if `size` is `int`.
    Reference: https://pytorch.org/docs/1.6.0/_modules/torchvision/transforms/functional.html#resize
    Resize the input PIL Image to the given size.
    Args:
        img (PIL Image): Image to be resized.
        size (sequence or int): Desired output size. If size is a sequence like
            (h, w), the output size will be matched to this. If size is an int,
            the smaller (bigger depending on `resize_to_smaller_edge`) edge of the image will be matched
            to this number maintaining
            the aspect ratio. i.e, if height > width, then image will be rescaled to
            :math:`\left(\text{size} \times \frac{\text{height}}{\text{width}}, \text{size}\right)`
        resize_to_smaller_edge (bool, optional): if True the smaller edge is matched to number in `size`,
            if False, the bigger edge is matched to it.
        interpolation (int, optional): Desired interpolation. Default is
            ``PIL.Image.BILINEAR``
    Returns:
        PIL Image: Resized image.
    """
    if not (isinstance(size, int) or (isinstance(size, Iterable) and len(size) == 2)):
        raise TypeError('Got inappropriate size arg: {}'.format(size))

    if isinstance(size, int):
        w, h = img.size
        if (w <= h and w == size) or (h <= w and h == size):
            return img
        if (w < h) == resize_to_smaller_edge:
            ow = size
            oh = int(size * h / w)
            return img.resize((ow, oh), interpolation)
        else:
            oh = size
            ow = int(size * w / h)
            return img.resize((ow, oh), interpolation)
    else:
        return img.resize(size[::-1], interpolation)


class ResizeImproved(object):

    def __init__(self, size: int, resize_to_smaller_edge: bool = True, interpolation=Image.BILINEAR):
        self.size = size
        self.resize_to_smaller_edge = resize_to_smaller_edge
        self.interpolation = interpolation

    def __call__(self, img):
        return resize(img, self.size, self.resize_to_smaller_edge, self.interpolation)


class ToTensorWithoutScaling(object):

    def __call__(self, np_img):
        return torch.from_numpy(np_img).permute(2, 0, 1).float()


class ToFloat(object):

    def __call__(self, byte_img):
        return byte_img.float()


class PILToTensor:
    """Convert a ``PIL Image`` to a tensor of the same type. This transform does not support torchscript.
    Converts a PIL Image (H x W x C) to a Tensor of shape (C x H x W).
    Reference: https://github.com/pytorch/vision/blob/610c9d2a06/torchvision/transforms/functional.py#L107
    """

    def __call__(self, pic):
        """
        Args:
            pic (PIL Image): Image to be converted to tensor.
        Returns:
            Tensor: Converted image.
        """
        # handle PIL Image
        img = torch.from_numpy(np.array(pic, copy=True))
        img = img.view(pic.size[1], pic.size[0], len(pic.getbands()))
        # put it from HWC to CHW format
        img = img.permute((2, 0, 1))
        return img

    def __repr__(self):
        return self.__class__.__name__ + '()'

In [6]:
from torchvision.io.video import read_video
import warnings
warnings.filterwarnings('ignore')
import torch
import pandas as pd
import numpy as np
import torch, gc
import torch.nn as nn
from typing import Tuple
from typing import Callable
from pytorchvideo.layers.accelerator.mobile_cpu.attention import SqueezeExcitation

def round_width(width, multiplier, min_width = 8, divisor = 8, ceil = False):
  if not multiplier:
    return width
  
  width *= multiplier
  min_width = min_width or divisor
  if ceil:
      width_out = max(min_width, int(math.ceil(width / divisor)) * divisor)
  else:
      width_out = max(min_width, int(width + divisor / 2) // divisor * divisor)
  if width_out < 0.9 * width:
      width_out += divisor
  return int(width_out)

def create_x3d_stem(
    # Conv configs.
    in_channels: int,
    out_channels: int,
    conv_kernel_size: Tuple[int] = (5, 3, 3),
    conv_stride: Tuple[int] = (1, 2, 2),
    conv_padding: Tuple[int] = (2, 1, 1),
    # BN configs.
    norm: Callable = nn.BatchNorm3d,
    norm_eps: float = 1e-5,
    norm_momentum: float = 0.1,
    # Activation configs.
    activation: Callable = nn.ReLU,
) -> nn.Module:

  conv_xy_module = nn.Conv3d(
      in_channels = in_channels,
      out_channels = out_channels,
      kernel_size = (1, conv_kernel_size[1], conv_kernel_size[2]),
      stride=(1, conv_stride[1], conv_stride[2]),
      padding=(0, conv_padding[1], conv_padding[2]),
      bias=False,
  )

  conv_t_module = nn.Conv3d(
      in_channels = out_channels,
      out_channels = out_channels,
      kernel_size=(conv_kernel_size[0], 1, 1),
      stride=(conv_stride[0], 1, 1),
      padding=(conv_padding[0], 0, 0),
      bias=False,
      groups=out_channels,
  )

  stacked_conv_module = Conv2plus1d(
      conv_t=conv_xy_module,
      norm=None,
      activation=None,
      conv_xy=conv_t_module,
  )

  norm_module = (
      None
      if norm is None
      else norm(num_features=out_channels, eps=norm_eps, momentum=norm_momentum)
  )

  activation_module = None if activation is None else activation()

  return ResNetBasicStem(
      conv = stacked_conv_module,
      norm = norm_module,
      activation = activation_module,
      pool = None
  )


class Conv2plus1d(nn.Module):
  def __init__(
      self,
      conv_t: nn.Module = None,
      norm: nn.Module = None,
      activation: nn.Module = None,
      conv_xy: nn.Module = None,
      conv_xy_first: bool = False,
  ) -> None:
    super(Conv2plus1d, self).__init__()
    self.conv_t = conv_t
    self.norm = norm
    self.activation = activation
    self.conv_xy = conv_xy
    self.conv_xy_first = conv_xy_first
  
  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.conv_xy(x) if self.conv_xy_first else self.conv_t(x)
    x = self.norm(x) if self.norm else x
    x = self.activation(x) if self.activation else x
    x = self.conv_t(x) if self.conv_xy_first else self.conv_xy(x)
    return x

class ResNetBasicStem(nn.Module):
  def __init__(self, 
               conv: nn.Module = None,
               norm: nn.Module = None,
               activation: nn.Module = None,
               pool: nn.Module = None
  ):
    super().__init__()
    self.conv = conv
    self.norm = norm
    self.activation = activation
    self.pool = pool
  
  def forward(self, x):
    x = self.conv(x)
    if self.norm is not None:
      x = self.norm(x)
    if self.activation is not None:
      x = self.activation(x)
    if self.pool is not None:
      x = self.pool(x)
    
    return x

class Swish(nn.Module):
    """
    Wrapper for the Swish activation function.
    """

    def forward(self, x):
        return SwishFunction.apply(x)


class SwishFunction(torch.autograd.Function):
    """
    Implementation of the Swish activation function: x * sigmoid(x).
    Searching for activation functions. Ramachandran, Prajit and Zoph, Barret
    and Le, Quoc V. 2017
    """

    @staticmethod
    def forward(ctx, x):
        result = x * torch.sigmoid(x)
        ctx.save_for_backward(x)
        return result

    @staticmethod
    def backward(ctx, grad_output):
        x = ctx.saved_variables[0]
        sigmoid_x = torch.sigmoid(x)
        return grad_output * (sigmoid_x * (1 + x * (1 - sigmoid_x)))


def create_x3d_bottleneck_block(
    dim_in: int,
    dim_inner: int,
    dim_out: int,
    conv_kernel_size: Tuple[int] = (3, 3, 3),
    conv_stride: Tuple[int] = (1, 2, 2),
    # Norm configs.
    norm: Callable = nn.BatchNorm3d,
    norm_eps: float = 1e-5,
    norm_momentum: float = 0.1,
    se_ratio: float = 0.0625,
    # Activation configs.
    activation: Callable = nn.ReLU,
    inner_act: Callable = Swish,
) -> nn.Module:
  """
  Bottleneck block for X3D: a sequence of Conv, Normalization with optional SE block,
  and Activations
  """
  conv_a = nn.Conv3d(
      in_channels = dim_in,
      out_channels = dim_inner,
      kernel_size = (1, 1, 1),
      bias = False
  )
  norm_a = (
      None 
      if norm is None 
      else norm(num_features = dim_inner, eps = norm_eps, momentum = norm_momentum)
  )
  act_a = None if activation is None else activation()

  # 3x3x3 Conv (Separable Convolution)
  conv_b = nn.Conv3d(
      in_channels = dim_inner,
      out_channels = dim_inner,
      kernel_size = conv_kernel_size,
      stride = conv_stride,
      padding = [size // 2 for size in conv_kernel_size],
      bias = False,
      groups = dim_inner,
      dilation = (1, 1, 1)
  )
  se = (
      SqueezeExcitation(
          num_channels = dim_inner,
          num_channels_reduced = round_width(dim_inner, se_ratio),
          is_3d = True
      )
      if se_ratio > 0.0
      else nn.Identity()
  )
  norm_b = nn.Sequential(
      (
          nn.Identity()
          if norm is None
          else norm(num_features = dim_inner, eps = norm_eps, momentum = norm_momentum)    
      ),
      se
  )
  act_b = None if inner_act is None else inner_act()

  # 1x1x1 Conv (Separable Convolution)
  conv_c = nn.Conv3d(
      in_channels = dim_inner,
      out_channels = dim_out,
      kernel_size = (1, 1, 1),
      bias = False
  )
  norm_c = (
      None
      if norm is None
      else norm(num_features = dim_out, eps = norm_eps, momentum = norm_momentum)
  )

  return BottleneckBlock(
      conv_a=conv_a,
      norm_a=norm_a,
      act_a=act_a,
      conv_b=conv_b,
      norm_b=norm_b,
      act_b=act_b,
      conv_c=conv_c,
      norm_c=norm_c
  )

class BottleneckBlock(nn.Module):
  def __init__(
      self,
      conv_a: nn.Module = None,
      norm_a: nn.Module = None,
      act_a: nn.Module = None,
      conv_b: nn.Module = None,
      norm_b: nn.Module = None,
      act_b: nn.Module = None,
      conv_c: nn.Module = None,
      norm_c: nn.Module = None,
  ):
    super(BottleneckBlock, self).__init__()
    self.conv_a = conv_a
    self.norm_a = norm_a
    self.act_a = act_a

    self.conv_b = conv_b
    self.norm_b = norm_b
    self.act_b = act_b

    self.conv_c = conv_c
    self.norm_c = norm_c
  
  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.conv_a(x)
    x = self.norm_a(x) if self.norm_a is not None else x
    x = self.act_a(x) if self.act_a is not None else x

    x = self.conv_b(x)
    x = self.norm_b(x) if self.norm_b is not None else x
    x = self.act_b(x) if self.act_b is not None else x

    x = self.conv_c(x)
    x = self.norm_c(x) if self.norm_c is not None else x

    return x

class ResBlock(nn.Module):
  def __init__(
      self,
      branch1_conv: nn.Module = None,
      branch1_norm: nn.Module = None,
      branch2: nn.Module = None,
      activation: nn.Module = None,
      branch_fusion: Callable = None
  ) -> nn.Module:
    super(ResBlock, self).__init__()
    self.branch1_conv = branch1_conv
    self.branch1_norm = branch1_norm
    self.branch2 = branch2
    self.activation = activation
    self.branch_fusion = branch_fusion
  
  def forward(self, x) -> torch.Tensor:
    if self.branch1_conv is None:
      x = self.branch_fusion(x, self.branch2(x))
    else:
      shortcut = self.branch1_conv(x)
      if self.branch1_norm is not None:
        shortcut = self.branch1_norm(shortcut)
      x = self.branch_fusion(shortcut, self.branch2(x))
    
    if self.activation is not None:
      x = self.activation(x)
    return x

def create_x3d_res_block(
    # Bottleneck Block configs.
    dim_in: int,
    dim_inner: int,
    dim_out: int,
    bottleneck: Callable = create_x3d_bottleneck_block,
    use_shortcut: bool = True,
    # Conv configs
    conv_kernel_size: Tuple[int] = (3, 3, 3),
    conv_stride: Tuple[int] = (1, 2, 2),
    # Norm configs.
    norm: Callable = nn.BatchNorm3d,
    norm_eps: float = 1e-5,
    norm_momentum: float = 0.1,
    se_ratio: float = 0.0625,
    # Activation configs.
    activation: Callable = nn.ReLU,
    inner_act: Callable = Swish
) -> nn.Module:

  norm_model = None
  if norm is not None and dim_in != dim_out:
    norm_model = norm(num_features = dim_out)
  
  return ResBlock(
      branch1_conv = nn.Conv3d(dim_in, dim_out, kernel_size = (1, 1, 1), stride = conv_stride, bias = False)
      if (dim_in != dim_out or np.prod(conv_stride) > 1) and use_shortcut
      else None,
      branch1_norm = norm_model if dim_in != dim_out and use_shortcut else None,
      branch2 = bottleneck(
          dim_in = dim_in,
          dim_inner = dim_inner,
          dim_out = dim_out,
          conv_kernel_size=conv_kernel_size,
          conv_stride=conv_stride,
          norm=norm,
          norm_eps=norm_eps,
          norm_momentum=norm_momentum,
          se_ratio=se_ratio,
          activation=activation,
          inner_act=inner_act
      ),
      activation = None if activation is None else activation(),
      branch_fusion = lambda x, y: x + y
  )

class ResStage(nn.Module):
  def __init__(self, res_blocks: nn.ModuleList) -> nn.Module:
    super(ResStage, self).__init__()
    self.res_blocks = res_blocks
  
  def forward(self, x: torch.Tensor) -> torch.Tensor:
    for _, res_block in enumerate(self.res_blocks):
      x = res_block(x)
      
    return x

def round_repeats(repeats, multiplier):
  if not multiplier:
    return repeats
  return int(math.ceil(repeats * multiplier))

def create_x3d_res_stage(
    # Stage configs
    depth: int,
    # Bottle Block Configs
    dim_in: int,
    dim_inner: int,
    dim_out: int,
    bottleneck: Callable = create_x3d_bottleneck_block,
    # Conv Configs
    conv_kernel_size: Tuple[int] = (3, 3, 3),
    conv_stride: Tuple[int] = (1, 2, 2),
    # Norm Configs
    norm: Callable = nn.BatchNorm3d,
    norm_eps: float = 1e-5,
    norm_momentum: float = 0.1,
    se_ratio: float = 0.0625,
    # Activation configs.
    activation: Callable = nn.ReLU,
    inner_act: Callable = Swish,
) -> nn.Module:
  
  res_blocks = []
  for idx in range(depth):
    block = create_x3d_res_block(
        dim_in = dim_in if idx == 0 else dim_out,
        dim_inner = dim_inner,
        dim_out = dim_out,
        bottleneck = bottleneck,
        conv_kernel_size=conv_kernel_size,
        conv_stride=conv_stride if idx == 0 else (1, 1, 1),
        norm = norm,
        norm_eps = norm_eps,
        norm_momentum = norm_momentum,
        se_ratio=(se_ratio if (idx + 1) % 2 else 0.0),
        activation=activation,
        inner_act=inner_act,
    )

    res_blocks.append(block)
  
  return ResStage(res_blocks=nn.ModuleList(res_blocks))


class Net(nn.Module):
  def __init__(self, blocks: nn.ModuleList):
    super(Net, self).__init__()
    self.blocks = blocks
  
  def forward(self, x: torch.Tensor) -> torch.Tensor:
    for idx in range(len(self.blocks)):
      x = self.blocks[idx](x)

    return x

def create_x3d(
    input_channel: int = 3,
    input_clip_length: int = 13,
    input_crop_size: int = 160,
    # Model Configs
    model_num_class: int = 400,
    dropout_rate: float = 0.5,
    width_factor: float = 2.0,
    depth_factor: float = 2.2,
    # Normalization configs.
    norm: Callable = nn.BatchNorm3d,
    norm_eps: float = 0.1,
    norm_momentum: float = 0.1,
    # Activation configs.
    activation: Callable = nn.ReLU,
    # Stem Configs
    stem_dim_in: int = 12,
    stem_conv_kernel_size: Tuple[int] = (5, 3, 3),
    stem_conv_stride: Tuple[int] = (1, 2, 2),
    # Stage configs.
    stage_conv_kernel_size: Tuple[Tuple[int]] = (
        (3, 3, 3),
        (3, 3, 3),
        (3, 3, 3),
        (3, 3, 3),
    ),
    stage_spatial_stride: Tuple[int] = (2, 2, 2, 2),
    stage_temporal_stride: Tuple[int] = (1, 1, 1, 1),
    bottleneck: Callable = create_x3d_bottleneck_block,
    bottleneck_factor: float = 2.25,
    se_ratio: float = 0.0625,
    inner_act: Callable = Swish,
    # Head configs.
    head_dim_out: int = 2048,
    head_pool_act: Callable = nn.ReLU,
    head_bn_lin5_on: bool = False,
    head_activation: Callable = nn.Softmax,
    head_output_with_global_average: bool = True,
) -> nn.Module:

  # stem_dim_in = 12
  blocks = []
  stem_dim_out = round_width(stem_dim_in, width_factor) # 24
  stem = create_x3d_stem(
      in_channels = input_channel,
      out_channels = stem_dim_out,
      conv_kernel_size = stem_conv_kernel_size,
      conv_stride = stem_conv_stride,
      conv_padding=[size // 2 for size in stem_conv_kernel_size],
      norm=norm,
      norm_eps=norm_eps,
      norm_momentum=norm_momentum,
      activation=activation,
  )

  # return stem

  blocks.append(stem)

  # Compute the depth and dimension for each stage
  stage_depths = [1, 2, 5, 3]
  exp_stage = 2.0
  stage_dim1 = stem_dim_in # 12
  stage_dim2 = round_width(stage_dim1, exp_stage, divisor = 8) # 24
  stage_dim3 = round_width(stage_dim2, exp_stage, divisor = 8) # 48
  stage_dim4 = round_width(stage_dim3, exp_stage, divisor=8) # 96
  stage_dims = [stage_dim1, stage_dim2, stage_dim3, stage_dim4] # 12, 24, 48, 96

  # print(stage_dim1, stage_dim2, stage_dim3, stage_dim4)

  dim_in = stem_dim_out

  for idx in range(len(stage_dims)):
    dim_out = round_width(stage_dims[idx], width_factor) # 24, 48, 96, 192
    # print(dim_out)
    dim_inner = int(bottleneck_factor * dim_out) # 54, 108, 216, 432
    # print(dim_inner)
    depth = round_repeats(stage_depths[idx], depth_factor) # 3, 5, 11, 7
    # print(depth)

    stage_conv_stride = (
        stage_temporal_stride[idx],
        stage_spatial_stride[idx],
        stage_spatial_stride[idx],
    ) # (1, 2, 2), (1, 2, 2), (1, 2, 2), (1, 2, 2)
    # print(stage_conv_stride)

    stage = create_x3d_res_stage(
        depth=depth,
        dim_in=dim_in,
        dim_inner=dim_inner,
        dim_out=dim_out,
        bottleneck=bottleneck,
        conv_kernel_size=stage_conv_kernel_size[idx],
        conv_stride=stage_conv_stride,
        norm=norm,
        norm_eps=norm_eps,
        norm_momentum=norm_momentum,
        se_ratio=se_ratio,
        activation=activation,
        inner_act=inner_act,
    )

    blocks.append(stage)
    dim_in = dim_out
  
  # return nn.ModuleList(blocks)

  # Create head for X3D.
  total_spatial_stride = stem_conv_stride[1] * np.prod(stage_spatial_stride) # 32
  total_temporal_stride = stem_conv_stride[0] * np.prod(stage_temporal_stride) # 1
  
  assert (
      input_clip_length >= total_temporal_stride
  ), "Clip length doesn't match temporal stride!"
  
  assert (
      input_crop_size >= total_spatial_stride
  ), "Crop size doesn't match spatial stride!"

  head_pool_kernel_size = (
      input_clip_length // total_temporal_stride,
      int(math.ceil(input_crop_size / total_spatial_stride)),
      int(math.ceil(input_crop_size / total_spatial_stride))
  ) # (13, 5, 5)

  head = create_x3d_head(
      dim_in = dim_out,
      dim_inner = dim_inner,
      dim_out = head_dim_out,
      num_classes = model_num_class,
      pool_act = head_pool_act,
      pool_kernel_size = head_pool_kernel_size,
      norm = norm,
      norm_eps = norm_eps,
      norm_momentum = norm_momentum,
      bn_lin5_on = head_bn_lin5_on,
      dropout_rate = dropout_rate,
      activation = head_activation,
      output_with_global_average = head_output_with_global_average
  )

  # blocks.append(head)
  # block_head = []
  # block_head.append(head)

  # return nn.ModuleList(block_head)

  blocks.append(head)
  # return nn.ModuleList(blocks)
  return Net(blocks = nn.ModuleList(blocks))

def create_x3d_head(
    dim_in: int,
    dim_inner: int,
    dim_out: int,
    num_classes: int,
    # Pooling Configs
    pool_act: Callable = nn.ReLU,
    pool_kernel_size: Tuple[int] = (13, 5, 5),
    # BN Configs
    norm: Callable = nn.BatchNorm3d,
    norm_eps: float = 1e-5,
    norm_momentum: float = 0.1,
    bn_lin5_on = False,
    # Dropout configs.
    dropout_rate: float = 0.5,
    # Activation configs.
    activation: Callable = nn.Softmax,
    # Output configs.
    output_with_global_average: bool = True,
) -> nn.Module:

  pre_conv_module = nn.Conv3d(
      in_channels = dim_in, out_channels = dim_inner, kernel_size = (1, 1, 1), bias = False
  )
  pre_norm_module = norm(num_features = dim_inner, eps = norm_eps, momentum = norm_momentum)
  pre_act_module = None if pool_act is None else pool_act()


  if pool_kernel_size is None:
    pool_module = nn.AdaptiveAvgPool3d((1, 1, 1))
  else:
    pool_module = nn.AvgPool3d(pool_kernel_size, stride = 1)


  post_conv_module = nn.Conv3d(
      in_channels = dim_inner, out_channels=dim_out, kernel_size=(1, 1, 1), bias=False
  ) # ***************************(2048)***************************
  if bn_lin5_on:
    post_norm_module = norm(
      num_features = dim_out, eps = norm_eps, momentum = norm_momentum
    )
  else:
    post_norm_module = None
  # post_act_module = None if pool_act is None else pool_act() # Sửa ở đây
  post_act_module = None

  projected_pool_module = ProjectedPool(
    pre_conv = pre_conv_module,
    pre_norm = pre_norm_module,
    pre_act = pre_act_module,
    pool = pool_module,
    post_conv = post_conv_module,
    post_norm = post_norm_module,
    post_act = post_act_module,
  )

  if activation is None:
    activation_module = None
  elif activation == nn.Softmax:
    activation_module = activation(dim=1)
  elif activation == nn.Sigmoid:
    activation_module = activation()
  else:
    raise NotImplementedError(
        "{} is not supported as an activation" "function.".format(activation)
    )

  if output_with_global_average:
    output_pool = nn.AdaptiveAvgPool3d(1)
  else:
    output_pool = None
  
  # return ResNetBasicHead(
  #     proj = nn.Linear(dim_out, num_classes, bias=True),
  #     activation = activation_module,
  #     pool = projected_pool_module,
  #     dropout = nn.Dropout(dropout_rate) if dropout_rate > 0 else None,
  #     output_pool = output_pool,
  # ) # Sửa ở đây
  
  return ResNetBasicHead(
      pool = projected_pool_module
  )

class ProjectedPool(nn.Module):
  def __init__(
      self,
      pre_conv: nn.Module = None,
      pre_norm: nn.Module = None,
      pre_act: nn.Module = None,
      pool: nn.Module = None,
      post_conv: nn.Module = None,
      post_norm: nn.Module = None,
      post_act: nn.Module = None,
  ):

    super(ProjectedPool, self).__init__()
    self.pre_conv = pre_conv
    self.pre_norm = pre_norm
    self.pre_act = pre_act

    self.pool = pool

    self.post_conv = post_conv
    self.post_norm = post_norm
    self.post_act = post_act

  def forward(self, x):
    x = self.pre_conv(x)
    if self.pre_norm is not None:
      x = self.pre_norm(x)
    if self.pre_act is not None:
      x = self.pre_act(x)
    
    x = self.pool(x)
    
    x = self.post_conv(x)
    if self.post_norm is not None:
      x = self.post_norm(x)
    if self.post_act is not None:
      x = self.post_act(x)
    
    return x

class ResNetBasicHead(nn.Module):
  def __init__(
    self,
    pool: nn.Module = None,
    dropout: nn.Module = None,
    proj: nn.Module = None,
    activation: nn.Module = None,
    output_pool: nn.Module = None,
  ):

    super(ResNetBasicHead, self).__init__()
    self.pool = pool
    self.dropout = dropout
    self.proj = proj
    self.activation = activation
    self.output_pool = output_pool
  
  def forward(self, x: torch.Tensor) -> torch.Tensor:
    if self.pool is not None:
      x = self.pool(x)
    
    if self.dropout is not None:
      x = self.dropout(x)

    if self.proj is not None:
      x = x.permute((0, 2, 3, 4, 1))
      x = self.proj(x)
      x = x.permute((0, 4, 1, 2, 3))
    
    if self.activation is not None:
      x = self.activation(x)

    if self.output_pool is not None:
      # Performs global averaging.
      x = self.output_pool(x)
      x = x.view(x.shape[0], -1)

    return x

"""
import json
import urllib
from pytorchvideo.data.encoded_video import EncodedVideo

from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo,
)
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    ShortSideScale,
    UniformTemporalSubsample
)

mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
frames_per_second = 30
model_transform_params  = {
        "side_size": 256,
        "crop_size": 256,
        "num_frames": 16,
        "sampling_rate": 5,
}

# Get transform parameters based on model
transform_params = model_transform_params

transform =  ApplyTransformToKey(
    key="video",
    transform=Compose(
        [
            UniformTemporalSubsample(transform_params["num_frames"]),
            Lambda(lambda x: x/255.0),
            NormalizeVideo(mean, std),
            ShortSideScale(size=transform_params["side_size"]),
            CenterCropVideo(
                crop_size=(transform_params["crop_size"], transform_params["crop_size"])
            )
        ]
    ),
)
"""

import csv
from pytorchvideo.data.encoded_video import EncodedVideo
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo,
)
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    ShortSideScale,
    UniformTemporalSubsample
)

mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
frames_per_second = 10
model_transform_params  = {
    "x3d_m": {
        "side_size": 224,
        "crop_size": 224,
        "num_frames": 16,
        "sampling_rate": 5,
    }
}

# Get transform parameters based on model
transform_params = model_transform_params['x3d_m']

# Note that this transform is specific to the slow_R50 model.
transform =  ApplyTransformToKey(
    key="video",
    transform=Compose(
        [
            UniformTemporalSubsample(transform_params["num_frames"]),
            Lambda(lambda x: x/255.0),
            NormalizeVideo(mean, std),
            ShortSideScale(size=transform_params["side_size"]),
            CenterCropVideo(
                crop_size=(transform_params["crop_size"], transform_params["crop_size"])
            )
        ]
    ),
)

In [None]:
import os
from torchvision.io.video import read_video
import warnings
warnings.filterwarnings('ignore')
import torch
import pandas as pd
import numpy as np
import torch, gdalconst
import cv2
import math
import time

def video_info(infilename):
 
    cap = cv2.VideoCapture(infilename)
 
    if not cap.isOpened():
        print("could not open :", infilename)
        exit(0)
 
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
 
    print('length : ', length)
    print('width : ', width)
    print('height : ', height)
    print('count : ', count)
    print('fps : ', fps)

def get_fps(filename):
    cap = cv2.VideoCapture(filename)
 
    if not cap.isOpened():
        print("could not open :", filename)
        exit(0)
 
    return int(cap.get(cv2.CAP_PROP_FPS))


def get_count(filename):
    cap = cv2.VideoCapture(filename)
 
    if not cap.isOpened():
        print("could not open :", filename)
        exit(0)
 
    return int(cap.get(cv2.CAP_PROP_FRAME_COUNT))



if __name__ == "__main__":
    #video_info('/content/drive/MyDrive/normal/sun3.mp4')

    model = create_x3d(input_clip_length = 16, input_crop_size = 224, depth_factor = 2.2) # X3D_M
    pretrained_path = "/content/Extract-Features-X3D/X3D_M_extract_features.pth"
    model.load_state_dict(torch.load(pretrained_path), strict = False)
    print("Load model successfully!!!")

    # Set to GPU or CPU
    #device = "cuda"
    model = model.eval()
    #model = model.to(device)

    #model.cuda()
    
    i = 0
    dir_path = "/content/drive/MyDrive/extra_resize/"

    for (root, directories, files) in os.walk(dir_path):
      for file in files:
          i = i+1
          file_path = os.path.join(root, file)

          video_paths = [file_path]

          # Extract features
          # 각 영상의
          for video_path in video_paths:
              print(video_path)
              print("[" + str(i) + " / " + "37] , " + str(i/37) + "\n")

              # save one video (1, 2048)
              video = EncodedVideo.from_path(video_path)
              start_sec = 0.0
              stack_sec = 16/get_fps(video_path)
              end_sec = stack_sec
              count = (int)((get_count(video_path)) / 16)

            # 한 segment (16 frame) 마다
              for j in range(0, count):
                end_sec += stack_sec * j
                video_data = video.get_clip(start_sec=start_sec, end_sec=end_sec)
                video_data = transform(video_data)
                pre_x = video_data["video"]
                del video_data
                x = pre_x.unsqueeze(0)
                del pre_x

                with torch.no_grad(): #autograd꺼서 메모리 사용량 줄이고 연산 속도 높임
                  out = model(x)
                del x
                out2 = out.squeeze(dim=2)
                del out
                out3 = out2.squeeze(dim=2)
                del out2
                out4 = out3.squeeze(dim=2)
                
                print('one video percent : ' + str(j) + ' / ' + str(count))

                savepath = "/content/drive/MyDrive/extra_resize_result/"
                savepath += file_path[36:39]
                savepath += '_'
                savepath += str(j)
                np.save(savepath, out4.tolist())
                
                torch.cuda.empty_cache()
                del out4
              del video
          del file


Load model successfully!!!
/content/drive/MyDrive/extra_resize/b06.mp4
[1 / 37] , 0.02702702702702703

one video percent : 0 / 110
one video percent : 1 / 110
one video percent : 2 / 110
one video percent : 3 / 110
one video percent : 4 / 110
one video percent : 5 / 110
one video percent : 6 / 110
one video percent : 7 / 110
one video percent : 8 / 110
one video percent : 9 / 110
one video percent : 10 / 110
one video percent : 11 / 110
one video percent : 12 / 110
one video percent : 13 / 110
one video percent : 14 / 110
one video percent : 15 / 110
one video percent : 16 / 110
one video percent : 17 / 110
one video percent : 18 / 110
one video percent : 19 / 110
one video percent : 20 / 110
one video percent : 21 / 110
one video percent : 22 / 110
one video percent : 23 / 110
one video percent : 24 / 110
one video percent : 25 / 110
one video percent : 26 / 110
one video percent : 27 / 110
one video percent : 28 / 110
one video percent : 29 / 110
one video percent : 30 / 110
one vide