In [None]:
import json
import pandas as pd
import os
import re
import numpy as np
import shutil
from itertools import combinations

In [None]:
WORDS_TO_PROCESS = ['VIVIR', 'ALGUNO', 'DIABETES', 'GORDO', 'RESULTADO']
SKIP_SIGNER = 4

NOVELTY_BASE = '/home/gerardo/NoveltyDetection'
SUB_FOLDER = '-1'
NOVELTY_DIR = os.path.join(NOVELTY_BASE, SUB_FOLDER)

os.makedirs(NOVELTY_DIR, exist_ok=True)

dfs = pd.read_excel(
    '/home/gerardo/LSE_HEALTH/LSE-Health-UVigo.xlsx',
    sheet_name=None
)
GlossesContent = dfs['GlossesContent']

def extract_words_from_json(json_file_path):
    basename = os.path.basename(json_file_path)
    match = re.search(r'_(\d+)(?=\.json)', basename)
    if not match:
        return

    signer = int(match.group(1))
    if signer == SKIP_SIGNER:
        return

    video = re.sub(r'_[0-9]+\.json$', '', basename)

    fps = 25

    with open(json_file_path, 'r') as jf:
        data = json.load(jf)

    for word in WORDS_TO_PROCESS:

        elan_filtered = GlossesContent.loc[
            (GlossesContent['File'] == video) &
            (GlossesContent['Gloss'] == word)
        ]
        if elan_filtered.empty:
            continue

        for _, row in elan_filtered.iterrows():
            start_ms = row['Start(ms)']
            end_ms = row['End(ms)']
            start_frame = int(start_ms / 1000 * fps)
            end_frame = int(end_ms / 1000 * fps)

            filtered_frames = [f for f in data if start_frame <= f['frame'] <= end_frame]

            word_data = {
                'signer': signer,
                'video': video,
                'gloss': word,
                'start': start_ms,
                'end': end_ms,
                'frames': filtered_frames
            }

            out_name = f"{word}_{video}_{signer}_{start_ms}_{end_ms}.json"
            out_path = os.path.join(NOVELTY_DIR, out_name)
            with open(out_path, 'w') as out_f:
                json.dump(word_data, out_f)

base_path = '/home/gerardo/LSE_DATABASE/LSE_HEALTH'
for fname in os.listdir(base_path):
    if fname.lower().endswith('.json'):
        extract_words_from_json(os.path.join(base_path, fname))

In [5]:
def euclidean_distance(p1, p2):
    if p1[0] == -1 and p1[1] == -1:
        return -1
    if p2[0] == -1 and p2[1] == -1:
        return -1
    return np.sqrt(sum((a - b) ** 2 for a, b in zip(p1, p2)))

def angle_between_joints(a, b, c):
    ba = np.array(a) - np.array(b)
    bc = np.array(c) - np.array(b)
    cos_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    return np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0)))

In [6]:
def is_hand_missing(landmarks):
    for lm in landmarks:
        if not (lm['x'] == -1 and lm['y'] == -1 and lm['z'] == -1):
            return False
    return True

def extract_pose_landmarks(pose_landmarks, row, prefix='pose'):
    left_labels = ['LEFT_ANKLE', 'LEFT_FOOT_INDEX', 'LEFT_HEEL', 'LEFT_INDEX', 'LEFT_KNEE', 'LEFT_PINKY', 'LEFT_THUMB', 'LEFT_WRIST']
    right_labels = ['RIGHT_ANKLE', 'RIGHT_FOOT_INDEX', 'RIGHT_HEEL', 'RIGHT_INDEX', 'RIGHT_KNEE', 'RIGHT_PINKY', 'RIGHT_THUMB', 'RIGHT_WRIST']
    face = ['LEFT_EYE_INNER', 'RIGHT_EYE_INNER','LEFT_EYE_OUTER', 'RIGHT_EYE_OUTER','RIGHT_EYE', 'LEFT_EYE','LEFT_EAR', 'RIGHT_EAR', 'MOUTH_LEFT', 'MOUTH_RIGHT']

    for lm in pose_landmarks:
        if lm['label'] not in left_labels and lm['label'] not in right_labels and lm['label'] not in face:
            label = lm['label']
            row[f'{prefix}.{label}_x'] = lm['x']
            row[f'{prefix}.{label}_y'] = lm['y']
            row[f'{prefix}.{label}_z'] = lm['z']
    return row

def extract_hand_landmarks(hand_data, row, hand_name='Hand_0'):
    excluded_labels = {
        "INDEX_FINGER_DIP", "INDEX_FINGER_PIP", "MIDDLE_FINGER_DIP", "MIDDLE_FINGER_PIP",
        "PINKY_DIP", "PINKY_PIP", "RING_FINGER_DIP", "RING_FINGER_PIP",
        "THUMB_CMC", "THUMB_IP"
    }

    for lm in hand_data['landmarks']:
        label = lm['label']
        if label in excluded_labels:
            continue
        if is_hand_missing(hand_data['landmarks']):
            row[f'{hand_name}.{label}_x'] = -1
            row[f'{hand_name}.{label}_y'] = -1
            row[f'{hand_name}.{label}_z'] = -1
        else:
            row[f'{hand_name}.{label}_x'] = lm['x']
            row[f'{hand_name}.{label}_y'] = lm['y']
            row[f'{hand_name}.{label}_z'] = lm['z']
    return row

