##Собственный

In [None]:
!unzip /content/drive/MyDrive/slovo.zip

##Kaggle

In [None]:
! pip install -q kaggle
from google.colab import files
files.upload()

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/

In [None]:
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle kernels pull kleinsbotle/usage-example

In [None]:
!kaggle datasets download -d kapitanov/slovo

Downloading slovo.zip to /content
100% 14.8G/14.8G [02:52<00:00, 153MB/s]
100% 14.8G/14.8G [02:52<00:00, 91.7MB/s]


In [None]:
! unzip /content/slovo.zip

##Обучаем

Для работы со свежей версией torch (Если надо сохранять в ONNX)

In [None]:
!python -c 'import torch;print(torch.__version__);print(torch.version.cuda)'

2.2.1+cu121
12.1


In [None]:
!pip install -U openmim
!mim install mmengine
#!mim install mmcv
!pip install mmcv==2.1.0 -f https://download.openmmlab.com/mmcv/dist/cu121/torch2.1/index.html
! git clone https://github.com/open-mmlab/mmaction2.git
%cd mmaction2
! pip install -v -e .
!pip install timm

Для работы с версией torch==1.12.0 (Если надо обучать)

In [None]:
!pip install torch==1.12.0 torchvision --extra-index-url https://download.pytorch.org/whl/cu113
!pip install -U openmim
!mim install mmengine
!mim install 'mmcv >= 2.0.0, <2.2.0'
! git clone https://github.com/open-mmlab/mmaction2.git
%cd mmaction2
! pip install -v -e .
!pip install timm

In [None]:
import os
import cv2
import pandas as pd
from tqdm import tqdm
from glob import glob
import matplotlib.pyplot as plt
import torch

import warnings
warnings.filterwarnings('ignore')

DATA_DIR = '/content/slovo'
TRAIN_DIR = os.path.join(DATA_DIR, 'train')
TEST_DIR = os.path.join(DATA_DIR, 'test')
ANNOTATIONS_DIR = os.path.join(DATA_DIR, 'annotations')

ann = pd.read_csv(os.path.join(DATA_DIR, 'annotations.csv'), sep='\t')

In [None]:
train_files = sorted(glob(os.path.join(TRAIN_DIR, '*')))
test_files = sorted(glob(os.path.join(TEST_DIR, '*')))
NUM_CLASSES = len(ann['text'].unique()) # Including "no-action" class
classes = {label: label_id for label, label_id in zip(ann['text'].unique(), range(NUM_CLASSES))}

ann_train = []
ann_test = []

for file in tqdm(train_files + test_files):
    video_id = file.split('/')[-1][:-4]
    label = ann[ann['attachment_id'] == video_id]['text'].to_string(index=False)
    class_id = classes[label]
    line = file + ' ' + str(class_id) + '\n'
    if ann[ann['attachment_id'] == video_id]['train'].bool():
        ann_train.append(line)
    else:
        ann_test.append(line)

100%|██████████| 1400/1400 [00:02<00:00, 642.69it/s]


In [None]:
with open('ann_train.txt', 'w') as train_file, open('ann_test.txt', 'w') as test_file:
    train_file.writelines(ann_train)
    test_file.writelines(ann_test)

In [None]:
%%writefile mvit-slovo.py

# Model settings
model = dict(
    type='Recognizer3D',
    backbone=dict(
        type='MViT',
        arch='small',
        drop_path_rate=0.1,
        init_cfg=dict(
            type='Pretrained',
            checkpoint=
            'https://download.openmmlab.com/mmaction/v1.0/recognition/mvit/converted/mvit-small-p244_16x4x1_kinetics400-rgb_20221021-9ebaaeed.pth',
            prefix='backbone.')),
    data_preprocessor=dict(
        type='ActionDataPreprocessor',
        mean=[123.675, 116.28, 103.53],
        std=[58.395, 57.12, 57.375],
        format_shape='NCTHW'),
    cls_head=dict(
        type='MViTHead',
        in_channels=768,
        num_classes=51,
        label_smooth_eps=0.1,
        average_clips='prob'))

# Logging settings
default_scope = 'mmaction'
default_hooks = dict(
    runtime_info=dict(type='RuntimeInfoHook'),
    timer=dict(type='IterTimerHook'),
    logger=dict(type='LoggerHook', interval=525, ignore_last=False),
    param_scheduler=dict(type='ParamSchedulerHook'),
    checkpoint=dict(
        type='CheckpointHook', interval=1, save_best='auto', max_keep_ckpts=5),
    sampler_seed=dict(type='DistSamplerSeedHook'),
    sync_buffers=dict(type='SyncBuffersHook'))
