In [7]:
from huggingface_hub import snapshot_download
import zipfile
import librosa
import numpy as np
import scipy.io as sio
from scipy import signal
from pathlib import Path
import matplotlib.pyplot as plt
import cv2
import os
import base64
import getpass
import subprocess
import json  # Added json import
from tqdm import tqdm
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

In [8]:
class GithubDownload:
    @staticmethod
    def download(output_path: str="Resource/data"):
        os.makedirs(output_path, exist_ok=True)
        command = [
            'curl',
            '-L',
            '-o',
            f'{output_path}/corpus.json',
            'https://raw.githubusercontent.com/MML-Group/code4AVE-Speech/master/corpus.json'
        ]
        return subprocess.run(command)

class HuggingFaceDownload:
    def __init__(self, password: str | None = None, enc_path: str = "Resource/oJtYpLhVfD.enc"):
        # If no encryption file, we can skip or ask for token directly
        if os.path.exists(enc_path):
            if password is None:
                password = getpass.getpass("password: ")
            self.token = self.decrypt_file(file_path=enc_path, password=password)
            print(self.token)
        else:
            # Fallback if you don't have the .enc file handy
            print(f"Warning: {enc_path} not found. Asking for token manually.")
            self.token = getpass.getpass("Enter Hugging Face Token (hf_...): ")
            print(self.token)

        self.repo = "MML-Group/AVE-Speech"

    def derive_key(self, password: str, salt: bytes) -> bytes:
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        return base64.urlsafe_b64encode(kdf.derive(password.encode()))

    def decrypt_file(self, file_path: str, password: str):
        with open(file_path, 'rb') as file:
            data = file.read()

        salt = data[:16]
        encrypted_data = data[16:]
        key = self.derive_key(password, salt)
        f = Fernet(key)
        decrypted_data = f.decrypt(encrypted_data)
        return decrypted_data.decode('utf-8')

    def download_specific_subjects(
        self,
        subjects: list[int],
        local_dir: str | Path = ".",
    ):
        """
        Downloads only the zip files for the specified list of subjects.
        Uses a wildcard pattern to find them regardless of folder structure.
        """
        allow_patterns = []
        for subject in subjects:
            # We use ** to find 'subject_X.zip' anywhere in the repo
            # We assume the naming convention is 'subject_1.zip', 'subject_2.zip', etc.
            allow_patterns.append(f"**/subject_{subject}.zip")

        print(f"Downloading {len(subjects)} subjects from {self.repo}...")

        return snapshot_download(
            repo_id=self.repo,
            repo_type="dataset",
            local_dir=str(local_dir),
            allow_patterns=allow_patterns,
            token=self.token,
        )

class EMGPreprocessing:
    def __init__(self):
        self.fs = 1000

    def filter(self, raw_data):
        b1, a1 = signal.iirnotch(50, 30, self.fs)
        b2, a2 = signal.iirnotch(150, 30, self.fs)
        b3, a3 = signal.iirnotch(250, 30, self.fs)
        b4, a4 = signal.iirnotch(350, 30, self.fs)
        b5, a5 = signal.butter(4, [10 / (self.fs / 2), 400 / (self.fs / 2)], "bandpass")

        x = signal.filtfilt(b1, a1, raw_data, axis=1)
        x = signal.filtfilt(b2, a2, x, axis=1)
        x = signal.filtfilt(b3, a3, x, axis=1)
        x = signal.filtfilt(b4, a4, x, axis=1)
        x = signal.filtfilt(b5, a5, x, axis=1)
        return x

    def EMG_MFSC(self, x):
        x = x[:, 250:, :]
        n_mels = 36
        sr = 1000
        channel_list = []
        for j in range(x.shape[-1]):
            mfsc_x = np.zeros((x.shape[0], 36, n_mels))
            for i in range(x.shape[0]):
                norm_x = np.asfortranarray(x[i, :, j])
                tmp = librosa.feature.melspectrogram(
                    y=norm_x, sr=sr, n_mels=n_mels, n_fft=200, hop_length=50
                )
                tmp = librosa.power_to_db(tmp).T
                mfsc_x[i, :, :] = tmp

            mfsc_x = np.expand_dims(mfsc_x, axis=-1)
            channel_list.append(mfsc_x)
        data_x = np.concatenate(channel_list, axis=-1)
        mu = np.mean(data_x)
        std = np.std(data_x)
        data_x = (data_x - mu) / std
        data_x = data_x.transpose(0, 3, 1, 2)
        return data_x

    def load_and_preprocess_emg(self, mat_path: str):
        # Reduced verbosity for bulk processing
        emg = sio.loadmat(mat_path)
        emg = np.expand_dims(emg["data"], axis=0)
        emg = self.filter(emg)
        emg = self.EMG_MFSC(emg)
        return emg

