Perform per class feature extraction.

In [4]:
import cv2
import mediapipe as mp
import numpy as np
import os
import re
import tensorflow as tf
from tensorflow.keras.layers import Conv1D, Input, Dense, GlobalAveragePooling1D, LSTM
from tensorflow.keras.models import Model
import pickle

# Paths to video dataset
VIDEO_PATH = "../model_input/augmented_train_clips"  # Update this
OUTPUT_DIR = "../model_input/output_features"  # Directory to save per-class feature files
ERROR_LOG_FILE = "../model_input/output_features/error_log.txt"

# Initialize Mediapipe models
mp_face_mesh = mp.solutions.face_mesh
mp_hands = mp.solutions.hands

# Mediapipe Constants
FACE_FEATURES = [33, 263, 61, 291, 199, 159, 145, 386, 374, 152]  # Eyebrows, eyes, and mouth
MAX_HANDS = 2

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Function to extract labels from filenames
def extract_label(filename):
    match = re.search(r'^(.*_\d+)_.*$', filename)
    if match:
        return match.group(1)
    else:
        return "unknown"

# Function to process a single video and extract features
# Function to process a single video and extract features
def extract_features_from_video(video_path):
    cap = cv2.VideoCapture(video_path)

    face_features = []
    hand_features = []

    prev_face_features = None  # Variable to store features from previous frame
    prev_hand_features = None  # Variable to store features from previous frame

    with mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1) as face_mesh, \
         mp_hands.Hands(static_image_mode=False, max_num_hands=2) as hands:

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # Convert frame to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Face landmark detection
            face_result = face_mesh.process(frame_rgb)
            frame_face_features = []

            if face_result.multi_face_landmarks:
                for face_landmarks in face_result.multi_face_landmarks:
                    for idx in FACE_FEATURES:
                        landmark = face_landmarks.landmark[idx]
                        frame_face_features.append([landmark.x, landmark.y, landmark.z])

            else:
                if prev_face_features is not None:
                    frame_face_features = prev_face_features
                else:
                    frame_face_features = [[0, 0, 0]] * len(FACE_FEATURES)

            face_features.append(frame_face_features)
            prev_face_features = frame_face_features  # Save for the next iteration

            # Hand landmark detection
            hand_result = hands.process(frame_rgb)
            frame_hand_features = []

            if hand_result.multi_hand_landmarks:
                for hand_landmarks in hand_result.multi_hand_landmarks:
                    for idx in range(21):  # 21 hand landmarks
                        landmark = hand_landmarks.landmark[idx]
                        frame_hand_features.append([landmark.x, landmark.y, landmark.z])

            else:
                if prev_hand_features is not None:
                    frame_hand_features = prev_hand_features
                else:
                    frame_hand_features = [[0, 0, 0]] * (21 * MAX_HANDS)

            if len(frame_hand_features) < (21 * MAX_HANDS):
                pad_length = (21 * MAX_HANDS) - len(frame_hand_features)
                frame_hand_features.extend([[0, 0, 0]] * pad_length)

            hand_features.append(frame_hand_features)
            prev_hand_features = frame_hand_features  # Save for the next iteration

        cap.release()

    # Convert to numpy arrays and enforce consistent shapes
    face_features = np.array(face_features, dtype=np.float32)
    hand_features = np.array(hand_features, dtype=np.float32)

    # Padding or trimming to ensure consistent number of frames
    min_len = min(len(face_features), len(hand_features))
    face_features = face_features[:min_len]
    hand_features = hand_features[:min_len]

    # Padding the shorter sequence to match the longer one
    max_len = max(len(face_features), len(hand_features))
    if len(face_features) < max_len:
        padding = np.zeros((max_len - len(face_features), face_features.shape[1], 3))
        face_features = np.vstack([face_features, padding])
    if len(hand_features) < max_len:
        padding = np.zeros((max_len - len(hand_features), hand_features.shape[1], 3))
        hand_features = np.vstack([hand_features, padding])

    # Temporal smoothing and structuring using 1D convolution or LSTM
    smoothed_face_features = apply_temporal_smoothing(face_features)
    smoothed_hand_features = apply_temporal_smoothing(hand_features)

    try:
        combined_features = np.hstack([ 
            smoothed_face_features.reshape(max_len, -1),
            smoothed_hand_features.reshape(max_len, -1)
        ])
    except Exception as e:
        print(f"Error during feature stacking: {str(e)}")
        combined_features = np.zeros((max_len, len(FACE_FEATURES) * 3 + (21 * MAX_HANDS) * 3))

    return combined_features

