In [12]:
import cv2
import mediapipe as mp
import os
import csv
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import copy

In [13]:
class ECA(tf.keras.layers.Layer):
    def __init__(self, kernel_size=5, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.kernel_size = kernel_size
        self.conv = tf.keras.layers.Conv1D(1, kernel_size=kernel_size, strides=1, padding="same", use_bias=False)

    def call(self, inputs, mask=None):
        nn = tf.keras.layers.GlobalAveragePooling1D()(inputs, mask=mask)
        nn = tf.expand_dims(nn, -1)
        nn = self.conv(nn)
        nn = tf.squeeze(nn, -1)
        nn = tf.nn.sigmoid(nn)
        nn = nn[:,None,:]
        return inputs * nn

class LateDropout(tf.keras.layers.Layer):
    def __init__(self, rate, noise_shape=None, start_step=0, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.rate = rate
        self.start_step = start_step
        self.dropout = tf.keras.layers.Dropout(rate, noise_shape=noise_shape)
      
    def build(self, input_shape):
        super().build(input_shape)
        agg = tf.VariableAggregation.ONLY_FIRST_REPLICA
        self._train_counter = tf.Variable(0, dtype="int64", aggregation=agg, trainable=False)

    def call(self, inputs, training=False):
        x = tf.cond(self._train_counter < self.start_step, lambda:inputs, lambda:self.dropout(inputs, training=training))
        if training:
            self._train_counter.assign_add(1)
        return x


In [14]:
class CausalDWConv1D(tf.keras.layers.Layer):
    def __init__(self, 
        kernel_size=17,
        dilation_rate=1,
        use_bias=False,
        depthwise_initializer='glorot_uniform',
        name='', **kwargs):
        super().__init__(name=name,**kwargs)
        self.causal_pad = tf.keras.layers.ZeroPadding1D((dilation_rate*(kernel_size-1),0),name=name + '_pad')
        self.dw_conv = tf.keras.layers.DepthwiseConv1D(
                            kernel_size,
                            strides=1,
                            dilation_rate=dilation_rate,
                            padding='valid',
                            use_bias=use_bias,
                            depthwise_initializer=depthwise_initializer,
                            name=name + '_dwconv')
        self.supports_masking = True
        
    def call(self, inputs):
        x = self.causal_pad(inputs)
        x = self.dw_conv(x)
        return x

def Conv1DBlock(channel_size,
          kernel_size,
          dilation_rate=1,
          drop_rate=0.0,
          expand_ratio=2,
          se_ratio=0.25,
          activation='swish',
          name=None):
    '''
    efficient conv1d block, @hoyso48
    '''
    if name is None:
        name = str(tf.keras.backend.get_uid("mbblock"))
    # Expansion phase
    def apply(inputs):
        channels_in = tf.keras.backend.int_shape(inputs)[-1]
        channels_expand = channels_in * expand_ratio

        skip = inputs

        x = tf.keras.layers.Dense(
            channels_expand,
            use_bias=True,
            activation=activation,
            name=name + '_expand_conv')(inputs)

        # Depthwise Convolution
        x = CausalDWConv1D(kernel_size,
            dilation_rate=dilation_rate,
            use_bias=False,
            name=name + '_dwconv')(x)

        x = tf.keras.layers.BatchNormalization(momentum=0.95, name=name + '_bn')(x)

        x  = ECA()(x)

        x = tf.keras.layers.Dense(
            channel_size,
            use_bias=True,
            name=name + '_project_conv')(x)

        if drop_rate > 0:
            x = tf.keras.layers.Dropout(drop_rate, noise_shape=(None,1,1), name=name + '_drop')(x)

        if (channels_in == channel_size):
            x = tf.keras.layers.add([x, skip], name=name + '_add')
        return x

    return apply

In [15]:
class MultiHeadSelfAttention(tf.keras.layers.Layer):
    def __init__(self, dim=256, num_heads=4, dropout=0, **kwargs):
        super().__init__(**kwargs)
        self.dim = dim
        self.scale = self.dim ** -0.5
        self.num_heads = num_heads
        self.qkv = tf.keras.layers.Dense(3 * dim, use_bias=False)
        self.drop1 = tf.keras.layers.Dropout(dropout)
        self.proj = tf.keras.layers.Dense(dim, use_bias=False)
        self.supports_masking = True

    def call(self, inputs, mask=None):
        qkv = self.qkv(inputs)
        qkv = tf.keras.layers.Permute((2, 1, 3))(tf.keras.layers.Reshape((-1, self.num_heads, self.dim * 3 // self.num_heads))(qkv))
        q, k, v = tf.split(qkv, [self.dim // self.num_heads] * 3, axis=-1)

        attn = tf.matmul(q, k, transpose_b=True) * self.scale

        if mask is not None:
            mask = mask[:, None, None, :]

        attn = tf.keras.layers.Softmax(axis=-1)(attn, mask=mask)
        attn = self.drop1(attn)

        x = attn @ v
        x = tf.keras.layers.Reshape((-1, self.dim))(tf.keras.layers.Permute((2, 1, 3))(x))
        x = self.proj(x)
        return x


def TransformerBlock(dim=256, num_heads=4, expand=4, attn_dropout=0.2, drop_rate=0.2, activation='swish'):
    def apply(inputs):
        x = inputs
        x = tf.keras.layers.BatchNormalization(momentum=0.95)(x)
        x = MultiHeadSelfAttention(dim=dim,num_heads=num_heads,dropout=attn_dropout)(x)
        x = tf.keras.layers.Dropout(drop_rate, noise_shape=(None,1,1))(x)
        x = tf.keras.layers.Add()([inputs, x])
        attn_out = x

        x = tf.keras.layers.BatchNormalization(momentum=0.95)(x)
        x = tf.keras.layers.Dense(dim*expand, use_bias=False, activation=activation)(x)
        x = tf.keras.layers.Dense(dim, use_bias=False)(x)
        x = tf.keras.layers.Dropout(drop_rate, noise_shape=(None,1,1))(x)
        x = tf.keras.layers.Add()([attn_out, x])
        return x
    return apply
MAX_LEN = 30
CHANNELS = 258
NUM_CLASSES = 20

# ----------------------------------------- DEFINE MODEL -----------------------------
def get_model(max_len=MAX_LEN, dropout_step=0, dim=192):
    inp = tf.keras.Input((max_len,CHANNELS))
    x = inp
    ksize = 3
    x = Conv1DBlock(dim,ksize,drop_rate=0.3)(x)
    x = TransformerBlock(dim,expand=2)(x)
    x = Conv1DBlock(dim,ksize,drop_rate=0.3)(x)
    x = TransformerBlock(dim,expand=2)(x)

    x = tf.keras.layers.Dense(dim*2,activation=None,name='top_conv')(x)
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    # x = LateDropout(0.2, start_step=dropout_step)(x)
    x = tf.keras.layers.Dense(NUM_CLASSES,name='classifier',activation="softmax")(x)
    return tf.keras.Model(inp, x)

In [16]:
from tensorflow.keras.models import  load_model
model = load_model('1DCNN_transformer.tf')

KeyboardInterrupt: 

In [None]:
from scipy import stats

In [None]:
colors = [(245,117,16) for _ in range(20)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
        
    return output_frame

In [None]:
mp_holistic = mp.solutions.holistic 
mp_drawing = mp.solutions.drawing_utils

width = 640
height = 480
def mediapipe_detection(image, model):
    # từ image, model dự đoán trả về kết quả (định dạng mặc định) 
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections

In [None]:
last = None
def update_mpresult(res,results):
    c = 0
    if results.pose_landmarks:
        for p in results.pose_landmarks.landmark:
            p.x = res[c][0]
            p.y = res[c][1]
            c+=1
    else:
        for _ in range(33):
            c+=1
    if results.left_hand_landmarks:
        for p in results.left_hand_landmarks.landmark:
            p.x = res[c][0]
            p.y = res[c][1]
            c+=1
    else:
        if last and last.left_hand_landmarks: results.left_hand_landmarks = copy.deepcopy(last.left_hand_landmarks)
        for _ in range(21):
            c+=1
    if results.right_hand_landmarks:
        for p in results.right_hand_landmarks.landmark:
            p.x = res[c][0]
            p.y = res[c][1]
            c+=1
    else:
        if last and last.right_hand_landmarks: results.right_hand_landmarks = copy.deepcopy(last.right_hand_landmarks)
        for _ in range(21):
            c+=1
    return results

def normalize_keypoint(res,img=None):
    #normalize keypoint
    x1,y1,x2,y2 = res[11][0]*width,res[11][1]*height,res[12][0]*width,res[12][1]*height
    try:
        cv2.circle(img,(int(x1),int(y1)),4,(0,255,255),-1)
        cv2.circle(img,(int(x2),int(y2)),4,(0,255,255),-1)
    except:
        # print("No img found")
        pass
    dis = np.sqrt((x1-x2)**2+(y1-y2)**2)
    x_cen = (res[11][0]+res[12][0])/2
    y_cen = (res[11][1]+res[12][1])/2
    vector = [0.5-x_cen,0.5-y_cen]
    scale = (200*width/640)/dis
    for i in range(len(res)):
        if res[i][0]==0 and res[i][1]==0:
            continue
        res[i][0] = vector[0]+res[i][0]
        res[i][1] = vector[1]+res[i][1]
        res[i][0] = 0.5+(res[i][0]-0.5)*scale
        res[i][1] = 0.5+(res[i][1]-0.5)*scale
    return res


def extract_keypoint(results):
    global last
    res = []
    if results.pose_landmarks:
        for p in results.pose_landmarks.landmark:
            res.append(np.array([p.x,p.y,p.z,p.visibility]))
    else:
        for _ in range(33):
            res.append(np.array([0,0,0,0]))
    #--------------
    if results.left_hand_landmarks:
        for p in results.left_hand_landmarks.landmark:
            res.append(np.array([p.x,p.y,p.z]))
    elif last!= None and last.left_hand_landmarks:
        for p in last.left_hand_landmarks.landmark:
            res.append(np.array([p.x,p.y,p.z]))
    else:
        for _ in range(21):
            res.append(np.array([0,0,0]))
    #---------------
    if results.right_hand_landmarks:
        for p in results.right_hand_landmarks.landmark:
            res.append(np.array([p.x,p.y,p.z]))
    elif last!=None and last.right_hand_landmarks:
        for p in last.right_hand_landmarks.landmark:
            res.append(np.array([p.x,p.y,p.z]))
    else:
        for _ in range(21):
            res.append(np.array([0,0,0]))
    return res

def extract_keypoints_flatten(result,img = None):
    #đây là hàm chính thức
    res = extract_keypoint(result)
    res = normalize_keypoint(res,img)
    update_mpresult(res,result)
    return np.concatenate([x for x in res])

def numpy_to_filecsv(data,filename):
    with open(filename,"w",newline="") as csvfile:
        writer = csv.writer(csvfile,delimiter=",")
        writer.writerows(data.tolist())

def filecsv_to_numpy(filename,data):
    pass

In [23]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5
num_frame = 30
Data_path = os.path.join('./data_split/Train')
actions = np.array(os.listdir(Data_path))


cap = cv2.VideoCapture(0)
delay = 0
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        ret, frame = cap.read()


        image, results = mediapipe_detection(frame, holistic)
        
        keypoints = extract_keypoints_flatten(results,image)
        draw_landmarks(image=image,results=results)
        # last =  copy.deepcopy(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        image = cv2.flip(image,1)
        if delay !=0:
            delay -=1
        elif delay == 0 and len(sequence)%10 == 0 and len(sequence)>=num_frame:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))

            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                            delay = 30
                    else:
                        sentence.append(actions[np.argmax(res)])
                        delay = 30
            if len(sentence) > 5: 
                sentence = sentence[-5:]
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        

        cv2.imshow('OpenCV Feed', image)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

toi
toi
toi
toi
toi
toi
toi
ban
ban dem
ban dem
ban dem
ban dem
ban dem
xin loi
xin loi
toi
toi
toi
toi
toi
toi
toi
toi
di
ban dem
ban
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
ban
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
moi ngay
ban
