In [None]:
%pip install -q opencv-python-headless
%pip install -q mediapipe
%pip install -q numpy

In [None]:
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np


def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

In [None]:
import cv2
import mediapipe as mp
import csv
import numpy as np
import os

def write_landmarks_to_csv(landmarks, frame_number, csv_data):
    print(f"Landmark coordinates for frame {frame_number}:")
    for idx, landmark in enumerate(landmarks):
        print(f"{mp_pose.PoseLandmark(idx).name}: (x: {landmark.x}, y: {landmark.y}, z: {landmark.z})")
        csv_data.append([frame_number, mp_pose.PoseLandmark(idx).name, landmark.x, landmark.y, landmark.z])
    print("\n")

video_path = './src/magnetic.mp4'
output_path = './test'
output_csv = './outputOfClip.csv'
write_out_csv = False

# check if the output path exists, if not create it

if not os.path.exists(output_path):
    os.makedirs(output_path)
    

# Initialize MediaPipe Pose and Drawing utilities
mp_pose = mp.solutions.pose
mp_drawing_styles = mp.solutions.drawing_styles
mp_drawing = mp.solutions.drawing_utils
pose = mp_pose.Pose()

# Open the video file
cap = cv2.VideoCapture(video_path)

frame_number = 0
csv_data = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert the frame to RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

	# resize the frame as 256x256
    # frame_rgb = cv2.resize(frame_rgb, (256, 256))

    # Create a black background image
    black_background = np.zeros_like(frame_rgb)

    # Process the frame with MediaPipe Pose
    result = pose.process(frame_rgb)

    # Draw the pose landmarks on the black background
    if result.pose_landmarks:
        # mp_drawing.draw_landmarks(black_background, result.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        # black_background = draw_landmarks_on_image(black_background, result)
        mp_drawing.draw_landmarks(
            black_background,
            result.pose_landmarks,
            mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())

        # Add the landmark coordinates to the list and print them
        write_landmarks_to_csv(result.pose_landmarks.landmark, frame_number, csv_data)

    # Concatenate the original frame and black background containing pose landmarks
    combined_frame = np.concatenate((black_background, frame_rgb), axis=1)
    combined_frame = cv2.cvtColor(combined_frame, cv2.COLOR_BGR2RGB)    # convert back to BGR for displaying

    # Display the combined frame
    # cv2.imshow('MediaPipe Pose', combined_frame)  # not work for Linux


    # Write out the combined frame
    cv2.imwrite(f'{output_path}/{frame_number}.jpg', combined_frame)  # write out the combined frame
    # cv2.imwrite(f'{output_path}/{frame_number}.jpg', black_background)  # write out the background pose frame


    # Exit if 'q' key is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    frame_number += 1

cap.release()

if write_out_csv:
    # Save the CSV data to a file
    with open(output_csv, 'w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(['frame_number', 'landmark', 'x', 'y', 'z'])
        csv_writer.writerows(csv_data)


# Write the pose_dance images to a video

In [None]:
# write video out from images in ouput folder

images_path = './output'
output_path = './output_video'

# check if the output path exists, if not create it
if not os.path.exists(output_path):
    os.makedirs(output_path)

image_array = os.listdir(images_path)
image_array.sort(key=lambda x: int(x.split('.')[0]))
# print(image_array)

img_array = []
for filename in image_array:
    img = cv2.imread(os.path.join(images_path, filename))
    height, width, layers = img.shape
    size = (width, height)
    img_array.append(img)

print(f"Number of frames: {len(img_array)}")
print(f"Frame size: {size}")
print(img_array)
# create the video file
# write out as mp4
out = cv2.VideoWriter(f'{output_path}/output.mp4', cv2.VideoWriter_fourcc(*'mp4v'), fps=30, frameSize=size)

# write the images to the video file
for i in range(len(img_array)):
    out.write(img_array[i])

out.release()

# Combine video and audio

In [None]:
%pip install -q moviepy --upgrade

In [None]:
from moviepy.editor import VideoFileClip, AudioFileClip

output_name = 'magnetic.mp4'
output_path = "./output_video_audio"
video_path = "./output_video/magnetic.mp4"
audio_path = "./src/magnetic.mp4"

# Create a video clip
video = VideoFileClip(f"{video_path}")

# Create an audio clip
audio = AudioFileClip(f"{audio_path}")

# get the duration
print(f"Video duration: {video.duration}")
print(f"Audio duration: {audio.duration}")

duration = min(video.duration, audio.duration)

# set the duration
# video = video.set_duration(duration)
# audio = audio.set_duration(duration)

# # Add the audio clip to the video clip
video = video.set_audio(audio)

# Write the result to a file
video.write_videofile(f"{output_path}/{output_name}", codec="libx264", audio_codec="aac")

# Combine Source Video and Generated Video Together

In [None]:
import cv2
import numpy as np
from moviepy.editor import VideoFileClip, AudioFileClip

source_video = "./src/magnetic.mp4"
generated_video = "./output_video_audio/magnetic.mp4"


# Open the video files
cap1 = cv2.VideoCapture(source_video)
cap2 = cv2.VideoCapture(generated_video)

# Check if the videos are opened successfully
if not cap1.isOpened() or not cap2.isOpened():
    print('Could not open videos')
    exit()

# Create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = 30
frame_size = (640*2, 480)  # double width to accommodate two videos
out = cv2.VideoWriter('combined.mp4', fourcc, fps, frame_size)


while True:
    # Read frames from the videos
    ret1, frame1 = cap1.read()
    ret2, frame2 = cap2.read()

    # If either video ends, break the loop
    if not ret1 or not ret2:
        break

    # Resize the frames if necessary
    frame1 = cv2.resize(frame1, (640, 480))
    frame2 = cv2.resize(frame2, (640, 480))

    # Concatenate the frames horizontally
    combined = np.concatenate((frame1, frame2), axis=1)
    
	# write out the combined frame
    out.write(combined)

    # Display the combined frame
    # cv2.imshow('Combined', combined)

    # Break the loop if 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the video files and close all windows
cap1.release()
cap2.release()
out.release()
cv2.destroyAllWindows()

In [None]:
# Convert the video to mp4 by using moviepy
from moviepy.editor import VideoFileClip

video_path = 'combine/combined.mp4'
audio_path = 'src/magnetic.mp4'
output_path = 'combine/result.mp4'

# Create a video clip
video = VideoFileClip(f"{video_path}")

# Create an audio clip
audio = AudioFileClip(f"{audio_path}")

# Add the audio clip to the video clip
video = video.set_audio(audio)

video.write_videofile(output_path, codec="libx264", audio_codec="aac")