In [7]:
def json_to_csv(json_file):
    with open(json_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    if isinstance(data, list):
        frames = data
    elif isinstance(data, dict) and 'frames' in data:
        frames = data['frames']
    else:
        print("Formato JSON no reconocido.")
        return

    if not frames:
        print("No se encontraron fotogramas en el JSON.")
        return

    rows = []
    for frame_info in frames:
        row = {'frame': frame_info.get('frame', -1)}
        row = extract_pose_landmarks(frame_info.get('pose_landmarks', []), row)
        for hand in frame_info.get('hand_landmarks', []):
            row = extract_hand_landmarks(hand, row, hand_name=hand['hand'])
        rows.append(row)

    df = pd.DataFrame(rows)
    df.sort_values(by='frame', inplace=True)
    df.interpolate(method='linear', limit_direction='both', inplace=True)
    df.fillna(-1, inplace=True)

    fieldnames = list(df.columns)
    fieldnames.remove('frame')
    fieldnames = ['frame'] + sorted(fieldnames)
    df = df[fieldnames]

    output_csv = json_file.replace('.json', '.csv')
    df.to_csv(output_csv, index=False)
    print(f"CSV generado con interpolación en: {output_csv}")

In [8]:
def pose_distances(dict_coords):
    distances = {}

    distances['Nose-Left_Shoulder'] = euclidean_distance(dict_coords['pose.NOSE'], dict_coords['pose.LEFT_SHOULDER'])
    distances['Nose-Right_Shoulder'] = euclidean_distance(dict_coords['pose.NOSE'], dict_coords['pose.RIGHT_SHOULDER'])

    distances['Nose-Left_WRIST'] = euclidean_distance(dict_coords['pose.NOSE'], dict_coords['Hand_0.WRIST'])
    distances['Nose-Right_WRIST'] = euclidean_distance(dict_coords['pose.NOSE'], dict_coords['Hand_1.WRIST'])

    distances['Left_Shoulder-Left_WRIST'] = euclidean_distance(dict_coords['pose.LEFT_SHOULDER'], dict_coords['Hand_0.WRIST'])
    distances['Right_Shoulder-Right_WRIST'] = euclidean_distance(dict_coords['pose.RIGHT_SHOULDER'], dict_coords['Hand_1.WRIST'])
    distances['Left_WRIST-Right_WRIST'] = euclidean_distance(dict_coords['Hand_0.WRIST'], dict_coords['Hand_1.WRIST'])

    fingers = ['THUMB', 'INDEX_FINGER', 'MIDDLE_FINGER', 'RING_FINGER', 'PINKY']
    for f in fingers:
        distances[f'Hand_0.{f}_Tip-Mcp'] = euclidean_distance(dict_coords[f'Hand_0.{f}_TIP'], dict_coords[f'Hand_0.{f}_MCP'])
        distances[f'Hand_1.{f}_Tip-Mcp'] = euclidean_distance(dict_coords[f'Hand_1.{f}_TIP'], dict_coords[f'Hand_1.{f}_MCP'])

    for f1, f2 in combinations(fingers, 2):
        distances[f'Hand_0.{f1}_Tip-Hand_0.{f2}_Tip'] = euclidean_distance(dict_coords[f'Hand_0.{f1}_TIP'], dict_coords[f'Hand_0.{f2}_TIP'])
        distances[f'Hand_1.{f1}_Tip-Hand_1.{f2}_Tip'] = euclidean_distance(dict_coords[f'Hand_1.{f1}_TIP'], dict_coords[f'Hand_1.{f2}_TIP'])

    return distances


In [9]:
def process_directory(input_dir, output_dir, clean_output_dir=True):
    if os.path.exists(output_dir):
        if clean_output_dir:
            shutil.rmtree(output_dir)
        os.makedirs(output_dir, exist_ok=True)
    else:
        os.makedirs(output_dir)

    print(f"Directory {output_dir} created")

    for subfolder in os.listdir(input_dir):
        subfolder_path = os.path.join(input_dir, subfolder)
        if not os.path.isdir(subfolder_path):
            continue

        output_subfolder_path = os.path.join(output_dir, subfolder)
        os.makedirs(output_subfolder_path, exist_ok=True)

        for file in os.listdir(subfolder_path):
            if not file.endswith('.csv') or file.endswith('_features.csv'):
                continue

            file_path = os.path.join(subfolder_path, file)
            df = pd.read_csv(file_path)
            output_rows = []
            l = df.columns[1:]

            for _, row in df.iterrows():
                dict_coords = {'frame': row['frame']}
                for i in range(0, len(l), 3):
                    name = l[i].split('_x')[0]
                    dict_coords[name] = (row[l[i]], row[l[i+1]], row[l[i+2]])

                d1 = euclidean_distance(dict_coords['pose.LEFT_HIP'], dict_coords['pose.LEFT_SHOULDER'])
                d2 = euclidean_distance(dict_coords['pose.RIGHT_HIP'], dict_coords['pose.RIGHT_SHOULDER'])
                height_percentage = ((d1 + d2) / 2) * 0.265

                distances = pose_distances(dict_coords)

                for k in distances:
                    distances[k] = distances[k] / height_percentage

                frame_data = {'frame': row['frame'], **distances}
                output_rows.append(frame_data)

            df_out = pd.DataFrame(output_rows)
            output_file = os.path.join(output_subfolder_path, file.replace('.csv', '_features.csv'))
            df_out.to_csv(output_file, index=False)
            print(f"{output_file}")

In [10]:
def process_json_directory(path):
    for subfolder in os.listdir(path):
        subfolder_path = os.path.join(path, subfolder)
        if not os.path.isdir(subfolder_path):
            continue

        for json_file in os.listdir(subfolder_path):
            if not json_file.endswith(".json"):
                continue

            json_file_path = os.path.join(subfolder_path, json_file)
            json_to_csv(json_file_path)

In [None]:
process_json_directory('/home/gerardo/NoveltyDetection')

In [None]:
process_directory('/home/gerardo/NoveltyDetection', '/home/gerardo/NoveltyDetectionProcessed')