In [7]:
import shutil
import os
import uuid

def check_output_folder(output_folder):
    if not os.path.isdir(output_folder):
        print('warning: the folder{} is not exist'.format(output_folder))
        # create srt_folder
        os.makedirs(output_folder)
        print('create folder', output_folder)

def load_file_by_extension(file_dir, extension):
    L = []
    for root, dirs, files in os.walk(file_dir):
        for file in files:
            if os.path.splitext(file)[1] == f'.{extension}':  
                L.append(file) 
        return L 

def video_dataset_to_hf(video_source_path, output_dir):
 
  
    videos_output_dir = f'{video_source_path}/videos'
    labels_output_dir = f'{video_source_path}/labels'
    videos_files = load_file_by_extension(videos_output_dir, 'mp4')
    labels_files = load_file_by_extension(labels_output_dir, 'txt')

    print(f"load videos_files: {len(videos_files)}")

    prompts_file = f'{output_dir}/captions.txt'
    videos_file = f'{output_dir}/videos.txt'

    instance_videos = []
    instance_prompts = []
    videos_dir = f'{output_dir}/videos'
    check_output_folder(videos_dir)
    for i, filename in enumerate(videos_files):
        video_path = f'{videos_output_dir}/{filename}' 
        label_filename = os.path.splitext(filename)[0] + '.txt'
        label_path = f'{labels_output_dir}/{label_filename}'

        if os.path.exists(video_path) and os.path.exists(label_path):
            uuid4 = uuid.uuid4()

            # Copy video to output directory
            output_video_path = f'{videos_dir}/{uuid4}_{filename}' 
            shutil.copy(video_path, output_video_path)
            instance_videos.append(f'videos/{uuid4}_{filename}')

            # Read and append prompt
            with open(label_path, "r", encoding="utf-8") as file:
                prompt_data = [line.strip() for line in file.readlines() if len(line.strip()) > 0]
                instance_prompts.append("".join(prompt_data))

    # Write instance_videos to videos_file
    with open(videos_file, "a", encoding="utf-8") as vf:
        for video in instance_videos:
            vf.write(video + '\n')

    # Write instance_prompts to prompts_file
    with open(prompts_file, "a", encoding="utf-8") as pf:
        for prompt in instance_prompts:
            pf.write(prompt + '\n')

    print(f"Copied {len(instance_videos)} videos and saved corresponding prompts.")


In [8]:

dir_put = '/mnt/ceph/develop/jiawei/lora_dataset/Dance-VideoGeneration-Dataset'

video_dataset_to_hf('/mnt/ceph/develop/jiawei/lora_dataset/cogvideox_blibli_lora_dataset',dir_put )

load videos_files: 212
create folder /mnt/ceph/develop/jiawei/lora_dataset/Dance-VideoGeneration-Dataset/videos
Copied 212 videos and saved corresponding prompts.


In [9]:
import os
from datasets import Dataset, DatasetDict
 
# 定义数据集路径

dataset_dir = '/mnt/ceph/develop/jiawei/lora_dataset/Dance-VideoGeneration-Dataset' 
captions_file = os.path.join(dataset_dir, 'captions.txt')
videos_file = os.path.join(dataset_dir, 'videos.txt')

# 读取 captions.txt
with open(captions_file, 'r', encoding='utf-8') as f:
    captions = f.readlines()

# 读取 videos.txt
with open(videos_file, 'r', encoding='utf-8') as f:
    video_paths = f.readlines()

# 清理换行符
captions = [caption.strip() for caption in captions]
video_paths = [video_path.strip() for video_path in video_paths]

# 确保两个文件长度一致
assert len(captions) == len(video_paths), f"captions.txt { len(captions)} 和 {len(video_paths)}videos.txt 的行数不匹配"

# 创建数据集 (你可以根据需要添加更多的特征，例如视频路径等)
data = {
    'text': captions,
    'video': video_paths
}

# 创建 Hugging Face Dataset
dataset = Dataset.from_dict(data)

# 创建 DatasetDict（假设这是训练数据集）
dataset_dict = DatasetDict({
    'train': dataset
})
dataset_dict

DatasetDict({
    train: Dataset({
        features: ['text', 'video'],
        num_rows: 212
    })
})