<a href="https://colab.research.google.com/github/GauravShinde013/AngularTraining/blob/master/Action_Detection_using_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
%%capture
!pip install mediapipe
!pip install tensorflow

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Initialize MediaPipe Holistic model
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# Path to the dataset
DATA_PATH = '/content/drive/MyDrive/ISL_CSLRT_Corpus/Frames_Sentence_Level'
CSV_PATH = '/content/drive/MyDrive/ISL_CSLRT_Corpus/corpus_csv_files/ISL_CSLRT_Corpus_frame_details.csv'

# Define actions (this should be dynamic based on your dataset)
actions = np.array(os.listdir(DATA_PATH))
# Limit to a subset of sentences for quicker testing
LIMIT = 10
actions = actions[:LIMIT]
# Videos are going to be 30 frames in length
sequence_length = 30

# Function to extract keypoints using MediaPipe
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

# Function to process images and extract keypoints
def process_images(action, sequence_folders, sequence_length=30):
    sequences = []
    labels = []

    for subfolder in sequence_folders:
        subfolder_path = os.path.join(DATA_PATH, action, subfolder)
        frames = sorted(os.listdir(subfolder_path))
        window = []

        for frame_name in frames:
            frame_path = os.path.join(subfolder_path, frame_name)
            frame_path = frame_path.replace("\\", "/")
            if os.path.exists(frame_path):
                image = cv2.imread(frame_path)
                if image is None:
                    print(f"Failed to load image {frame_path}")
                    continue
                image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                # Process the image to extract keypoints
                with mp_holistic.Holistic(static_image_mode=True) as holistic:
                    results = holistic.process(image_rgb)
                    keypoints = extract_keypoints(results)
                    window.append(keypoints)
            else:
                print(f"Frame {frame_path} does not exist.")

        if len(window) >= sequence_length:
            for start in range(0, len(window) - sequence_length + 1, sequence_length):
                sequences.append(window[start:start + sequence_length])
                labels.append(action)
        elif len(window) > 0:
            sequences.append(window[:sequence_length] + [np.zeros_like(window[0])] * (sequence_length - len(window)))
            labels.append(action)

    return sequences, labels

# Read the CSV file
df = pd.read_csv(CSV_PATH)
df['Frames path'] = df['Frames path'].str.replace('\\', '/')
df['Sentence'] = df['Sentence'].str.lower().str.replace(' ', '_')

# Process the dataset
all_sequences = []
all_labels = []
for action in actions:
    action_path = os.path.join(DATA_PATH, action)
    sequence_folders = [folder for folder in os.listdir(action_path) if os.path.isdir(os.path.join(action_path, folder))]

    # Process images for the current action
    sequences, labels = process_images(action, sequence_folders)
    if sequences:  # Only add non-empty sequences
        all_sequences.extend(sequences)  # Flattened directly here
        all_labels.extend(labels)

# Convert to NumPy arrays
X = np.array(all_sequences)
label_map = {label: num for num, label in enumerate(actions)}
y = to_categorical([label_map[label] for label in all_labels], num_classes=len(actions))

# Print the shapes of the arrays to verify the data
print(X.shape)
print(y.shape)




(14, 30, 1662)
(14, 2)


In [None]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)

print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

X_train shape: (13, 30, 1662), y_train shape: (13, 2)
X_test shape: (1, 30, 1662), y_test shape: (1, 2)


# Build and Train LSTM Neural Network

In [None]:
from tensorflow import keras
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.callbacks import TensorBoard

# Define the model
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30, 1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

# Compile the model
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

# Print the model summary
model.summary()




Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 30, 64)            442112    
                                                                 
 lstm_1 (LSTM)               (None, 30, 128)           98816     
                                                                 
 lstm_2 (LSTM)               (None, 64)                49408     
                                                                 
 dense (Dense)               (None, 64)                4160      
                                                                 
 dense_1 (Dense)             (None, 32)                2080      
                                                                 
 dense_2 (Dense)             (None, 2)                 66        
                                                                 
