In [None]:
!python -m pip install mediapipe
!pip install tensorflow scikit-learn
!wget https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/latest/pose_landmarker_lite.task

In [None]:
#KEYPOINT GENERATION WITHOUT Z COORDINATE

import cv2
import mediapipe as mp
import json
import os

# Define the indices of the desired landmarks (11-16 and 23-32)
desired_landmark_indices = list(range(11, 17)) + list(range(23, 33))

def process_video(video_path, output_path):
    # Initialize MediaPipe Pose for each video
    mp_pose = mp.solutions.pose.Pose()

    # Open video capture
    cap = cv2.VideoCapture(video_path)

    # List to store selected landmarks
    selected_landmarks = []

    while True:
        # Read a frame
        ret, frame = cap.read()
        if not ret:
            break

        # Convert the frame to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Process the frame with MediaPipe Pose
        results = mp_pose.process(rgb_frame)

        # Extract and store selected landmarks if available
        if results.pose_landmarks:
            # Create a new list for each frame to store the landmarks
            landmarks_frame = []

            for idx in desired_landmark_indices:
                landmark = results.pose_landmarks.landmark[idx]

                landmarks_frame.append({
                    "index": idx,
                    "name": mp.solutions.pose.PoseLandmark(idx).name,
                    "x": landmark.x,
                    "y": landmark.y
                })

            # Append the landmarks of the current frame to the main list
            selected_landmarks.append(landmarks_frame)

    # Release the video capture
    cap.release()

    # Save selected landmarks in JSON format
    json_data = json.dumps(selected_landmarks, indent=2)
    with open(output_path, 'w') as json_file:
        json_file.write(json_data)

    # Close the MediaPipe Pose instance
    mp_pose.close()

# Specify the directory containing your videos
videos_directory = '/content/drive/MyDrive/RAW FRISBEE FOOTAGE/NO_PIVOT'

# Specify the directory to save the output JSON files
output_directory = '/content/drive/MyDrive/RAW FRISBEE FOOTAGE/NO_PIVOT_JSON'

# Loop through each video file in the specified directory
for video_file in os.listdir(videos_directory):
    if video_file.endswith('.mp4'):  # Adjust the file extension as needed
        video_path = os.path.join(videos_directory, video_file)
        output_path = os.path.join(output_directory, f'{video_file[:-4]}_keypoints.json')  # Create a unique output file for each video
        process_video(video_path, output_path)

print("Processing completed.")


In [None]:
#USE THIS FOR KEYPOINT GENERATION IF WANT INCLUDE Z COORDINATE

import cv2
import mediapipe as mp
import json
import os

# Define the indices of the desired landmarks (11-16 and 23-32)
desired_landmark_indices = list(range(11, 17)) + list(range(23, 33))

def process_video(video_path, output_path, label):
    # Initialize MediaPipe Pose for each video
    mp_pose = mp.solutions.pose.Pose()

    # Open video capture
    cap = cv2.VideoCapture(video_path)

    # List to store selected landmarks
    selected_landmarks = []

    while True:
        # Read a frame
        ret, frame = cap.read()
        if not ret:
            break

        # Convert the frame to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Process the frame with MediaPipe Pose
        results = mp_pose.process(rgb_frame)

        # Extract and store selected landmarks if available
        if results.pose_landmarks:
            # Create a new list for each frame to store the landmarks
            landmarks_frame = []

            for idx in desired_landmark_indices:
                landmark = results.pose_landmarks.landmark[idx]

                landmarks_frame.append({
                    "index": idx,
                    "name": mp.solutions.pose.PoseLandmark(idx).name,
                    "x": landmark.x,
                    "y": landmark.y,
                    "z": landmark.z
                })

            # Append the landmarks of the current frame to the main list
            selected_landmarks.append(landmarks_frame)

    # Release the video capture
    cap.release()

    # Save selected landmarks in JSON format
    json_data = json.dumps(selected_landmarks, indent=2)
    with open(output_path, 'w') as json_file:
        json_file.write(json_data)

    # Close the MediaPipe Pose instance
    mp_pose.close()

def label_videos_by_folder(videos_directory):
    labels = []
    videos = []
    for label_folder in os.listdir(videos_directory):
        label_path = os.path.join(videos_directory, label_folder)
        if os.path.isdir(label_path):
            for video_file in os.listdir(label_path):
                if video_file.endswith('.mp4'):
                    video_path = os.path.join(label_path, video_file)
                    labels.append(label_folder)
                    videos.append(video_path)
    return videos, labels

# Specify the directory containing your videos
videos_directory = '/content/onedrive/FYP/AUGMENTED'

# Specify the directory to save the output JSON files
output_directory = '/content/onedrive/FYP/JSON'

# Label videos based on the folder they are in
video_paths, labels = label_videos_by_folder(videos_directory)