class DatasetProcessor:
    def __init__(self, base_dir: str | Path):
        self.base_dir = Path(base_dir)

    def unzip_files(self):
        # Look for zips in the downloaded structure
        zip_files = list(self.base_dir.rglob("*.zip"))
        print(f"\nFound {len(zip_files)} zip files. Starting unzip process...")

        for zip_path in tqdm(zip_files, desc="Unzipping", unit="file"):
            # Extract to a folder named after the zip file (without .zip)
            # If zip is inside subfolders, keep that structure
            extract_to = zip_path.parent / zip_path.stem

            if extract_to.exists():
                continue

            try:
                with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                    zip_ref.extractall(extract_to)
            except zipfile.BadZipFile:
                tqdm.write(f"  Damaged: {zip_path.name}")

    def mat_static_directory(self):
        mat_files = list(self.base_dir.rglob("*.mat"))
        if not mat_files:
            return
        for path in mat_files:
            yield path

    def avi_static_directory(self):
        avi_files = list(self.base_dir.rglob("*.avi"))
        if not avi_files:
            return
        for path in avi_files:
            yield path


In [9]:
def list_subject_files(base_dir: Path, video_frame_count: int | None = 60) -> None:
    def save_stacked_channels_png(emg_tensor: np.ndarray, out_png: Path):
        specs = [emg_tensor[0, ch] for ch in range(emg_tensor.shape[1])]  # list of (T,F)
        stacked = np.concatenate(specs, axis=0)  # (C*T, F)

        plt.figure()
        plt.imshow(stacked, aspect="auto", origin="lower")
        plt.axis("off")
        out_png.parent.mkdir(parents=True, exist_ok=True)
        plt.savefig(out_png, bbox_inches="tight", pad_inches=0, dpi=200)
        plt.close()

    def center_crop(img: np.ndarray, crop_size: tuple[int, int]) -> np.ndarray:
        crop_w, crop_h = crop_size
        h, w = img.shape[:2]
        left = max((w - crop_w) // 2, 0)
        top = max((h - crop_h) // 2, 0)
        return img[top : top + crop_h, left : left + crop_w]

    def convert_avi_to_mp4(
        avi_path: Path,
        out_path: Path,
        crop_size=(320, 240),
        resize=(88, 88),
    ) -> None:
        cap = cv2.VideoCapture(str(avi_path))
        fps = cap.get(cv2.CAP_PROP_FPS)
        if not fps or fps <= 0:
            fps = 30

        writer = None
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                if crop_size:
                    frame = center_crop(frame, crop_size)
                if resize:
                    frame = cv2.resize(frame, resize)
                frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)

                if writer is None:
                    out_path.parent.mkdir(parents=True, exist_ok=True)
                    h, w = frame.shape[:2]
                    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
                    writer = cv2.VideoWriter(str(out_path), fourcc, fps, (w, h))

                writer.write(frame)
        finally:
            cap.release()
            if writer is not None:
                writer.release()

    out_root = base_dir.parent / f"{base_dir.name}"
    emgpreprocessing = EMGPreprocessing()

    processor = DatasetProcessor(base_dir=base_dir)
    processor.unzip_files()

    # Process EMG files
    mat_files = list(processor.mat_static_directory())
    for directory in tqdm(mat_files, desc="Converting EMG to Spectrogram", unit="file"):
        relative_path = directory.relative_to(base_dir)
        relative_path = [
            "EMG_IMG" if part == "EMG" else part
            for part in relative_path.parts
        ]
        relative_path = Path(*relative_path)
        target_png_path = out_root / relative_path.with_suffix(".png")

        # Skip if already converted
        if target_png_path.exists():
            continue

        save_stacked_channels_png(emg_tensor=emgpreprocessing.load_and_preprocess_emg(mat_path=directory), out_png=target_png_path)

    # Process AVI files
    avi_files = list(processor.avi_static_directory())
    for directory in tqdm(avi_files, desc="Converting AVI to MP4", unit="file"):
        relative_path = directory.relative_to(base_dir)
        relative_path = [
            "Visual_MP4" if part == "Visual" else part
            for part in relative_path.parts
        ]
        relative_path = Path(*relative_path)
        target_mp4 = out_root / relative_path.with_suffix(".mp4")

        # Skip if already converted
        if target_mp4.exists():
            continue

        convert_avi_to_mp4(directory, target_mp4)

In [10]:
def get_test_subjects(json_path: str) -> list[int]:
    """Parses the corpus.json to find subjects allocated to the test set."""
    with open(json_path, 'r') as f:
        data = json.load(f)

    # Check if 'test' key exists, otherwise try to infer or ask user
    # Commonly structure is { 'train': [...], 'test': [...] }
    # Or sometimes it's a list of dicts. We need to adapt based on actual JSON.
    # Assuming standard split format:
    if 'test' in data:
        # If the data is a list of filenames like "s1_u1.mat", we extract subject ID
        # If it's just subject IDs, we use them directly.
        # Let's assume generic parsing logic:
        test_entries = data['test']
        subjects = set()

        for entry in test_entries:
            # If entry is string like "1/1.mat" or "s1/..."
            if isinstance(entry, str):
                # Try to extract number from "subject_X" or "X/..."
                # Heuristic: split by / or _ and find the number
                parts = entry.replace('\\', '/').split('/')
                # Usually first part is subject info
                import re
                nums = re.findall(r'\d+', parts[0])
                if nums:
                    subjects.add(int(nums[0]))
            elif isinstance(entry, int):
                subjects.add(entry)

        return sorted(list(subjects))
    else:
        print("Warning: 'test' key not found in corpus.json. Downloading Subject 80-100 as fallback.")
        return list(range(80, 101))

In [11]:
# 1. Download Corpus JSON to know which files are "Test"
print("Step 1: Downloading corpus.json...")
GithubDownload.download(output_path="Resource/data")

Step 1: Downloading corpus.json...


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2657  100  2657    0     0   9406      0 --:--:-- --:--:-- --:--:--  9421


CompletedProcess(args=['curl', '-L', '-o', 'Resource/data/corpus.json', 'https://raw.githubusercontent.com/MML-Group/code4AVE-Speech/master/corpus.json'], returncode=0)

In [12]:

# 2. Parse Test Subjects
# Note: If parsing fails, it defaults to 80-100. Check your corpus.json structure if this is wrong.
# test_subjects = get_test_subjects("Resource/data/corpus.json")
# print(f"Targeting Test Subjects: {test_subjects}")

In [13]:
# 3. Download ONLY Test Subjects

Train = list(range(1, 70))
Valid = list(range(71, 80))
Test = list(range(81, 100))

print("Step 2: Downloading Test Data from Hugging Face...")
huggingfacedownload = HuggingFaceDownload()

# REPLACED: download_full_dataset with specific subject download
huggingfacedownload.download_specific_subjects(
    subjects=Test,
    local_dir="Resource/data"
)

Step 2: Downloading Test Data from Hugging Face...
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
Downloading 19 subjects from MML-Group/AVE-Speech...


Fetching 57 files: 100%|██████████| 57/57 [00:00<00:00, 3289.29it/s]


'/Volumes/Dataset Storage/Test/Resource/data'

In [None]:
# 4. Process the downloaded data
print("Step 3: Processing Data...")
list_subject_files(Path("Resource/data"), video_frame_count=15)

In [1]:
from __future__ import annotations

import json
import argparse
from pathlib import Path

def build_train_json(
    base_dir: Path,
    output_json: Path,
    corpus_json: Path | None = None,
    prompt: str = "<image>\n<video>\nTranscribe the spoken sentence.",
    preprocess: bool = False,
    video_frame_count: int | None = 60,
) -> None:
    print(f"[json] base_dir={base_dir}")

    if corpus_json is None:
        corpus_json = base_dir / "corpus.json"
    if not corpus_json.exists():
        raise FileNotFoundError(f"corpus.json not found: {corpus_json}")
    print(f"[json] corpus_json={corpus_json}")

    with corpus_json.open("r", encoding="utf-8") as f:
        corpus = json.load(f)

    emg_root = base_dir / "EMG_IMG"
    if not emg_root.exists():
        raise FileNotFoundError(f"EMG_IMG not found under: {base_dir}")
    print(f"[json] emg_root={emg_root}")

    video_map: dict[tuple[str, str, str], Path] = {}
    for vid_path in base_dir.rglob("*.mp4"):
        rel = vid_path.relative_to(base_dir)
        subject = next((p for p in rel.parts if p.startswith("subject_")), None)
        session = next((p for p in rel.parts if p.startswith("session")), None)
        if subject and session:
            video_map[(subject, session, vid_path.stem)] = vid_path
    print(f"[json] mp4_found={len(video_map)}")

    items = []
    missing_sentence = 0
    missing_video = 0
    scanned = 0
    for spec_path in emg_root.rglob("*.png"):
        rel = spec_path.relative_to(emg_root)
        if len(rel.parts) < 3:
            continue
        scanned += 1
        subject, session, filename = rel.parts[0], rel.parts[1], rel.parts[2]
        label = Path(filename).stem

        sentence = corpus.get(label)
        if sentence is None and label.isdigit():
            sentence = corpus.get(str(int(label)))
        if sentence is None:
            missing_sentence += 1
            continue

        vid_path = video_map.get((subject, session, label))
        if vid_path is None:
            missing_video += 1
            continue

        items.append(
            {
                "id": f"{subject}_{session}_{label}",
                "image": [str(spec_path.resolve())],
                "video": [str(vid_path.resolve())],
                "conversations": [
                    {"from": "human", "value": prompt},
                    {"from": "gpt", "value": sentence},
                ],
            }
        )

    output_json.parent.mkdir(parents=True, exist_ok=True)
    with output_json.open("w", encoding="utf-8") as f:
        json.dump(items, f, ensure_ascii=False, indent=2)
    print(
        "[json] scanned_emg={} missing_sentence={} missing_video={} saved={}".format(
            scanned, missing_sentence, missing_video, len(items)
        )
    )

def create_train_json(
    base_dir: Path = Path("Resource/data"),
    output_json: Path | None = None,
    output_dir: Path | None = None,
    corpus_json: Path | None = None,
    prompt: str = "<image>\\n<video>\\n말한 문장을 출력해줘.",
    preprocess: bool = False,
    video_frame_count: int | None = 60,
) -> Path | list[Path]:
    split_names = ("Train", "Val", "Test")
    split_dirs = {name: base_dir / name for name in split_names}

    if any(path.is_dir() for path in split_dirs.values()):
        if preprocess:
            for split_dir in split_dirs.values():
                if split_dir.is_dir():
                    list_subject_files(split_dir, video_frame_count=video_frame_count)

        if output_dir is None:
            output_dir = base_dir
        output_dir.mkdir(parents=True, exist_ok=True)

        corpus_path = corpus_json or (base_dir / "corpus.json")
        if not corpus_path.exists():
            raise FileNotFoundError(f"corpus.json not found: {corpus_path}")
        outputs: list[Path] = []
        for name, split_dir in split_dirs.items():
            if not split_dir.is_dir():
                print(f"[json] skip {name}: not found -> {split_dir}")
                continue
            if not (split_dir / "EMG_IMG").exists():
                print(f"[json] skip {name}: EMG_IMG missing -> {split_dir}")
                continue
            out_path = output_dir / f"{name.lower()}.json"
            build_train_json(
                base_dir=split_dir,
                output_json=out_path,
                corpus_json=corpus_path,
                prompt=prompt,
                preprocess=False,
                video_frame_count=video_frame_count,
            )
            outputs.append(out_path)
        return outputs

    if output_json is None:
        output_json = base_dir / "train.json"
    build_train_json(
        base_dir=base_dir,
        output_json=output_json,
        corpus_json=corpus_json,
        prompt=prompt,
        preprocess=preprocess,
        video_frame_count=video_frame_count,
    )
    return output_json

def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Build train.json for EMG spectrogram + video dataset."
    )
    parser.add_argument("--base-dir", type=Path, default=Path("Resource/data"))
    parser.add_argument("--output-json", type=Path, default=None)
    parser.add_argument("--output-dir", type=Path, default=None)
    parser.add_argument("--corpus-json", type=Path, default=None)
    parser.add_argument(
        "--prompt",
        type=str,
        default="<image>\\n<video>\\nTranscribe the spoken sentence.",
    )
    parser.add_argument("--preprocess", action="store_true")
    parser.add_argument("--video-frame-count", type=int, default=60)
    args, unknown = parser.parse_known_args()
    return args


