# Data Process Pipeline

First, you should add hosts in your ~/.ssh/config file

In [6]:
import os
import paramiko
import time

HOSTS = ["h800-80", "h800-81", "h800-82", "h800-83", "h800-84", "h800-85", "h800-86", "h800-170", "h800-171"]

# load from ~/.ssh/config
ssh_config = paramiko.SSHConfig()
user_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(user_config_file):
    with open(user_config_file) as f:
        ssh_config.parse(f)


def get_ssh_config(hostname):
    # get the configuration for the host
    user_config = ssh_config.lookup(hostname)
    cfg = {
        "hostname": user_config["hostname"],
        "username": user_config["user"],
        "port": int(user_config["port"]),
        "key_filename": user_config["identityfile"],
    }
    return cfg


def connect(hostname):
    cfg = get_ssh_config(hostname)
    # connect
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(**cfg)
    return client


def run_command(command, hostname, nohup=False, log_file=None):
    client = connect(hostname)
    print("HOST:", hostname)
    command = f"bash -ic '{command}'"
    if log_file:
        command = f"{command} >> {log_file} 2>&1"
    if nohup:
        command = f"nohup {command} &"
    print("COMMAND:", command)
    stdin, stdout, stderr = client.exec_command(command, get_pty=False)

    stdout_str = stdout.read().decode()
    stderr_str = stderr.read().decode()
    if stdout_str:
        print("==== STDOUT ====\n", stdout_str)
    if stderr_str:
        print("==== STDERR ====\n", stderr_str)

    client.close()


def run_command_all_hosts(command, hosts=HOSTS):
    for hostname in hosts:
        run_command(command, hostname)

Here are tools to examine machine's status.

In [7]:
def nvidia_smi(host):
    if host:
        run_command("nvidia-smi", host)
    else:
        run_command_all_hosts("nvidia-smi")


def nvitop(host=None):
    if host:
        run_command(f"/home/zhaowangbo/.local/bin/nvitop -1", host)
    else:
        run_command_all_hosts("/home/zhaowangbo/.local/bin/nvitop -1")


def ps(host=None, interest="python|sleep|torchrun|colossal"):
    if host:
        if interest is None:
            run_command("ps ux | cat", host)
        else:
            run_command(f'ps ux | cat | grep --color=never -E "{interest}"', host)
    else:
        if interest is None:
            run_command_all_hosts("ps ux | cat")
        else:
            run_command_all_hosts(f'ps ux | cat | grep --color=never -E "{interest}"')


def kill(pid, host):
    run_command(f"kill -KILL {pid}", host)

Here we define different tasks.

In [8]:
OPEN_SORA_HOME = "/home/zhaowangbo/zangwei/opensora/"


def convert_dataset_cmd(input_dir, output_file, datatype="video"):
    commands = []
    commands.append(f'echo "Converting {input_dir} to {output_file}"')
    output_dir = os.path.dirname(output_file)

    commands.append(f"mkdir -p {output_dir}")
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"python -m tools.datasets.convert {datatype} {input_dir} --output {output_file}")
    return " && ".join(commands), output_file


