In [None]:
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.gridspec as gridspec
import IPython.display as display
import time
from PIL import Image

## Uploading file manually

In [None]:
!git clone https://github.com/TaatiTeam/MotionAGFormer.git
%cd MotionAGFormer

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import gridspec
from mpl_toolkits.mplot3d import Axes3D

def show3Dpose(vals, ax, color='R'):
    ax.view_init(elev=15., azim=70)

    lcolor = (0, 0, 1)  # Blue
    rcolor = (1, 0, 0)  # Red
    if color == 'L':
        lcolor = (1, 1, 0)
        rcolor = (0, 1, 0)

    I = np.array([0, 0, 1, 4, 2, 5, 0, 7, 8, 8, 14, 15, 11, 12, 8, 9])
    J = np.array([1, 4, 2, 5, 3, 6, 7, 8, 14, 11, 15, 16, 12, 13, 9, 10])
    LR = np.array([0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0], dtype=bool)

    for i in range(len(I)):
        x, y, z = [np.array([vals[I[i], j], vals[J[i], j]]) for j in range(3)]
        ax.plot(x, y, z, lw=2, color=lcolor if LR[i] else rcolor)

    RADIUS = 0.72
    RADIUS_Z = 0.7
    xroot, yroot, zroot = vals[0, 0], vals[0, 1], vals[0, 2]
    ax.set_xlim3d([-RADIUS + xroot, RADIUS + xroot])
    ax.set_ylim3d([-RADIUS + yroot, RADIUS + yroot])
    ax.set_zlim3d([-RADIUS_Z + zroot, RADIUS_Z + zroot])
    ax.set_aspect('auto')

    white = (1.0, 1.0, 1.0, 0.0)
    ax.xaxis.set_pane_color(white)
    ax.yaxis.set_pane_color(white)
    ax.zaxis.set_pane_color(white)

    ax.tick_params('x', labelbottom=False)
    ax.tick_params('y', labelleft=False)
    ax.tick_params('z', labelleft=False)


def create_pose_video(data, output_video_path="pose_video.mp4", frame_folder="pose_frames", fps=20):
    """
    Create a 3D pose animation video from a numpy array.

    Args:
        data (np.ndarray): Shape (num_frames, num_joints, 3)
        output_video_path (str): Output path for the .mp4 video
        frame_folder (str): Temporary folder to save frames
        fps (int): Frames per second for output video
    """
    os.makedirs(frame_folder, exist_ok=True)

    for i, frame in enumerate(data):
        fig = plt.figure(figsize=(6, 6))
        gs = gridspec.GridSpec(1, 1)
        ax = plt.subplot(gs[0], projection='3d')
        show3Dpose(frame, ax)
        frame_path = os.path.join(frame_folder, f"{i:04d}.png")
        plt.savefig(frame_path, dpi=100, bbox_inches='tight')
        plt.close(fig)

    # Get video dimensions from first frame
    img = cv2.imread(os.path.join(frame_folder, "0000.png"))
    height, width, _ = img.shape
    video = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    for i in range(len(data)):
        img = cv2.imread(os.path.join(frame_folder, f"{i:04d}.png"))
        video.write(img)

    video.release()
    print(f"Video saved to {output_video_path}")

def create_pose_overlay_video(data1, data2, output_video_path="pose_video.mp4", frame_folder="pose_frames", fps=20):
    """
    Create a 3D pose animation video from a numpy array.

    Args:
        data1 (np.ndarray): Shape (num_frames, num_joints, 3)
        data2 (np.ndarray): Shape (num_frames, num_joints, 3)
        output_video_path (str): Output path for the .mp4 video
        frame_folder (str): Temporary folder to save frames
        fps (int): Frames per second for output video
    """
    os.makedirs(frame_folder, exist_ok=True)

    #crop video so its same length
    if len(data1) > len(data2):
        data1 = data1[:len(data2)]
    elif len(data2) > len(data1):
        data2 = data2[:len(data1)]

    for i, (frame1, frame2) in enumerate(zip(data1, data2)):
        fig = plt.figure(figsize=(6, 6))
        gs = gridspec.GridSpec(1, 1)
        ax = plt.subplot(gs[0], projection='3d')
        show3Dpose(frame1, ax)
        show3Dpose(frame2, ax, 'L')
        frame_path = os.path.join(frame_folder, f"{i:04d}.png")
        plt.savefig(frame_path, dpi=100, bbox_inches='tight')
        plt.close(fig)

    img = cv2.imread(os.path.join(frame_folder, "0000.png"))
    height, width, _ = img.shape
    video = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
    for i in range(len(data1)):
        img = cv2.imread(os.path.join(frame_folder, f"{i:04d}.png"))
        video.write(img)

    video.release()
    print(f"Video saved to {output_video_path}")




