In [4]:
%pip install opencv-python

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Note: you may need to restart the kernel to use updated packages.


In [5]:
import cv2
import numpy as np
from sklearn import svm
from sklearn.model_selection import KFold, cross_val_score

class MotionRecognition:
    def __init__(self):
        self.sift = cv2.SIFT_create()
        self.mhi_duration = 10
        self.clf = svm.SVC()

    def extract_sift_features(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        keypoints, descriptors = self.sift.detectAndCompute(gray, None)
        return keypoints, descriptors

    def update_mhi(self, prev_frame, current_frame, mhi):
        diff = cv2.absdiff(prev_frame, current_frame)
        _, binary_diff = cv2.threshold(diff, 30, 1, cv2.THRESH_BINARY)

        timestamp = cv2.getTickCount() / cv2.getTickFrequency()
        mhi *= 0.9
        mhi[binary_diff > 0] = timestamp

        return mhi

    def process_video(self):
        cap = cv2.VideoCapture(0)
        ret, prev_frame = cap.read()

        if not ret:
            raise ValueError("Cannot read the video file.")

        prev_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

        mhi = np.zeros_like(prev_frame, dtype=np.float32)

        while True:
            ret, current_frame = cap.read()

            if not ret:
                break

            current_frame_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
            mhi = self.update_mhi(prev_frame, current_frame_gray, mhi)

            keypoints, descriptors = self.extract_sift_features(current_frame)
            current_frame_with_keypoints = cv2.drawKeypoints(current_frame, keypoints, None)

            cv2.imshow("Current Frame with SIFT keypoints", current_frame_with_keypoints)
            cv2.imshow("MHI", mhi)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            prev_frame = current_frame_gray

        cap.release()
        cv2.destroyAllWindows()

    def train_svm(self, X_train, y_train):
        self.clf.fit(X_train, y_train)

    def predict(self, X_test):
        return self.clf.predict(X_test)

    def k_fold_cross_validation(self, X, y, n_splits=5):
        kf = KFold(n_splits=n_splits)
        scores = cross_val_score(self.clf, X, y, cv=kf)
        return scores.mean(), scores.std()

In [6]:
import cv2
import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor

motion_recognition = MotionRecognition()

def process_video(video_path):
    cap = cv2.VideoCapture(video_path)
    features = []
    ret, prev_frame = cap.read()
    prev_frame = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    mhi = np.zeros_like(prev_frame, dtype=np.float32)

    while True:
        ret, current_frame = cap.read()

        if not ret:
            break

        current_frame_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
        mhi = motion_recognition.update_mhi(prev_frame, current_frame_gray, mhi)
        keypoints, descriptors = motion_recognition.extract_sift_features(current_frame)
        features.append(descriptors)
        prev_frame = current_frame_gray

    cap.release()

    return features, mhi

def load_your_data(progress_interval=10):
    sift = cv2.SIFT_create()
    data_path = 'motions'
    train_data = []
    test_data = []
    train_labels = []
    test_labels = []
    futures = []

    action_map = {
        'boxing': 0,
        'handclapping': 1,
        'handwaving': 2,
        'jogging': 3,
        'running': 4,
        'walking': 5
    }

    total_videos = 0
    for action in os.listdir(data_path):
        if action not in action_map:
            continue
        action_path = os.path.join(data_path, action)
        for video in os.listdir(action_path):
            total_videos += 1

    processed_videos = 0
    with ThreadPoolExecutor() as executor:
        for action in os.listdir(data_path):
            if action not in action_map:
                continue
            action_path = os.path.join(data_path, action)
            for video in os.listdir(action_path):
                video_path = os.path.join(action_path, video)
                future = executor.submit(process_video, video_path)
                futures.append((future, action_map[action]))

        for future, action_label in futures:
            features, _ = future.result()

            if len(features) < 10:
                # Discard videos that are too short
                continue

            # Split the video features into training and testing sets
            split_index = int(len(features) * 0.8)
            train_data.extend(features[:split_index])
            test_data.extend(features[split_index:])
            train_labels.extend([action_label] * split_index)
            test_labels.extend([action_label] * (len(features) - split_index))

            processed_videos += 1
            if processed_videos % progress_interval == 0:
                print(f"Processed {processed_videos}/{total_videos} videos.")

    X_train = np.vstack(train_data)
    y_train = np.array(train_labels)
    X_test = np.vstack(test_data)
    y_test = np.array(test_labels)

    return X_train, y_train, X_test, y_test


NotADirectoryError: [WinError 267] The directory name is invalid: 'data\\boxing\\person01_boxing_d1_uncomp.avi'