In [1]:
import cv2
import numpy as np
import torch

import matplotlib.pyplot as plt

from tqdm import tqdm

import sys
import copy
import os

from tensorly.decomposition import tucker
from tensorly.tenalg import multi_mode_dot

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
from utils import get_video_names
from sklearn.model_selection import train_test_split
from sklearn.metrics import root_mean_squared_error
from sklearn.preprocessing import MinMaxScaler

from mytypes import VideoNumpy, CoordinatesNumpy

## Конвертация видео в 3х мерный тензор

In [4]:
def get_frames_seq(path: str) -> VideoNumpy:

    cap = cv2.VideoCapture(path)

    frames = []

    ret = True
    while ret:
        ret, frame = cap.read()
        if ret:
            # Normalization
            frame = frame.astype(np.float32) / 255.0

            frames.append(frame)

    # Release the video capture object
    cap.release()
    video_tensor = np.stack(frames, axis=0)

    return video_tensor

In [4]:
user = 1
video_num = 1
video_tensor = get_frames_seq(f'data/LPW/{user}/{video_num}.avi')

In [12]:
video_tensor.shape

(2000, 480, 640, 3)

## Обучение сверточной сети для извлечения признаков

In [5]:
user_ids = [str(i) for i in range(1, 23)]

In [10]:
data_path = 'data/LPW/'
train_tensor = None
test_tensor = None
train_gt = None
test_gt = None

for user_id in tqdm(user_ids):
    user_path = os.path.join(data_path, user_id)
    video_names = get_video_names(user_path)

    for video in video_names:
        video_path = os.path.join(user_path, video + '.avi')
        gt_path = os.path.join(user_path, video + '.txt')
        video_tensor = get_frames_seq(video_path)
        gt = np.genfromtxt(gt_path)
        _train_frames, _test_frames, _train_gt, _test_gt= train_test_split(video_tensor, gt)
        del video_tensor
        del gt

        if train_tensor is None:
            train_tensor = _train_frames
            test_tensor = _test_frames
            train_gt = _train_gt
            test_gt = _test_gt

        else:
            train_tensor = np.concatenate((train_tensor, _train_frames), axis=0)
            test_tensor = np.concatenate((test_tensor, _test_frames), axis=0)
            train_gt = np.concatenate((train_gt, _train_gt), axis=0)
            test_gt = np.concatenate((test_gt, _test_gt), axis=0)
        
        del _train_frames, _test_frames, _train_gt, _test_gt

np.save('data/train_frames.npy', train_tensor)
np.save('data/test_frames.npy', test_tensor)
np.save('data/train_gt.npy', train_gt)
np.save('data/test_gt.npy', test_gt)
    

  0%|          | 0/22 [00:00<?, ?it/s]

: 

## Работа с полученными признаками

In [7]:
from sklearn.preprocessing import StandardScaler

In [8]:
new_features = np.load('new_features.npy')
ground_truth = np.load('ground_truth.npy')

In [9]:
scaler = StandardScaler()
# features_scaler = MinMaxScaler()

In [10]:
ground_truth = scaler.fit_transform(ground_truth)
# new_features = scaler.fit_transform(new_features)

