In [None]:
# @title Download the Datasets
#
# This cell downloads the required TEST datasets from Google Drive.

import os
import gdown
import zipfile
import tarfile
import shutil

# --- Configuration ---
# URLs for the datasets, taken from the project plan
DATA_URLS = {
    "keypoints": "https://drive.google.com/file/d/1g8tzzW5BNPzHXlamuMQOvdwlHRa-29Vp/view?usp=sharing",
    "rgb_clips": "https://drive.google.com/file/d/1qTIXFsu8M55HrCiaGv7vZ7GkdB3ubjaG/view?usp=sharing"
}

# Directory to store the development data
OUTPUT_DIR = "dev_test_data"


# --- Download and Extract Files ---
for name, url in DATA_URLS.items():
    print(f"--- Processing {name} ---")

    # Let gdown determine the filename and download to the current directory
    print(f"Downloading {name} data...")
    downloaded_file_path = gdown.download(url, quiet=False, fuzzy=True)

    if downloaded_file_path is None or not os.path.exists(downloaded_file_path):
        print(f"❌ Error: Download failed for {name}. Please check the URL and permissions.")
        continue

    print(f"✅ Download complete: {downloaded_file_path}")

    # --- Unpack Files ---
    print(f"Attempting to unpack {downloaded_file_path}...")
    extracted = False

    # Try to extract as a zip file
    if zipfile.is_zipfile(downloaded_file_path):
        try:
            with zipfile.ZipFile(downloaded_file_path, 'r') as zip_ref:
                zip_ref.extractall(OUTPUT_DIR)
            print(f"✅ Unzipped successfully.")
            extracted = True
        except Exception as e:
            print(f"An error occurred during unzipping: {e}")

    # If not a zip, try to extract as a tar file
    elif tarfile.is_tarfile(downloaded_file_path):
        try:
            with tarfile.open(downloaded_file_path, 'r:*') as tar_ref:
                tar_ref.extractall(path=OUTPUT_DIR)
            print(f"✅ Extracted tar archive successfully.")
            extracted = True
        except Exception as e:
            print(f"An error occurred during tar extraction: {e}")

    if not extracted:
        print(f"❌ Error: The file '{downloaded_file_path}' is not a recognized zip or tar archive. Manual inspection may be needed.")

    # --- Clean up the downloaded archive file ---
    if os.path.exists(downloaded_file_path):
        os.remove(downloaded_file_path)
        print(f"Removed archive file: {downloaded_file_path}\n")


print("All dataset operations are complete.")


--- Processing keypoints ---
Downloading keypoints data...


Downloading...
From (original): https://drive.google.com/uc?id=1g8tzzW5BNPzHXlamuMQOvdwlHRa-29Vp
From (redirected): https://drive.google.com/uc?id=1g8tzzW5BNPzHXlamuMQOvdwlHRa-29Vp&confirm=t&uuid=7ed29dea-277b-4d44-8fc7-c1a2f55c034f
To: /content/test_2D_keypoints.tar.gz
100%|██████████| 1.70G/1.70G [00:17<00:00, 96.5MB/s]


✅ Download complete: test_2D_keypoints.tar.gz
Attempting to unpack test_2D_keypoints.tar.gz...
✅ Extracted tar archive successfully.
Removed archive file: test_2D_keypoints.tar.gz

--- Processing rgb_clips ---
Downloading rgb_clips data...


Downloading...
From (original): https://drive.google.com/uc?id=1qTIXFsu8M55HrCiaGv7vZ7GkdB3ubjaG
From (redirected): https://drive.google.com/uc?id=1qTIXFsu8M55HrCiaGv7vZ7GkdB3ubjaG&confirm=t&uuid=b5ba748b-8a2c-4c50-93b0-ebec6518c27f
To: /content/test_rgb_front_clips.zip
100%|██████████| 2.41G/2.41G [00:38<00:00, 63.1MB/s]