def main() -> None:
    args = parse_args()
    output_paths = create_train_json(
        base_dir=args.base_dir,
        output_json=args.output_json,
        output_dir=args.output_dir,
        corpus_json=args.corpus_json,
        prompt=args.prompt,
        preprocess=args.preprocess,
        video_frame_count=args.video_frame_count,
    )
    if isinstance(output_paths, list):
        for path in output_paths:
            print(f"Saved: {path}")
    else:
        print(f"Saved: {output_paths}")


if __name__ == "__main__":
    main()

[json] skip Train: not found -> Resource/data/Train
[json] skip Val: not found -> Resource/data/Val
[json] base_dir=Resource/data/Test
[json] corpus_json=Resource/data/corpus.json
[json] emg_root=Resource/data/Test/EMG_IMG
[json] mp4_found=19812
[json] scanned_emg=19806 missing_sentence=0 missing_video=0 saved=19806
Saved: Resource/data/test.json


In [None]:
import time
import json
import torch
from PIL import Image
from tqdm import tqdm
from pathlib import Path
from transformers import AutoProcessor, AutoModelForVision2Seq

In [7]:
model_id = "diddmstjr/ISEF"
test_json_path = "Resource/data/test.json"
max_new_tokens = 64
dtype = torch.float16


processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True)

