In [1]:
import importlib
import numpy as np
import torch
from collections import OrderedDict
import utils

In [2]:
def import_class(name):
    components = name.rsplit('.', 1)
    print(components)
    mod = importlib.import_module(components[0])
    mod = getattr(mod, components[1])
    print(mod)
    return mod

In [3]:
model_args = {"num_classes": 1296, "c2d_type": "resnet18", "conv_type": 2, "use_bn": 1}
loss_weights = {"ConvCTC": 1.0, "SeqCTC": 1.0, "Dist": 10.0}
gloss_dict = np.load('/home/aayush/Thesis/VAC_CSLR/preprocess/phoenix2014/gloss_dict.npy', allow_pickle=True)\
                .item()


model_class = import_class("slr_network.SLRModel")
model = model_class(**model_args,
                    gloss_dict=gloss_dict,
                    loss_weights=loss_weights)

['slr_network', 'SLRModel']
<class 'slr_network.SLRModel'>


In [4]:
def modified_weights(state_dict, modified=False):
    state_dict = OrderedDict([(k.replace('.module', ''), v) for k, v in state_dict.items()])
    if not modified:
        return state_dict
    modified_dict = dict()
    return modified_dict

In [5]:
state_dict = torch.load('resnet18_slr_pretrained_distill25.pt', map_location=torch.device('cpu'))
weights = modified_weights(state_dict['model_state_dict'], False)
model.load_state_dict(weights, strict=True)

<All keys matched successfully>

In [6]:
model.eval()

SLRModel(
  (conv2d): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_run

# Input format modification

In [7]:
import torch.utils.data as data
from utils import video_augmentation
import os
import glob
import cv2

In [8]:
class SampleBaseFeeder(data.Dataset):
    
    def __init__(self, prefix, gloss_dict):
        self.prefix = prefix # image frames path : ./01April_2010_Thursday_heute_default-5/1/*.png
        self.dict = gloss_dict
        self.data_aug = self.transform()
        print("")
        
    def __getitem__(self, idx):
        
        input_data, label, fi = self.read_video(idx)
        input_data, label = self.normalize(input_data, label)
        
        return input_data, torch.LongTensor(label), ''#self.inputs_list[idx]['original_info']

    
    def read_video(self, index, num_glosses=-1):
        # load file info
        fi = {
                "fileid": "01April_2010_Thursday_heute_default-5",
                 "folder": "test/01April_2010_Thursday_heute_default-5/1/*.png",
                 "signer": "Signer04",
                 "label": "",#"ABER FREUEN  MORGEN SONNE SELTEN REGEN",
                 "num_frames": 132,
                 "original_info": "01April_2010_Thursday_heute_default-5|01April_2010_Thursday_heute_default-5/1/*.png|Signer04|ABER FREUEN  MORGEN SONNE SELTEN REGEN"
             }
        
        img_folder = os.path.join(self.prefix)
        img_list = sorted(glob.glob(img_folder))
        label_list = []
        
        for phase in fi['label'].split(" "):
            if phase == '':
                continue
            if phase in self.dict.keys():
                label_list.append(self.dict[phase][0])
                
        return [cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB) for img_path in img_list],\
            label_list, fi

    
    def normalize(self, video, label, file_id=None):
        video, label = self.data_aug(video, label, file_id)
        video = video.float() / 127.5 - 1
        #print(typvideo[0][0])
        #print(label)
        #plt.imshow(cv2.cvtColor(video[0][0], cv2.COLOR_BGR2RGB))
        return video, label

    
    def transform(self):
        print("Apply testing transform.")
        return video_augmentation.Compose([
            video_augmentation.CenterCrop(224),
            #video_augmentation.Resize(0.5),
            video_augmentation.ToTensor(),
        ]) 
    
    
    @staticmethod
    def collate_fn(batch):
        #print("In collate_fn")
        batch = [item for item in sorted(batch, key=lambda x: len(x[0]), reverse=True)]
        video, label, info = list(zip(*batch))
        if len(video[0].shape) > 3:
            max_len = len(video[0])
            video_length = torch.LongTensor([np.ceil(len(vid) / 4.0) * 4 + 12 for vid in video])
            left_pad = 6
            right_pad = int(np.ceil(max_len / 4.0)) * 4 - max_len + 6
            max_len = max_len + left_pad + right_pad
            padded_video = [torch.cat(
                (
                    vid[0][None].expand(left_pad, -1, -1, -1),
                    vid,
                    vid[-1][None].expand(max_len - len(vid) - left_pad, -1, -1, -1),
                )
                , dim=0)
                for vid in video]
            padded_video = torch.stack(padded_video)
        else:
            max_len = len(video[0])
            video_length = torch.LongTensor([len(vid) for vid in video])
            padded_video = [torch.cat(
                (
                    vid,
                    vid[-1][None].expand(max_len - len(vid), -1),
                )
                , dim=0)
                for vid in video]
            padded_video = torch.stack(padded_video).permute(0, 2, 1)
        label_length = torch.LongTensor([len(lab) for lab in label])
        if max(label_length) == 0:
            return padded_video, video_length, [], [], info
        else:
            padded_label = []
            for lab in label:
                padded_label.extend(lab)
            padded_label = torch.LongTensor(padded_label)
            return padded_video, video_length, padded_label, label_length, info
        
    def __len__(self):
        return 1 # as for prediction we just have one folder/video