def get_video_info(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_info{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting info of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(
        f"python -m tools.datasets.datautil {input_file} --output {output_file} --format {output_format} --info --fmin 1"
    )
    return " && ".join(commands), output_file


def get_caption_llava7b_video(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_info{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting info of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(
        f"torchrun --nproc_per_node 8 --standalone -m tools.caption.caption_llava {input_file} --dp-size 8 --tp-size 1 --model-path liuhaotian/llava-v1.6-mistral-7b --prompt video"
    )
    commands.append(
        f"python -m tools.datasets.datautil {base}_part*{ext} --output {output_file} --format {output_format} --intersection {input_file} --clean-caption --refine-llm-caption --remove-empty-caption"
    )


def get_caption_load(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_caption{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting caption of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(
        f"python -m tools.datasets.datautil {input_file} --output {output_file} --format {output_format} --load-caption json --remove-empty-caption --clean-caption"
    )
    return " && ".join(commands), output_file


def get_aesthetic_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_aes{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting aesthetic score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"torchrun --standalone --nproc_per_node 8 -m tools.scoring.aesthetic.inference {input_file}")
    commands.append(
        f"python -m tools.datasets.datautil {base}_part*{ext} --output {output_file} --format {output_format} --sort aes"
    )
    return " && ".join(commands), output_file


def get_flow_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_flow{ext}"

    commands.append(f'echo "Getting flow score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"torchrun --standalone --nproc_per_node 8 -m tools.scoring.optical_flow.inference {input_file}")
    return " && ".join(commands), output_file


def get_match_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_match{ext}"

    commands.append(f'echo "Getting match score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"torchrun --standalone --nproc_per_node 8 -m tools.scoring.matching.inference {input_file}")
    return " && ".join(commands), output_file


def get_cmotion_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_cmotion{ext}"

    commands.append(f'echo "Getting cmotion score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"python -m tools.caption.camera_motion_detect {input_file}")
    return " && ".join(commands), output_file


def get_commands(job_list):
    commands = []
    output_file = None
    for job in job_list:
        cmd = job.pop("cmd")
        if output_file is None:
            command, output_file = cmd(**job)
            commands.append(command)
        else:
            job["input_file"] = output_file
            command, output_file = cmd(**job)
            commands.append(command)
    commands.append(f'echo "All Done!"')
    return " && ".join(commands), output_file

The following is the pipeline for panda.

In [12]:
host = "h800-83"
log_file = os.path.join(OPEN_SORA_HOME, "logs/data-panda-16-split.log")
input_dir = "/mnt/disk1/data-panda/16"
output_file = "/mnt/hdd/data/panda70m_by/raw/meta/split-16/meta.csv"
cmd, output_file = get_commands(
    [
        # {
        #     "cmd": convert_dataset_cmd,
        #     "input_dir": input_dir,
        #     "output_file": output_file,
        # },
        # {
        #     "cmd": get_video_info,
        # },
        # {
        #     "cmd": get_caption_load,
        # },
        # {
        #     "cmd": get_aesthetic_score,
        # },
        # {
        #     "cmd": get_flow_score,
        # },
        # {
        #     "cmd": get_match_score,
        # },
        # {
        #     "cmd": get_cmotion_score,
        # },
    ]
)
print(cmd)
print(output_file)

echo "Getting flow score of /mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption_flow.csv" && cd /home/zhaowangbo/zangwei/opensora/ && torchrun --standalone --nproc_per_node 8 -m tools.scoring.optical_flow.inference /mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption.csv && echo "All Done!"
/mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption_flow.csv


In [10]:
host = "h800-81"
split = 8
log_file = os.path.join(OPEN_SORA_HOME, f"logs/data-panda-{split}-split.log")
cmd, output_file = get_commands(
    [
        {
            "cmd": get_video_info,
            "input_file": f"/mnt/hdd/data/panda70m_by/raw/meta/split-{split}/meta_loadjson_noempty_clean.csv",
        },
    ]
)
print(cmd)
print(output_file)

echo "Getting info of /mnt/hdd/data/panda70m_by/raw/meta/split-8/meta_loadjson_noempty_clean.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-8/meta_loadjson_noempty_clean_info.csv" && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.datautil /mnt/hdd/data/panda70m_by/raw/meta/split-8/meta_loadjson_noempty_clean.csv --output /mnt/hdd/data/panda70m_by/raw/meta/split-8/meta_loadjson_noempty_clean_info.csv --format csv --info --fmin 1 && echo "All Done!"
/mnt/hdd/data/panda70m_by/raw/meta/split-8/meta_loadjson_noempty_clean_info.csv


In [13]:
run_command(cmd, host, log_file=log_file, nohup=True)
ps(host)

HOST: h800-83
COMMAND: nohup bash -ic 'echo "Getting flow score of /mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption_flow.csv" && cd /home/zhaowangbo/zangwei/opensora/ && torchrun --standalone --nproc_per_node 8 -m tools.scoring.optical_flow.inference /mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption.csv && echo "All Done!"' >> /home/zhaowangbo/zangwei/opensora/logs/data-panda-16-split.log 2>&1 &
HOST: h800-83
COMMAND: bash -ic 'ps ux | cat | grep --color=never -E "python|sleep|torchrun|colossal"'
==== STDOUT ====
 zhaowan+  891142  0.8  0.0 20886768 197296 pts/10 Sl+ 22:36   0:22 /home/zhaowangbo/.conda/envs/opensora/bin/python /home/zhaowangbo/.conda/envs/opensora/bin/torchrun --standalone --nproc_per_node 1 -m tools.scoring.aesthetic.inference /mnt/hdd/data/panda70m_by/raw/meta/split-16/meta_info_caption.csv --num_frames 2 --num_workers 0
zhaowan+  891294  3.0  0.0 74364928 1938304 ?    Ssl 

Using following commands to monitor the status of the jobs.

In [9]:
ps(host)

HOST: h800-80
COMMAND: bash -ic 'ps ux | cat | grep --color=never -E "python|sleep|torchrun|colossal"'
==== STDOUT ====
 zhaowan+ 3707972  0.1  0.0   8492  5240 ?        S    21:07   0:00 bash -ic echo "Getting info of /mnt/hdd/data/panda70m_by/raw/meta/split-6/meta_loadjson_noempty_clean.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-6/meta_loadjson_noempty_clean_info.csv" && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.datautil /mnt/hdd/data/panda70m_by/raw/meta/split-6/meta_loadjson_noempty_clean.csv --output /mnt/hdd/data/panda70m_by/raw/meta/split-6/meta_loadjson_noempty_clean_info.csv --format csv --info --fmin 1 && echo "All Done!"
zhaowan+ 3708645 33.6  0.0 17792816 399760 ?     Sl   21:07   0:21 python -m tools.datasets.datautil /mnt/hdd/data/panda70m_by/raw/meta/split-6/meta_loadjson_noempty_clean.csv --output /mnt/hdd/data/panda70m_by/raw/meta/split-6/meta_loadjson_noempty_clean_info.csv --format csv --info --fmin 1
zhaowan+ 3710337  0.4  0.0 18128668 3

In [None]:
nvitop(host)

In [None]:
kill(, host)