In [11]:
def HOPLS(X: np.ndarray, 
          Y: np.ndarray, 
          R: int = None,
          rank: tuple = [2, 4, 5, 2]
          ) -> tuple[list[np.ndarray], list[np.ndarray]]:

    D = []
    G = []
    P = []
    Q = []

    for i in tqdm(range(R)):
        covariance_tensor = np.einsum('ijkl, im -> jklm', X, Y)
        core, factors = tucker(covariance_tensor, rank=rank, random_state=1)
        redeemed_tensor = multi_mode_dot(X, [np.eye(X.shape[0])] + [factor.T for factor in factors[:-1]])
        unw_cov = redeemed_tensor.reshape(redeemed_tensor.shape[0], -1)

        U, _, _ = np.linalg.svd(unw_cov, full_matrices=False)
        t_r = U[:, 0]
        t_r = t_r.reshape(-1,1)
        G_x = multi_mode_dot(X, [t_r.T] + [factor.T for factor in factors[:-1]])
        G_y = multi_mode_dot(Y, [t_r.T] + [factors[-1].T])
        X = X - multi_mode_dot(G_x, [t_r] + factors[:-1])
        Y = Y - multi_mode_dot(G_y, [t_r] + [factors[-1]])
        # t.append(t_r)
        P.append(factors[:-1])
        Q.append(factors[-1])

        G.append(G_x)
        D.append(G_y)


        if i % (R // 20) == 0:
            print('new X norm', np.linalg.norm(X))
            print('new Y norm', np.linalg.norm(Y))

    return G, D, P, Q

In [12]:
G, D, P, Q = HOPLS(new_features, ground_truth, 100, rank=[1,1,1,2])

  0%|          | 0/100 [00:00<?, ?it/s]

  8%|▊         | 8/100 [00:00<00:02, 34.31it/s]

new X norm 2032.2340221817942
new Y norm 61.988346902993705
new X norm 910.1339964582319
new Y norm 26.824757675938255


 16%|█▌        | 16/100 [00:00<00:02, 36.44it/s]

new X norm 709.5857445046669
new Y norm 19.987509204192914
new X norm 572.705582595102
new Y norm 18.623140760544555


 25%|██▌       | 25/100 [00:00<00:01, 38.51it/s]

new X norm 497.32286249205276
new Y norm 17.849721253828005
new X norm 469.81328535393084
new Y norm 16.924050680665545


 38%|███▊      | 38/100 [00:01<00:01, 37.00it/s]

new X norm 431.76675368084085
new Y norm 16.293559492400156
new X norm 411.36673626619245
new Y norm 15.65381797214087


 47%|████▋     | 47/100 [00:01<00:01, 38.91it/s]

new X norm 388.57456341620616
new Y norm 15.152213295040726
new X norm 366.9525342353712
new Y norm 14.712782328121575


 57%|█████▋    | 57/100 [00:01<00:00, 43.80it/s]

new X norm 355.0238899846087
new Y norm 14.246777876091963
new X norm 341.8653137072467
new Y norm 13.86104737608423


 67%|██████▋   | 67/100 [00:01<00:00, 43.47it/s]

new X norm 331.76809647786223
new Y norm 13.56037036820479
new X norm 321.3426539162459
new Y norm 13.337995581370757


 77%|███████▋  | 77/100 [00:01<00:00, 44.66it/s]

new X norm 311.61403107433324
new Y norm 13.18175971617756
new X norm 302.99432894836167
new Y norm 13.010105112329102


 87%|████████▋ | 87/100 [00:02<00:00, 43.77it/s]

new X norm 293.6447047649844
new Y norm 12.882495345365948
new X norm 287.34377260003606
new Y norm 12.755403025915747


 97%|█████████▋| 97/100 [00:02<00:00, 43.59it/s]

new X norm 280.2122294095965
new Y norm 12.657913744579444
new X norm 272.9350118568515
new Y norm 12.591602746510208


100%|██████████| 100/100 [00:02<00:00, 40.98it/s]


In [13]:
def predict(X, G, D, P, Q):

    Y = 0

    for i in tqdm(range(len(P))):
        redeemed_tensor = multi_mode_dot(X, [np.eye(X.shape[0])] + [factor.T for factor in P[i]])
        unw_cov = redeemed_tensor.reshape(redeemed_tensor.shape[0], -1)

        U, _, _ = np.linalg.svd(unw_cov, full_matrices=False)
        t_r = U[:, 0]
        t_r = t_r.reshape(-1,1)

        Y += multi_mode_dot(D[i], [t_r] + [Q[i]])

    return Y


In [14]:
prediction = predict(new_features, G, D, P, Q)

  6%|▌         | 6/100 [00:00<00:01, 52.11it/s]

100%|██████████| 100/100 [00:01<00:00, 61.12it/s]


In [15]:
scaled_back_gt = scaler.inverse_transform(ground_truth)
scaled_back_prediction = scaler.inverse_transform(prediction)

root_mean_squared_error(scaled_back_prediction, 
                        scaled_back_gt)

38.200368855616304

## Выводы:
1. Удалось достичь результата лучше чем с помощью простой CNN
2. HOPLS чувствителен к нормализации данных перед их использованием
3. Слишком большой ранг ведет к сильной деградации качества

In [5]:
video_tensor = get_frames_seq('data/LPW/1/1.avi')

In [19]:
num_frames, height, width, num_channels = video_tensor.shape

fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('output_video.avi', fourcc, 20.0, (width, height))

for i in range(num_frames):
    frame = video_tensor[i]

    if frame.dtype != np.uint8:
        frame = (frame * 255).astype(np.uint8)

    x1, y1 = scaled_back_prediction[i].astype(np.uint16)
    x2, y2 = scaled_back_gt[i].astype(np.uint16)
    cv2.circle(frame, (x1, y1), 5, (0, 255, 0), -1)
    cv2.circle(frame, (x2, y2), 5, (0, 0, 255), -1)
    out.write(frame)

out.release()
