In [31]:
import pandas as pd
import numpy as np
import mediapipe as mp
import cv2
import os
from tqdm.notebook import tqdm
import pickle

from scipy.spatial.distance import euclidean
from collections import Counter

mp_drawing = mp.solutions.drawing_utils  # Для отрисовки кейпоинтов
import matplotlib.pyplot as plt

In [2]:
# детекция и преобразование кейпоинтов

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def landmark_to_array(mp_landmark_list):
    keypoints = []
    for landmark in mp_landmark_list.landmark:
        keypoints.append([landmark.x, landmark.y, landmark.z])
    return np.nan_to_num(keypoints, nan=0)

def extract_landmarks(results):
    pose = np.zeros(99).tolist()
    if results.pose_landmarks:
        pose = landmark_to_array(results.pose_landmarks).reshape(99).tolist()

    left_hand = np.zeros(63).tolist()
    if results.left_hand_landmarks:
        left_hand = landmark_to_array(results.left_hand_landmarks).reshape(63).tolist()

    right_hand = np.zeros(63).tolist()
    if results.right_hand_landmarks:
        right_hand = landmark_to_array(results.right_hand_landmarks).reshape(63).tolist()

    return pose, left_hand, right_hand

def save_landmarks_from_video(video_path, start_frame, end_frame):
    landmark_list = {"pose": [], "left_hand": [], "right_hand": []}
    cap = cv2.VideoCapture(video_path)
    current_frame = 0
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if start_frame <= current_frame <= end_frame:
                image, results = mediapipe_detection(frame, holistic)
                pose, left_hand, right_hand = extract_landmarks(results)
                landmark_list["pose"].append(pose)
                landmark_list["left_hand"].append(left_hand)
                landmark_list["right_hand"].append(right_hand)
            current_frame += 1
            if current_frame > end_frame:
                break
        cap.release()
    return landmark_list

In [47]:
# Соберем все ключевые точки и метки из обучающего набора
def merge_data(dataframe, video_folder_path):
    all_keypoints = []  # Список для хранения всех кейпоинтов из всех видео
    all_labels = []  # Список для хранения меток жестов
    
    for index, row in tqdm(dataframe.iterrows(), total=dataframe.shape[0]):
        video_path = os.path.join(video_folder_path, row['attachment_id'] + '.mp4')
        start_frame = row['begin']
        end_frame = row['end']
    
        # Извлекаем ключевые точки для каждого видео
        keypoints = save_landmarks_from_video(video_path, start_frame, end_frame)
    
        # Сохраняем массивы кейпоинтов
        all_keypoints.append(keypoints)
        all_labels.append(row['text'])  # Или какой у вас столбец с метками

    return all_keypoints, all_labels

In [5]:
# Подготовка данных для DTW
def prepare_data_for_dtw(all_keypoints):
    dtw_data = []

    for keypoints_dict in all_keypoints:
        # Список для хранения временной последовательности всех ключевых точек для одного видео
        video_sequence = []

        # Количество кадров должно быть одинаковым в 'pose', 'left_hand' и 'right_hand'
        num_frames = len(keypoints_dict['pose'])

        # Объединим все ключевые точки в одну временную последовательность
        for i in range(num_frames):
            frame_keypoints = keypoints_dict['pose'][i] + keypoints_dict['left_hand'][i] + keypoints_dict['right_hand'][i]
            video_sequence.append(frame_keypoints)

        dtw_data.append(video_sequence)

    return dtw_data

In [6]:
# Сохраняем данные в файл с использованием pickle
def save_data_with_pickle(data, labels, filepath):
    with open(filepath, 'wb') as f:
        pickle.dump((data, labels), f)

# Загружаем данные из файла
def load_data_with_pickle(filepath):
    with open(filepath, 'rb') as f:
        data, labels = pickle.load(f)
    return data, labels


In [26]:
def dtw_distance(sequence1, sequence2):
    n, m = len(sequence1), len(sequence2)
    dtw_matrix = np.zeros((n+1, m+1))
    for i in range(n+1):
        for j in range(m+1):
            dtw_matrix[i, j] = np.inf
    dtw_matrix[0, 0] = 0
    
    for i in range(1, n+1):
        for j in range(1, m+1):
            cost = euclidean(sequence1[i-1], sequence2[j-1])
            last_min = min(dtw_matrix[i-1, j], dtw_matrix[i, j-1], dtw_matrix[i-1, j-1])
            dtw_matrix[i, j] = cost + last_min
            
    return dtw_matrix[n, m]

In [27]:
class KNN_DTW_Classifier:
    def __init__(self, k=3):
        self.k = k
        self.train_data = []
        self.train_labels = []

    def fit(self, data, labels):
        self.train_data = data
        self.train_labels = labels

    def predict(self, test_sequence):
        # Рассчитываем DTW расстояние между тестовым и каждым обучающим временным рядом
        distances = [dtw_distance(test_sequence, train_sequence) for train_sequence in self.train_data]
        
        # Получаем индексы k наименьших расстояний
        k_nearest_indices = np.argsort(distances)[:self.k]
        
        # Извлекаем соответствующие метки
        k_nearest_labels = [self.train_labels[i] for i in k_nearest_indices]
        
        # Определяем наиболее часто встречающуюся метку среди k ближайших соседей
        most_common = Counter(k_nearest_labels).most_common(1)
        return most_common[0][0]  # Возвращаем метку

