In [1]:
import os
import cv2
import json
import time
import math
import copy
import numpy as np
import pandas as pd
import mediapipe as mp
import tensorflow as tf
from tensorflow import keras

In [2]:
def find_pose(img, pose, mp_pose):
    imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    results = pose.process(imgRGB)
    coords = {}
    
    if results.pose_landmarks:            
        # Knee
        right_knee_x = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_KNEE].x
        right_knee_y = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_KNEE].y
        left_knee_x = results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_KNEE].x
        left_knee_y = results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_KNEE].y
        
        right_knee_cor = right_knee_x, right_knee_y
        left_knee_cor = left_knee_x, left_knee_y
        
        coords.update({
            "knee coordinates": {
                "right knee": (right_knee_x, right_knee_y),
                "left knee": (left_knee_x, left_knee_y)
            }
        })
        
    imgBGR = cv2.cvtColor(imgRGB, cv2.COLOR_RGB2BGR)
    return img, results, coords

In [3]:
def find_position(img, results):
    pose_list = np.array([[res.x, res.y] for res in results.pose_landmarks.landmark] if results.pose_landmarks else np.zeros((32, 2)))              
    return pose_list

In [4]:
def find_angle(img, p1, p2, p3, pose_list):
    
    # Get the landmarks
    x1, y1 = pose_list[p1]
    x2, y2 = pose_list[p2]
    x3, y3 = pose_list[p3]
    
    # Calculate the Angle
    angle = math.degrees(math.atan2(y3 - y2, x3 - x2) - math.atan2(y1 - y2, x1 - x2))
    if angle < 0:
        angle += 360    
    return angle

In [5]:
def pose_detector(vid, num_frames):
    mp_drawing = mp.solutions.drawing_utils
    mp_drawing_styles = mp.solutions.drawing_styles
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    
    count_frames = 0
    cap = cv2.VideoCapture(vid)
    joint_angles = {
        "right knee": [],
        "left knee": []
                        }
    
    i = 0
    with pose:
        while True:
            success, image = cap.read()
            image = cv2.resize(image, (649, 400))
            if not success:
                print("Ignoring empty camera frame.")
                break             
            
            image, results, coords = find_pose(image, pose, mp_pose)
            pose_list = find_position(image, results)
           
            if len(pose_list) != 0:                
                # Right leg
                right_knee_angles = find_angle(image, 24, 26, 28, pose_list)           
                joint_angles["right knee"].append(int(right_knee_angles))
                
                # Left leg
                left_knee_angles = find_angle(image, 23, 25, 27, pose_list)
                joint_angles["left knee"].append(int(left_knee_angles))           
        
            count_frames += 1
            i += 1
            
            if num_frames == count_frames:                
                break            
            
            # Show to screen - Uncomment to see video
            # cv2.imshow('Feed', image)
            
            # Break - End video with letter q on keyboard - Uncomment if you are watching video
            # if cv2.waitKey(10) & 0xFF == ord('q'):
            #     break      
           
    
    cap.release()
    cv2.destroyAllWindows()
    
    return joint_angles, coords

In [6]:
num_frames = 15

In [7]:
def get_joint_angles_and_coords(path):    
    players = {}
    x_y_cors = {}
    dir_list = os.listdir(path)
    
    for i, vid in enumerate(dir_list):    
        joint_angles, coords = pose_detector(f"{path}/{vid}", num_frames=num_frames)
        players[f"joint_angles_{i+1}"] = joint_angles
        x_y_cors[f"joint_cors_{i+1}"] = coords
        
    return players, x_y_cors

In [8]:
def get_joint_angle_sequences(joint_angles):    
    players_joint_angles = []
    
    for i, joint_angle in enumerate(joint_angles):
        joint_angle = joint_angles[joint_angle]
        player_joint_angles = []
   
        for j in range(num_frames):            
            player_joint_angles.extend([joint_angle['right knee'][j], joint_angle['left knee'][j]])
        players_joint_angles.append(player_joint_angles) 
        
    return np.array(players_joint_angles)

In [9]:
def extract_tag(video_path):
    return video_path.split("_")[1]

In [10]:
# Create a dataframe having video names
def create_dataframe(videos):    
    df = pd.DataFrame()
    df['video_name'] = videos   
    df["tag"] = df["video_name"].apply(extract_tag)
    return df

In [11]:
def process_label(data):    
    label_processor = keras.layers.StringLookup(
        num_oov_indices=0, vocabulary=np.unique(data)
    )
    
    return label_processor

