In [None]:
# Frame by Frame
import cv2
vidcap = cv2.VideoCapture("input/YDXJ0098.mp4")
success, image = vidcap.read()
count = 0

while success:
    cv2.imwrite("input/frames/frame%d.jpg" % count, image)
    success, image = vidcap.read()
    print('Read a new frame: ', success)
    count += 1
    print (count)

In [None]:
# Frame Recognition
import os
import cv2
import time
import torch
import argparse
import numpy as np

from Detection.Utils import ResizePadding
from DetectorLoader import TinyYOLOv3_onecls

from PoseEstimateLoader import SPPE_FastPose
from fn import draw_single

from Track.Tracker import Detection, Tracker
from ActionsEstLoader import TSSTG

def preproc(image):
    image = resize_fn(image)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image


def kpt2bbox(kpt, ex=20):
    return np.array((kpt[:, 0].min() - ex, kpt[:, 1].min() - ex,
                     kpt[:, 0].max() + ex, kpt[:, 1].max() + ex))


source = './input/YDXJ0098.mp4'
detection_input_size = 1024
pose_input_size = '224x160'
pose_backbone = 'resnet50'
save_out = "./output/out28122021YDXJ0098.avi"
device = 'cuda'

# DETECTION MODEL.
inp_dets = detection_input_size
detect_model = TinyYOLOv3_onecls(inp_dets, device=device)

# POSE MODEL.
inp_pose = pose_input_size.split('x')
inp_pose = (int(inp_pose[0]), int(inp_pose[1]))
pose_model = SPPE_FastPose(pose_backbone, inp_pose[0], inp_pose[1], device=device)

# Tracker.
max_age = 30
tracker = Tracker(max_age=max_age, n_init=3)

# Actions Estimate.
action_model = TSSTG()

resize_fn = ResizePadding(inp_dets, inp_dets)

# outvid = False
outvid = True
codec = cv2.VideoWriter_fourcc(*'MJPG')
writer = cv2.VideoWriter(save_out, codec, 30, (inp_dets * 2, inp_dets * 2))

fps_time = 0
f = 0
for x in range(1998):
    path = "./input/frames/frame" + str(x) + ".jpg"
    # print(path)
    frame = cv2.imread(path)
    frame = preproc(frame)
    print ("shape ", frame.shape)

    # Detect humans bbox in the frame with detector model.
    detected = detect_model.detect(frame, need_resize=False, expand_bb=10)

    # Predict each tracks bbox of current frame from previous frames information with Kalman filter.
    tracker.predict()
    # Merge two source of predicted bbox together.
    for track in tracker.tracks:
        det = torch.tensor([track.to_tlbr().tolist() + [0.5, 1.0, 0.0]], dtype=torch.float32)
        detected = torch.cat([detected, det], dim=0) if detected is not None else det

    detections = []  # List of Detections object for tracking.
    if detected is not None:
        # Predict skeleton pose of each bboxs.
        poses = pose_model.predict(frame, detected[:, 0:4], detected[:, 4])

        # Create Detections object.
        detections = [Detection(kpt2bbox(ps['keypoints'].numpy()),
                                np.concatenate((ps['keypoints'].numpy(),
                                                ps['kp_score'].numpy()), axis=1),
                                ps['kp_score'].mean().numpy()) for ps in poses]
    tracker.update(detections)

    # Predict Actions of each track.
    for i, track in enumerate(tracker.tracks):
        if not track.is_confirmed():
            continue

        track_id = track.track_id
        bbox = track.to_tlbr().astype(int)
        center = track.get_center().astype(int)

        action = 'pending..'
        clr = (0, 255, 0)
        # Use 30 frames time-steps to prediction.
        if len(track.keypoints_list) == 30:
            pts = np.array(track.keypoints_list, dtype=np.float32)
            out = action_model.predict(pts, frame.shape[:2])
            action_name = action_model.class_names[out[0].argmax()]
            
            img = cv2.imread(path)
            print("Load ", img.shape)
            # img = cv2.resize(img, (1920, 1920))
            # print(img.shape)
            # img = draw_single(img, track.keypoints_list[-1])
            # print('1111111111111111')
            name = str(action_name) + " : " + str(out[0].max() * 100)
            font = cv2.FONT_HERSHEY_DUPLEX
            cv2.putText(img, name, (15, 15), font, 0.5, (255, 255, 255), 1)
            r = cv2.imwrite("./input/framesRec2/frameRec" + str(x) + ".jpg", img)
            print("./input/framesRec2/frameRec" + str(x) + ".jpg")
            print("is stored")

In [None]:
# Frames Building
import cv2

img_array = []

def toCheck(frame):
    x = 0
    if frame is None:
        return None

def openFrame(x):
    path = "./input/framesRec2/frameRec" + str(x) + ".jpg"
    frame = cv2.imread(path)
    return frame
    
for x in range(1998):
    frame = openFrame(x)
    if frame is None:
        x += 1
        print (x)
        toCheck(frame)
    else:   
        height, width, layers = frame.shape
        size = (width, height)
        img_array.append(frame)

        output = cv2.VideoWriter('./output/28122021RecFrames.avi', cv2.VideoWriter_fourcc(*'DIVX'), 15, size)

for i in range(len(img_array)):
    output.write(img_array[i])
output.release()