env_cfg = dict(
    cudnn_benchmark=False,
    mp_cfg=dict(mp_start_method='fork', opencv_num_threads=0),
    dist_cfg=dict(backend='nccl'))
log_processor = dict(type='LogProcessor', window_size=20, by_epoch=True)
vis_backends = [dict(type='TensorboardVisBackend'), dict(type='LocalVisBackend')]
visualizer = dict(
    type='ActionVisualizer',
    vis_backends=vis_backends,
    name='visualizer',
    save_dir='/kaggle/working/visualization_dir'
    )
log_level = 'INFO'
load_from = None
resume = False

# Specify dataset paths
dataset_type = 'VideoDataset'
data_root = '/content/slovo/train'
data_root_val = '/content/slovo/test'
ann_file_train = '/content/mmaction2/ann_train.txt'
ann_file_val = '/content/mmaction2/ann_test.txt'
ann_file_test = '/content/mmaction2/ann_test.txt'

train_pipeline = [
    dict(type='DecordInit', io_backend='disk'),
    dict(
        type='SampleFrames',
        clip_len=16,
        frame_interval=4,
        num_clips=1,
        out_of_bound_opt='repeat_last'),
    dict(type='DecordDecode'),
    dict(type='Resize', scale=(224, 224)),
    dict(type='Flip', flip_ratio=0.5, direction='horizontal'),
    dict(type='FormatShape', input_format='NCTHW'),
    dict(type='PackActionInputs')
]
val_pipeline = [
    dict(type='DecordInit', io_backend='disk'),
    dict(
        type='SampleFrames',
        clip_len=16,
        frame_interval=4,
        num_clips=1,
        test_mode=True,
        out_of_bound_opt='repeat_last'),
    dict(type='DecordDecode'),
    dict(type='Resize', scale=(224, 224)),
    dict(type='FormatShape', input_format='NCTHW'),
    dict(type='PackActionInputs')
]
test_pipeline = [
    dict(type='DecordInit', io_backend='disk'),
    dict(
        type='SampleFrames',
        clip_len=16,
        frame_interval=4,
        num_clips=2,
        test_mode=True,
        out_of_bound_opt='repeat_last'),
    dict(type='DecordDecode'),
    dict(type='Resize', scale=(224, 224)),
    dict(type='FormatShape', input_format='NCTHW'),
    dict(type='PackActionInputs')
]

train_dataloader = dict(
    batch_size=2,
    num_workers=2,
    persistent_workers=True,
    sampler=dict(type='DefaultSampler', shuffle=True),
    dataset=dict(
        type='VideoDataset',
        ann_file=ann_file_train,
        data_prefix=dict(video=data_root),
        pipeline=train_pipeline))
val_dataloader = dict(
    batch_size=2,
    num_workers=2,
    persistent_workers=True,
    sampler=dict(type='DefaultSampler', shuffle=False),
    dataset=dict(
        type='VideoDataset',
        ann_file=ann_file_val,
        data_prefix=dict(video=data_root_val),
        pipeline=val_pipeline,
        test_mode=True))
test_dataloader = dict(
    batch_size=1,
    num_workers=2,
    persistent_workers=True,
    sampler=dict(type='DefaultSampler', shuffle=False),
    dataset=dict(
        type='VideoDataset',
        ann_file=ann_file_test,
        data_prefix=dict(video=data_root_val),
        pipeline=test_pipeline,
        test_mode=True))

# Training settigns
val_evaluator = dict(type='AccMetric')
test_evaluator = dict(type='AccMetric')
train_cfg = dict(
    type='EpochBasedTrainLoop', max_epochs=25, val_begin=1, val_interval=1)
val_cfg = dict(type='ValLoop')
test_cfg = dict(type='TestLoop')
optim_wrapper = dict(
    optimizer=dict(
        type='Adam', lr=0.0001, weight_decay=0.0001),
    paramwise_cfg=dict(norm_decay_mult=0.0, bias_decay_mult=0.0))
param_scheduler = [
    dict(
        type='MultiStepLR',
        by_epoch=True,
        begin=0,
        end=25,
        milestones=[10, 20],
        gamma=0.1)
]
auto_scale_lr = dict(enable=False, base_batch_size=64)
dist_params = dict(backend='nccl')
launcher = 'pytorch'
work_dir = 'work_dirs/mvit-slovo'
randomness = dict(seed=None, diff_rank_seed=False, deterministic=False)

In [None]:
! python tools/train.py ./mvit-slovo.py

In [None]:
! python tools/test.py mvit-slovo.py work_dirs/mvit-slovo/best_acc_top1_epoch_3.pth

Сохраняем checkpoint, если нужно

