# 0. Install and Import Dependencies

In [2]:
import cv2 as cv
import mediapipe as mp
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# 1. Loading a Video and Saving Frames in Sequences

In [3]:
# Classes: 3 subfolders within 'Single_person_violent' => Kicking, Punching, Non-violent
CLASSES = ["Kicking", "Punching", "Standing"]

# We will define a fixed sequence length to handle variable-length videos
MAX_SEQ_LEN = 5 # example: 30 frames per video

# Set how many frames to skip when reading a video (to reduce computational load)
SKIP_RATE = 1  # example: capture 1 frame out of every 5


# 2. Extract Pose Sequence from a Single Video

This function opens a video file with OpenCV (cv.VideoCapture).

It skips frames by advancing the capture index so we only process 1 out of every SKIP_RATE frames.

For each processed frame, it uses MediaPipe Pose to detect 33 landmarks.

Each landmark has 4 values: (x, y, z, visibility), so for 33 landmarks, we get 132 values in one frame.

If no pose is detected, we append a zero vector (132 zeros).

Finally, it returns a NumPy array of shape (T, 132), where T is the number of frames we processed in the video.

In [4]:
mp_pose = mp.solutions.pose

def get_pose_sequence_from_video(video_path, skip_rate=5):
    """
    Opens a video, reads frames at a specified skip_rate,
    and returns a NumPy array of shape (num_frames, 132)
    containing (x, y, z, visibility) for 33 pose landmarks.
    """
    cap = cv.VideoCapture(video_path)
    
    # List to store the per-frame keypoints
    sequence = []
    
    with mp_pose.Pose(min_detection_confidence=0.5, 
                      min_tracking_confidence=0.5) as pose_model:
        
        frame_index = 0  # keep track of frame index
        while True:
            ret, frame = cap.read()
            if not ret:
                break  # no more frames
            
            # Process only 1 out of every 'skip_rate' frames
            if frame_index % skip_rate == 0:
                # Convert BGR to RGB
                rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
                results = pose_model.process(rgb_frame)
				
                if results.pose_landmarks:
                    # Flatten 33 landmarks × 4 = 132 values
                    keypoints = []
                    for lm in results.pose_landmarks.landmark:
                        keypoints.extend([lm.x, lm.y, lm.z, lm.visibility])
                    sequence.append(keypoints)
                else:
                    # If no pose was detected, append a zero vector of length 132
                    sequence.append([0]*132)
            
            frame_index += 1  # increment frame counter
    
    cap.release()
    return np.array(sequence)  # shape (num_frames, 132)

### Load Processed Data

In [5]:
data = np.load("extracted_data/pose_dataset_3.npz", allow_pickle=True)
X_sequences = list(data["X"])
y_labels = list(data["y"])

print(f"✅ Loaded {len(X_sequences)} sequences.")

✅ Loaded 75 sequences.


## 4. Pad or Truncate Sequences
LSTM/GRU networks expect uniform sequence lengths in a batch.

We define a function pad_or_truncate_sequence that ensures each sequence has exactly MAX_SEQ_LEN frames.

+ If a sequence is longer than MAX_SEQ_LEN, we take the first MAX_SEQ_LEN frames.

+ If it’s shorter, we pad with zeros at the end.

In [6]:
def pad_or_truncate_sequence(seq, max_len=30):
    """
    seq: (T, 132) array for T frames
    Returns an array of shape (max_len, 132).
    """
    
    length = seq.shape[0]
    num_features = seq.shape[1]
    
    # print(length, num_features)
    
    if length > max_len:
        # Truncate
        return seq[:max_len, :]
    else:
        # Pad with zeros
        padded = np.zeros((max_len, num_features))
        padded[:length, :] = seq
        return padded

In [7]:
# Calculate average sequence length from X_sequences
seq_lengths = [seq.shape[0] for seq in X_sequences]
# Convert sequence lengths to a DataFrame
df_seq_lengths = pd.DataFrame(seq_lengths, columns=['Sequence_Length'])

# Get statistical description
print("Sequence Length Statistics:")
df_seq_lengths.describe()

Sequence Length Statistics:


