In [1]:
import os

In [2]:
import cv2 

In [3]:
import mediapipe as mp

In [4]:
import numpy as np

In [5]:
from sklearn.model_selection import train_test_split

In [6]:
from tensorflow.keras.models import Sequential, load_model

In [7]:
from tensorflow.keras.layers import LSTM, Dense

In [8]:
from tensorflow.keras.utils import to_categorical

In [9]:
from tensorflow.keras.callbacks import EarlyStopping

In [10]:
#Directory for storing data
DATA_DIR="data"
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)

In [11]:
#Defining activities and parameters 
ACTIVITIES=['standing','jogging','jumping','squatting']
NUM_CLASSES=len(ACTIVITIES)
SEQUENCE_LENGTH=30
FRAME_RATE=3

In [12]:
#initializing mediapipe and pose
mp_pose=mp.solutions.pose
pose=mp_pose.Pose()
mp_drawing=mp.solutions.drawing_utils

In [15]:
#function to collect the data 
def collect_data(activity_name):
    print(f"collecting data for:{activity_name}")
    cap=cv2.VideoCapture(0)
    data=[]
    frame_count=0
    while True:
        ret,frame=cap.read()
        if not ret:
            print("error accessing the camera!")
            break

        frame =cv2.flip(frame,1)
        results=pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        if results.pose_landmarks and frame_count % FRAME_RATE==0:
            #extract landmarks
            landmarks=[[lm.x,lm.y,lm.z] for lm in results.pose_landmarks.landmark]
            data.append(np.array(landmarks).flatten())

            #draw landmarks on the franme
            mp_drawing.draw_landmarks(frame,results.pose_landmarks,mp_pose.POSE_CONNECTIONS)

        frame_count+=1
        cv2.putText(frame,f"collecting: {activity_name}",(10,50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
        cv2.imshow("Data collection", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

    #save collceted data 
    data= np.array(data)
    np.save(os.path.join(DATA_DIR, f"{activity_name}.npy"), data)
    print(f"Data for {activity_name} saved with shape {data.shape}")

#collect data for each activity
for activity in ACTIVITIES:
    collect_data(activity)

collecting data for:standing
Data for standing saved with shape (322, 99)
collecting data for:jogging
Data for jogging saved with shape (224, 99)
collecting data for:jumping
Data for jumping saved with shape (115, 99)
collecting data for:squatting
Data for squatting saved with shape (402, 99)


In [16]:
#load and proces data 
def load_data():
    X,y = [], []
    for label, activity in  enumerate(ACTIVITIES):
        data = np. load(os.path.join(DATA_DIR, f"{activity}.npy"))
        sequences= [data[i:i+SEQUENCE_LENGTH] for i in range (len(data)-SEQUENCE_LENGTH)]
        X.extend(sequences)
        y.extend([label]*len(sequences))

    X=np.array(X)
    y=np.array(y)
    return X,y

X,y = load_data()
print(f"loaded data: X.shape={X.shape},y.shape={y.shape}")
#split data
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2, random_state=42)
y_train=to_categorical(y_train,NUM_CLASSES)
y_test=to_categorical(y_test,NUM_CLASSES)
    
        
    

loaded data: X.shape=(943, 30, 99),y.shape=(943,)


In [17]:
#build the lstm model
model=Sequential([
    LSTM(64, return_sequences=True, input_shape=(SEQUENCE_LENGTH, X_train.shape[2])),
    LSTM(32),
    Dense(32,activation="relu"),
    Dense(NUM_CLASSES, activation="softmax")
])

model.compile(optimizer="adam", loss ="categorical_crossentropy", metrics=["accuracy"])
print(model.summary())

#train the model
callbacks = [EarlyStopping(monitor="val_loss", patience=5)]
history= model.fit(
    X_train, y_train,
    validation_data=(X_test,y_test),
    epochs=50,
    batch_size=32,
    callbacks=callbacks
)
    

  super().__init__(**kwargs)


None
Epoch 1/50
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - accuracy: 0.5609 - loss: 1.1856 - val_accuracy: 0.9048 - val_loss: 0.4972
Epoch 2/50
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step - accuracy: 0.9136 - loss: 0.3707 - val_accuracy: 0.9683 - val_loss: 0.1553
Epoch 3/50
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step - accuracy: 0.9586 - loss: 0.1389 - val_accuracy: 1.0000 - val_loss: 0.0461
Epoch 4/50
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step - accuracy: 0.9933 - loss: 0.0525 - val_accuracy: 1.0000 - val_loss: 0.0211
Epoch 5/50
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step - accuracy: 0.9983 - loss: 0.0214 - val_accuracy: 0.9841 - val_loss: 0.0760
Epoch 6/50
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step - accuracy: 0.9850 - loss: 0.0441 - val_accuracy: 1.0000 - val_loss: 0.0098
Epoch 7/50
[1m24/24[0m [32

In [18]:
#save the model
MODEL_PATH="human_activity_rec_model_first.h5"
model.save(MODEL_PATH)
print(f"Model saved to{MODEL_PATH}")




Model saved tohuman_activity_rec_model_first.h5


In [None]:
#for running my model
import time 

#set the desired width and height for the camera window 
ACTIVITIES=['standing','jogging','jumping','squatting']
DESIRED_WIDTH=1020
DESIRED_HEIGHT=720
MODEL_PATH="human_activity_rec_model_first.h5"

#real time prediction
def real_time_prediction():
    print("Starting real time prediction...")
    model=load_model(MODEL_PATH)
    sequence = []
    cap= cv2.VideoCapture(0)

    #mediapipe pose and face detection
    mp_pose=mp.solutions.pose
    pose=mp_pose.Pose()
    mp_drawing=mp.solutions.drawing_utils

    mp_face_detection=mp.solutions.drawing_utils

    mp_face_detection=mp.solutions.face_detection
    face_detection= mp_face_detection.FaceDetection(min_detection_confidence=0.5)
    
    while True:
        ret, frame=cap.read()
        if not ret:
            break

        frame_rgb=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        #Resize the frame to the desired size 
        frame =cv2.resize (frame,(DESIRED_WIDTH,DESIRED_HEIGHT))

        #process pose landmarks and face detection
        pose_results=pose.process(frame_rgb)
        face_results=face_detection.process(frame_rgb)

        if pose_results.pose_landmarks:
            #extract pose landmarks 
            landmarks=[[lm.x,lm.y,lm.z] for lm in pose_results.pose_landmarks.landmark]
            keypoints = np.array(landmarks).flatten()
            sequence.append(keypoints)
            #maintain sequence length
            if len(sequence) > SEQUENCE_LENGTH:
                sequence.pop(0)

            if len(sequence)==SEQUENCE_LENGTH:
                prediction=model.predict(np.expand_dims(sequence, axis=0))
                confidence= np.max(prediction)
                activity=ACTIVITIES[np.argmax(prediction)]

                overlay=frame.copy()
                cv2.rectangle(overlay,(0,0),(frame.shape[1],100),(0,0,0),-1)
                frame= cv2.addWeighted(overlay,0.6,frame,0.4,0)

                #Display activity label
                label=f"Activity: {activity} ({confidence:.2f})"
                cv2.putText(frame, label, (20,60), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255,255,255),3)

            #Draw body bounding box
            body_landmarks=pose_results.pose_landmarks.landmark
            body_x= [lm.x for lm in body_landmarks]
            body_y= [lm.y for lm in body_landmarks]
            h,w, _=frame.shape
            x_min=int(min(body_x)*w)
            x_max=int(max(body_x)*w)
            y_min=int(min(body_y)*h)
            y_max=int(max(body_y)*h)
            cv2.rectangle(frame, (x_min, y_min), (x_max,y_max),(0,255,0),2)

        if face_results.detections:
            for detection in face_results.detections:
                bboxC = detection.location_data.relative_bounding_box
                h, w, _ = frame.shape
                x_min = int(bboxC.xmin * w)
                y_min = int(bboxC.ymin * h)
                box_width = int(bboxC.width * w)
                box_height = int(bboxC.height * h)
                x_max = x_min + box_width
                y_max = y_min + box_height
                cv2.rectangle(frame, (x_min, y_min), (x_max,y_max),(255,0,0),2)


        border_color=(0,255,0)
        border_thickness=5
        cv2.rectangle(frame, (0, 0), (frame.shape[1],frame.shape[0]), border_color, border_thickness)

        footer_overlay=frame.copy()
        cv2.rectangle(footer_overlay, (0, frame.shape[0]-50), (frame.shape[1],frame.shape[0]), (0,0,0), -1)
        frame=cv2.addWeighted(footer_overlay,0.6,frame,0.4,0)
        cv2.putText(frame, "press q to quit", (20,frame.shape[0]-15), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255),2)

        cv2.imshow("professional real time prediction", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        

    cap.release()
    cv2.destroyAllWindows()


real_time_prediction()          
        

Starting real time prediction...




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 692ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 47ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3