In [1]:
##### Copyright 2021 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import cv2
import itertools
import numpy as np
import pandas as pd
import os
import sys
import tempfile
import tqdm

from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import keras
from ultralytics import YOLO

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Using the detection interface of raspberry_pi folder
pose_sample_rpi_path = os.path.join(os.getcwd(), 'examples/lite/examples/pose_estimation/raspberry_pi')
sys.path.append(pose_sample_rpi_path)

# Load MoveNet Thunder model
import utils
from data import BodyPart
from ml import Movenet
movenet = Movenet('movenet_thunder')

# Define function to run pose estimation using MoveNet Thunder.
# You'll apply MoveNet's cropping algorithm and run inference multiple times on
# the input image to improve pose estimation accuracy.
def detect(input_tensor, inference_count=3):
  """Runs detection on an input image.

  Args:
    input_tensor: A [height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.
    inference_count: Number of times the model should run repeatly on the
      same input image to improve detection accuracy.

  Returns:
    A Person entity detected by the MoveNet.SinglePose.
  """
  image_height, image_width, channel = input_tensor.shape

  # Detect pose using the full input image
  movenet.detect(input_tensor.numpy(), reset_crop_region=True)

  # Repeatedly using previous detection result to identify the region of
  # interest and only croping that region to improve detection accuracy
  for _ in range(inference_count - 1):
    person = movenet.detect(input_tensor.numpy(),
                            reset_crop_region=False)

  return person

#@title Functions to visualize the pose estimation results.
def draw_prediction_on_image(
    image, person, crop_region=None, close_figure=True,
    keep_input_size=False):
  """Draws the keypoint predictions on image.

  Args:
    image: An numpy array with shape [height, width, channel] representing the
      pixel values of the input image.
    person: A person entity returned from the MoveNet.SinglePose model.
    close_figure: Whether to close the plt figure after the function returns.
    keep_input_size: Whether to keep the size of the input image.

  Returns:
    An numpy array with shape [out_height, out_width, channel] representing the
    image overlaid with keypoint predictions.
  """
  # Draw the detection result on top of the image.
  image_np = utils.visualize(image, [person])

  # Plot the image with detection results.
  height, width, channel = image.shape
  aspect_ratio = float(width) / height
  fig, ax = plt.subplots(figsize=(12 * aspect_ratio, 12))
  im = ax.imshow(image_np)

  if close_figure:
    plt.close(fig)

  if not keep_input_size:
    image_np = utils.keep_aspect_ratio_resizer(image_np, (512, 512))

  return image_np