Unnamed: 0,Sequence_Length
count,75.0
mean,75.533333
std,79.72526
min,1.0
25%,33.0
50%,47.0
75%,79.5
max,338.0


In [8]:
X_seq_padded = []

for seq in X_sequences:
    seq_padded = pad_or_truncate_sequence(seq, max_len=MAX_SEQ_LEN)
    X_seq_padded.append(seq_padded)

X_seq_padded = np.array(X_seq_padded)  # shape => (num_videos, MAX_SEQ_LEN, 132)

print("Final shape of data:", X_seq_padded.shape)
# Should be (N, 20, 132) if MAX_SEQ_LEN=20

Final shape of data: (75, 5, 132)


Write to CSV file for visualization and analysis

## 5. Encode Labels and Split Data

We use LabelEncoder to convert "Kicking", "Punching", "Non-violent" into numeric IDs: e.g. 0, 1, 2.

Then we split into train/test sets (e.g., 80/20) for fair evaluation.

We store them as X_train, X_test, y_train, y_test.

In [9]:
label_encoder = LabelEncoder()
y_int = label_encoder.fit_transform(y_labels)  
# e.g. "Kick"->0, "Punching"->2, "Standing"->1 (the mapping depends on alphabetical order)

# Convert to NumPy
y_int = np.array(y_int)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X_seq_padded, 
    y_int, 
    test_size=0.2, 
    stratify=y_int,  # keep classes balanced
    random_state=42
)

print("Train shape:", X_train.shape, "Test shape:", X_test.shape)
print("Train labels shape:", y_train.shape, "Test labels shape:", y_test.shape)


Train shape: (60, 5, 132) Test shape: (15, 5, 132)
Train labels shape: (60,) Test labels shape: (15,)


In [10]:
N, MAX_SEQ_LEN, NUM_FEATURES = X_seq_padded.shape
NUM_CLASSES = len(np.unique(y_int))

print("N:", N, "\nMAX_SEQ_LEN:", MAX_SEQ_LEN, "\nNUM_FEATURES:", NUM_FEATURES, "\nnum_classes:", NUM_CLASSES)

N: 75 
MAX_SEQ_LEN: 5 
NUM_FEATURES: 132 
num_classes: 3


In [11]:
import os
import glob

BASE_DIR = "Single_person_violent"
KICKING_DIR = os.path.join(BASE_DIR, "Punching")

# Use glob to list all .mp4 or .avi files in the Kicking directory
video_paths = glob.glob(os.path.join(KICKING_DIR, "*.mp4")) + glob.glob(os.path.join(KICKING_DIR, "*.avi"))

num_videos = len(video_paths)
print(f"Total number of videos in the 'Single_person_violent/Kicking' folder: {num_videos}")

Total number of videos in the 'Single_person_violent/Kicking' folder: 23


# 5. Make Detection

In [12]:
import cv2 as cv
import numpy as np
import mediapipe as mp
import collections
import tensorflow as tf
import os

In [13]:
path = "utils/weight/"
lstm_path = path + "lstm_model.h5"
gru_path = path + "gru_model.h5"
dnn_pth = path + "dnn_model.h5"

#Load model
MAX_SEQ_LEN = 30
lstm_infer_model = tf.keras.models.load_model(lstm_path)
gru_infer_model = tf.keras.models.load_model(gru_path)
dnn_infer_model = tf.keras.models.load_model(dnn_pth)
infer_model = dnn_infer_model



In [14]:
# Set up MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Buffer to store latest MAX_SEQ_LEN frames of keypoints
buffer = collections.deque(maxlen=MAX_SEQ_LEN)

