In [200]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import cv2

In [201]:
# Data preparing
df = pd.read_csv('./traces_seq.csv')
window_size = None
with open('./window_size.txt', 'r') as f:
    window_size = int(f.read())

classes = []
classes_len = 0
with open('../data/classes.txt', 'r') as f:
    classes = f.read().splitlines()
    classes_len = len(classes)

class_mapping = {
    key: classes[key] for key in range(len(classes))
}
print('window size: ', window_size)
print('classes len: ', classes_len)
print(class_mapping)

window size:  20
classes len:  5
{0: 'left', 1: 'noise', 2: 'one', 3: 'right', 4: 'thumbup'}


In [202]:
device = torch.device('cuda:0')
device

device(type='cuda', index=0)

In [203]:
yolo = torch.hub.load('../yolov5', 'pose', source='local')
yolo.to(device)

YOLOv5 🚀 2022-11-2 Python-3.9.12 torch-1.13.0 CUDA:0 (NVIDIA Graphics Device, 24248MiB)

Fusing layers... 
Model summary: 157 layers, 1765930 parameters, 0 gradients, 4.1 GFLOPs
Adding AutoShape... 


AutoShape(
  (model): DetectMultiBackend(
    (model): DetectionModel(
      (model): Sequential(
        (0): Conv(
          (conv): Conv2d(3, 16, kernel_size=(6, 6), stride=(2, 2), padding=(2, 2))
          (act): SiLU(inplace=True)
        )
        (1): Conv(
          (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
          (act): SiLU(inplace=True)
        )
        (2): C3(
          (cv1): Conv(
            (conv): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1))
            (act): SiLU(inplace=True)
          )
          (cv2): Conv(
            (conv): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1))
            (act): SiLU(inplace=True)
          )
          (cv3): Conv(
            (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1))
            (act): SiLU(inplace=True)
          )
          (m): Sequential(
            (0): Bottleneck(
              (cv1): Conv(
                (conv): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1))
  

In [204]:
class PoseDetectNN(nn.Module):
    def __init__(self, num_classes, pos_dimension, window_size, batch_size, bidirection=False,hidden_dim=10, rnn_hidden=10, rnn_layers=2) -> None:
        super(PoseDetectNN, self).__init__()

        self.batch_size = batch_size
        self.rnn_layers = rnn_layers
        self.rnn_hidden = rnn_hidden
        self.num_classes = num_classes
        self.pos_dimension = pos_dimension
        self.window_size = window_size
        self.bidirection = bidirection

        self.linear = nn.Linear(num_classes + pos_dimension, out_features=hidden_dim)
        self.rnn = nn.LSTM(hidden_dim, rnn_hidden, rnn_layers, bidirectional=bidirection, batch_first=True)
        self.linear2 = nn.Linear((1+bidirection)*rnn_hidden, 1)
        self.linear3 = nn.Linear(window_size, 1)
    
    def forward(self, x):
        hidden = self.linear(x)

        h0, c0 = self.init_hidden(self.batch_size, self.rnn_layers, self.rnn_hidden, self.bidirection)

        out, (hn, cn) = self.rnn(hidden, (h0, c0))

        out = self.linear2(out[:, window_size-1, :]).reshape((self.batch_size))
        out = torch.sigmoid(out).view(-1)
        # out = self.linear3(out)
        # out = torch.sigmoid(out).view(-1)

        return out
    
    def init_hidden(self, batch_size, rnn_layers, rnn_hidden, bidirection=False, device=torch.device("cuda")):
        h0 = torch.randn((1+bidirection)*rnn_layers, batch_size, rnn_hidden, device=device)
        c0 = torch.randn((1+bidirection)*rnn_layers, batch_size, rnn_hidden, device=device)
        return h0, c0
        

In [205]:
pdnn = PoseDetectNN(classes_len, 4, window_size, 1)
pdnn.load_state_dict(torch.load('./pdnn.pt', map_location=device))
pdnn.to(device)

PoseDetectNN(
  (linear): Linear(in_features=9, out_features=10, bias=True)
  (rnn): LSTM(10, 10, num_layers=2, batch_first=True)
  (linear2): Linear(in_features=10, out_features=1, bias=True)
  (linear3): Linear(in_features=20, out_features=1, bias=True)
)

In [206]:
classes = set()
def processDetection(im, xyxy, traces={}):
    size = xyxy.shape[0]
    if size == 0:
        return im, traces

    for index, row in xyxy.iterrows():
        xmin = row['xmin']
        xmax = row['xmax']
        ymin = row['ymin']
        ymax = row['ymax']
        cls = row['class']


            
        confience = row['confidence']
        if confience < 0.8:
            continue


        if cls in [0, 2, 3, 4]:
            im = cv2.putText(im, row['name'], (int(xmin), int(ymin)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
            cls = 3

        if not (cls in traces.keys()): 
            traces[cls] = []


        rect_start = np.array([xmin, ymin])
        rect_end = np.array([xmax, ymax])

        center = rect_start + (rect_end - rect_start) / 2

        # following trace, check if it's possible
        if len(traces[cls]) > 0:
            last = traces[cls][-1]
            
            distance = np.sqrt(
                np.sum(np.abs(center - last))
            )

            if distance < 20:
                traces[cls].append(center)
            else:
                if len(traces[cls]) == 1:
                    traces[cls].pop()
                    traces[cls].append(center)


        # first point of the trace
        else:
            traces[cls].append(center)

        if cls in [0, 2, 3, 4]:
            im = cv2.rectangle(im, rect_start.astype(np.uint), rect_end.astype(np.uint), color=(0, 255, 0), thickness=2)
        # im = cv2.circle(im, center.astype(np.uint), radius=1, color=(255, 0, 0), thickness=6)

        # for t in traces:
        #     if t == 3:
    if 3 in traces:
        trace = np.array(traces[3]).reshape(-1, 1, 2).astype(np.int32)
        im = cv2.polylines(im, [trace], False, (0, 0, 255), 6)
    
    return im, traces

In [207]:
def row_tranform(row):
    # row = X.iloc[0, :].to_numpy().reshape((window_size, 5))
    poses = row[:, :4]
    poses[:, [0, 2]] = poses[:, [0, 2]] / 1080
    poses[:, [1, 3]] = poses[:, [1, 3]] / 720
    clses = row[:, 4].astype(np.int32)
    new_cls = []

    for cls in clses:
        new_cls.append(onehot_encode(cls, class_mapping))

    new_cls = np.array(new_cls).reshape((window_size, classes_len))

    return np.concatenate([poses, new_cls], axis=1)

def onehot_encode(cls, mapping):
    res = np.zeros(len(mapping))
    res[cls] = 1

    return res


In [208]:
cap = cv2.VideoCapture('../videos/video_partB.avi')
fps = cap.get(cv2.CAP_PROP_FPS)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)

In [209]:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output = cv2.VideoWriter('./output.mp4', fourcc, fps, np.array([width, height], dtype=np.uint))

In [210]:
success, img = cap.read()

consecutive_frames = []
consecutive_fnos = []
fno = 0
traces = {}

frames = []
while success:
    success, img = cap.read()

    if not success:
        break

    bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    results = yolo(bgr)
    copy = bgr.copy()
    # ordered in xmin,ymin,xmax,ymax,confidence,cls
    xyxy = results.pandas().xyxy[0]

    processed, traces = processDetection(copy, xyxy, traces)
    processed = cv2.cvtColor(processed, cv2.COLOR_BGR2RGB)

    # filtered out low confidence ones and noises
    xyxy_filtered = xyxy[(xyxy['confidence'] > 0.8) & (xyxy['class'] != 1)]
    xyxy_filtered = xyxy_filtered.sort_values(by='confidence').drop(['confidence', 'name'], axis=1)
    if len(xyxy_filtered) > 0:
        if len(consecutive_fnos) == 0 or fno - consecutive_fnos[-1] == 1:
            consecutive_frames.append(xyxy_filtered.iloc[0, :].to_numpy())
            consecutive_fnos.append(fno)

            while len(consecutive_fnos) > window_size:
                consecutive_fnos.pop(0)
                consecutive_frames.pop(0)
        else:
            consecutive_fnos.clear()
            consecutive_frames.clear()
        
    if len(consecutive_frames) == window_size:
        raw = np.array(consecutive_frames).reshape((window_size, 5))
        x = torch.Tensor(row_tranform(raw)).reshape((1, window_size, 4+classes_len)).to(device)
        pred = pdnn(x)
        # print(x)
        # print(pred.item(), fno)

        if pred.item() > 0.8:
            processed = cv2.putText(processed, 'waving', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)

    output.write(processed)

    fno += 1
        
    

In [211]:
cap.release()
output.release()

In [212]:
traces.keys()

dict_keys([1, 3])

In [213]:
classes

set()