In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import os

# Initialize MediaPipe Pose
class PoseDetector():
    def __init__(self, detectionCon=0.5, trackCon=0.5):
        self.mpPose = mp.solutions.pose
        self.pose = self.mpPose.Pose(
            min_detection_confidence=detectionCon, min_tracking_confidence=trackCon)

    def findPose(self, img):
        imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        self.results = self.pose.process(imgRGB)
        return img

    def findPosition(self, img):
        lmList = []
        if self.results.pose_landmarks:
            h, w, _ = img.shape

            left_hip = self.results.pose_landmarks.landmark[23]
            right_hip = self.results.pose_landmarks.landmark[24]
            mid_hip_x = (left_hip.x + right_hip.x) / 2
            mid_hip_y = (left_hip.y + right_hip.y) / 2

            left_shoulder = self.results.pose_landmarks.landmark[11]
            right_shoulder = self.results.pose_landmarks.landmark[12]
            shoulder_width = abs(left_shoulder.x - right_shoulder.x) + 1e-6

            for lm in self.results.pose_landmarks.landmark:
                cx = (lm.x - mid_hip_x) / shoulder_width
                cy = (lm.y - mid_hip_y) / shoulder_width
                cz = lm.z / shoulder_width
                lmList.append([cx, cy, cz])

        return np.array(lmList).flatten() if lmList else np.zeros(99)

def extract_landmarks(video_path, label, max_frames=1000):
    if not os.path.exists(video_path):
        print(f"⚠️ ERROR: Video file '{video_path}' not found.")
        return []

    cap = cv2.VideoCapture(video_path)
    detector = PoseDetector()
    all_landmarks = []

    frame_count = 0

    while cap.isOpened():
        success, img = cap.read()
        if not success or frame_count >= max_frames:
            break

        img = detector.findPose(img)
        lm_data = detector.findPosition(img)

        if lm_data is not None and np.any(lm_data):
            all_landmarks.append(np.append(lm_data, label))
            frame_count += 1

    cap.release()

    print(f"✅ Extracted {frame_count} frames from {video_path}")
    return all_landmarks

# Define Activities (Danger = 0, Non-Danger = 1)
activities = {
    "fighting.mp4": 0,
    "Kick1.mp4": 0,
    "Punch1.mp4": 0,
    "Shove1.mp4": 0,
    "slap1.mp4": 0,
    "Nun1.mp4": 0,
    "Shoot1.mp4": 0,
    "sitting.mp4": 1,
    "walking3.mp4": 1,
    "jumping.mp4": 1,
    "bowling1.mp4": 1
}

# Extract Data
all_data = []
for video, label in activities.items():
    all_data.extend(extract_landmarks(video, label))

# Save Dataset
if all_data:
    df = pd.DataFrame(all_data)
    df.to_csv("pose_dataset.csv", index=False)
    print("✅ Dataset saved as 'pose_dataset.csv' 🚀")
else:
    print("⚠️ No data extracted. Check video files.")

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional

# Load Dataset
csv_file = "pose_dataset.csv"
df = pd.read_csv(csv_file).values
X = df[:, :-1]
y = df[:, -1]

# Prepare Sequences for LSTM
TIME_STEPS = 30
FEATURES = 99

X_seq, y_seq = [], []
for i in range(len(X) - TIME_STEPS):
    X_seq.append(X[i:i+TIME_STEPS])
    y_seq.append(y[i+TIME_STEPS])

X_seq, y_seq = np.array(X_seq), np.array(y_seq)

# Split Train & Test
X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, random_state=42)

# Define LSTM Model
model = Sequential([
    Bidirectional(LSTM(128, return_sequences=True, input_shape=(TIME_STEPS, FEATURES))),
    Dropout(0.3),
    Bidirectional(LSTM(128)),
    Dense(64, activation="relu"),
    Dropout(0.3),
    Dense(2, activation="softmax")  # 2 Classes: Danger (0) & Non-Danger (1)
])

# Compile Model
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["sparse_categorical_accuracy"])

# Train Model
model.fit(X_train, y_train, epochs=50, batch_size=64, validation_data=(X_test, y_test))

# Save Model
model.save("lstm_activity_model.h5")
print("✅ Model trained & saved as 'lstm_activity_model.h5'")

In [None]:
import cv2
import numpy as np
import tensorflow as tf

# Load trained model
model = tf.keras.models.load_model("lstm_activity_model.h5")

# Labels
actions = ["DANGER", "SAFE"]
colors = [(0, 0, 255), (0, 255, 0)]  # RED (Danger) & GREEN (Non-Danger)

# Load Test Video
cap = cv2.VideoCapture("slap1 (1).mp4")
detector = PoseDetector()
sequence = []

cv2.namedWindow("Activity Recognition", cv2.WINDOW_NORMAL)

while cap.isOpened():
    success, img = cap.read()
    if not success:
        break

    img = detector.findPose(img)
    lm_data = detector.findPosition(img)

    if lm_data is not None and np.any(lm_data):
        sequence.append(lm_data)
        if len(sequence) > TIME_STEPS:
            sequence.pop(0)

        if len(sequence) == TIME_STEPS:
            input_seq = np.expand_dims(np.array(sequence), axis=0)
            prediction = np.argmax(model.predict(input_seq), axis=1)[0]
            label = actions[prediction]
            color = colors[prediction]

            cv2.rectangle(img, (20, 30), (250, 100), color, -1)  # Display box
            cv2.putText(img, label, (50, 80),
                        cv2.FONT_HERSHEY_COMPLEX, 1.2, (255, 255, 255), 3)

    img_resized = cv2.resize(img, (800, 600))
    cv2.imshow("Activity Recognition", img_resized)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [None]:
import cv2
import numpy as np
import tensorflow as tf

# Load trained model
model = tf.keras.models.load_model("lstm_activity_model.h5")

# Labels
actions = ["DANGER", "SAFE"]
colors = [(0, 0, 255), (0, 255, 0)]  # RED (Danger) & GREEN (Non-Danger)

# Load Test Video
cap = cv2.VideoCapture("test_video.mp4")
detector = PoseDetector()
sequence = []

cv2.namedWindow("Activity Recognition", cv2.WINDOW_NORMAL)

while cap.isOpened():
    success, img = cap.read()
    if not success:
        break

    img = detector.findPose(img)
    lm_data = detector.findPosition(img)

    if lm_data is not None and np.any(lm_data):
        sequence.append(lm_data)
        if len(sequence) > TIME_STEPS:
            sequence.pop(0)

        if len(sequence) == TIME_STEPS:
            input_seq = np.expand_dims(np.array(sequence), axis=0)
            prediction = np.argmax(model.predict(input_seq), axis=1)[0]
            label = actions[prediction]
            color = colors[prediction]

            # Adjusted smaller rectangle & font
            cv2.rectangle(img, (20, 20), (180, 60), color, -1)  # Smaller box
            cv2.putText(img, label, (30, 50),
                        cv2.FONT_HERSHEY_COMPLEX, 0.8, (255, 255, 255), 2)  # Smaller font

    img_resized = cv2.resize(img, (800, 600))
    cv2.imshow("Activity Recognition", img_resized)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