In [None]:
import torch
from mmaction.apis import inference_recognizer, init_recognizer
config_path = '/content/drive/MyDrive/mvit-slovo.py'
checkpoint_path = '/content/drive/MyDrive/best_acc_top1_epoch_3.pth'
# build the model from a config file and a checkpoint file
torch_model = init_recognizer(config_path, checkpoint_path)  # device can be 'cuda:0'

In [None]:
torch.save(torch_model, '/content/drive/MyDrive/my_model')

In [None]:
!unzip /content/drive/MyDrive/vis_data.zip

Archive:  /content/drive/MyDrive/vis_data.zip
  inflating: vis_data/20240522_155651.json  
  inflating: vis_data/config.py      
  inflating: vis_data/events.out.tfevents.1716393413.4fd840a43d67.190.0  
  inflating: vis_data/scalars.json   


In [None]:
from torch.utils.tensorboard import SummaryWriter
%load_ext tensorboard
%tensorboard --logdir /content/vis_data

#Проверка работоспособности

In [None]:
!pip install onnxruntime
!pip install onnxscript
!pip install onnx

In [None]:
from IPython import display
import sys
sys.path.append("../")

import onnxruntime as ort
import torch
import numpy as np
import cv2
from PIL import Image
from drive.MyDrive.Diploma.constants import classes

isOnnx = True

In [None]:
path_to_input_video = "/content/drive/MyDrive/Diploma/f17a6060-6ced-4bd1-9886-8578cfbb864f.mp4"

In [None]:
if not isOnnx:
  path_to_model = "/content/drive/MyDrive/Diploma/mvit16-4.pt"
  model = torch.jit.load(path_to_model)
  window_size = 16
else:
  path_to_model = "/content/drive/MyDrive/Diploma/mvit16-4.onnx"
  session = ort.InferenceSession(path_to_model)
  input_name = session.get_inputs()[0].name
  input_shape = session.get_inputs()[0].shape
  window_size = input_shape[3]
  output_names = [output.name for output in session.get_outputs()]

In [None]:
frame_interval = 4
threshold = 0.5
mean = [123.675, 116.28, 103.53]
std = [58.395, 57.12, 57.375]

In [None]:
def resize(im, new_shape=(224, 224)):
    shape = im.shape[:2]  # текущая размерность [height, width]
    if isinstance(new_shape, int):
        new_shape = (new_shape, new_shape)
    # Коэффициент масштабирования (new / old)
    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
    # Вычисляем отступы
    new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
    dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
    dw /= 2
    dh /= 2
    if shape[::-1] != new_unpad:  # Изменяем размер
        im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
    # Добавляем границу
    value = (114, 114, 114)
    bConst = cv2.BORDER_CONSTANT
    im = cv2.copyMakeBorder(im, top, bottom, left, right, bConst, value)
    im = (im - mean) / std #Стандартизация
    return im

In [None]:
cap = cv2.VideoCapture(path_to_input_video)
_,frame = cap.read()

tensors_list = []
prediction_list = []
prediction_list.append("---")

frame_counter = 0
while True:
    _, frame = cap.read()
    if frame is None:
        break
    frame_counter += 1
    if frame_counter == frame_interval:
        image = cv2.cvtColor(frame.copy(), cv2.COLOR_BGR2RGB)
        image = resize(image, (224, 224))
        image = np.transpose(image, [2, 0, 1])
        tensors_list.append(image)
        if len(tensors_list) == window_size:
            input_tensor = np.stack(tensors_list, axis=1)[None][None]
            outputs = session.run(output_names, {input_name: input_tensor.astype(np.float32)})[0]
            gloss = str(classes[outputs.argmax()])
            if outputs.max() > threshold and gloss != prediction_list[-1] and gloss != "---":
              prediction_list.append(gloss)
              print(gloss)
            tensors_list.clear()
        frame_counter = 0
cap.release()

Привет!


In [None]:
input_tensor.shape

(1, 1, 3, 16, 224, 224)

##Если надо сохранить в ONNX

In [None]:
income_model_path = '/content/drive/MyDrive/my_model'
output_model_path = "/content/drive/MyDrive/Diploma/my_model.onnx"

In [None]:
!pip install onnxruntime
!pip install onnxscript
!pip install onnx
import torch
income_model_path = '../my_model'
output_model_path = '../my_model.onnx'
def export_to_onnx(income_model_path, output_model_path):
  torch_model = torch.load(income_model_path)
  device = torch.device("cuda")
  torch_model = torch_model.to(device)
  input_tensor = input_tensor.to(device)
  torch.onnx.export(torch_model, input_tensor, output_model_path)

In [None]:
device = torch.device("cuda")
torch_model = torch_model.to(device)
input_tensor = input_tensor.to(device)
torch.onnx.export(torch_model, input_tensor, "/content/drive/MyDrive/Diploma/my_model.onnx")