In [1]:
import numpy as np
import pandas as pd
import cv2
import glob
import os
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dropout, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import joblib

import mediapipe as mp
import warnings 



In [2]:
def image_to_video(input_path, output_path, fps=18):
    images = sorted(glob.glob(os.path.join(input_path, '*.png'))) # Sort the images in ascending order

    frame = cv2.imread(images[0])
    height, width, layers = frame.shape
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    
    video = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    for image in images:
        video.write(cv2.imread(image))
    
    video.release()
    
    print("""Video saved as {}""".format(output_path))

In [3]:
def concat_csv_files(input_folder):
    combined_df = pd.DataFrame()
    
    for filename in os.listdir(input_folder):
        if filename.endswith('.csv'):
            df = pd.read_csv(os.path.join(input_folder, filename))
            combined_df = pd.concat([combined_df, df], ignore_index=True)
    
    return combined_df

In [4]:
# Create sequences with a fixed window size
def create_sequences(X, y, window_size):
    Xs, ys = [], []
    for i in range(len(X) - window_size):
        Xs.append(X[i:i+window_size])
        ys.append(y[i+window_size])
    return np.array(Xs), np.array(ys)

In [5]:

class LSTMModel:
    def __init__(self, input_shape, num_classes):
        self.model = Sequential()
        self.model.add(LSTM(64, input_shape=input_shape, return_sequences=True))
        self.model.add(Dropout(0.2))
        self.model.add(LSTM(32))
        self.model.add(Dropout(0.2))
        self.model.add(Dense(num_classes, activation='softmax'))

    def compile(self):
        self.model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

    def summary(self):
        return self.model.summary()
    
    def fit(self, X_seq, y_seq, epochs=10, batch_size=32, validation_split=0.2, shuffle=True):
        return self.model.fit(X_seq, y_seq, epochs=epochs, batch_size=batch_size, validation_split=validation_split, shuffle=shuffle)
    
    def evaluate(self, X_seq, y_seq):
        return self.model.evaluate(X_seq, y_seq)
    
    def predict(self, X_seq):
        return self.model.predict(X_seq)

In [6]:
def feature_posses_from_frame(results, mp_pose, frame_count):
    landmarks = results.pose_landmarks.landmark
    
    df = []
    
    # Get coordinates of shoulder, belly, and knees
    lshoulder_x = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x
    lshoulder_y = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y
    lbelly_x = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x
    lbelly_y = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y
    lknee_x = landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x
    lknee_y = landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y
    
    rshoulder_x = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x
    rshoulder_y = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y
    rbelly_x = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x
    rbelly_y = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y
    rknee_x = landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x
    rknee_y = landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y
    
    df.append({"": frame_count,
        'frame': frame_count,
        'lshoulder_x': lshoulder_x,
        'lshoulder_y': lshoulder_y,
        'lbelly_x': lbelly_x,
        'lbelly_y': lbelly_y,
        'lknee_x': lknee_x,
        'lknee_y': lknee_y,
        'rshoulder_x': rshoulder_x,
        'rshoulder_y': rshoulder_y,
        'rbelly_x': rbelly_x,
        'rbelly_y': rbelly_y,
        'rknee_x': rknee_x,
        'rknee_y': rknee_y}) 
    
    df = pd.DataFrame(df)
    
    # adding new columns for difference of previous frames
    for i in range(4):
        for col in [
                'lshoulder_x',
                'lshoulder_y',
                'lbelly_x',
                'lbelly_y',
                'lknee_x',
                'lknee_y',
                'rshoulder_x',
                'rshoulder_y',
                'rbelly_x',
                'rbelly_y',
                'rknee_x',
                'rknee_y'
            ]:
            new_column_name = f'{col}diff_{i+1}frame_before'
            df[new_column_name] = np.nan
                
    return df
        
    