Total params: 596642 (2.28 MB)
Trainable params: 596642 

Tensor board

In [None]:
# Setup TensorBoard
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

# Train the model
model.fit(X_train, y_train, epochs=200, callbacks=[tb_callback])


Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

<keras.src.callbacks.History at 0x7ea3e5c14760>

In [None]:
# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test loss: {loss}, Test accuracy: {accuracy}")


Test loss: 0.6601467132568359, Test accuracy: 1.0


In [None]:
# Make predictions
res = model.predict(X_test)

# Print predictions and true labels
for i in range(len(X_test)):
    print(f"Predicted: {actions[np.argmax(res[i])]}, True: {actions[np.argmax(y_test[i])]}")


Predicted: bring water for me, True: bring water for me


In [None]:
%%capture
pip install gtts


In [None]:
from gtts import gTTS
import os
import numpy as np


In [None]:
# Function to convert text to speech
def text_to_speech(text, lang='en'):
    tts = gTTS(text=text, lang=lang)
    filename = "output.mp3"
    tts.save(filename)
    os.system(f"mpg321 {filename}")

# Make predictions
res = model.predict(X_test)

# Print predictions and true labels
for i in range(len(X_test)):
    predicted_text = actions[np.argmax(res[i])]
    true_text = actions[np.argmax(y_test[i])]
    print(f"Predicted: {predicted_text}, True: {true_text}")

    # Convert the predicted text to speech
    text_to_speech(predicted_text)


Predicted: bring water for me, True: bring water for me


In [None]:
import cv2
import os

VIDEO_PATH = '/content/drive/MyDrive/ISL_CSLRT_Corpus/Videos_Sentence_Level'
FRAME_PATH = '/content/drive/MyDrive/ISL_CSLRT_Corpus/Extracted_Frames'
LIMIT = 2  # Limit to a subset of sentences for quicker testing

def extract_frames_from_videos(video_path, frame_path, limit):
    if not os.path.exists(frame_path):
        os.makedirs(frame_path)

    sentences = os.listdir(video_path)[:limit]

    for sentence in sentences:
        sentence_path = os.path.join(video_path, sentence)
        if os.path.isdir(sentence_path):
            for video_file in os.listdir(sentence_path):
                video_file_path = os.path.join(sentence_path, video_file)
                cap = cv2.VideoCapture(video_file_path)
                frame_count = 0
                sentence_frame_path = os.path.join(frame_path, sentence, video_file.split('.')[0])

                if not os.path.exists(sentence_frame_path):
                    os.makedirs(sentence_frame_path)

                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                    frame_file_path = os.path.join(sentence_frame_path, f"frame_{frame_count}.jpg")
                    cv2.imwrite(frame_file_path, frame)
                    frame_count += 1

                cap.release()

# Extract frames from videos
extract_frames_from_videos(VIDEO_PATH, FRAME_PATH, LIMIT)


In [None]:
def process_frames(frame_path, sequence_length=30):
    sequences = []
    labels = []

    for sentence in os.listdir(frame_path):
        sentence_path = os.path.join(frame_path, sentence)
        if not os.path.isdir(sentence_path):
            continue
        for video_folder in os.listdir(sentence_path):
            video_folder_path = os.path.join(sentence_path, video_folder)
            if not os.path.isdir(video_folder_path):
                continue
            frames = sorted(os.listdir(video_folder_path))
            window = []

            for frame_name in frames:
                frame_path = os.path.join(video_folder_path, frame_name)
                frame_path = frame_path.replace("\\", "/")  # Ensure correct path format
                if os.path.exists(frame_path):
                    image = cv2.imread(frame_path)
                    if image is None:
                        print(f"Failed to load image {frame_path}")
                        continue
                    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                    # Process the image to extract keypoints
                    with mp_holistic.Holistic(static_image_mode=True) as holistic:
                        results = holistic.process(image_rgb)
                        keypoints = extract_keypoints(results)
                        window.append(keypoints)
                else:
                    print(f"Frame {frame_path} does not exist.")

            if len(window) >= sequence_length:
                for start in range(0, len(window) - sequence_length + 1, sequence_length):
                    sequences.append(window[start:start + sequence_length])
                    labels.append(sentence)
            elif len(window) > 0:
                sequences.append(window[:sequence_length] + [np.zeros_like(window[0])] * (sequence_length - len(window)))
                labels.append(sentence)

    return sequences, labels

