In [None]:
import numpy as np
import pandas as pd
import cv2
import glob
import os
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dropout, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import joblib

import mediapipe as mp
import warnings 





In [None]:
def image_to_video(input_path, output_path, fps=18):
    images = sorted(glob.glob(os.path.join(input_path, '*.png'))) # Sort the images in ascending order

    frame = cv2.imread(images[0])
    height, width, layers = frame.shape
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    
    video = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    for image in images:
        video.write(cv2.imread(image))
    
    video.release()
    
    print("""Video saved as {}""".format(output_path))

In [None]:
def concat_csv_files(input_folder):
    combined_df = pd.DataFrame()
    sorted_files = sorted(glob.glob(os.path.join(input_folder, '*.csv')))
    for file in sorted_files:
        df = pd.read_csv(file)
        combined_df = pd.concat([combined_df, df[4:]], ignore_index=True)
    
    return combined_df

In [None]:
# Create sequences with a fixed window size
def create_sequences(X, y, window_size):
    Xs, ys = [], []
    for i in range(len(X) - window_size):
        Xs.append(X[i:i+window_size])
        ys.append(y[i+window_size])
    return np.array(Xs), np.array(ys)

In [None]:

class LSTMModel:
    def __init__(self, input_shape, num_classes):
        self.model = Sequential()
        self.model.add(LSTM(64, input_shape=input_shape, return_sequences=True))
        self.model.add(Dropout(0.2))
        self.model.add(LSTM(32))
        self.model.add(Dropout(0.2))
        self.model.add(Dense(num_classes, activation='softmax'))

    def compile(self):
        self.model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
        
    def summary(self):
        return self.model.summary()
    
    def fit(self, X_seq, y_seq, epochs=10, batch_size=32, validation_split=0.2, shuffle=True):
        return self.model.fit(X_seq, y_seq, epochs=epochs, batch_size=batch_size, validation_split=validation_split, shuffle=shuffle)
    
    def evaluate(self, X_seq, y_seq):
        return self.model.evaluate(X_seq, y_seq)
    
    def predict(self, X_seq):
        return self.model.predict(X_seq)

In [None]:
def feature_posses_from_frame(results, mp_pose, frame_count):
    df = []
    
    main_column_names = [
        'head_x',
        'head_y',
        'l_shoulder_x',
        'l_shoulder_y',
        'l_belly_x',
        'l_belly_y',
        'l_knee_x',
        'l_knee_y',
        'r_shoulder_x',
        'r_shoulder_y',
        'r_belly_x',
        'r_belly_y',
        'r_knee_x',
        'r_knee_y']

    if results.pose_landmarks is not None:
        # Extract key points
        landmarks = results.pose_landmarks.landmark

        # Get coordinates of head, shoulder, belly, and knees
        head_x = landmarks[mp_pose.PoseLandmark.NOSE.value].x
        head_y = landmarks[mp_pose.PoseLandmark.NOSE.value].y

        l_shoulder_x = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x
        l_shoulder_y = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y
        l_belly_x = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x
        l_belly_y = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y
        l_knee_x = landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x
        l_knee_y = landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y

        r_shoulder_x = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x
        r_shoulder_y = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y
        r_belly_x = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x
        r_belly_y = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y
        r_knee_x = landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x
        r_knee_y = landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y

        df.append({'frame': frame_count,
                   'head_x': head_x,
                   'head_y': head_y,
                   'l_shoulder_x': l_shoulder_x,
                   'l_shoulder_y': l_shoulder_y,
                   'l_belly_x': l_belly_x,
                   'l_belly_y': l_belly_y,
                   'l_knee_x': l_knee_x,
                   'l_knee_y': l_knee_y,
                   'r_shoulder_x': r_shoulder_x,
                   'r_shoulder_y': r_shoulder_y,
                   'r_belly_x': r_belly_x,
                   'r_belly_y': r_belly_y,
                   'r_knee_x': r_knee_x,
                   'r_knee_y': r_knee_y})

        df = pd.DataFrame(df)

        # adding new columns for difference of previous frames
        for i in range(4):
            for col in main_column_names:
                new_column_name = f'{col}diff_{i + 1}frame_before'
                df[new_column_name] = np.nan


    return df

In [None]:
def test_video(input_path, output_path, model):
    cap = cv2.VideoCapture(input_path)
    
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    
    
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose()
    
    frame_queue = []
    
    preds = []
    
    final_data_frame = pd.DataFrame()
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    
    output_video = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
    
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # Convert frame to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Detect pose
        results = pose.process(rgb_frame)
        
        if results.pose_landmarks is not None:
            # Extract key points
            df = feature_posses_from_frame(results, mp_pose, frame_count)
            
            main_column_names = [
                'head_x',
                'head_y',
                'l_shoulder_x',
                'l_shoulder_y',
                'l_belly_x',
                'l_belly_y',
                'l_knee_x',
                'l_knee_y',
                'r_shoulder_x',
                'r_shoulder_y',
                'r_belly_x',
                'r_belly_y',
                'r_knee_x',
                'r_knee_y']
            
            
            # adding previous frames information to the current frame feature matrix
            if len(frame_queue) <= 4:
                for i in range(min(len(frame_queue), 4)):
                    for col in main_column_names:
                        df[f"{col}diff_{i + 1}frame_before"] = df[col] - frame_queue[-(i + 1)][col]
                frame_queue.append(df)
            else:
                frame_queue.pop(0)
                for i in range(4):
                    for col in main_column_names:
                        df[f"{col}diff_{i + 1}frame_before"] = df[col] - frame_queue[-(i + 1)][col]
                frame_queue.append(df)

            final_data_frame = pd.concat([final_data_frame, df], ignore_index=True, sort=False)

         
            features = np.array(df)
    
            features = loaded_scaler.transform(features)
            
            seq = []
    
            seq.append(features)
            seq = np.array(seq)
            prediction = model.predict(seq)
            prediction = np.argmax(prediction)
            preds.append(prediction)
            
            if len(frame_queue) <= 4:
                label = "NO PREDICTION"
            elif prediction == 0:
                label = "FELL"
            elif prediction == 1:
                label = "IDLE"
            elif prediction == 2:
                label = "FALLING"
            
            
            cv2.putText(frame, label, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
            
            output_video.write(frame)
            
            frame_count += 1
    
    
    cap.release()
    output_video.release()
    
    final_data_frame.to_csv(f'{input_path}.csv', index=False)
    
    return preds
    
    

In [None]:
# getting train data
dataset = concat_csv_files("train")

X = dataset.drop("label", axis=1).values
y = dataset["label"].values

scaler = MinMaxScaler()

X = scaler.fit_transform(X)
y = to_categorical(y+1)


# save scaler
joblib.dump(scaler, 'scaler_params.pkl')

# load scaler
loaded_scaler = joblib.load('scaler_params.pkl')

X_seq, y_seq = create_sequences(X, y, 9)

In [None]:
warnings.filterwarnings('ignore') 
model = LSTMModel((X_seq.shape[1], X_seq.shape[2]), 3)
model.compile()
model.summary()

In [None]:
model.fit(X_seq, y_seq, epochs=100, batch_size=128, validation_split=0.2, shuffle=True)
model.model.save('model_saved.keras')

In [None]:
from tensorflow.keras.models import load_model
loaded_model=load_model("model_saved.keras")
prediction=test_video("camera2.mp4", "output2.mp4", loaded_model)