# Start webcam
cap = cv.VideoCapture(0)
cap.set(cv.CAP_PROP_FRAME_WIDTH, 960)
cap.set(cv.CAP_PROP_FRAME_HEIGHT, 720)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Resize frame to consistent shape (optional for clarity)
    frame = cv.resize(frame, (960, 720))

    # Convert to RGB and get pose
    rgb = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
    results = pose.process(rgb)

    # Extract pose keypoints
    if results.pose_landmarks:
        keypoints = []
        for lm in results.pose_landmarks.landmark:
            keypoints.extend([lm.x, lm.y, lm.z, lm.visibility])
    else:
        keypoints = [0] * NUM_FEATURES

    # Add keypoints to buffer
    buffer.append(keypoints)

    if len(buffer) == MAX_SEQ_LEN:
        seq = np.array(buffer)

        if infer_model == dnn_infer_model:
            seq_input = seq.reshape(1, -1)  # DNN input shape: (1, 660)
        else:
            seq_input = np.expand_dims(seq, axis=0)  # LSTM/GRU input shape: (1, 5, 132)

        pred = infer_model.predict(seq_input, verbose=0)
        class_id = np.argmax(pred)
        confidence = float(pred[0][class_id]) * 100
        class_name = CLASSES[class_id]
        
        if class_name == "Standing":
            display_text = f"{class_name} ({confidence:.1f}%), Non-Violent"
        else:
            display_text = f"{class_name} ({confidence:.1f}%), Violent"
            
        # Show prediction on frame
        cv.putText(frame, f'Action: {display_text}', (30, 50),
                   cv.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)
    else:
        display_text = "Non-Violent"
        cv.putText(frame, f'Action: {display_text}', (30, 50),
                   cv.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)

    # Draw pose landmarks
    if results.pose_landmarks:
        mp.solutions.drawing_utils.draw_landmarks(
            frame,
            results.pose_landmarks,
            mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
            connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
        )

    # Display the frame
    cv.imshow("Live Action Detection", frame)

    # Press 'q' to exit
    if cv.waitKey(1) & 0xFF == ord('q'):
        break

# Cleanup
cap.release()
cv.destroyAllWindows()
pose.close()

### Test on Video

In [15]:
# Use the test_video_paths that's already defined in the context
# test_video_path = "Single_person_violent\Kicking\kicking23.avi"
test_video_path = "Single_person_violent\Punching\punching5.mp4"

# test_video_path = "A-Dataset-for-Automatic-Violence-Detection-in-Videos\\violence-detection-dataset\\violent\cam1\\1.mp4" 
print(f"Processing: {os.path.basename(test_video_path)}")

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5,
    model_complexity=1
)

# Open video file
cap = cv.VideoCapture(test_video_path)

# Buffer to store latest MAX_SEQ_LEN frames of keypoints
buffer = collections.deque(maxlen=MAX_SEQ_LEN)

while True:
	ret, frame = cap.read()
	if not ret:
		break

	# Convert to RGB and get pose
	rgb = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
	results = pose.process(rgb)

	# Extract pose keypoints
	if results.pose_landmarks:
		keypoints = []
		for lm in results.pose_landmarks.landmark:
			keypoints.extend([lm.x, lm.y, lm.z, lm.visibility])
	else:
		keypoints = [0] * NUM_FEATURES

	# Add keypoints to buffer
	buffer.append(keypoints)

	if len(buffer) == MAX_SEQ_LEN:
		seq = np.array(buffer)

		if infer_model == dnn_infer_model:
			seq_input = seq.reshape(1, -1)  # DNN input shape: (1, 660)
		else:
			seq_input = np.expand_dims(seq, axis=0)  # LSTM/GRU input shape: (1, 5, 132)

		pred = infer_model.predict(seq_input, verbose=0)
		class_id = np.argmax(pred)
		confidence = float(pred[0][class_id]) * 100
		class_name = CLASSES[class_id]

		if class_name == "Standing":
			display_text = f"{class_name} ({confidence:.1f}%), Non-Violent"
		else:
			display_text = f"{class_name} ({confidence:.1f}%), Violent"

		# Show prediction on frame
		cv.putText(frame, f'Action: {display_text}', (30, 50),
				   cv.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)

	# Draw pose landmarks
	if results.pose_landmarks:
		mp.solutions.drawing_utils.draw_landmarks(
			frame,
			results.pose_landmarks,
			mp_pose.POSE_CONNECTIONS,
			landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
			connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
		)

	# Display the frame
	cv.imshow(f"Action Detection - {os.path.basename(test_video_path)}", frame)

	# Press 'q' to exit
	if cv.waitKey(1) & 0xFF == ord('q'):
		break

# Cleanup
cap.release()
cv.destroyAllWindows()
pose.close()

Processing: punching5.mp4
