# Learning To use the Bible

## Using Holistic on an image

In [51]:
import os
from natsort import natsorted
from dataclasses import dataclass

@dataclass
class ImageSequence:
    id: int
    filepaths: list[str]

#Constants
FRAMES_PATH = "../backend/dynamic_signs/frames"

def __extract_prefix(filename:str, separator:str = "_"):
    return filename.split(separator)[0]

T_filepaths = list[str]
def get_image_sequences_from_dir(dir:str) -> dict[str, list[ImageSequence]]:
    labels = [folder for folder in os.listdir(dir)
                  if os.path.isdir(dir + os.sep + folder)]
    res_dict = {}
    for label in labels:
        folder_path = dir + os.sep + label + os.sep
        files = natsorted([file for file in os.listdir(folder_path)
                               if os.path.isfile(folder_path + file)])

        prev_prefix = __extract_prefix(files[0])
        cur_sequence = []
        label_sequences: list[ImageSequence] = [ImageSequence(int(prev_prefix), cur_sequence)]
        for image in files:
            prefix = __extract_prefix(image)
            if not (prefix == prev_prefix):
                cur_sequence = []
                label_sequences.append(ImageSequence(int(prefix), cur_sequence))
            cur_sequence.append(folder_path + image)
            prev_prefix = prefix
        res_dict[label] = label_sequences
    return res_dict

# label_files_dict = get_image_sequences_from_dir(FRAMES_PATH)


In [52]:
from typing import Any, NamedTuple
import mediapipe.python.solutions.holistic as mp_holistic
import cv2 as cv
import csv

mp = mp_holistic.Holistic(
    static_image_mode=True,
    model_complexity=1,
)

def get_attributes_as_dict(obj : NamedTuple) -> dict[str, Any]:
    return {field : getattr(obj, field) for field in obj._fields}

def write_processed_sequence_to_csv(label:str, id: int, 
                                    mp_process_results: list[NamedTuple],
                                    verbose = False):
        with open(f"{label}_out.csv", 'a', newline="") as f:        
            writer = csv.writer(f)
            for res in mp_process_results:
                res_as_dict = get_attributes_as_dict(res)
                #Sort to get:
                # face_landmarks, left_hand_landmarks, pose_landmarks, pose_world_landmarks, right_hand_landmarks, segmentation_mask
                for body_part, landmarks in sorted(res_as_dict.items(), key = lambda key_value : key_value[0]):
                    toWrite = []
                    which = body_part.removesuffix("_landmarks")
                    if landmarks is not None: 
                        toWrite = list(sum([ (mrk.x, mrk.y, mrk.z) for mrk in landmarks.landmark], ()))
                    else:
                        if verbose:
                            print(f"No landmarks for {which}")
                    writer.writerow([which, id, *toWrite])
            
            
def write_to_csv(label_files_dict: dict[str, list[ImageSequence]]):
    for label, img_sequences in label_files_dict.items():
        for sequence in img_sequences:
            results = [ mp.process(cv.imread(img_path)) for img_path in sequence.filepaths] 
            print(f"MediaPipe for {label}-{sequence.id} has {len(results)} many elements")
            write_processed_sequence_to_csv(label, sequence.id, results)


In [53]:
# This extracts zippity zip zip
import os
from pathlib import Path
import re
import shutil
import cv2
from zipfile import ZipFile

from sign.training.landmark_extraction.MediaPiper import MediaPiper

if False:
    regex = r".*\/*(.+)\/(.+)\.avi"
    mediapiper = MediaPiper()
    with ZipFile(str(Path.cwd().absolute().joinpath("data/zippo.zip")), 'r') as myzip:
        try:
            for file in myzip.filelist:
                match = re.match(regex, file.filename)
                if match is not None:
                    sign = match.group(1)
                    id = match.group(2)
                    video = myzip.open(file.filename).read()
                    with open("video.avi", "wb") as video_file:
                        video_file.write(video)
                    vc = cv2.VideoCapture('video.avi')
                    i = 0
                    if vc.isOpened():
                        rval , frame = vc.read()
                    else:
                        rval = False
                    path = f"./dynamic_signs/frames"
                    path_sign = f"{path}/{sign}"
                    if not os.path.exists(path):
                        os.makedirs(path)
                    if not os.path.exists(path_sign):
                        os.makedirs(path_sign)   
                    while rval:
                        rval, frame = vc.read()
                        if frame is None or frame.size == 0:
                            continue
                        cv2.imwrite(f"{path_sign}/{id}_{i}.png", frame)
                        i = i + 1
                    vc.release()
                    label_files_dict = get_image_sequences_from_dir(path)

                    print(label_files_dict)
                    write_to_csv(label_files_dict)
                    shutil.rmtree(path_sign)

        finally:
            os.remove("video.avi")  # Clean up after ourselves