model = AutoModelForVision2Seq.from_pretrained(
    model_id,
    torch_dtype=torch.float16,   # mac이면 float16/float32
    device_map="auto",
    trust_remote_code=True
)

Some parameters are on the meta device because they were offloaded to the disk.


In [8]:
model.eval()

tok = processor.tokenizer
print("all_special_tokens:", tok.all_special_tokens)
print("special_tokens_map:", tok.special_tokens_map)
print("chat_template exists:", hasattr(tok, "chat_template"))

all_special_tokens: ['<|im_end|>', '<|endoftext|>', '<|im_start|>', '<|object_ref_start|>', '<|object_ref_end|>', '<|box_start|>', '<|box_end|>', '<|quad_start|>', '<|quad_end|>', '<|vision_start|>', '<|vision_end|>', '<|vision_pad|>', '<|image_pad|>', '<|video_pad|>']
special_tokens_map: {'eos_token': '<|im_end|>', 'pad_token': '<|endoftext|>', 'additional_special_tokens': ['<|im_start|>', '<|im_end|>', '<|object_ref_start|>', '<|object_ref_end|>', '<|box_start|>', '<|box_end|>', '<|quad_start|>', '<|quad_end|>', '<|vision_start|>', '<|vision_end|>', '<|vision_pad|>', '<|image_pad|>', '<|video_pad|>']}
chat_template exists: True


