In [1]:
import os

import cv2
import math
import numpy as MOèY
import moviepy 
import torch

from matplotlib import pyplot as plt



from my_alphaction.config import cfg
from my_alphaction.modeling.detector import build_detection_model
from my_alphaction.utils.checkpoint import ActionCheckpointer
from my_alphaction.utils.comm import get_world_size



from tqdm import tqdm

### 1. CONFIG
#### 1.1 Main Parameters

In [2]:
model_name = 'VMAEv2'


person_threshold = 0.6 # confidence threshold on actor. 0.6 is the defualt
sampling_rate = 3 # sampling rate: 4 is the defualt
top_k = 5 # number of actions per person
video_path = '../input_dir/markt2_fight.mp4'

slice_height = 800
slice_width = 1000
overlap_ratio = 0.1

starting_frame_index = 100
length_input = 200

exp_dict = {'model_name': model_name,
            'model_params': {'person_threshold': person_threshold, 
                             'sampling_rate': sampling_rate},
            'orig_post_processing':{'top_k': top_k},
            'aggregation': {'method': {}, 
                            'params': {}},
            'video_path': video_path,
            'slicing_params': {'slice_height': slice_height, 
                               'slice_width': slice_width, 
                               'overlap_ratio':overlap_ratio},
            'video_params': {'st_frame_index': starting_frame_index, 
                             'length_input':length_input
                             }
           }



In [3]:
if model_name == 'VMAEv2':
    config_file = '../config_files/VMAEv2-ViTB-16x4.yaml'
if model_name == 'VMAE':
    config_file = '../config_files/VMAE-ViTB-16x4.yaml'


In [4]:
cfg.merge_from_file(config_file)

In [5]:
# change model weight path
if model_name == 'VMAEv2':
    cfg.merge_from_list(["MODEL.WEIGHT", "../checkpoints/VMAEv2_ViTB_16x4.pth"])
if model_name == 'VMAE':
    cfg.merge_from_list(["MODEL.WEIGHT", "../checkpoints/VMAE_ViTB_16x4.pth"])

# change output dir
cfg.merge_from_list(["OUTPUT_DIR", "../output_dir/"])

# change person threshold
cfg.merge_from_list(["MODEL.STM.PERSON_THRESHOLD", person_threshold])

# change sampling rate
cfg.merge_from_list(["DATA.SAMPLING_RATE", sampling_rate])

# change path for data_dir
cfg.merge_from_list(["DATA.PATH_TO_DATA_DIR", "/work/ava"])

# folder name of annotations
cfg.merge_from_list(["AVA.ANNOTATION_DIR", "annotations/"])

# file name of  frame_lists
cfg.merge_from_list(["AVA.TRAIN_LISTS", ['sample.csv']])
cfg.merge_from_list(["AVA.TEST_LISTS", ['sample.csv']])

# file name of predicted_bboxes
cfg.merge_from_list(["AVA.TRAIN_GT_BOX_LISTS", ['ava_sample_predicted_boxes.csv']])
cfg.merge_from_list(["AVA.TEST_GT_BOX_LISTS", ['ava_sample_predicted_boxes.csv']])

# file name of exlusions
cfg.merge_from_list(["AVA.EXCLUSION_FILE", 'ava_sample_train_excluded_timestamps_v2.2.csv'])

# number of batches in test scenario
cfg.merge_from_list(["TEST.VIDEOS_PER_BATCH", 1])

# number of workers
cfg.merge_from_list(["DATALOADER.NUM_WORKERS", 1])


In [6]:
cfg.ViT.USE_CHECKPOINT

True

In [7]:
cfg.merge_from_list(["ViT.USE_CHECKPOINT", False])

In [8]:
cfg.ViT.USE_CHECKPOINT

False