In [54]:
import os
from pathlib import Path
HolisticSequence = dict[str, list[float]]


class csv_reader:
    @staticmethod
    def spawn_sequence() -> HolisticSequence:
        return {"face": [],
                "left_hand": [],
                "pose" : [],
                "right_hand" : [],
                }

    def __init__(self, sequence_spawner = spawn_sequence):
        self.new_holistic_sequence = sequence_spawner

    def _avoid(self, row_val: str):
        return row_val == "segmentation_mask" or row_val == "pose_world"
    
    def _remove_file_suffix(self, file_name: str):
        return file_name.removesuffix("_out.csv")

    def extract_holistic_landmarks(self, path:Path) -> dict[int, HolisticSequence]:
        res :dict[int, HolisticSequence] = {}
        with open(path, 'r') as f:
            reader = csv.reader(f)
            prev_id = -1

            cur_entry: HolisticSequence = self.new_holistic_sequence()
            for row in reader:
                row_key = row[0]
                if self._avoid(row_key):
                    continue

                new_id = int(row[1])
                is_new_video = new_id != prev_id
                if is_new_video and prev_id != -1:
                    res[prev_id] = cur_entry
                    cur_entry = self.new_holistic_sequence()
                    
                prev_id = new_id
                if len(row) > 2:
                    landmarks = row[2:]
                    row_marks = list(map(lambda elm : float(elm), landmarks))
                    if row_key not in cur_entry:
                        raise ValueError(f"Holistic Sequence only allows keys: {[k for k in self.new_holistic_sequence().keys()]}.\n\tEither update \"spawn_sequence\" function or check if csv is broken")
                    cur_entry[row_key].extend( row_marks)
            if len(list(cur_entry.values())[0]) > 0:
                res[prev_id] = cur_entry
        return res
    
    def extract_holistic_landmarks_from_folder(self, path: str) -> dict[str, dict[int, HolisticSequence]]:
        """
            Returns a dictionary from LABEL of the sign to a dictionary of sequence_ID to a HolisticSequence.
                
                HolisticSequence:
                    A dictionary of keys: ["face", "right_hand", "left_hand", "pose"]. 
                    Keys map to a list of the floats corresponding to xyz of landmarks of all frames in the sequnce.
        """
        res = {}
        for csv_file in [path+os.sep+file for file in os.listdir(path) if file.endswith(".csv")]:
            path_to_file = Path(csv_file)
            label = self._remove_file_suffix(path_to_file.name)
            res[label] = self.extract_holistic_landmarks(path_to_file.absolute())
        return res

reader = csv_reader()
result = reader.extract_holistic_landmarks_from_folder(".")
f"Parsed {result} classes"
len(result["J"])

10

In [55]:
## BORROW FROM DYNAMIC GESTURE
import numpy as np
from typing import Tuple
from sign.trajectory import TrajectoryBuilder
from sign.landmarks import NormalizedLandmark, pre_process_landmark, calc_landmark_list
from dynamic_signs.csv_reader import csv_reader
bob = TrajectoryBuilder(target_len=24)


def extract_training_data_and_labels_from_dynamic_gesture_map(gesture_map: dict[str, list[np.ndarray]]) -> Tuple[list[np.ndarray], list[str]]:
    trajectories_and_landmarks: list[np.ndarray] = []
    labels : list[str] = []
    for label, label_data in gesture_map.items():
        for data in label_data:
            labels.append(label)
            trajectories_and_landmarks.append(data)

    return trajectories_and_landmarks, labels