class MoveNetPreprocessor(object):
  """Helper class to preprocess pose sample images for classification."""

  def __init__(self,
               images_in_folder,
               images_out_folder,
               csvs_out_path):
    """Creates a preprocessor to detection pose from images and save as CSV.

    Args:
      images_in_folder: Path to the folder with the input images. It should
        follow this structure:
        yoga_poses
        |__ downdog
            |______ 00000128.jpg
            |______ 00000181.bmp
            |______ ...
        |__ goddess
            |______ 00000243.jpg
            |______ 00000306.jpg
            |______ ...
        ...
      images_out_folder: Path to write the images overlay with detected
        landmarks. These images are useful when you need to debug accuracy
        issues.
      csvs_out_path: Path to write the CSV containing the detected landmark
        coordinates and label of each image that can be used to train a pose
        classification model.
    """
    self._images_in_folder = images_in_folder
    self._images_out_folder = images_out_folder
    self._csvs_out_path = csvs_out_path
    self._messages = []

    # Create a temp dir to store the pose CSVs per class
    self._csvs_out_folder_per_class = tempfile.mkdtemp()

    # Get list of pose classes and print image statistics
    self._pose_class_names = sorted(
        [n for n in os.listdir(self._images_in_folder) if not n.startswith('.')]
        )

  def process(self, per_pose_class_limit=None, detection_threshold=0.1):
    """Preprocesses images in the given folder.
    Args:
      per_pose_class_limit: Number of images to load. As preprocessing usually
        takes time, this parameter can be specified to make the reduce of the
        dataset for testing.
      detection_threshold: Only keep images with all landmark confidence score
        above this threshold.
    """
    # Loop through the classes and preprocess its images
    for pose_class_name in self._pose_class_names:
      # Print out the processing class name and progress bar
      print('Preprocessing', pose_class_name, file=sys.stderr)

      # Paths for the pose class.
      images_in_folder = os.path.join(self._images_in_folder, pose_class_name)
      images_out_folder = os.path.join(self._images_out_folder, pose_class_name)
      csv_out_path = os.path.join(self._csvs_out_folder_per_class,
                                  pose_class_name + '.csv')
      if not os.path.exists(images_out_folder):
        os.makedirs(images_out_folder)

      # Detect landmarks in each image and write it to a CSV file
      with open(csv_out_path, 'w') as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file,
                                    delimiter=',',
                                    quoting=csv.QUOTE_MINIMAL)
        # Get list of images
        image_names = sorted(
            [n for n in os.listdir(images_in_folder) if not n.startswith('.')])
        if per_pose_class_limit is not None:
          image_names = image_names[:per_pose_class_limit]

        valid_image_count = 0

        # Detect pose landmarks from each image
        for image_name in tqdm.tqdm(image_names):
          image_path = os.path.join(images_in_folder, image_name)

          try:
            image = tf.io.read_file(image_path)
            image = tf.io.decode_jpeg(image)
          except:
            self._messages.append('Skipped ' + image_path + '. Invalid image.')
            continue
          else:
            # thinks that this two lines are redundant.
            # image = tf.io.read_file(image_path)
            # image = tf.io.decode_jpeg(image)
            image_height, image_width, channel = image.shape

          # Skip images that isn't RGB because Movenet requires RGB images
          if channel != 3:
            self._messages.append('Skipped ' + image_path +
                                  '. Image isn\'t in RGB format.')
            continue
          person = detect(image)

          # Save landmarks if all landmarks were detected
          min_landmark_score = min(
              [keypoint.score for keypoint in person.keypoints])
          should_keep_image = min_landmark_score >= detection_threshold
          if not should_keep_image:
            self._messages.append('Skipped ' + image_path +
                                  '. No pose was confidentlly detected.')
            continue

          valid_image_count += 1

          # Draw the prediction result on top of the image for debugging later
          output_overlay = draw_prediction_on_image(
              image.numpy().astype(np.uint8), person,
              close_figure=True, keep_input_size=True)

          # Write detection result into an image file
          output_frame = cv2.cvtColor(output_overlay, cv2.COLOR_RGB2BGR)
          cv2.imwrite(os.path.join(images_out_folder, image_name), output_frame)

          # Get landmarks and scale it to the same size as the input image
          pose_landmarks = np.array(
              [[keypoint.coordinate.x, keypoint.coordinate.y, keypoint.score]
                for keypoint in person.keypoints],
              dtype=np.float32)

          # Write the landmark coordinates to its per-class CSV file
          # transfer np.str to str due to the error
          coordinates = pose_landmarks.flatten().astype(str).tolist()
          csv_out_writer.writerow([image_name] + coordinates)

        if not valid_image_count:
          raise RuntimeError(
              'No valid images found for the "{}" class.'
              .format(pose_class_name))

    # Print the error message collected during preprocessing.
    print('\n'.join(self._messages))

    # Combine all per-class CSVs into a single output file
    all_landmarks_df = self._all_landmarks_as_dataframe()
    all_landmarks_df.to_csv(self._csvs_out_path, index=False)

  def class_names(self):
    """List of classes found in the training dataset."""
    return self._pose_class_names

  def _all_landmarks_as_dataframe(self):
    """Merge all per-class CSVs into a single dataframe."""
    total_df = None
    for class_index, class_name in enumerate(self._pose_class_names):
      csv_out_path = os.path.join(self._csvs_out_folder_per_class,
                                  class_name + '.csv')
      per_class_df = pd.read_csv(csv_out_path, header=None)

      # Add the labels
      per_class_df['class_no'] = [class_index]*len(per_class_df)
      per_class_df['class_name'] = [class_name]*len(per_class_df)

      # Append the folder name to the filename column (first column)
      per_class_df[per_class_df.columns[0]] = (os.path.join(class_name, '')
        + per_class_df[per_class_df.columns[0]].astype(str))

      if total_df is None:
        # For the first class, assign its data to the total dataframe
        total_df = per_class_df
      else:
        # Concatenate each class's data into the total dataframe
        total_df = pd.concat([total_df, per_class_df], axis=0)

    list_name = [[bodypart.name + '_x', bodypart.name + '_y',
                  bodypart.name + '_score'] for bodypart in BodyPart]
    header_name = []
    for columns_name in list_name:
      header_name += columns_name
    header_name = ['file_name'] + header_name
    header_map = {total_df.columns[i]: header_name[i]
                  for i in range(len(header_name))}

    total_df.rename(header_map, axis=1, inplace=True)

    return total_df