In [9]:
debug = True
if debug:
    # The shape of model input should be divisible into this. Otherwise, padding 0 to left and bottum. 
    print("cfg.DATALOADER.SIZE_DIVISIBILITY: ", cfg.DATALOADER.SIZE_DIVISIBILITY)
    
    # Sampling rate in constructing the clips.
    self_sample_rate =  cfg.DATA.SAMPLING_RATE
    print("cfg.DATA.SAMPLING_RATE: ", cfg.DATA.SAMPLING_RATE)
    
    # Length of clip
    self_video_length = cfg.DATA.NUM_FRAMES
    print("cfg.DATA.NUM_FRAMES: ", cfg.DATA.NUM_FRAMES)
    
    # Length of sequence frames from which a clip is constructed.
    self_seq_len = self_video_length * self_sample_rate
    print("self_seq_len: ", self_seq_len)
    
    self_num_classes = cfg.MODEL.STM.ACTION_CLASSES
    print("cfg.MODEL.STM.ACTION_CLASSES: ", self_num_classes)
    
    # Augmentation params.
    self_data_mean = cfg.DATA.MEAN
    self_data_std = cfg.DATA.STD
    self_use_bgr = cfg.AVA.BGR
    print("Augmentation params: ", self_data_mean, self_data_std, self_use_bgr)
    
    self_jitter_min_scale = cfg.DATA.TEST_MIN_SCALES
    self_jitter_max_scale = cfg.DATA.TEST_MAX_SCALE
    self_test_force_flip = cfg.AVA.TEST_FORCE_FLIP

    print("scale and flip params", self_jitter_min_scale, self_jitter_max_scale, self_test_force_flip)

cfg.DATALOADER.SIZE_DIVISIBILITY:  32
cfg.DATA.SAMPLING_RATE:  3
cfg.DATA.NUM_FRAMES:  16
self_seq_len:  48
cfg.MODEL.STM.ACTION_CLASSES:  80
Augmentation params:  [0.45, 0.45, 0.45] [0.225, 0.225, 0.225] False
scale and flip params [256] 1333 False


In [10]:
model = build_detection_model(cfg)

In [11]:
checkpointer = ActionCheckpointer(cfg, model, save_dir="../output_dir/")
checkpointer.load(cfg.MODEL.WEIGHT)

{}

In [12]:
if True:
    backbone_module = model._modules['backbone']
    backbone_module = backbone_module.eval()

In [13]:
import torch.onnx
import torch

In [19]:
slow_video = torch.randn(1, 3, 16, 256+64, 320+64).to('cpu')

In [20]:
model(slow_video)

x forward input shape: torch.Size([1, 3, 16, 320, 384])
x after patch_embed: torch.Size([1, 768, 8, 20, 24])
x after flatten: torch.Size([1, 3840, 768])
pos_embed shape before if: torch.Size([1, 1568, 768])
pos_embed shape after if: torch.Size([8, 196, 768])
gird_size: [14, 14]
ws_s 20 24
pos_embed shape after all: torch.Size([1, 3840, 768])