#USE THIS FOR UNAUGMENTED
# Loop through each video file in the specified directory
#for video_path, label in zip(video_paths, labels):
    # Get the video name without the extension
    #video_name = os.path.basename(video_path)[:-4]
    # Include the folder name (label) as part of the output file name
    #output_path = os.path.join(output_directory, f'{label}_{video_name}_keypoints.json')

#USE THIS FOR AUGMENTED FOLDER
# Loop through each video file in the specified directory
for video_path, label in zip(video_paths, labels):
    # Split the original video name into label and name
    original_label, video_name = os.path.basename(video_path)[:-4].split('_', 1)
    # Include the folder name (label) in between the original label and the video name
    output_path = os.path.join(output_directory, f'{original_label}_{label}_{video_name}_keypoints.json')
    process_video(video_path, output_path, label)

    # Check if the file already exists
    if os.path.exists(output_path):
        print(f"Skipping {output_path} as it already exists.")
        continue

    process_video(video_path, output_path, label)

print("Processing completed.")


In [None]:
#pre-processing 3 coordinate keypoint data

import numpy as np
from keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import json
import os
import joblib

# Initialize lists to hold data and labels
X_data = []
y_data = []

# Specify the directory where the JSON files are located
directory = '/content/onedrive/FYP/JSON'

# Read each JSON file in the directory
for filename in os.listdir(directory):
    if filename.endswith('.json'):
        with open(os.path.join(directory, filename), 'r') as f:
            data = json.load(f)

        # Convert the data to a list of frames, each frame being a list of [x, y, z] coordinates
        sequence_data = [[[entry['x'], entry['y'], entry['z']] for entry in frame] for frame in data]
        X_data.append(sequence_data)


        # Extract the label from the filename and append it to y_data
        label = filename.split('_')[0]
        y_data.append(label)

# Define max_sequence_length as the length of the longest sequence in X_data
max_sequence_length = max(len(sequence) for sequence in X_data)

# Convert data to numpy arrays
X_data = pad_sequences(X_data, maxlen=max_sequence_length, padding='post', dtype='float32')
y_data = np.array(y_data)

# Initialize an encoder to convert labels to numerical format
label_encoder = LabelEncoder()
y_data_encoded = label_encoder.fit_transform(y_data)

print(label_encoder.classes_)
# Save the fitted LabelEncoder
joblib.dump(label_encoder, '/content/onedrive/FYP/label_encoder.joblib')

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data_encoded, test_size=0.2, random_state=42)

# Save the data
np.save('/content/onedrive/FYP/TRAINING/X_train_with_z.npy', X_train)
np.save('/content/onedrive/FYP/TRAINING/X_test_with_z.npy', X_test)
np.save('/content/onedrive/FYP/TRAINING/y_train_test_with_z.npy', y_train)
np.save('/content/onedrive/FYP/TRAINING/y_test_test_with_z.npy', y_test)

['BOTH' 'GOOD' 'NO FLICK' 'PIVOT']


In [None]:
#pre-processing 2 coordinate keypoint data

import numpy as np
from keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import json
import os
import joblib

# Initialize lists to hold data and labels
X_data = []
y_data = []

# Specify the directory where the JSON files are located
directory = '/content/onedrive/FYP/JSON_NO_Z'

# Read each JSON file in the directory
for filename in os.listdir(directory):
    if filename.endswith('.json'):
        with open(os.path.join(directory, filename), 'r') as f:
            data = json.load(f)

        # Convert the data to a list of frames, each frame being a list of [x, y] coordinates
        sequence_data = [[[entry['x'], entry['y']] for entry in frame] for frame in data]
        X_data.append(sequence_data)

        # Extract the label from the filename and append it to y_data
        label = filename.split('_')[0]
        y_data.append(label)

# Define max_sequence_length as the length of the longest sequence in X_data
max_sequence_length = max(len(sequence) for sequence in X_data)

# Convert data to numpy arrays
X_data = pad_sequences(X_data, maxlen=max_sequence_length, padding='post', dtype='float32')
y_data = np.array(y_data)

# Initialize an encoder to convert labels to numerical format
label_encoder = LabelEncoder()
y_data_encoded = label_encoder.fit_transform(y_data)

print(label_encoder.classes_)
# Save the fitted LabelEncoder
joblib.dump(label_encoder, '/content/onedrive/FYP/label_encoder_no_z.joblib')

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data_encoded, test_size=0.2, random_state=42)

# Save the data
np.save('/content/onedrive/FYP/TRAINING/X_train_no_z.npy', X_train)
np.save('/content/onedrive/FYP/TRAINING/X_test_no_z.npy', X_test)
np.save('/content/onedrive/FYP/TRAINING/y_train_no_z.npy', y_train)
np.save('/content/onedrive/FYP/TRAINING/y_test_no_z.npy', y_test)