# Process the extracted frames
all_sequences, all_labels = process_frames(FRAME_PATH)

# Convert to NumPy arrays
X = np.array(all_sequences)
label_map = {label: num for num, label in enumerate(set(all_labels))}
y = to_categorical([label_map[label] for label in all_labels], num_classes=len(label_map))

# Print the shapes of the arrays to verify the data
print(X.shape)
print(y.shape)


(18, 30, 1662)
(18, 1)


# Split the Data and Train the Model

In [None]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)

# Define the model (as done previously)
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30, 1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(len(label_map), activation='softmax'))

# Compile the model
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=200, callbacks=[tb_callback])

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test loss: {loss}, Test accuracy: {accuracy}")




Epoch 1/200


  return dispatch_target(*args, **kwargs)


Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78/200
Epoch 7

In [None]:
from pytube import YouTube

def download_video(youtube_url, save_path):
    yt = YouTube(youtube_url)
    ys = yt.streams.get_highest_resolution()
    ys.download(save_path)

# Example usage
youtube_url = 'https://www.youtube.com/watch?v=your_video_id'
save_path = '/path/to/save/video'
download_video(youtube_url, save_path)


In [None]:
def extract_frames_from_video(video_file_path, frame_output_path):
    cap = cv2.VideoCapture(video_file_path)
    frame_count = 0

    if not os.path.exists(frame_output_path):
        os.makedirs(frame_output_path)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_file_path = os.path.join(frame_output_path, f"frame_{frame_count}.jpg")
        cv2.imwrite(frame_file_path, frame)
        frame_count += 1

    cap.release()

# Example usage
video_file_path = '/path/to/save/video/your_video_file.mp4'
frame_output_path = '/path/to/save/frames'
extract_frames_from_video(video_file_path, frame_output_path)


In [None]:
def process_video_frames(frame_path, sequence_length=30):
    frames = sorted(os.listdir(frame_path))
    sequences = []
    window = []

    for frame_name in frames:
        frame_file_path = os.path.join(frame_path, frame_name)
        frame_file_path = frame_file_path.replace("\\", "/")  # Ensure correct path format
        if os.path.exists(frame_file_path):
            image = cv2.imread(frame_file_path)
            if image is None:
                print(f"Failed to load image {frame_file_path}")
                continue
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Process the image to extract keypoints
            with mp_holistic.Holistic(static_image_mode=True) as holistic:
                results = holistic.process(image_rgb)
                keypoints = extract_keypoints(results)
                window.append(keypoints)
        else:
            print(f"Frame {frame_file_path} does not exist.")

    if len(window) >= sequence_length:
        for start in range(0, len(window) - sequence_length + 1, sequence_length):
            sequences.append(window[start:start + sequence_length])
    elif len(window) > 0:
        sequences.append(window[:sequence_length] + [np.zeros_like(window[0])] * (sequence_length - len(window)))

    return sequences

# Example usage
processed_sequences = process_video_frames(frame_output_path)
X_new_video = np.array(processed_sequences)


In [None]:
# Make predictions on the new video frames
predictions = model.predict(X_new_video)

# Map predictions to actions
predicted_actions = [label_map[np.argmax(pred)] for pred in predictions]

print(predicted_actions)


In [None]:
# Function to convert text to speech
def text_to_speech(text, lang='en'):
    tts = gTTS(text=text, lang=lang)
    filename = "output.mp3"
    tts.save(filename)
    os.system(f"mpg321 {filename}")

# Convert predicted actions to speech
for action in predicted_actions:
    print(f"Predicted: {action}")
    text_to_speech(action)