[tensor([[[ -3.7434, -11.8778,  -4.3906,  ...,  -8.0285,  -0.9966,  -0.3563],
          [ -3.8643, -11.9442,  -4.3439,  ...,  -8.2044,  -1.0482,  -0.5466],
          [ -3.7065, -11.8679,  -4.3757,  ...,  -8.0272,  -1.0268,  -0.5813],
          ...,
          [ -3.7255, -11.8799,  -4.4662,  ...,  -8.0591,  -1.0265,  -0.5829],
          [ -3.7719, -12.0909,  -4.1032,  ...,  -8.0336,  -1.0050,  -0.4918],
          [ -3.7142, -12.0353,  -4.1871,  ...,  -8.0224,  -0.9460,  -0.5059]]],
        grad_fn=<ViewBackward0>),
 tensor([[[ -6.1144, -13.9819,  -6.8985,  ...,  -8.7508,  -2.1330,  -2.0613],
          [ -5.6654, -13.6685,  -6.7246,  ...,  -8.8098,  -2.3053,  -2.6272],
          [ -5.8000, -13.7061,  -6.7167,  ...,  -8.6228,  -2.2140,  -2.4665],
          ...,
          [ -5.9854, -13.8458,  -6.7762,  ...,  -8.7279,  -2.1971,  -2.3371],
          [ -5.5259, -13.3174,  -6.8426,  ...,  -8.7278,  -2.3362,  -2.4527],
          [ -5.8312, -13.7180,  -6.8018,  ...,  -8.6408,  -2.2620,  -2.4025]

In [16]:
import onnx


In [None]:
slow_video = torch.randn(1, 3, 16, 256, 320).to('cpu')
values = [[320., 256., 320., 256.]]

# Create the tensor
whwh = torch.tensor(values, device='cuda')

model = model.eval().to('cpu')

# Export the model to ONNX with static batch size
torch.onnx.export(model, 
                  args = slow_video, 
                  f="model_decoder_stage_v2.onnx", 
                  input_names=["slow_video"], 
                  output_names=["output"],
                  opset_version=16,
                  verbose=False
                  #dynamic_axes={"slow_video": {0: "batch_size"}, "whwh": {0: "batch_size"}, "output": {0: "batch_size"}}
                 )

In [None]:
model.__dict__.keys()

In [None]:
model.__dict__['_modules'].keys()

In [None]:
model.__dict__['_modules']['stm_head'].__dict__['_modules'].keys()

In [None]:
model.__dict__['_modules']['stm_head'].__dict__['_modules']

In [None]:
mapped_features = model(slow_video)

In [None]:
import onnx
from onnx import helper
from onnx import numpy_helper

# Load the ONNX model
model = onnx.load("whole_model_dyn_batch.onnx")

# Print a human-readable representation of the model's graph
print(onnx.helper.printable_graph(model.graph))



In [None]:
# Optionally, visualize the graph using external tools such as Netron

# Perform inference on the model to ensure it produces expected outputs
# Example inference code using onnxruntime
import onnxruntime as ort
import numpy as np

ort_session = ort.InferenceSession("whole_model_dyn_batch.onnx")
slow_video = np.random.randn(1, 3, 16, 256, 320).astype(np.float32)


# Run inference
outputs = ort_session.run(["output"], {"slow_video": slow_video})

# Print the outputs
print(outputs)


In [None]:
len(outputs)

In [None]:
outputs[0].shape

In [None]:
traced_model = torch.jit.trace(model, [slow_video])

In [None]:
with torch.jit.optimized_execution(True):
    scripted_model = torch.jit.script(model, [slow_video]).to("cuda")

In [None]:
torch.onnx.export(model, slow_video, "model_v2.onnx", do_constant_folding=False, verbose=True)

In [None]:
torch.onnx.export(traced_model, slow_video, "traced_model.onnx", verbose=True)

In [None]:
my_model = onnx.load("your_model.onnx")
onnx.checker.check_model(my_model)

In [None]:

# iterate through inputs of the graph
for input in my_model.graph.input:
    print (input.name, end=": ")
    # get type of input tensor
    tensor_type = input.type.tensor_type
    # check if it has a shape:
    if (tensor_type.HasField("shape")):
        # iterate through dimensions of the shape:
        for d in tensor_type.shape.dim:
            # the dimension may have a definite (integer) value or a symbolic identifier or neither:
            if (d.HasField("dim_value")):
                print (d.dim_value, end=", ")  # known dimension
            elif (d.HasField("dim_param")):
                print (d.dim_param, end=", ")  # unknown dimension with symbolic name
            else:
                print ("?", end=", ")  # unknown dimension with no name
    else:
        print ("unknown rank", end="")
    print()

In [None]:
# Extract backbone and lateral_convs modules

import torch.nn as nn

backbone_module = model._modules['backbone']
lateral_convs_module = model._modules['lateral_convs']

# Assuming you want to combine them into a single model
class CombinedModel(nn.Module):
    def __init__(self, backbone, lateral_convs):
        super(CombinedModel, self).__init__()
        self.backbone = backbone
        self.lateral_convs = lateral_convs

    def forward(self, x):
        # Forward pass through the backbone and lateral_convs
        backbone_output = self.backbone(x)
        lateral_convs_output = self.lateral_convs(backbone_output)
        return lateral_convs_output

# Create the combined model instance
combined_model = CombinedModel(backbone_module, lateral_convs_module)

In [None]:
example_input = torch.randn(1, 3, 16, 256, 320)


In [None]:
combined_model([example_input])

In [None]:
model

In [None]:
import onnx
print("ONNX version:", onnx.__version__)

In [None]:
from typing import Any, Sequence
import numpy as np
import onnx
import onnxruntime
import subprocess
import sys
from copy import deepcopy
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np

import onnx
from onnx.backend.test.case.test_case import TestCase
from onnx.backend.test.case.utils import import_recursive
from onnx.onnx_pb import (
    AttributeProto,
    FunctionProto,
    GraphProto,
    ModelProto,
    NodeProto,
    TensorProto,
    TypeProto,
)

_NodeTestCases = []
_TargetOpType = None
_DiffOpTypes = None


def _extract_value_info(
    input: Union[List[Any], np.ndarray, None],
    name: str,
    type_proto: Optional[TypeProto] = None,
) -> onnx.ValueInfoProto:
    if type_proto is None:
        if input is None:
            raise NotImplementedError(
                "_extract_value_info: both input and type_proto arguments cannot be None."
            )
        elif isinstance(input, list):
            elem_type = onnx.helper.np_dtype_to_tensor_dtype(input[0].dtype)
            shape = None
            tensor_type_proto = onnx.helper.make_tensor_type_proto(elem_type, shape)
            type_proto = onnx.helper.make_sequence_type_proto(tensor_type_proto)
        elif isinstance(input, TensorProto):
            elem_type = input.data_type
            shape = tuple(input.dims)
            type_proto = onnx.helper.make_tensor_type_proto(elem_type, shape)
        else:
            elem_type = onnx.helper.np_dtype_to_tensor_dtype(input.dtype)
            shape = input.shape
            type_proto = onnx.helper.make_tensor_type_proto(elem_type, shape)

    return onnx.helper.make_value_info(name, type_proto)

def expect(
    node: onnx.NodeProto,
    inputs: Sequence[np.ndarray],
    outputs: Sequence[np.ndarray],
    name: str,
    **kwargs: Any,
) -> None:
    # Builds the model
    present_inputs = [x for x in node.input if (x != "")]
    present_outputs = [x for x in node.output if (x != "")]
    input_type_protos = [None] * len(inputs)
    if "input_type_protos" in kwargs:
        input_type_protos = kwargs["input_type_protos"]
        del kwargs["input_type_protos"]
    output_type_protos = [None] * len(outputs)
    if "output_type_protos" in kwargs:
        output_type_protos = kwargs["output_type_protos"]
        del kwargs["output_type_protos"]
    inputs_vi = [
        _extract_value_info(arr, arr_name, input_type)
        for arr, arr_name, input_type in zip(inputs, present_inputs, input_type_protos)
    ]
    outputs_vi = [
        _extract_value_info(arr, arr_name, output_type)
        for arr, arr_name, output_type in zip(
            outputs, present_outputs, output_type_protos
        )
    ]
    graph = onnx.helper.make_graph(
        nodes=[node], name=name, inputs=inputs_vi, outputs=outputs_vi
    )
    kwargs["producer_name"] = "backend-test"

    if "opset_imports" not in kwargs:
        # To make sure the model will be produced with the same opset_version after opset changes
        # By default, it uses since_version as opset_version for produced models
        produce_opset_version = onnx.defs.get_schema(
            node.op_type, domain=node.domain
        ).since_version
        kwargs["opset_imports"] = [
            onnx.helper.make_operatorsetid(node.domain, produce_opset_version)
        ]

    model = onnx.helper.make_model_gen_version(graph, **kwargs)

    # Checking the produces are the expected ones.
    sess = onnxruntime.InferenceSession(model.SerializeToString(),
                                        providers=["CPUExecutionProvider"])
    feeds = {name: value for name, value in zip(node.input, inputs)}
    results = sess.run(None, feeds)
    for expected, output in zip(outputs, results):
        return (results, outputs)
        return np.testing.assert_allclose(expected, output)

In [None]:
node = onnx.helper.make_node(
    "Squeeze",
    inputs=["x", "axes"],
    outputs=["y"],
)
x = np.random.randn(1, 3, 4, 5).astype(np.float32)
axes = np.array([0], dtype=np.int64)
y = np.squeeze(x, axis=0)

s = expect(node, inputs=[x, axes], outputs=[y], name="test_squeeze")

In [None]:
s

In [None]:
import numpy as np
import onnx
import onnxruntime as ort

# Step 1: Define the ONNX node and create an ONNX graph
node = onnx.helper.make_node(
    "Squeeze",
    inputs=["x", "axes"],
    outputs=["y"],
)
graph = onnx.helper.make_graph([node], "squeeze_graph", inputs=[onnx.helper.make_tensor_value_info("x", onnx.TensorProto.FLOAT, (1, 3, 4, 5)), onnx.helper.make_tensor_value_info("axes", onnx.TensorProto.INT64, (1,))], outputs=[onnx.helper.make_tensor_value_info("y", onnx.TensorProto.FLOAT, None)])

# Step 2: Create an ONNX model with the graph
onnx_model = onnx.helper.make_model(graph)

# Save the ONNX model to a file
onnx.save(onnx_model, "squeeze_model.onnx")

# Step 3: Load the ONNX model with ONNX Runtime
sess = ort.InferenceSession("squeeze_model.onnx")

# Step 4: Prepare input data
x = np.random.randn(1, 3, 4, 5).astype(np.float32)
axes = np.array([0], dtype=np.int64)

# Step 5: Run inference
output = sess.run(["y"], {"x": x, "axes": axes})

# Print the output
print("Output:", output)

In [None]:
output[0].shape

In [None]:
axes

In [None]:
# Step 1: Define the ONNX node and create an ONNX graph
node = onnx.helper.make_node(
    "Squeeze",
    inputs=["x"],
    outputs=["y"],
    axes=[0, 1]  # Squeeze along axes 0 and 1
)
graph = onnx.helper.make_graph([node], "squeeze_graph", inputs=[onnx.helper.make_tensor_value_info("x", onnx.TensorProto.FLOAT, (1, 3, 4, 5))], outputs=[onnx.helper.make_tensor_value_info("y", onnx.TensorProto.FLOAT, None)])

# Step 2: Create an ONNX model with the graph
onnx_model = onnx.helper.make_model(graph)

# Save the ONNX model to a file
onnx.save(onnx_model, "squeeze_model.onnx")

# Step 3: Load the ONNX model with ONNX Runtime
sess = ort.InferenceSession("squeeze_model.onnx")

# Step 4: Prepare input data
x = np.random.randn(1, 3, 4, 5).astype(np.float32)

# Step 5: Run inference
output = sess.run(["y"], {"x": x})

# Print the output
print("Output:", output)

In [None]:
import numpy as np
import torch

def get_variable_info(*args):
    variable_info = {}
    
    def get_info(arg):
        info = {}
        if isinstance(arg, torch.Tensor):
            info["type"] = "Tensor"
            info["shape"] = tuple(arg.shape)
        elif isinstance(arg, (list, tuple)):
            info["length"] = len(arg)
            if arg:  # Check if the list or tuple is not empty
                item_shapes = []
                for item in arg:
                    item_shapes.append(get_info(item))
                info["item_shapes"] = item_shapes
        else:
            info["type"] = type(arg).__name__
        return info
    
    for i, arg_value in enumerate(args):
        arg_name = f"arg_{i+1}"  # Create a name for the argument
        variable_info[arg_name] = get_info(arg_value)
    
    return variable_info

# Example usage
x = torch.randn(1, 3, 16, 256, 320)
y = torch.randn(2, 3, 4)
z = [torch.randn(2, 3), torch.randn(3, 4, 5)]

info = get_variable_info(x, y, z)

In [None]:
info

In [None]:
import torch


class Foo(torch.nn.Module):
    def forward(self, tensor):
        # It is data dependent
        # Trace will only work with one path
        if tensor.max() > 0.5:
            return tensor ** 2
        return tensor


model = Foo()
traced = torch.jit.script(model) # No warnings


In [None]:
traced_v2= torch.jit.trace(model, torch.randn(10)) # Warning

In [None]:
test_input = torch.randn(10)

In [None]:
output = traced(test_input)


In [None]:
test_input

In [None]:
traced_v2(test_input)

In [22]:
m = torch.nn.Linear(20, 30)

In [29]:
m._parameters['weight'].shape

torch.Size([30, 20])