✅ Download complete: test_rgb_front_clips.zip
Attempting to unpack test_rgb_front_clips.zip...
✅ Unzipped successfully.
Removed archive file: test_rgb_front_clips.zip

All dataset operations are complete.


In [None]:
# @title Verify Data Setup
#
# This cell checks if the data folders have been created successfully,
# meeting the success criterion.

# --- Verification ---
print(f"Verifying contents of '{OUTPUT_DIR}':")

try:
    # List the contents of the directory
    contents = os.listdir(OUTPUT_DIR)

    if contents:
        print("🎯 Success! The following files/folders are in the development directory:")
        for item in contents:
            print(f"- {item}")
    else:
        print("⚠️ Warning: The development directory is empty.")

except FileNotFoundError:
    print(f"❌ Error: The directory '{OUTPUT_DIR}' was not found.")



Verifying contents of 'dev_test_data':
🎯 Success! The following files/folders are in the development directory:
- openpose_output
- .ipynb_checkpoints
- raw_videos


In [None]:
# @title Now is time to save some data to drive
#
# This cell connects your Google Drive to this Colab notebook.
# You will be prompted to authorize this connection.

from google.colab import drive
drive.mount('/content/drive')

print("\n✅ Google Drive mounted successfully!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

✅ Google Drive mounted successfully!


In [None]:
# @title 最终解决方案：精确创建数据子集
#
# 📝 **目标:** 根据我们已确认的正确数据结构，创建一个包含所有相关文件（JSON关键点、原始视频等）的、小而完整的开发子集。
#
# **操作:** 此脚本将自动从 `openpose_output/json` 中选取前5个视频序列，然后找到并复制它们对应的所有相关文件。

import os
import shutil

# --- 1. 配置 (基于我们已确认的正确路径) ---
JSON_SOURCE_DIR = 'dev_test_data/openpose_output/json'
VIDEO_SOURCE_DIR = 'dev_test_data/openpose_output/video'
RAW_VIDEO_SOURCE_DIR = 'dev_test_data/raw_videos'

SUBSET_DIR = 'dev_test_data_subset'
NUM_SEQUENCES_TO_KEEP = 5 # 选择5个序列作为我们的开发集，这个数量完全足够

print("--- 开始创建精确的数据子集 ---")

# --- 2. 清理并创建子集目录结构 ---
if os.path.exists(SUBSET_DIR):
    shutil.rmtree(SUBSET_DIR)
    print(f"清除了旧的子集目录: {SUBSET_DIR}")

# 创建与源数据类似的目录结构，更清晰
subset_json_dir = os.path.join(SUBSET_DIR, 'json_keypoints')
subset_video_dir = os.path.join(SUBSET_DIR, 'rendered_videos')
subset_raw_video_dir = os.path.join(SUBSET_DIR, 'raw_videos')

os.makedirs(subset_json_dir)
os.makedirs(subset_video_dir)
os.makedirs(subset_raw_video_dir)
print(f"创建了新的子集目录结构于: {SUBSET_DIR}")

# --- 3. 选择要复制的视频序列 ---
if not os.path.isdir(JSON_SOURCE_DIR):
    print(f"❌ 错误: 关键点数据源目录 '{JSON_SOURCE_DIR}' 不存在！无法继续。")
else:
    # 获取所有序列的名称（即json目录下的所有子文件夹）并排序
    all_sequences = sorted([d for d in os.listdir(JSON_SOURCE_DIR) if os.path.isdir(os.path.join(JSON_SOURCE_DIR, d))])

    sequences_to_copy = all_sequences[:NUM_SEQUENCES_TO_KEEP]

    print(f"\n在 '{JSON_SOURCE_DIR}' 中找到 {len(all_sequences)} 个视频序列。")
    print(f"将选取前 {len(sequences_to_copy)} 个作为子集:")
    for seq_name in sequences_to_copy:
        print(f"  - {seq_name}")

    # --- 4. 复制所有相关文件 ---
    copied_count = 0
    for seq_name in sequences_to_copy:
        print(f"\n--- 正在处理序列: {seq_name} ---")

        # 1. 复制JSON关键点文件夹
        source_json_path = os.path.join(JSON_SOURCE_DIR, seq_name)
        dest_json_path = os.path.join(subset_json_dir, seq_name)
        if os.path.isdir(source_json_path):
            shutil.copytree(source_json_path, dest_json_path)
            print(f"    ✅ 已复制关键点数据 (JSONs)")
        else:
            print(f"    ⚠️ 警告: 未找到关键点文件夹 {source_json_path}")

        # 2. 复制渲染后的视频
        # 视频文件名通常是序列名 + .mp4
        video_filename = f"{seq_name}.mp4"
        source_video_path = os.path.join(VIDEO_SOURCE_DIR, video_filename)
        dest_video_path = os.path.join(subset_video_dir, video_filename)
        if os.path.exists(source_video_path):
            shutil.copy(source_video_path, dest_video_path)
            print(f"    ✅ 已复制渲染视频")
        else:
            print(f"    ⚠️ 警告: 未找到渲染视频 {source_video_path}")

        # 3. 复制原始视频
        raw_video_filename = f"{seq_name}.mp4" # 假设原始视频和序列名也对应
        source_raw_video_path = os.path.join(RAW_VIDEO_SOURCE_DIR, raw_video_filename)
        dest_raw_video_path = os.path.join(subset_raw_video_dir, raw_video_filename)
        if os.path.exists(source_raw_video_path):
            shutil.copy(source_raw_video_path, dest_raw_video_path)
            print(f"    ✅ 已复制原始视频")
        else:
            # 原始视频的文件名可能没有-rgb_front后缀，尝试去掉它
            base_name = seq_name.replace('-rgb_front', '')
            raw_video_filename_alt = f"{base_name}.mp4"
            source_raw_video_path_alt = os.path.join(RAW_VIDEO_SOURCE_DIR, raw_video_filename_alt)
            if os.path.exists(source_raw_video_path_alt):
                 shutil.copy(source_raw_video_path_alt, os.path.join(subset_raw_video_dir, raw_video_filename_alt))
                 print(f"    ✅ 已复制原始视频 (备用名称: {raw_video_filename_alt})")
            else:
                print(f"    ⚠️ 警告: 未找到原始视频 {source_raw_video_path} 或 {source_raw_video_path_alt}")

        copied_count += 1

    print(f"\n--- 🎯 操作完成！---")
    print(f"成功处理了 {copied_count} 个视频序列。")
    print(f"一个完整、小巧的开发数据集已在 '{SUBSET_DIR}' 中准备就绪。")
    print("现在，您可以运行“打包并上传到谷歌硬盘”的单元格来永久保存它了。")



--- 开始创建精确的数据子集 ---
清除了旧的子集目录: dev_test_data_subset
创建了新的子集目录结构于: dev_test_data_subset

在 'dev_test_data/openpose_output/json' 中找到 2343 个视频序列。
将选取前 5 个作为子集:
  - -fZc293MpJk_0-1-rgb_front
  - -fZc293MpJk_2-1-rgb_front
  - -fZc293MpJk_3-1-rgb_front
  - -fZc293MpJk_4-1-rgb_front
  - -fZc293MpJk_5-1-rgb_front

--- 正在处理序列: -fZc293MpJk_0-1-rgb_front ---
    ✅ 已复制关键点数据 (JSONs)
    ✅ 已复制渲染视频
    ✅ 已复制原始视频

--- 正在处理序列: -fZc293MpJk_2-1-rgb_front ---
    ✅ 已复制关键点数据 (JSONs)
    ✅ 已复制渲染视频
    ✅ 已复制原始视频

--- 正在处理序列: -fZc293MpJk_3-1-rgb_front ---
    ✅ 已复制关键点数据 (JSONs)
    ✅ 已复制渲染视频
    ✅ 已复制原始视频

--- 正在处理序列: -fZc293MpJk_4-1-rgb_front ---
    ✅ 已复制关键点数据 (JSONs)
    ✅ 已复制渲染视频
    ✅ 已复制原始视频

--- 正在处理序列: -fZc293MpJk_5-1-rgb_front ---
    ✅ 已复制关键点数据 (JSONs)
    ✅ 已复制渲染视频
    ✅ 已复制原始视频

--- 🎯 操作完成！---
成功处理了 5 个视频序列。
一个完整、小巧的开发数据集已在 'dev_test_data_subset' 中准备就绪。
现在，您可以运行“打包并上传到谷歌硬盘”的单元格来永久保存它了。


In [None]:
# @title 打包开发子集并保存到谷歌硬盘
#
# 📝 **目标:** 将我们最终创建的、正确的 'dev_test_data_subset' 文件夹打包成zip文件，并永久保存在您的谷歌硬盘中。
#
# **操作:** 此单元将自动完成挂载硬盘、打包和复制的全过程。

from google.colab import drive
import shutil
import os

# --- 1. 配置 ---
SOURCE_DIR_TO_PACKAGE = 'dev_test_data_subset'
ARCHIVE_NAME = 'dev_test_data_subset_archive'
# 您可以自定义保存在谷歌硬盘里的文件夹名称
DRIVE_FOLDER_PATH = '/content/drive/MyDrive/Sign_Language_Project_Dev_Data'

# --- 2. 检查源文件夹是否存在 ---
if not os.path.isdir(SOURCE_DIR_TO_PACKAGE):
    print(f"❌ 错误: 源文件夹 '{SOURCE_DIR_TO_PACKAGE}' 不存在。请确保上一步已成功运行。")
else:
    # --- 3. 打包文件夹为 .zip 文件 ---
    print(f"正在将 '{SOURCE_DIR_TO_PACKAGE}' 打包成 '{ARCHIVE_NAME}.zip'...")
    shutil.make_archive(ARCHIVE_NAME, 'zip', SOURCE_DIR_TO_PACKAGE)
    print("✅ 打包成功！")

    # --- 4. 挂载谷歌硬盘 ---
    print("\n正在连接到您的谷歌硬盘...")
    drive.mount('/content/drive')

    # --- 5. 在谷歌硬盘中创建目标文件夹 (如果不存在) ---
    if not os.path.exists(DRIVE_FOLDER_PATH):
        print(f"在您的谷歌硬盘中创建新文件夹: {DRIVE_FOLDER_PATH}")
        os.makedirs(DRIVE_FOLDER_PATH)

    # --- 6. 复制打包好的文件到谷歌硬盘 ---
    source_file_path = f"{ARCHIVE_NAME}.zip"
    destination_path = os.path.join(DRIVE_FOLDER_PATH, source_file_path)

    print(f"\n正在复制文件到: {destination_path}...")
    if os.path.exists(source_file_path):
        shutil.copy(source_file_path, destination_path)
        print(f"\n--- 🎯 操作成功！---")
        print(f"开发数据集 '{source_file_path}' 已成功保存到您的谷歌硬盘中！")
        print("后续您可以直接从谷歌硬盘中下载并解压这个文件，无需再重复下载原始数据。")
    else:
        print(f"❌ 错误: 未找到打包好的文件 '{source_file_path}'。")



正在将 'dev_test_data_subset' 打包成 'dev_test_data_subset_archive.zip'...
✅ 打包成功！

正在连接到您的谷歌硬盘...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
在您的谷歌硬盘中创建新文件夹: /content/drive/MyDrive/Sign_Language_Project_Dev_Data

正在复制文件到: /content/drive/MyDrive/Sign_Language_Project_Dev_Data/dev_test_data_subset_archive.zip...

--- 🎯 操作成功！---
开发数据集 'dev_test_data_subset_archive.zip' 已成功保存到您的谷歌硬盘中！
后续您可以直接从谷歌硬盘中下载并解压这个文件，无需再重复下载原始数据。
