## Загрузка данных

In [1]:
# скачиваем тесты с гугл диска
# структура файла test.zip:
#     test:
#         1.mp4
#         2.mp4
#         ...
!gdown -O test.zip https://drive.google.com/uc?id=1xnxF7k-a0OuPElPSn7PUc6VfovGoPm4H
!unzip -u test.zip

Downloading...
From: https://drive.google.com/uc?id=1xnxF7k-a0OuPElPSn7PUc6VfovGoPm4H
To: /content/test.zip
100% 69.1M/69.1M [00:00<00:00, 135MB/s] 
Archive:  test.zip
  inflating: test/2.mp4              


In [2]:
!rm -rf test/images
!mkdir -p test/images

In [3]:
%%capture
!pip install git+https://github.com/openai/CLIP.git
!wget https://getfile.dokpub.com/yandex/get/https://disk.yandex.ru/d/CHnmggS2yBp02Q -O ViT_B_32_epoch_2.pth #Скачиваем файл поулченный при обученни модели

## Загрузка модели

In [4]:
from tqdm.notebook import tqdm
import os
from collections import defaultdict
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torchvision import datasets, models, transforms

from PIL import Image
import clip
import cv2
import json
import numpy as np

In [5]:
# для удобного взаимодействия, предстваим видео в формате отдельных изображений
def convert2img(path2read: str, path2save: str, n: int = 10):
    vidcap = cv2.VideoCapture(path2read)
    filename = path2read.split("/")[-1]
    size = (224, 224)
    count = 1

    while True:
        success, image = vidcap.read()
        if not success:
            break
        if n and (count % n):
            image = cv2.resize(image, size)
            # формтат названия: {имя исходного видео}_{номер кадра видео}.png
            cv2.imwrite(f"{path2save}/{filename[:-4]}_{count}.png", image)
        else:
            # формтат названия: {имя исходного видео}_{номер кадра видео}.png
            cv2.imwrite(f"{path2save}/{filename[:-4]}_{count}.png", image)
        count += 1

    print(f"Done: {filename[:100]}")

    vidcap.release()
    return filename

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model, preprocess = clip.load("ViT-B/32", device=device)
model = model.visual.float()

100%|████████████████████████████████████████| 338M/338M [00:01<00:00, 221MiB/s]


In [7]:
class ViT_B_32(nn.Module):
    def __init__(self, model):
        super().__init__()
        self.vit = model.to(device)
        self.new_emb = nn.Linear(512, 384).to(device)
        self.norm = nn.LayerNorm(384).to(device)
        self.fc = nn.Linear(384, 2).to(device)

    def forward(self, x):
        h = self.vit(x).float()
        h = self.new_emb(h).float()
        h = self.norm(h).float()
        h = self.fc(h).float()
        return h

In [8]:
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed(42)

model = ViT_B_32(model)

In [9]:
PATH = "ViT_B_32_epoch_2.pth"
model.load_state_dict(torch.load(PATH, map_location=torch.device(device)))
model.eval()
for x in model.parameters(): x.requires_grad = False

### Подготовка теста

In [10]:
for filename in tqdm(os.listdir("test")):
    if filename.endswith(".mp4"):
        convert2img(path2read=f"test/{filename}",
                    path2save="test/images/",
                    n=0) # сохраняем каждый кадр

  0%|          | 0/2 [00:00<?, ?it/s]

Done: 2.mp4


In [11]:
test_data = os.listdir("test/images/")

In [12]:
class AccidentDatasetTest(Dataset):
    def __init__(self, data_prep, transform=None,
                 folder:str="/content/"):
        self.data_prep = data_prep
        self.transform = transform
        self.folder = folder

    def __getitem__(self, idx):
        image_name = self.data_prep[idx]
        image = Image.open(os.path.join(self.folder, image_name))
        if self.transform:
            image = self.transform(image)
        return image, image_name
    
    def __len__(self):
        return len(self.data_prep)

In [13]:
test_dataset = AccidentDatasetTest(test_data, preprocess,
                                   folder="test/images/")

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=128,
                                          pin_memory=True,
                                          num_workers=2,
                                          shuffle=False)

### Прдесказание

In [14]:
test_pred = defaultdict(list)
model.eval()
with torch.no_grad():
    for imgs, image_name in tqdm(test_loader):
        imgs = imgs.to(device)
        pred = model(imgs)
        for pred_, filename in zip(pred.argmax(1).cpu().detach().numpy(),
                                   image_name):
            test_pred[int(filename.split("_")[0])].append([filename, pred_])

  0%|          | 0/14 [00:00<?, ?it/s]

In [15]:
for key in test_pred:
    test_pred[key] = sorted(test_pred[key],
                            key=lambda x: int(x[0].split("_")[1][:-4]))

In [16]:
final_output = defaultdict(list)
threshold1 = 0.09
threshold2 = 0.14
for key in test_pred:
    pred_targets = sum(i[1] for i in test_pred[key])
    max_length = max(map(len, "".join(str(i[1]) for i in test_pred[key]).split("0")))
    score1 = max_length / len(test_pred[key])
    score2 = pred_targets / len(test_pred[key])
    # final_output[имя видео без .mp4]["pred_by_img"] - для просмотра предсказаний по каждому кадру
    # final_output[имя видео без .mp4]["result"][2] - для просмотра предсказания по конкретному видео
    final_output[int(key)] = {"pred_by_img": [[i[0], int(i[1])] for i in test_pred[key]],
                              "result": [score1, score2, (score1 >= threshold1) or (score2 >= threshold2)]}

In [17]:
with open("results.json", "w") as f:
    json.dump(final_output, f)

In [28]:
for i in final_output.keys():
  print('Имя файла: ' + str(i) + ', Результат: ' + str(final_output[i]['result']))

Имя файла: 2, Результат: [1.0, 1.0, True]
