In [None]:
# @title Download Code {form-width: "25%"}
!git clone https://github.com/deepmind/tapnet.git

Cloning into 'tapnet'...
remote: Enumerating objects: 541, done.[K
remote: Counting objects: 100% (214/214), done.[K
remote: Compressing objects: 100% (92/92), done.[K
remote: Total 541 (delta 126), reused 138 (delta 122), pack-reused 327[K
Receiving objects: 100% (541/541), 1.23 MiB | 5.19 MiB/s, done.
Resolving deltas: 100% (316/316), done.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# @title Install Dependencies {form-width: "25%"}
!pip install -r tapnet/requirements_inference.txt

Collecting jaxline (from -r tapnet/requirements_inference.txt (line 3))
  Downloading jaxline-0.0.5.tar.gz (32 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dm-haiku (from -r tapnet/requirements_inference.txt (line 5))
  Downloading dm_haiku-0.0.10-py3-none-any.whl (360 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m360.3/360.3 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Collecting mediapy (from -r tapnet/requirements_inference.txt (line 9))
  Downloading mediapy-1.1.9-py3-none-any.whl (25 kB)
Collecting einshape (from -r tapnet/requirements_inference.txt (line 11))
  Downloading einshape-1.0-py3-none-any.whl (21 kB)
Collecting ipympl (from -r tapnet/requirements_inference.txt (line 12))
  Downloading ipympl-0.9.3-py2.py3-none-any.whl (511 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m511.6/511.6 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
Collecting ml_collections>=0.1 (from jaxline->-r tapnet/requirements_inference.tx

In [None]:
# @title Download Model {form-width: "25%"}

%mkdir tapnet/checkpoints

!wget -P tapnet/checkpoints https://storage.googleapis.com/dm-tapnet/causal_tapir_checkpoint.npy

%ls tapnet/checkpoints

--2023-08-19 08:50:03--  https://storage.googleapis.com/dm-tapnet/causal_tapir_checkpoint.npy
Resolving storage.googleapis.com (storage.googleapis.com)... 108.177.12.128, 108.177.13.128, 74.125.26.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|108.177.12.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 124408122 (119M) [application/octet-stream]
Saving to: ‘tapnet/checkpoints/causal_tapir_checkpoint.npy’


2023-08-19 08:50:04 (129 MB/s) - ‘tapnet/checkpoints/causal_tapir_checkpoint.npy’ saved [124408122/124408122]

causal_tapir_checkpoint.npy


In [None]:
# @title Imports {form-width: "25%"}
%matplotlib widget
import functools

import haiku as hk
import jax
import jax.numpy as jnp
import matplotlib.pyplot as plt
import mediapy as media
import numpy as np
from tqdm import tqdm
import tree

from tapnet import tapir_model
from tapnet.utils import transforms
from tapnet.utils import viz_utils

from google.colab import output
output.enable_custom_widget_manager()

In [None]:
# @title Load Checkpoint {form-width: "25%"}

checkpoint_path = 'tapnet/checkpoints/causal_tapir_checkpoint.npy'
ckpt_state = np.load(checkpoint_path, allow_pickle=True).item()
params, state = ckpt_state['params'], ckpt_state['state']

In [None]:

# @title Build Model {form-width: "25%"}

# Internally, the tapir model has three stages of processing: computing
# image features (get_feature_grids), extracting features for each query point
# (get_query_features), and estimating trajectories given query features and
# the feature grids where we want to track (estimate_trajectories).  For
# tracking online, we need extract query features on the first frame only, and
# then call estimate_trajectories on one frame at a time.

def build_online_model_init(frames, query_points):
  """Initialize query features for the query points."""
  model = tapir_model.TAPIR(use_causal_conv=True, bilinear_interp_with_depthwise_conv=False)

  feature_grids = model.get_feature_grids(frames, is_training=False)
  query_features = model.get_query_features(
      frames,
      is_training=False,
      query_points=query_points,
      feature_grids=feature_grids,
  )
  return query_features


def build_online_model_predict(frames, query_features, causal_context):
  """Compute point tracks and occlusions given frames and query points."""
  model = tapir_model.TAPIR(use_causal_conv=True, bilinear_interp_with_depthwise_conv=False)
  feature_grids = model.get_feature_grids(frames, is_training=False)
  trajectories = model.estimate_trajectories(
      frames.shape[-3:-1],
      is_training=False,
      feature_grids=feature_grids,
      query_features=query_features,
      query_points_in_video=None,
      query_chunk_size=64,
      causal_context=causal_context,
      get_causal_context=True,
  )
  causal_context = trajectories['causal_context']
  del trajectories['causal_context']
  return {k: v[-1] for k, v in trajectories.items()}, causal_context


online_init = hk.transform_with_state(build_online_model_init)
online_init_apply = jax.jit(online_init.apply)

online_predict = hk.transform_with_state(build_online_model_predict)
online_predict_apply = jax.jit(online_predict.apply)

rng = jax.random.PRNGKey(42)
online_init_apply = functools.partial(
    online_init_apply, params=params, state=state, rng=rng
)
online_predict_apply = functools.partial(
    online_predict_apply, params=params, state=state, rng=rng
)

In [None]:

# @title Utility Functions {form-width: "25%"}

def preprocess_frames(frames):
  """Preprocess frames to model inputs.

  Args:
    frames: [num_frames, height, width, 3], [0, 255], np.uint8

  Returns:
    frames: [num_frames, height, width, 3], [-1, 1], np.float32
  """
  frames = frames.astype(np.float32)
  frames = frames / 255 * 2 - 1
  return frames


def postprocess_occlusions(occlusions, expected_dist):
  """Postprocess occlusions to boolean visible flag.

  Args:
    occlusions: [num_points, num_frames], [-inf, inf], np.float32

  Returns:
    visibles: [num_points, num_frames], bool
  """
  pred_occ = jax.nn.sigmoid(occlusions)
  pred_occ = 1 - (1 - pred_occ) * (1 - jax.nn.sigmoid(expected_dist))
  visibles = pred_occ < 0.5  # threshold
  return visibles


def sample_random_points(frame_max_idx, height, width, num_points):
  """Sample random points with (time, height, width) order."""
  y = np.random.randint(0, height, (num_points, 1))
  x = np.random.randint(0, width, (num_points, 1))
  t = np.random.randint(0, frame_max_idx + 1, (num_points, 1))
  points = np.concatenate((t, y, x), axis=-1).astype(np.int32)  # [num_points, 3]
  return points


def construct_initial_causal_state(num_points, num_resolutions):
  value_shapes = {
      "tapir/~/pips_mlp_mixer/block_1_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_1_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_2_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_2_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_3_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_3_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_4_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_4_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_5_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_5_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_6_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_6_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_7_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_7_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_8_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_8_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_9_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_9_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_10_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_10_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_11_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_11_causal_2": (1, num_points, 2, 2048),
      "tapir/~/pips_mlp_mixer/block_causal_1": (1, num_points, 2, 512),
      "tapir/~/pips_mlp_mixer/block_causal_2": (1, num_points, 2, 2048),
  }
  fake_ret = {
      k: jnp.zeros(v, dtype=jnp.float32) for k, v in value_shapes.items()
  }
  return [fake_ret] * num_resolutions * 4

In [None]:
import pandas as pd

frame_labels = pd.read_csv('/content/drive/MyDrive/Dissertation_2/filtered_pain_labels.csv')
frame_labels.head()




Unnamed: 0,Person,video,frame,Label
0,042-ll042,ll042t1aaaff,ll042t1aaaff001_facs.txt,0
1,042-ll042,ll042t1aaaff,ll042t1aaaff002_facs.txt,0
2,042-ll042,ll042t1aaaff,ll042t1aaaff003_facs.txt,0
3,042-ll042,ll042t1aaaff,ll042t1aaaff004_facs.txt,0
4,042-ll042,ll042t1aaaff,ll042t1aaaff005_facs.txt,0


In [None]:
#  Load image from folder

import os
from PIL import Image
import numpy as np

label_dict=dict()


def load_video(folder_path):


  # Get a list of all PNG files in the folder
  png_files = [file for file in os.listdir(folder_path) if file.endswith(".png")]
  png_files.sort()
  # Create an empty list to store the loaded images
  loaded_images = []
  label_list=list()

  # Iterate over each PNG file and load it
  for file_name in png_files:


      # Construct the full file path
      file_path = os.path.join(folder_path, file_name)
      splits = file_path.split('/')
      #print(splits)

      condition1 = frame_labels['Person']==str(splits[-3])
      condition2 = frame_labels['video']==str(splits[-2])
      condition3 = frame_labels['frame']==str(splits[-1]).split('.')[0]+'_facs.txt'

      label = frame_labels.loc[condition1 & condition2 & condition3,'Label'].values[0]
      label_list.append(label)

      # Open the image using PIL
      image = Image.open(file_path)
      new_size = (320, 240)
      image = image.resize(new_size, Image.ANTIALIAS)

      # Convert the image to RGB mode (in case it has an alpha channel)
      image = image.convert("RGB")

      # Append the image to the list
      loaded_images.append(image)

  label_dict[splits[-2]]=label_list
  # Determine the dimensions of the images
  num_images = len(loaded_images)
  #height, width = loaded_images[0].size
  height, width = 240, 320

  # Create a NumPy array of size (number of images, height, width, 3)
  image_array = np.zeros((num_images, height, width, 3), dtype=np.uint8)

  # Fill the array with the image data
  for i, image in enumerate(loaded_images):
      image_array[i] = np.array(image)

  # Print the shape of the image array
  #print("Shape of the image array:", image_array.shape)

  video = image_array
  return(video)


In [None]:
def detect_landmarks(image_path):
  # STEP 3: Load the input image.
  image = mp.Image.create_from_file(image_path)

  # STEP 4: Detect face landmarks from the input image.
  detection_result = detector.detect(image)

  landmark_targets = list([130,25,243,244,245,6,196,197,193,168,
                     417, 419, 413,414,398,465,464,463,362,255,359,
                     8,9,55,107,336,285,65,52,53,46,124,156,226,35,143,31,295,282,283,276,265,446,261,353,372,340,368,264,
                      57,185,40,39,37,0,267,269,270,409,287,375,321,405,314,17,84,181,91,146,61,76,62,78,308,293,306,291,
                      122,188,114,47,100,101,50,123,137,351,412,343,277,329,330,280,352,366,
                      129,203,206,216,212,210,169,358,423,426,436,432,430,394,
                         246,161,160,159,158,157,173,133,154,155,153,145,144,163,7,
                         398,384,385,386,387,388,466,263,249,390,373,374,380,381,382,362])

  landmark_list=list()
  projection_image = cv2.imread(image_path)


  face_landmarks_list = detection_result.face_landmarks
  for i,landmark in enumerate(face_landmarks_list[0]):
    if i in landmark_targets:
      x = landmark.x
      y = landmark.y

      shape = projection_image.shape
      relative_x = int(x * shape[1])
      relative_y = int(y * shape[0])

      landmark_list.append([0,relative_y,relative_x])

  lardmark_array = np.array(landmark_list)

  return (lardmark_array)

for sequence in video sequence folders:
  Video = Load video(sequences)
  First_image = Identify first image in the sequence
  First_image_Landmark = Mediapipe(First image)
  Tracked_landmarks = TAPIR (Video, First_image_Landmark)
Rolled landmarks = Rolling window logic (Merged landmarks)
Rolled labels = Rolling window logic (Labels)
Face extracts = Mediapipe (Frames)
Frame pain estimate model (Rolled landmarks,Face extracts,Rolled labels)
Video pain estimate model (Tracked_landmarks,Face extracts,labels)



In [None]:
!pip install -q mediapipe==0.10.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m18.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
!wget -O face_landmarker_v2_with_blendshapes.task -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task

In [None]:
# STEP 1: Import the necessary modules.
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import cv2



# STEP 2: Create an FaceLandmarker object.
base_options = python.BaseOptions(model_asset_path='face_landmarker_v2_with_blendshapes.task')
options = vision.FaceLandmarkerOptions(base_options=base_options,
                                       output_face_blendshapes=True,
                                       output_facial_transformation_matrixes=True,
                                       num_faces=1)
detector = vision.FaceLandmarker.create_from_options(options)

def track_points(video,landmark_array):
    # @title Progressively Predict Sparse Point Tracks {form-width: "25%"}

  resize_height = 240  # @param {type: "integer"}
  resize_width = 320  # @param {type: "integer"}

  #height, width = video.shape[1:3]
  height, width = 240, 320
  frames = media.resize_video(video, (resize_height, resize_width))
  query_points = landmark_array

  query_features, _ = online_init_apply(frames=preprocess_frames(frames[None, None, 0]), query_points=query_points[None])
  causal_state = construct_initial_causal_state(query_points.shape[0], len(query_features.resolutions) - 1)

  # Predict point tracks frame by frame
  predictions = []
  print('Frame size',frames.shape[0])
  for i in range(frames.shape[0]):
    (prediction, causal_state), _ = online_predict_apply(
        frames=preprocess_frames(frames[None, None, i]),
        query_features=query_features,
        causal_context=causal_state,
    )
    predictions.append(prediction)

  tracks = np.concatenate([x['tracks'][0] for x in predictions], axis=1)

  # Visualize sparse point tracks
  tracks = transforms.convert_grid_coordinates(tracks, (resize_width, resize_height), (width, height))

  occlusions = np.concatenate([x['occlusion'][0] for x in predictions], axis=1)
  expected_dist = np.concatenate([x['expected_dist'][0] for x in predictions], axis=1)

  visibles = postprocess_occlusions(occlusions, expected_dist)

  # # Visualize sparse point tracks
  # tracks = transforms.convert_grid_coordinates(tracks, (resize_width, resize_height), (width, height))
  video_viz = viz_utils.paint_point_track(video, tracks, visibles)
  media.show_video(video_viz, fps=10)
  return(tracks)

In [None]:
from pathlib import Path
import os

def main(track_folder):
  video_loaded = load_video(track_folder)

  folder_path = Path(track_folder)
  files = list(folder_path.iterdir())
  dir_files=list()
  for i in files:
    if str(i). split('.')[1]=='png':
      dir_files.append(str(i))
  dir_files.sort()
  first_file_path = dir_files[0]

  landmark_points = detect_landmarks(first_file_path)
  print('First_file_path',first_file_path)

  trajectory = track_points(video_loaded, landmark_points)

  return(trajectory)

trajectory_main= dict()
folder_track = '/content/drive/MyDrive/Dissertation_2/Images/Images/'
for files in os.listdir(folder_track):
  print(files)
  for folders in os.listdir(folder_track+files):
    if (folders in list(set(frame_labels['video']))) and (folders not in list(trajectory_main.keys())):
      print(folders)
      tracked_points = main(folder_track+files + '/' + folders)
      trajectory_main[folders] = np.array(tracked_points)
      #break
      #trajectory_main.append(np.array(tracked_points))


107-hs107
hs107t2aaaff


  image = image.resize(new_size, Image.ANTIALIAS)


First_file_path /content/drive/MyDrive/Dissertation_2/Images/Images/107-hs107/hs107t2aaaff/hs107t2aaaff001.png
Frame size 412


KeyboardInterrupt: ignored

In [None]:
import pickle


# Specify the file path to save the dictionary
file_path = '/content/drive/MyDrive/Dissertation_2/trajectory.pickle'

# Save the dictionary using pickle
with open(file_path, 'wb') as file:
    pickle.dump(trajectory_main, file)

In [None]:
# Specify the file path to save the dictionary
file_path = '/content/drive/MyDrive/Dissertation_2/labels.pickle'

# Save the dictionary using pickle
with open(file_path, 'wb') as file:
    pickle.dump(label_dict, file)

In [None]:
 # trajectory_array=np.array(trajectory_main)
# trajectory_array

In [None]:
# from PIL import Image
# import os

# input_folder = '/content/drive/MyDrive/Dissertation_2/Images/Images/123-jh123/jh123t1aeunaff'  # Specify the path to the folder containing the images
# output_folder = '/content/drive/MyDrive/Dissertation_2/Images/Images/123-jh123/jh123t1aeunaff'  # Specify the path to the folder where resized images will be saved

# # Ensure the output folder exists
# if not os.path.exists(output_folder):
#     os.makedirs(output_folder)

# main_folder = '/content/drive/MyDrive/Dissertation_2/Images/Images'

# for folders in os.listdir(main_folder):



# # Loop through all files in the input folder
# for folders in
# for filename in os.listdir(input_folder):
#     if filename.endswith(".jpg") or filename.endswith(".png"):
#         image_path = os.path.join(input_folder, filename)
#         output_path = os.path.join(output_folder, filename)

#         # Open the image
#         img = Image.open(image_path)

#         # Resize the image
#         # You can adjust the new size as needed
#         new_size = (320, 240)
#         resized_img = img.resize(new_size, Image.ANTIALIAS)

#         # Save the resized image
#         resized_img.save(output_path)

In [None]:
# main_folder = '/content/drive/MyDrive/Dissertation_2/Images/Images/'

# for folders in os.listdir(main_folder):
#   for folder_2 in os.listdir(main_folder+folders):
#     for filename in os.listdir(main_folder+folders+'/'+folder_2):
#       if filename.endswith(".jpg") or filename.endswith(".png"):
#         image_path = os.path.join(main_folder,folders,folder_2, filename)
#         output_path = os.path.join(main_folder,folders,folder_2, filename)
#         # Open the image
#         img = Image.open(image_path)

#         # Resize the image
#         # You can adjust the new size as needed
#         new_size = (320, 240)
#         resized_img = img.resize(new_size, Image.ANTIALIAS)

#         # Save the resized image
#         resized_img.save(output_path)


In [None]:
# from PIL import Image
# import os

# import numpy as np

# img = Image.open('/content/drive/MyDrive/Dissertation_2/Images/Images/123-jh123/jh123t1aeunaff/jh123t1aeunaff001.png')
# print(np.array(img).shape)
# new_size = (320, 240)
# resized_img = img.resize(new_size, Image.ANTIALIAS)
# np.array(resized_img).shape