# Function to apply temporal smoothing using convolution or LSTM
# Function to apply temporal smoothing using convolution or LSTM
def apply_temporal_smoothing(features):
    features = np.reshape(features, (features.shape[0], -1))
    features = np.expand_dims(features, axis=-1)  # Shape: (num_frames, num_features, 1)
    
    conv_layer = Conv1D(filters=64, kernel_size=3, padding='same', activation='relu')(features)
    smoothed_features = GlobalAveragePooling1D()(conv_layer)
    smoothed_features = smoothed_features.numpy()

    return smoothed_features


# Function to save features by input file
def save_features_by_class(features, label, filename):
    # Remove file extension from the original filename
    '''
    filename_without_extension = os.path.splitext(filename)[0]
    
    # Check if the filename contains any augmentation-related words
    augmentations = ['blurred', 'brightened', 'flipped', 'scaled', 'sharpened', 'shifted']
    if any(word in filename_without_extension for word in augmentations):
        # Get the part before the last underscore
        pkl_filename = "_".join(filename_without_extension.split('_')[:-1]) + ".pkl"
    else:
        # Keep the original filename as is
        pkl_filename = filename_without_extension + ".pkl"
    '''

    pkl_filename = os.path.splitext(filename)[0]

    class_filename = os.path.join(OUTPUT_DIR, pkl_filename)

    if os.path.exists(class_filename):
        # Append to existing file
        with open(class_filename, 'rb') as f:
            data = pickle.load(f)
        existing_features = data['features']
        existing_features.append(features)
        data['features'] = existing_features
    else:
        # Create new file
        data = {'features': [features], 'label': label}

    # Save updated data
    with open(class_filename, 'wb') as f:
        pickle.dump(data, f)

# Main processing
'''
if __name__ == "__main__":
    for file in os.listdir(VIDEO_PATH):
        if file.endswith(('.MP4', '.MOV')):
            print(f"Processing {file}...")
            video_file_path = os.path.join(VIDEO_PATH, file)
            label = extract_label(file)
            
            try:
                features = extract_features_from_video(video_file_path)
                save_features_by_class(features, label, file)
            
            except Exception as e:
                with open(ERROR_LOG_FILE, "a") as log_file:
                    log_file.write(f"Error processing file {file}: {str(e)}\n")
                print(f"Error processing {file}, logged to {ERROR_LOG_FILE}.")
    
    print("Feature extraction complete. Features saved per input file.")
'''

# Main processing
if __name__ == "__main__":
    # Get all files in the directory
    files = [file for file in os.listdir(VIDEO_PATH) if file.endswith(('.MP4', '.MOV'))]

    # Sort the files alphabetically and find the index of 'dont_understand_15'
    files.sort()
    
    start_processing = False  # Flag to start processing from 'dont_understand_15'

    for file in files:
        if not start_processing:
            if 'hot_16' in file:  # Start processing from 'dont_understand_15'
                start_processing = True
        
        if start_processing:
            print(f"Processing {file}...")
            video_file_path = os.path.join(VIDEO_PATH, file)
            label = extract_label(file)
            
            try:
                features = extract_features_from_video(video_file_path)
                save_features_by_class(features, label, file)
            
            except Exception as e:
                with open(ERROR_LOG_FILE, "a") as log_file:
                    log_file.write(f"Error processing file {file}: {str(e)}\n")
                print(f"Error processing {file}, logged to {ERROR_LOG_FILE}.")
    
    print("Feature extraction complete. Features saved per input file.")

ImportError: DLL load failed while importing _framework_bindings: A dynamic link library (DLL) initialization routine failed.