## Extract Train Data

In [12]:
train_videos = os.listdir("soccer_train")
train_df = create_dataframe(train_videos)
train_df.head()

Unnamed: 0,video_name,tag
0,v_SoccerDribbling_g24_c01.mp4,SoccerDribbling
1,v_SoccerDribbling_g24_c010.mp4,SoccerDribbling
2,v_SoccerDribbling_g24_c012.mp4,SoccerDribbling
3,v_SoccerDribbling_g24_c013.mp4,SoccerDribbling
4,v_SoccerDribbling_g24_c016.mp4,SoccerDribbling


In [13]:
train_players, train_coords = get_joint_angles_and_coords("soccer_train")

In [14]:
train_joint_sequences = get_joint_angle_sequences(train_players)
train_joint_sequences.shape

(40, 30)

In [15]:
train_joint_sequences[0]

array([178, 193, 195, 211, 203, 200, 204, 209, 218, 194, 218, 193, 204,
       191, 214, 193, 212, 194, 202, 184, 185, 188, 172, 198, 177, 203,
       179, 192, 200, 184])

In [16]:
train_label_processor = process_label(train_df["tag"])
train_label_processor.get_vocabulary()

['SoccerDribbling', 'SoccerJuggling']

In [17]:
labels = train_df["tag"]
labels = train_label_processor(labels).numpy()
labels

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], dtype=int64)

In [18]:
labels.shape

(40,)

In [19]:
train_data, train_labels = train_joint_sequences, labels

In [20]:
train_data.shape, train_labels.shape

((40, 30), (40,))

## Extract Test Data

In [21]:
val_videos = os.listdir("soccer_val")
val_df = create_dataframe(val_videos)
val_df.head()

Unnamed: 0,video_name,tag
0,v_SoccerDribbling_g24_c014.mp4,SoccerDribbling
1,v_SoccerDribbling_g24_c017.mp4,SoccerDribbling
2,v_SoccerDribbling_g24_c02.mp4,SoccerDribbling
3,v_SoccerDribbling_g24_c023.mp4,SoccerDribbling
4,v_SoccerDribbling_g24_c024.mp4,SoccerDribbling


In [22]:
val_players, val_coords = get_joint_angles_and_coords("soccer_val")

In [23]:
val_joint_sequences = get_joint_angle_sequences(val_players)
val_joint_sequences.shape

(17, 30)

In [24]:
val_label_processor = process_label(val_df["tag"])
val_label_processor.get_vocabulary()

['SoccerDribbling', 'SoccerJuggling']

In [25]:
labels = val_df["tag"]
labels = val_label_processor(labels).numpy()
labels

array([0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], dtype=int64)

In [26]:
labels.shape

(17,)

In [27]:
val_data, val_labels = val_joint_sequences, labels

In [28]:
val_data.shape, val_labels.shape

((17, 30), (17,))

In [32]:
# Make directory to save numpy files
try:
    print("Creating train folder...")
    os.mkdir(f"train_data/")
    print("Creating val folder...")
    os.mkdir(f"val_data/")
except FileExistsError:
    print("Directory already exist")
except:
    print("Unforseen circumstance")

Creating train folder...
Creating val folder...


In [29]:
# Save train data as npy file
np.save('train_data/encoded_data/pose_train_data', train_data)
np.save('train_data/normalized_data/pose_train_data', train_data)
np.save('train_data/pose_train_labels', train_labels)

In [30]:
# Save val data as npy file
np.save('val_data/encoded_data/pose_val_data', val_data)
np.save('val_data/normalized_data/pose_val_data', val_data)
np.save('val_data/pose_val_labels', val_labels)

In [31]:
try:
    print("Creating sequence coordinates folder...")
    os.mkdir(f"sequence_coordinates/")
except FileExistsError:
    print("Directory already exist")
except:
    print("Unforseen circumstance")

Creating sequence coordinates folder...
Directory already exist


## Store Sequence (x, y) Coordinates for Visualization

In [32]:
# Serializing json for train sequence coordinates
json_object = json.dumps(train_coords, indent=4)
 
# Writing to train_sequence_coords.json
with open("sequence_coordinates/train_sequence_coords.json", "w") as outfile:
    outfile.write(json_object)

In [33]:
# Serializing json for val sequence coordinates
json_object = json.dumps(val_coords, indent=4)
 
# Writing to train_sequence_coords.json
with open("sequence_coordinates/val_sequence_coords.json", "w") as outfile:
    outfile.write(json_object)