In [None]:
!pip install boto3

In [None]:
# prompt: create a function get_info that gets the min, max, average values from a numpy array

def get_numpy_info(arr):
  """
  Gets the minimum, maximum, and average values from a numpy array.

  Args:
    arr (np.ndarray): Input numpy array.

  Returns:
    tuple: A tuple containing (min_value, max_value, average_value).
  """

  left_ankle_arr = []
  right_ankle_arr = []
  higher_ankle_arr = []
  max_left = 0
  max_left_index = 0

  is_valid = True

  for i in range(len(arr)):
    joints = get_frame_info(arr[i])
    left_ankle_z = joints["Left Ankle"][2]
    right_ankle_z = joints["Right Ankle"][2]
    if left_ankle_z > max_left:
      max_left = left_ankle_z
      max_left_index = i


    left_ankle_arr.append(left_ankle_z)
    right_ankle_arr.append(right_ankle_z)

  #smooth the arrays
  left_ankle_arr = np.array(left_ankle_arr)
  right_ankle_arr = np.array(right_ankle_arr)
  left_ankle_arr = np.convolve(left_ankle_arr, np.ones(10)/10, mode='valid')
  right_ankle_arr = np.convolve(right_ankle_arr, np.ones(10)/10, mode='valid')

  for i in range(len(left_ankle_arr)):
    if left_ankle_arr[i] > right_ankle_arr[i]:
      higher_ankle_arr.append(1)
    else:
      higher_ankle_arr.append(0)

  #switch point is the index where the right ankle becomes higher than the left ankle after the max_left_index
  switch_point = 0
  for i in range(max_left_index, len(higher_ankle_arr)):
    if higher_ankle_arr[i] == 0:
      switch_point = i
      break
  print(f"Switch point: {switch_point}")
  #if less than 80% of the values before switch_point in higher_ankle_arr are 1 print invalid
  if np.sum(higher_ankle_arr[:switch_point]) < len(higher_ankle_arr[:switch_point])*0.8:
    is_valid = False
  #if less than 80% of the values after switch_point in higher_ankle_arr are 0 print invalid
  if np.sum(higher_ankle_arr[switch_point:]) > len(higher_ankle_arr[switch_point:])*0.2:
    is_valid = False

  if is_valid:
    print("Valid")

  #plot the joints
  plt.plot(left_ankle_arr, label="Left Ankle")
  plt.plot(right_ankle_arr, label="Right Ankle")
  plt.legend()
  plt.show()

  ankle_points = [0,0,0,0]
  switch_joints = get_frame_info(arr[switch_point])
  ankle_points[0] = switch_joints["Right Ankle"][0]
  ankle_points[1] = switch_joints["Right Ankle"][1]
  ankle_points[2] = switch_joints["Left Ankle"][0]
  ankle_points[3] = switch_joints["Left Ankle"][1]
  return (is_valid, switch_point, max_left_index, ankle_points)

def get_frame_info(frame):
  joint_names = [
    "Hip", "Right Hip", "Right Knee", "Right Ankle",
    "Left Hip", "Left Knee", "Left Ankle", "Spine",
    "Thorax", "Neck", "Head", "Left Shoulder",
    "Left Elbow", "Left Wrist", "Right Shoulder",
    "Right Elbow", "Right Wrist"
    ]
  #frame is of shape 17, 3 for each joint print the x, y and z coordinates
  joints = {}
  for i in range(len(frame)):
    #print(f"{joint_names[i]}: {frame[i]}")
    joints[joint_names[i]] = frame[i]
  return joints