In [7]:
def test_video(input_path, output_path, model):
    cap = cv2.VideoCapture(input_path)
    
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    
    
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose()
    
    frame_queue = []
    
    preds = []
    
    final_data_frame = pd.DataFrame()

    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    
    output_video = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
    
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # Convert frame to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Detect pose
        results = pose.process(rgb_frame)
        
        if results.pose_landmarks is not None:
            # Extract key points
            features = feature_posses_from_frame(results, mp_pose, frame_count)
            
            main_columns = [
                'lshoulder_x',
                'lshoulder_y',
                'lbelly_x',
                'lbelly_y',
                'lknee_x',
                'lknee_y',
                'rshoulder_x',
                'rshoulder_y',
                'rbelly_x',
                'rbelly_y',
                'rknee_x',
                'rknee_y'
            ]
            
            
            # adding previous frames information to the current frame feature matrix
            if frame_count <= 4:
                if frame_count == 0:
                    frame_queue.append(features)
                elif frame_count == 1:
                    for col in main_columns:
                        features[f"{col}diff_1frame_before"] = features[col] - frame_queue[0][col]
                    frame_queue.append(features)
                elif frame_count == 2:
                    for col in main_columns:
                        features[f"{col}diff_1frame_before"] = features[col] - frame_queue[1][col]
                        features[f"{col}diff_2frame_before"] = features[col] - frame_queue[0][col]
                    frame_queue.append(features)
                elif frame_count == 3:
                    for col in main_columns:
                        features[f"{col}diff_1frame_before"] = features[col] - frame_queue[2][col]
                        features[f"{col}diff_2frame_before"] = features[col] - frame_queue[1][col]
                        features[f"{col}diff_3frame_before"] = features[col] - frame_queue[0][col]
                    frame_queue.append(features)
                else:
                    for col in main_columns:
                        features[f"{col}diff_1frame_before"] = features[col] - frame_queue[3][col]
                        features[f"{col}diff_2frame_before"] = features[col] - frame_queue[2][col]
                        features[f"{col}diff_3frame_before"] = features[col] - frame_queue[1][col]
                        features[f"{col}diff_4frame_before"] = features[col] - frame_queue[0][col]
                    frame_queue.append(features)
            else:
                frame_queue.pop(0)
                for col in main_columns:
                    features[f"{col}diff_1frame_before"] = features[col] - frame_queue[3][col]
                    features[f"{col}diff_2frame_before"] = features[col] - frame_queue[2][col]
                    features[f"{col}diff_3frame_before"] = features[col] - frame_queue[1][col]
                    features[f"{col}diff_4frame_before"] = features[col] - frame_queue[0][col]
                frame_queue.append(features)
                
                
            final_data_frame = pd.concat([final_data_frame, features], ignore_index=True)
         
            features = np.array(features)
    
            features = loaded_scaler.transform(features)
            
            seq = []
    
            seq.append(features)
            seq = np.array(seq)
            prediction = model.predict(seq)
            prediction = np.argmax(prediction)
            preds.append(prediction)
            
            label = "no prediction"
            
            if prediction == 0:
                label = "FELL"
            elif prediction == 1:
                label = "IDLE"
            else:
                label = "FALLING"
            
            cv2.putText(frame, label, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
            
            output_video.write(frame)
            
            frame_count += 1
    
    
    cap.release()
    output_video.release()
    
    final_data_frame.to_csv(f'{input_path}.csv', index=False)
    
    return preds
    
    

In [8]:
# getting train data


dataset = concat_csv_files("data/labelled")

X = dataset.drop("label", axis=1).values
y = dataset["label"].values

scaler = MinMaxScaler()

X = scaler.fit_transform(X)
y = to_categorical(y+1)


# save scaler
joblib.dump(scaler, 'scaler_params.pkl')

# load scaler
loaded_scaler = joblib.load('scaler_params.pkl')

X_seq, y_seq = create_sequences(X, y, 9)

In [9]:
warnings.filterwarnings('ignore') 
model = LSTMModel((X_seq.shape[1], X_seq.shape[2]), 3)
model.compile()
model.summary()

In [10]:
model.fit(X_seq, y_seq, epochs=100, batch_size=128, validation_split=0.2, shuffle=True)
model.model.save('model_saved.keras')

Epoch 1/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 19ms/step - accuracy: 0.4732 - loss: 1.0296 - val_accuracy: 0.4291 - val_loss: 1.1140
Epoch 2/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 0.6729 - loss: 0.7784 - val_accuracy: 0.6356 - val_loss: 0.9410
Epoch 3/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 0.7470 - loss: 0.5952 - val_accuracy: 0.6481 - val_loss: 1.0909
Epoch 4/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.7822 - loss: 0.5181 - val_accuracy: 0.6493 - val_loss: 1.2962
Epoch 5/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.7999 - loss: 0.4868 - val_accuracy: 0.6504 - val_loss: 1.4702
Epoch 6/100
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.8367 - loss: 0.4356 - val_accuracy: 0.6629 - val_loss: 1.5514
Epoch 7/100
[1m28/28[0m [32m━━

In [15]:
from tensorflow.keras.models import load_model
loaded_model=load_model("model_saved.keras")
prediction=test_video("data/input.mp4", "data/output.mp4", loaded_model)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 500ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1