In [12]:
with open(test_json_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# -----------------------------
# Helper: build prompt correctly for Qwen-style VLM
# -----------------------------
def build_prompt(user_text: str) -> str:
    """
    Qwen 계열은 '<image>' 같은 토큰을 쓰지 않고,
    apply_chat_template()가 <|vision_start|><|image_pad|><|vision_end|>를 자동 삽입한다.
    """
    messages = [
        {
            "role": "user",
            "content": [
                {"type": "image"},
                {"type": "text", "text": user_text[9:]},
            ],
        }
    ]
    # generation prompt를 붙여야 model.generate에서 자연스럽게 이어짐
    return processor.apply_chat_template(messages, add_generation_prompt=True)

# ---------- runtime inputs ----------
def ask_int(prompt, default):
    s = input(f"{prompt} [default={default}]: ").strip()
    return default if s == "" else int(s)

def ask_str(prompt, default):
    s = input(f"{prompt} [default={default}]: ").strip()
    return default if s == "" else s

log_path = Path(ask_str("Log file path (jsonl)", "pred_log.jsonl"))
max_samples = ask_int("How many samples to run? (0 = all)", 2000)
max_new_tokens = ask_int("max_new_tokens", 32)

log_path.parent.mkdir(parents=True, exist_ok=True)

# ---------- choose subset ----------
N = len(data) if max_samples == 0 else min(max_samples, len(data))
subset = data[:N]

# ---------- inference loop + timing ----------
exact = 0
t_start = time.perf_counter()

with open(log_path, "w", encoding="utf-8") as log_f:
    pbar = tqdm(enumerate(subset), total=N, desc="Infer", unit="sample")

    for i, sample in pbar:
        gt = sample["conversations"][-1]["value"].strip()
        img_path = sample["image"][0]
        user_text = sample["conversations"][0]["value"].strip()

        image = Image.open(img_path).convert("RGB")
        prompt = build_prompt(user_text)

        inputs = processor(text=prompt, images=[image], return_tensors="pt")
        inputs = {k: v.to(model.device) if torch.is_tensor(v) else v for k, v in inputs.items()}

        with torch.no_grad():
            out_ids = model.generate(**inputs, max_new_tokens=max_new_tokens)

        prompt_len = inputs["input_ids"].shape[-1]
        gen_ids = out_ids[:, prompt_len:]
        pred = processor.batch_decode(gen_ids, skip_special_tokens=True)[0].strip()

        if pred == gt:
            exact += 1

        record = {
            "i": i,
            "img_path": img_path,
            "user_text": user_text,
            "gt": gt,
            "pred": pred,
        }
        json.dump(record, log_f, ensure_ascii=False)
        log_f.write("\n")

        # update progress + timing
        elapsed = time.perf_counter() - t_start
        avg = elapsed / (i + 1)
        remaining = avg * (N - (i + 1))
        pbar.set_postfix(
            acc=f"{exact/(i+1):.3f}",
            avg_s=f"{avg:.2f}",
            eta_min=f"{remaining/60:.1f}",
        )

t_total = time.perf_counter() - t_start
print(f"\nSaved log to: {log_path.resolve()}")
print(f"Ran {N} samples in {t_total:.1f}s")
print(f"Avg: {t_total/N:.3f} s/sample  |  throughput: {N/t_total:.2f} samples/s")
print(f"Exact match: {exact}/{N} = {exact/N:.4f}")

Infer:   0%|          | 7/19806 [02:00<94:47:43, 17.24s/sample, acc=0.000]


KeyboardInterrupt: 