In [None]:
gloss_dict


In [9]:
pred_dataset = SampleBaseFeeder('./dataset/dataset/01April_2010_Thursday_heute_default-5/1/*.png', gloss_dict)

Apply testing transform.



In [None]:
pred_dataset

In [10]:
data_loader = torch.utils.data.DataLoader(
            pred_dataset,
            batch_size=1,
            shuffle=False,
            drop_last=False,
            num_workers=4, 
            collate_fn=pred_dataset.collate_fn,
        )

In [None]:
data_loader

In [11]:
loader = data_loader
for batch_idx, l_data in enumerate(loader):
    print(batch_idx)
    device = utils.GpuDataParallel()
    vid = device.data_to_device(l_data[0])
    vid_lgt = device.data_to_device(l_data[1])
    label = device.data_to_device(l_data[2])
    label_lgt = device.data_to_device(l_data[3])

with torch.no_grad():
    ret_dict = model(vid, vid_lgt, label=label, label_lgt=label_lgt)


In ToTensor.


  video_length = torch.LongTensor([np.ceil(len(vid) / 4.0) * 4 + 12 for vid in video])


0
In slr_network.py forward:
torch.Size([1, 144, 3, 224, 224])
tensor([144])
<class 'torch.Tensor'>


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
To keep the current behavior, use torch.div(a, b, rounding_mode='trunc'), or for actual floor division, use torch.div(a, b, rounding_mode='floor'). (Triggered internally at  /pytorch/aten/src/ATen/native/BinaryOps.cpp:450.)
  feat_len //= 2


In [12]:
ret_dict

{'framewise_features': tensor([[[0.0794, 0.0794, 0.0794,  ..., 0.1176, 0.1176, 0.1176],
          [0.1410, 0.1410, 0.1410,  ..., 0.8128, 0.8128, 0.8128],
          [1.5184, 1.5184, 1.5184,  ..., 0.9420, 0.9420, 0.9420],
          ...,
          [0.2038, 0.2038, 0.2038,  ..., 0.5262, 0.5262, 0.5262],
          [0.4511, 0.4511, 0.4511,  ..., 0.5688, 0.5688, 0.5688],
          [0.0109, 0.0109, 0.0109,  ..., 0.9165, 0.9165, 0.9165]]]),
 'visual_features': tensor([[[1.0786, 0.0000, 0.1509,  ..., 0.6620, 0.0000, 0.0000]],
 
         [[1.4162, 0.0000, 0.3352,  ..., 1.0442, 0.0000, 0.0000]],
 
         [[0.5913, 0.6186, 0.0000,  ..., 0.7431, 0.0000, 0.0000]],
 
         ...,
 
         [[1.7912, 0.0000, 1.1050,  ..., 5.2073, 0.0000, 0.0000]],
 
         [[3.1430, 0.3526, 0.5447,  ..., 3.7436, 0.0000, 0.6845]],
 
         [[3.0035, 0.8708, 0.0000,  ..., 2.4030, 0.0000, 1.2275]]]),
 'feat_len': tensor([33]),
 'conv_logits': tensor([[[ -7.0832, -23.6861, -23.3820,  ..., -21.5323, -20.8931, -22.28

In [None]:
del model

# For prediction

In [None]:
!pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python mediapipe sklearn matplotlib

In [None]:
import os
from matplotlib import pyplot as plt
import mediapipe as mp
import cv2

In [None]:
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [None]:
def draw_styled_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                              mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             )
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             )
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             )
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )

In [None]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh]), lh, rh

In [None]:
sequence = []
sentence = []
threshold = 0.6
count=0

cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()
        print(frame.shape)
        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        #print(image.shape)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # Prediction logic
        keypoints, lh_keypoints, rh_keypoints = extract_keypoints(results)        
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30 and not (np.sum(lh_keypoints) == 0 and np.sum(rh_keypoints) == 0):
            res = model(image, len(image))
            #print(actions[np.argmax(res)])
            print(res)
            cv2.imwrite("frame_collection/frame%d.jpg" % count, frame) 
            count += 1
            
            
        # Viz logic
            if res[np.argmax(res)] > threshold: 
                if len(sentence) > 0: 
                    if actions[np.argmax(res)] != sentence[-1]:
                        sentence.append(actions[np.argmax(res)])
                else:
                    sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-3:]

            
        #cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('Live SLR Detection', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()