In [3]:
import csv
import cv2
import itertools
import numpy as np
import pandas as pd
import os
import sys
import tempfile
import tqdm
import matplotlib
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_hub as hub
# from tensorflow import keras # module 'keras.api._v2.keras'
import sklearn



# 檢查並打印版本信息
def check_versions():
    # print("CSV Module: Standard library, no separate version")
    print("CV2 (OpenCV) Version:", cv2.__version__)
    # print("itertools: Standard library, no separate version")
    print("NumPy Version:", np.__version__)
    print("Pandas Version:", pd.__version__)
    # print("OS Module: Standard library, no separate version")
    # print("Sys Module: Standard library, no separate version")
    # print("Tempfile Module: Standard library, no separate version")
    print("tqdm Version:", tqdm.__version__)
    print("Matplotlib Version:", matplotlib.__version__)
    print("TensorFlow Version:", tf.__version__)
    print("TensorFlow Hub Version:", hub.__version__)
    # print("Keras (from TensorFlow) Version:", keras.__version__)
    # print("Ultralytics YOLO Version: Not typically specified within the package")
    # print("Scikit-learn Version:", sklearn_version)

# 執行版本檢查函數
check_versions()

'''
CV2 (OpenCV) Version: 4.9.0
NumPy Version: 1.26.4
Pandas Version: 2.2.1
tqdm Version: 4.66.2
Matplotlib Version: 3.8.3
TensorFlow Version: 2.15.0
TensorFlow Hub Version: 0.16.1
Scikit-learn Version: 1.4.1.post1
'''


CV2 (OpenCV) Version: 4.9.0
NumPy Version: 1.26.4
Pandas Version: 2.2.1
tqdm Version: 4.66.2
Matplotlib Version: 3.8.3
TensorFlow Version: 2.15.0
TensorFlow Hub Version: 0.16.1


'\nCV2 (OpenCV) Version: 4.9.0\nNumPy Version: 1.26.4\nPandas Version: 2.2.1\ntqdm Version: 4.66.2\nMatplotlib Version: 3.8.3\nTensorFlow Version: 2.15.0\nTensorFlow Hub Version: 0.16.1\nScikit-learn Version: 1.4.1.post1\n'

In [4]:
forward = r'D:\Project\Movenet\Neck_results\poses_images_out_test\Neck Forward'
neutral = r'D:\Project\Movenet\Neck_results\poses_images_out_test\Neck Neutral'

# Load the pretrained model
# model = YOLO('../YOLO/YOLOv8x.pt')
# result = model(sample_path)
# print(result.Boxes)

# result = model.predict(
#     source=sample_path,
#     mode="predict",
#     save=True,
#     device="cpu"
# )


# print(sys.path)
def predict(samplePath):
  image = tf.io.read_file(samplePath)
  image = tf.io.decode_jpeg(image)
  person = detect(image)
  pose_landmarks = []
  for keypoint in person.keypoints:
    pose_landmarks += [keypoint.coordinate.x, keypoint.coordinate.y, keypoint.score]
  pose_landmarks = np.array(pose_landmarks)
  input_tensor = tf.constant(pose_landmarks, dtype=tf.float32)
  # image_np = draw_prediction_on_image(image.numpy(), person, crop_region=None,
                              #  close_figure=False, keep_input_size=True) #  Bug: trigger error by setting keep_input_size False
  # print(image_np)
  # plt.imshow(image_np)
  # plt.axis('off')  # Turn off axis
  # plt.show()
  input_tensor_reshaped = tf.reshape(input_tensor, shape=(1, -1))
  return input_tensor_reshaped


model = tf.keras.models.load_model(r'D:\Project\Movenet\Neck_results\run2\Neck_weights.best.keras')

files_forwards = os.listdir(forward)
forward_paths = [os.path.join(forward, p) for p in files_forwards]

forward_count = 0
neutral_count = 0
for f in forward_paths:
  pose_landmarks = predict(f)
  prediction = model.predict(pose_landmarks)

  if prediction[0, 0] > prediction[0, 1]:
    forward_count += 1
  else:
    neutral_count += 1
print(f'forward: {forward_count}, neutral: {neutral_count}')


files_neutrals = os.listdir(neutral)
neutral_paths = [os.path.join(neutral, p) for p in files_neutrals]

forward_count = 0
neutral_count = 0
for f in neutral_paths:
  pose_landmarks = predict(f)
  prediction = model.predict(pose_landmarks)
  print(prediction.keypoints)
  if prediction[0, 0] > prediction[0, 1]:
    forward_count += 1
  else:
    neutral_count += 1
print(f'forward: {forward_count}, neutral: {neutral_count}')



KeyboardInterrupt: 