In [1]:
import cv2
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from tqdm import tqdm

class MovenetProcessor:
    def __init__(self, video_path, output_path, model_path="movenet-tensorflow2-multipose-lightning-v1"):
        self.video_path = video_path
        self.output_path = output_path
        self.model_path = model_path
        self.confidence_threshold = 0.1
        self.seconds_to_extract = None
        self.input_size = 384

        # Check if GPU is available
        self._setup_gpu()

        # Load the MoveNet model
        self.model = hub.load(model_path)
        self.movenet = self.model.signatures['serving_default']
        self.EDGES = {
            (0, 1): 'm',    # Top of the head - Left eye
            (0, 2): 'c',    # Top of the head - Right eye
            (1, 3): 'm',    # Left eye - Left ear
            (2, 4): 'c',    # Right eye - Right ear
            (0, 5): 'm',    # Top of the head - Left shoulder
            (0, 6): 'c',    # Top of the head - Right shoulder
            (5, 7): 'm',    # Left shoulder - Left elbow
            (7, 9): 'm',    # Left elbow - Left wrist
            (6, 8): 'c',    # Right shoulder - Right elbow
            (8, 10): 'c',   # Right elbow - Right wrist
            (5, 6): 'y',    # Left shoulder - Right shoulder
            (5, 11): 'm',   # Left shoulder - Left hip
            (6, 12): 'c',   # Right shoulder - Right hip
            (11, 12): 'y',  # Left hip - Right hip
            (11, 13): 'm',  # Left hip - Left knee
            (13, 15): 'm',  # Left knee - Left ankle
            (12, 14): 'c',  # Right hip - Right knee
            (14, 16): 'c'   # Right knee - Right ankle
        }

        # Warm up the model
        self._warm_up_model()

    def _setup_gpu(self):
        physical_devices = tf.config.experimental.list_physical_devices('GPU')
        if physical_devices:
            tf.config.experimental.set_memory_growth(physical_devices[0], True)
            tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU')
            print(f"Using GPU: {physical_devices[0]}")
        else:
            print("No GPU found, using CPU.")

    def _warm_up_model(self):
        print("Warming up model...")
        # Create a dummy image to warm up the model
        warm_up_image = tf.zeros([1, self.input_size, self.input_size, 3], dtype=tf.int32)
        _ = self.movenet(warm_up_image)
        print("Model is warmed up.")

    def set_params(self, confidence_threshold=None, seconds_to_extract=None, input_size=None):
        if confidence_threshold is not None:
            self.confidence_threshold = confidence_threshold
        if seconds_to_extract is not None:
            self.seconds_to_extract = seconds_to_extract
        if input_size is not None:
            if input_size % 32 != 0:
                raise ValueError("input_size must be a multiple of 32")
            self.input_size = input_size
            self._warm_up_model() # Warm up the model again if input size changes

    def process_input(self, image):
        image = tf.convert_to_tensor(image, dtype=tf.int32)
        original_height, original_width, _ = image.shape
        aspect_ratio = original_width / original_height

        # Calculate the new width while maintaining the aspect ratio and ensuring it's a multiple of 32
        new_width = int(self.input_size * aspect_ratio)
        new_width = (new_width // 32) * 32 # Ensure the new_width is a multiple of 32

        image = tf.expand_dims(image, axis=0)
        image = tf.image.resize_with_pad(image, self.input_size, new_width)# 1440*2560 / 4 or 2
        image = tf.cast(image, dtype=tf.int32)

        return image

    def draw_keypoints(self, frame, keypoints):
        y, x, _ = frame.shape
        shaped = np.squeeze(np.multiply(keypoints, [y, x, 1]))
        head_points = []
        for i, kp in enumerate(shaped):
            ky, kx, kp_conf = kp
            if kp_conf > self.confidence_threshold:
                if i in [0, 1, 2, 3, 4]:
                    head_points.append((kx, ky))
                cv2.circle(frame, (int(kx), int(ky)), 3, (0, 255, 0), -1)

        return head_points

    def draw_head_box(self, frame, head_points):
        if head_points:
            x_coordinates = [int(pt[0]) for pt in head_points]
            y_coordinates = [int(pt[1]) for pt in head_points]
            min_x, max_x = min(x_coordinates), max(x_coordinates)
            min_y, max_y = min(y_coordinates), max(y_coordinates)
            cv2.rectangle(frame, (min_x, min_y), (max_x, max_y), (0, 255, 0), 2)

    def draw_connections(self, frame, keypoints):
        y, x, _ = frame.shape
        shaped = np.squeeze(np.multiply(keypoints, [y, x, 1]))

        for edge, _ in self.EDGES.items():
            p1, p2 = edge
            y1, x1, c1 = shaped[p1]
            y2, x2, c2 = shaped[p2]

            if (c1 > self.confidence_threshold) & (c2 > self.confidence_threshold):
                cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 4)

    def loop_through_people(self, frame, keypoints_with_scores):
        for person in keypoints_with_scores:
            self.draw_connections(frame, person)
            head_points = self.draw_keypoints(frame, person)
            self.draw_head_box(frame, head_points)

    def process_video(self):
        cap = cv2.VideoCapture(self.video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(self.output_path, fourcc, fps, (frame_width, frame_height))

        if self.seconds_to_extract is not None:
            frames_to_process = min(fps * self.seconds_to_extract, total_frames)
        else:
            frames_to_process = total_frames

        frame_count = 0
        pbar = tqdm(total=frames_to_process, desc="Processing Video", leave=False)

        while frame_count < frames_to_process:
            ret, frame = cap.read()
            if not ret:
                break

            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            input_image = self.process_input(rgb_frame)

            outputs = self.movenet(input_image)
            keypoints_with_scores = outputs['output_0'].numpy()[:, :, :51].reshape((6, 17, 3))

            self.loop_through_people(frame, keypoints_with_scores)

            out.write(frame)

            frame_count += 1
            pbar.update(1)

        pbar.close()
        cap.release()
        out.release()
        cv2.destroyAllWindows()
        print("Processing completed!")





In [2]:
processor = MovenetProcessor('20240402_200836.mp4', 'result.mp4')
processor.set_params(seconds_to_extract=None, confidence_threshold=0.25)
processor.process_video()

No GPU found, using CPU.


Warming up model...
Model is warmed up.


                                                                                                                       

Processing completed!