In [7]:
# Инициализация MediaPipe
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

In [15]:
# Загружаем данные аннотаций
annotations = pd.read_csv('../dataset/annotations.csv', sep='\t')

# animals
animals_list = ["собака", "лошадь", "курица", "медведь", "козел", "волк", "бык", "коза", "свинья", "овца"]

# Отфильтруем данные для обучающего набора
animals_train_annotations = annotations[(annotations['train']) & (annotations['text'].isin(animals_list))]

train_video_folder_path = '../dataset/slovo/train/'

In [16]:
train_keypoints, train_labels = merge_data(animals_train_annotations)

  0%|          | 0/150 [00:00<?, ?it/s]

In [19]:
dtw_data_animals = prepare_data_for_dtw(train_keypoints)

In [21]:
# Сохраняем результат
file_to_save = 'animals_train_dtw_data.pkl'

save_data_with_pickle(dtw_data_animals, train_labels, file_to_save)

In [22]:
# Фильтрация тестового набора данных
animals_test_annotations = annotations[(~annotations['train']) & (annotations['text'].isin(animals_list))]

# Путь к папке с тестовыми видео
test_video_folder_path = '../dataset/slovo/test/'

In [48]:
test_keypoints, test_labels = merge_data(animals_test_annotations, test_video_folder_path)

  0%|          | 0/50 [00:00<?, ?it/s]

In [49]:
animals_test_data_dtw = prepare_data_for_dtw(test_keypoints)

In [50]:
# Путь к файлу, где будет сохранен результат
file_to_save = 'animals_test_dtw_data.pkl'

save_data_with_pickle(animals_test_data_dtw, test_labels, file_to_save)

In [29]:
# Создаем экземпляр классификатора
knn_dtw_classifier = KNN_DTW_Classifier(k=5)

# Обучаем классификатор на обучающих данных
knn_dtw_classifier.fit(dtw_data_animals, train_labels)

In [51]:
# Предсказание для тестового набора данных с индикатором прогресса
predictions = []
for test_sequence in tqdm(animals_test_data_dtw, desc='Predicting'):
    predictions.append(knn_dtw_classifier.predict(test_sequence))


Predicting:   0%|          | 0/50 [00:00<?, ?it/s]

In [52]:
# Вычисление точности
accuracy = sum(1 for pred, true in zip(predictions, test_labels) if pred == true) / len(test_labels)
print(f'Accuracy: {accuracy:.2f}')

Accuracy: 0.30


# ClearML

In [56]:
import os
from typing import List, Dict
from clearml import Logger
from clearml import Task
import sklearn
import random as rd 
from sklearn.metrics import classification_report

In [58]:
SCKLEARN_CLASSIFICATION_REPORT_TYPE = Dict[str, Dict[str, float]]

def log_classififcation_report_to_clearml(
    clearml_logger: Logger,
    classification_report: SCKLEARN_CLASSIFICATION_REPORT_TYPE,
    class_names: List[str],
    iteration: int,
) -> None:
    report_metrics_names: List[str] = ["f1-score", "precision", "recall", "support"]
    for metric_name in report_metrics_names:
        title = "Per class " + metric_name
        for class_name in class_names:
            logged_value: float = classification_report[class_name][metric_name]
            clearml_logger.report_scalar(
                title=title,
                series=class_name,
                iteration=iteration,
                value=logged_value,
            )

    # log aggregated metrics
    aggregated_metrics_keys: List[str] = list(
        set(classification_report.keys()) - set(class_names) - set(["accuracy"])
    )
    for aggregated_metrics_key in aggregated_metrics_keys:
        aggregated_metrics = classification_report[aggregated_metrics_key]
        for series_name, series_value in aggregated_metrics.items():
            clearml_logger.report_scalar(
                title=aggregated_metrics_key,
                series=series_name,
                value=series_value,
                iteration=iteration,
            )

    # log accuracy
    clearml_logger.report_scalar(
        title="accuracy",
        series="accuracy",
        iteration=iteration,
        value=classification_report["accuracy"],
    )

In [None]:
%env CLEARML_WEB_HOST=https://app.clear.ml
%env CLEARML_API_HOST=https://api.clear.ml
%env CLEARML_FILES_HOST=https://files.clear.ml
%env CLEARML_API_ACCESS_KEY=
%env CLEARML_API_SECRET_KEY=

In [59]:
task: Task = Task.init(project_name="All Experiments", task_name="mediapipe_animals_package")
logger = task.get_logger()

ClearML Task: created new task id=85bb794110c34e7095b14c503a97d1f2
ClearML results page: https://app.clear.ml/projects/9fb206e41199414a9a9144002e36c6b7/experiments/85bb794110c34e7095b14c503a97d1f2/output/log
ClearML Monitor: Could not detect iteration reporting, falling back to iterations as seconds-from-start


In [61]:
classes = animals_list

In [62]:
 log_classififcation_report_to_clearml(
        clearml_logger=logger,
        classification_report=report,
        class_names=classes,
        iteration=0,
    )

In [63]:
task.close()

2023-11-11 02:47:19,359 - clearml.Task - INFO - Waiting for repository detection and full package requirement analysis
2023-11-11 02:52:19,362 - clearml.Task - INFO - Repository and package analysis timed out (300.0 sec), giving up