In [None]:
#training LSTM model with z coordinate
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM, Dense, TimeDistributed, Flatten

# Load the data
X_train = np.load('/content/onedrive/FYP/TRAINING/X_train_with_z.npy')
X_test = np.load('/content/onedrive/FYP/TRAINING/X_test_with_z.npy')
y_train = np.load('/content/onedrive/FYP/TRAINING/y_train_with_z.npy')
y_test = np.load('/content/onedrive/FYP/TRAINING/y_test_with_z.npy')

# Create a Sequential model
model = Sequential()

# Add a TimeDistributed wrapper to apply the LSTM layer to each sequence
model.add(TimeDistributed(LSTM(64, return_sequences=True), input_shape=(60, 16, 3)))
model.add(TimeDistributed(LSTM(64)))

# Flatten the output of the LSTM layers
model.add(Flatten())

# Add a Dense layer
model.add(Dense(128, activation='relu'))

# Add a dense layer with 4 units (for the 4 classes) and a softmax activation function
model.add(Dense(4, activation='softmax'))

# Compile the model
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=32, validation_data=(X_test, y_test), shuffle=True)
#EPOCH 32

# Plot training & validation accuracy values
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

plt.tight_layout()
plt.show()

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print('Loss:', loss)
print('Accuracy:', accuracy)

# Save the model
model.save('/content/onedrive/FYP/MODEL/frisbee.keras')

In [None]:
#training transformer model

import tensorflow as tf
from keras.layers import Layer, MultiHeadAttention, Flatten, TimeDistributed
from keras.models import Sequential
from keras.layers import Dense
import numpy as np
import matplotlib.pyplot as plt

# Load the data
X_train = np.load('/content/onedrive/FYP/TRAINING/X_train_with_z.npy')
X_test = np.load('/content/onedrive/FYP/TRAINING/X_test_with_z.npy')
y_train = np.load('/content/onedrive/FYP/TRAINING/y_train_with_z.npy')
y_test = np.load('/content/onedrive/FYP/TRAINING/y_test_with_z.npy')

# Custom Transformer layer
class TransformerBlock(Layer):
    def __init__(self, num_heads, key_dim, **kwargs):
        super(TransformerBlock, self).__init__(**kwargs)
        self.att = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)

    def call(self, inputs):
        x = self.att(query=inputs, key=inputs, value=inputs)
        return x

# Create a Sequential model
model = Sequential()

# Add the custom Transformer layer
model.add(TimeDistributed(TransformerBlock(num_heads=8, key_dim=8), input_shape=(60, 16, 3)))

# Add a Flatten layer to reduce the dimensionality
model.add(Flatten())

# Add a Dense layer
model.add(Dense(128, activation='relu', kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02)))

# Add a dense layer with 4 units (for the 4 classes) and a softmax activation function
model.add(Dense(4, activation='softmax', kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02)))

# Compile the model
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=33, validation_data=(X_test, y_test), shuffle=True)
#epoch 31

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print('Loss:', loss)
print('Accuracy:', accuracy)

# Plot training & validation accuracy values
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

plt.tight_layout()
plt.show()

# Save the model
model.save('/content/onedrive/FYP/MODEL/transformer_frisbee.keras')

In [None]:
#training LSTM model without z coordinate

from keras.models import Sequential
from keras.layers import LSTM, Dense, Flatten, TimeDistributed
import numpy as np
import matplotlib.pyplot as plt

# Load the data
X_train = np.load('/content/onedrive/FYP/TRAINING/X_train_no_z.npy')
X_test = np.load('/content/onedrive/FYP/TRAINING/X_test_no_z.npy')
y_train = np.load('/content/onedrive/FYP/TRAINING/y_train_no_z.npy')
y_test = np.load('/content/onedrive/FYP/TRAINING/y_test_no_z.npy')

# Create a Sequential model
model = Sequential()

# Add a TimeDistributed wrapper to apply the LSTM layer to each sequence
model.add(TimeDistributed(LSTM(64, return_sequences=True), input_shape=(60, 16, 2)))
model.add(TimeDistributed(LSTM(64)))

# Flatten the output of the LSTM layers
model.add(Flatten())

# Add a Dense layer
model.add(Dense(128, activation='relu'))

# Add a dense layer with 5 units (for the 5 classes) and a softmax activation function
model.add(Dense(4, activation='softmax'))

# Compile the model
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=32, validation_data=(X_test, y_test))

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print('Loss:', loss)
print('Accuracy:', accuracy)

# Plot training & validation accuracy values
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

plt.tight_layout()
plt.show()

# Save the model
model.save('/content/onedrive/FYP/frisbee_no_z.keras')