def prune_training_data_and_labels_from_dynamic_gesture_csv(input: dict[str, dict[int, list[float]]]) -> dict[str, list[np.ndarray]]:
    target_length = 24*3*21
    bob = TrajectoryBuilder(target_len=target_length)
    res: dict[str, list[np.ndarray]] = {}
    for label, videos in input.items():
        for id, frames in videos.items():
            if len(frames) == 0:
                print(f"{label}-{id} is empty - SKIPPING")
                continue
            existing = res.get(label)
            if len(frames) < target_length:
                frames = bob.pad_sequences_of_landmarks(frames)
            else: 
                frames = bob.extract_keyframes_sample(frames)
            frames = np.array(frames)
            if existing is not None:
                existing.append(frames)
            else:
                res[label] = [frames]
                
    return res

def extract_left_hand_landmarks_from_holistic(dict: dict[str, dict[int, HolisticSequence]]) -> dict[str, dict[int, list[float]]]:
    res = {}
    for label, video_id_to_holistic_seq in dict.items():
        res[label] = {} 
        for id, holy_seq in video_id_to_holistic_seq.items():
            if len(holy_seq["left_hand"]) > 0:
                res[label][id] = holy_seq["left_hand"]
            elif len(holy_seq["right_hand"]) > 0:
                res[label][id] = holy_seq["right_hand"]
            #else --> Do nothing!!??
            
    return res

def extract_training_data_and_labels_from_dynamic_gesture_csv() -> Tuple[list[np.ndarray], list[str]]:
    unpruned = reader.extract_holistic_landmarks_from_folder(".")
    unpruned = extract_left_hand_landmarks_from_holistic(unpruned)
    pruned = prune_training_data_and_labels_from_dynamic_gesture_csv(unpruned)
    normalized_landmarks = {}
    
    for key, val in pruned.items():
        for seq in val:
            trajectory = bob.make_trajectory(seq.reshape(-1, 21, 3))
            landmarks = []
            for i in range(0, len(seq), 3):
                landmark = NormalizedLandmark()
                landmark.x = seq[i]
                landmark.y = seq[i+1]
                landmark.z = seq[i+2]
                landmarks.append(landmark)
            normalized_landmarks_for_video = pre_process_landmark(calc_landmark_list(landmarks))
            existing = normalized_landmarks.get(key)
            asd = trajectory.to_float_list()
            asd.extend(normalized_landmarks_for_video)
            with_trajectories = np.array(asd)
            
            if existing is None:
                normalized_landmarks[key] = [with_trajectories]
            else:
                existing.append(with_trajectories)
    return extract_training_data_and_labels_from_dynamic_gesture_map(normalized_landmarks)

extracted_data,extracted_labels = extract_training_data_and_labels_from_dynamic_gesture_csv()

shapes = list(map(lambda arr : arr.shape ,extracted_data))
first = extracted_data[0].shape
f"Shape of first {first}. Do all data have this shape? '{'Yes' if all(first == s for s in shapes) else 'No'}'"

🔥🔥 TrajectoryBuilder is now running in BERTRAM_MODE 🔥🔥
🔥🔥 TrajectoryBuilder is now running in BERTRAM_MODE 🔥🔥


"Shape of first (1077,). Do all data have this shape? 'Yes'"

In [56]:
#Holistic on one-armed-bandit looking for hands- Holy nation approves of this one 
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

labels_J = (np.array(extracted_labels, dtype=np.str_) == "J")
model_J_or_not = make_pipeline(StandardScaler(),
                          SVC(kernel="poly", degree=6, coef0=1))
model_J_or_not.fit(extracted_data, labels_J)

if False:
    from joblib import dump
    dump(model_svm, 'dynamic_model.joblib')
model_J_or_not

In [57]:
# ABSOLUTE WORST TEST, please correct me :3
toPredict = [extracted_data[0]]
true_label = labels_J[0]
print(f"Is J?\n  Predicted: {model_J_or_not.predict(toPredict)[0]}\n  Should be: {true_label}")

Is J?
  Predicted: False
  Should be: False