# Example usage:
# data = np.random.rand(10, 3)
# min_v, max_v, avg_v = get_info(data)
# print(f"Min: {min_v}, Max: {max_v}, Average: {avg_v}")


In [None]:
import numpy as np

def rotate_along_z(kpts: np.ndarray, degrees: float) -> np.ndarray:
    """
    Rotates a set of 3D keypoints around the Z-axis.

    Parameters:
        kpts (np.ndarray): Array of shape (17, 3) containing [x, y, z] keypoints.
        degrees (float): Angle in degrees to rotate around the Z-axis.

    Returns:
        np.ndarray: Rotated keypoints of shape (17, 3).
    """
    assert kpts.shape == (17, 3), "Input must be of shape (17, 3)"

    radians = np.deg2rad(degrees)
    cos_theta = np.cos(radians)
    sin_theta = np.sin(radians)

    # Rotation matrix for Z-axis
    Rz = np.array([
        [cos_theta, -sin_theta, 0],
        [sin_theta,  cos_theta, 0],
        [0,               0,    1]
    ])

    # Rotate each point
    rotated_kpts = kpts @ Rz.T  # matrix multiply (17x3) x (3x3).T = (17x3)

    return rotated_kpts


In [None]:
# prompt: Create a function that takes in 2 x,y coordinates and finds what angle they are pointing

def find_angle(coord1, coord2):
  """
  Finds the angle in degrees between the line segment connecting two 2D coordinates
  and the positive x-axis.

  Args:
    coord1 (tuple): A tuple representing the first (x, y) coordinate.
    coord2 (tuple): A tuple representing the second (x, y) coordinate.

  Returns:
    float: The angle in degrees. Returns None if the points are the same.
  """
  x1, y1 = coord1
  x2, y2 = coord2

  print(f"x1: {x1}, y1: {y1}, x2: {x2}, y2: {y2}")

  dx = x2 - x1
  dy = y2 - y1

  if dx == 0 and dy == 0:
    return None # Points are the same, angle is undefined

  angle = np.degrees(np.arctan2(dy, dx))
  if angle < 0:
    angle += 360
  print(f"Angle: {angle}")
  return angle

In [None]:
import boto3
import tempfile
def list_and_play_mp4_from_s3(bucket_name, prefix, aws_access_key_id=None, aws_secret_access_key=None, region_name='us-east-2'):
    if aws_access_key_id and aws_secret_access_key:
        s3 = boto3.client(
            's3',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=region_name
        )
    else:
        s3 = boto3.client('s3')

    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    count = 0
    val_list = []
    prev_frames = None
    for page in pages:
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                if key.endswith('.npy'):
                    print(f"\n[INFO] Processing: {key}")

                    with tempfile.NamedTemporaryFile(delete=False, suffix=".npy") as temp_file:
                        temp_path = temp_file.name
                    count += 1
                    if count == 100:
                        break
                    try:
                        s3.download_file(bucket_name, key, temp_path)
                        data = np.load(temp_path)
                        print(data.shape)
                        valid, switch_point, max_y_pt, ankle_points = get_numpy_info(data)
                        angle = find_angle(ankle_points[0:2], ankle_points[2:4])
                        val_list.append(valid)
                        base_name = os.path.splitext(os.path.basename(key))[0]
                        output_video_path = f"{base_name}_pose_video.mp4"
                        if val_list[-1]:
                            for i in range(len(data)):
                                data[i] = rotate_along_z(data[i], 135 - angle)
                            if prev_frames is not None:
                                create_pose_overlay_video(prev_frames, data, output_video_path=output_video_path)
                            prev_frames = data

                    except:
                        print("FAILED")
    print(val_list)
    print(np.sum(val_list)/len(val_list))


list_and_play_mp4_from_s3('shadow-trainer-dev', 'pose_numpy/GallenZacR','','')


In [None]:
!pwd
!ls