# Data Process Pipeline

First, you should add hosts in your ~/.ssh/config file

In [12]:
import os
import paramiko
import time

HOSTS = ["h800-80", "h800-81", "h800-82", "h800-83", "h800-84", "h800-85", "h800-86", "h800-170", "h800-171"]

# load from ~/.ssh/config
ssh_config = paramiko.SSHConfig()
user_config_file = os.path.expanduser("~/.ssh/config")
if os.path.exists(user_config_file):
    with open(user_config_file) as f:
        ssh_config.parse(f)


def get_ssh_config(hostname):
    # get the configuration for the host
    user_config = ssh_config.lookup(hostname)
    cfg = {
        "hostname": user_config["hostname"],
        "username": user_config["user"],
        "port": int(user_config["port"]),
        "key_filename": user_config["identityfile"],
    }
    return cfg


def connect(hostname):
    cfg = get_ssh_config(hostname)
    # connect
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(**cfg)
    return client


def run_command(command, hostname, nohup=False, log_file=None, sleep=None):
    client = connect(hostname)
    print("HOST:", hostname)
    if sleep:
        command = f"sleep {sleep}; {command}"
    command = f"bash -ic '{command}'"
    if log_file:
        command = f"{command} >> {log_file} 2>&1"
    if nohup:
        command = f"nohup {command} &"
    print("COMMAND:", command)
    stdin, stdout, stderr = client.exec_command(command, get_pty=False)

    stdout_str = stdout.read().decode()
    stderr_str = stderr.read().decode()
    if stdout_str:
        print("==== STDOUT ====\n", stdout_str)
    if stderr_str:
        print("==== STDERR ====\n", stderr_str)

    client.close()


def run_command_all_hosts(command, hosts=HOSTS):
    for hostname in hosts:
        run_command(command, hostname)

Here are tools to examine machine's status.

In [2]:
def nvidia_smi(host):
    if host:
        run_command("nvidia-smi", host)
    else:
        run_command_all_hosts("nvidia-smi")


def nvitop(host=None):
    if host:
        run_command(f"/home/zhaowangbo/.local/bin/nvitop -1", host)
    else:
        run_command_all_hosts("/home/zhaowangbo/.local/bin/nvitop -1")


def ps(host=None, interest="python|sleep|torchrun|colossal"):
    if host:
        if interest is None:
            run_command("ps ux | cat", host)
        else:
            run_command(f'ps ux | cat | grep --color=never -E "{interest}"', host)
    else:
        if interest is None:
            run_command_all_hosts("ps ux | cat")
        else:
            run_command_all_hosts(f'ps ux | cat | grep --color=never -E "{interest}"')


def kill(pid, host):
    run_command(f"kill -KILL {pid}", host)

Here we define different tasks.

In [3]:
OPEN_SORA_HOME = "/home/zhaowangbo/zangwei/opensora/"


def convert_dataset_cmd(input_dir, output_file, datatype="video"):
    commands = []
    commands.append(f'echo "Converting {input_dir} to {output_file}"')
    output_dir = os.path.dirname(output_file)

    commands.append(f"mkdir -p {output_dir}")
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"python -m tools.datasets.convert {datatype} {input_dir} --output {output_file}")
    return " && ".join(commands), output_file


def get_video_info(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_info{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting info of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(
        f"python -m tools.datasets.datautil {input_file} --output {output_file} --format {output_format} --info --fmin 1"
    )
    return " && ".join(commands), output_file

def get_video_info_torchvision(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_info{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting info of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(
        f"python -m tools.datasets.datautil {input_file} --output {output_file} --format {output_format} --video-info --fmin 1"
    )
    return " && ".join(commands), output_file


def get_caption_llava7b_video(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_info{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting info of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(
        f"torchrun --nproc_per_node 8 --standalone -m tools.caption.caption_llava {input_file} --dp-size 8 --tp-size 1 --model-path liuhaotian/llava-v1.6-mistral-7b --prompt video"
    )
    commands.append(
        f"python -m tools.datasets.datautil {base}_part*{ext} --output {output_file} --format {output_format} --intersection {input_file} --clean-caption --refine-llm-caption --remove-empty-caption"
    )


def get_caption_load(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_caption{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting caption of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(
        f"python -m tools.datasets.datautil {input_file} --output {output_file} --format {output_format} --load-caption json --remove-empty-caption --clean-caption"
    )
    return " && ".join(commands), output_file


def get_aesthetic_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_aes{ext}"
    output_format = ext[1:]

    commands.append(f'echo "Getting aesthetic score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"torchrun --standalone --nproc_per_node 8 -m tools.scoring.aesthetic.inference {input_file}")
    commands.append(
        f"python -m tools.datasets.datautil {base}_part*{ext} --output {output_file} --format {output_format} --sort aes"
    )
    return " && ".join(commands), output_file


def get_flow_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_flow{ext}"

    commands.append(f'echo "Getting flow score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"torchrun --standalone --nproc_per_node 8 -m tools.scoring.optical_flow.inference {input_file}")
    return " && ".join(commands), output_file


def get_match_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_match{ext}"

    commands.append(f'echo "Getting match score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"torchrun --standalone --nproc_per_node 8 -m tools.scoring.matching.inference {input_file}")
    return " && ".join(commands), output_file


def get_cmotion_score(input_file):
    commands = []
    base, ext = os.path.splitext(input_file)
    output_file = f"{base}_cmotion{ext}"

    commands.append(f'echo "Getting cmotion score of {input_file} to {output_file}"')
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append(f"python -m tools.caption.camera_motion_detect {input_file}")
    return " && ".join(commands), output_file


def get_commands(job_list):
    commands = []
    output_file = None
    for job in job_list:
        cmd = job.pop("cmd")
        if output_file is None:
            command, output_file = cmd(**job)
            commands.append(command)
        else:
            job["input_file"] = output_file
            command, output_file = cmd(**job)
            commands.append(command)
    commands.append(f'echo "All Done!"')
    return " && ".join(commands), output_file

The following is the pipeline for panda.

In [28]:
host = "h800-81"
split = 24
input_dir = f"/mnt/disk0/data-panda/{split}"
log_file = os.path.join(OPEN_SORA_HOME, f"logs/data-panda-{split}-split.log")
output_file = f"/mnt/hdd/data/panda70m_by/raw/meta/split-{split}/meta.csv"
cmd, output_file = get_commands(
    [
        {
            "cmd": convert_dataset_cmd,
            "input_dir": input_dir,
            "output_file": output_file,
        },
        {
            "cmd": get_caption_load,
        },
        {
            "cmd": get_video_info_torchvision,
        },
        # {
        #     "cmd": get_aesthetic_score,
        # },
        # {
        #     "cmd": get_flow_score,
        # },
        # {
        #     "cmd": get_match_score,
        # },
        # {
        #     "cmd": get_cmotion_score,
        # },
    ]
)
print(cmd)
print(output_file)

echo "Converting /mnt/disk0/data-panda/24 to /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta.csv" && mkdir -p /mnt/hdd/data/panda70m_by/raw/meta/split-24 && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.convert video /mnt/disk0/data-panda/24 --output /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta.csv && echo "Getting caption of /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta_caption.csv" && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.datautil /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta.csv --output /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta_caption.csv --format csv --load-caption json --remove-empty-caption --clean-caption && echo "Getting info of /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta_caption.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-24/meta_caption_info.csv" && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.datautil /mnt/hdd/data/panda70m

In [45]:
host = "h800-82"
split = 7
log_file = os.path.join(OPEN_SORA_HOME, f"logs/data-panda-{split}-split.log")
cmd, output_file = get_commands(
    [
        {
            "cmd": get_video_info_torchvision,
            "input_file": f"/mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean.csv",
        },
    ]
)
print(cmd)
print(output_file)

echo "Getting info of /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean_info.csv" && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.datautil /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean.csv --output /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean_info.csv --format csv --video-info --fmin 1 && echo "All Done!"
/mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean_info.csv


In [46]:
sleep = "9h"
run_command(cmd, host, log_file=log_file, nohup=True, sleep=sleep)
ps(host)

HOST: h800-82
COMMAND: nohup bash -ic 'sleep 9h; echo "Getting info of /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean_info.csv" && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.datautil /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean.csv --output /mnt/hdd/data/panda70m_by/raw/meta/split-7/meta_loadjson_noempty_clean_info.csv --format csv --video-info --fmin 1 && echo "All Done!"' >> /home/zhaowangbo/zangwei/opensora/logs/data-panda-7-split.log 2>&1 &
HOST: h800-82
COMMAND: bash -ic 'ps ux | cat | grep --color=never -E "python|sleep|torchrun|colossal"'
==== STDOUT ====
 zhaowan+  174693  0.0  0.0   8496  5148 ?        S    01:59   0:00 bash -ic sleep 3h; echo "Getting info of /mnt/hdd/data/panda70m_by/raw/meta/split-3/meta_remove_corrupted_loadjson_noempty_clean_info.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-3/meta_remove_corrupted_loadjs

In [38]:
kill(1922317, host)
ps(host)

HOST: h800-84
COMMAND: bash -ic 'kill -KILL 1922317'
==== STDERR ====
 bash: cannot set terminal process group (-1): Inappropriate ioctl for device
bash: no job control in this shell

HOST: h800-84
COMMAND: bash -ic 'ps ux | cat | grep --color=never -E "python|sleep|torchrun|colossal"'
==== STDOUT ====
 zhaowan+ 1262361  0.0  0.0  14180  5296 ?        S    01:54   0:00 bash -ic sleep 3h; echo "Getting info of /mnt/hdd/data/panda70m_by/raw/meta/split-2/meta_remove_corrupted_loadjson_noempty_clean.csv to /mnt/hdd/data/panda70m_by/raw/meta/split-2/meta_remove_corrupted_loadjson_noempty_clean_info.csv" && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.datautil /mnt/hdd/data/panda70m_by/raw/meta/split-2/meta_remove_corrupted_loadjson_noempty_clean.csv --output /mnt/hdd/data/panda70m_by/raw/meta/split-2/meta_remove_corrupted_loadjson_noempty_clean_info.csv --format csv --video-info --fmin 1 && echo "All Done!"
zhaowan+ 1288071  0.0  0.0  11168   524 ?        S    01:54   0

Using following commands to monitor the status of the jobs.

In [18]:
ps(host)

HOST: h800-80
COMMAND: bash -ic 'ps ux | cat | grep --color=never -E "python|sleep|torchrun|colossal"'
==== STDOUT ====
 zhaowan+  599938  0.0  0.0   5492   592 ?        S    01:44   0:00 sleep 3h
zhaowan+  820507  9.0  0.0   8488  5256 ?        Ss   01:45   0:00 bash -ic ps ux | cat | grep --color=never -E "python|sleep|torchrun|colossal"
zhaowan+  823896  0.0  0.0   6412   724 ?        S    01:45   0:00 grep --color=auto --color=never -E python|sleep|torchrun|colossal
zhaowan+ 2900571  0.0  0.0   8496  5244 ?        S    01:31   0:00 bash -ic echo "Converting /mnt/disk0/data-panda/17 to /mnt/hdd/data/panda70m_by/raw/meta/split-17/meta.csv" && mkdir -p /mnt/hdd/data/panda70m_by/raw/meta/split-17 && cd /home/zhaowangbo/zangwei/opensora/ && python -m tools.datasets.convert video /mnt/disk0/data-panda/17 --output /mnt/hdd/data/panda70m_by/raw/meta/split-17/meta.csv && echo "Getting caption of /mnt/hdd/data/panda70m_by/raw/meta/split-17/meta.csv to /mnt/hdd/data/panda70m_by/raw/meta/split

In [11]:
ps()

HOST: h800-80
COMMAND: bash -ic 'ps ux | cat | grep --color=never -E "python|sleep|torchrun|colossal"'
==== STDOUT ====
 zhaowan+ 2508488  0.9  0.0 62146500 335516 pts/62 Sl  00:23   0:23 /home/zhaowangbo/.conda/envs/opensora/bin/python /home/zhaowangbo/.conda/envs/opensora/bin/colossalai run --nproc_per_node 8 --hostfile hostfile scripts/train.py configs/opensora-v1-1/train/video.py --data-path /home/zhaowangbo/data/csv/video_image_test_2.csv --wandb True --load outputs/764-STDiT2-XL-2/epoch1-global_step6000
zhaowan+ 2509916  0.4  0.0 62277572 223088 pts/62 Sl  00:23   0:11 /home/zhaowangbo/.conda/envs/opensora/bin/python /home/zhaowangbo/.conda/envs/opensora/bin/colossalai run --nproc_per_node 8 --hostfile hostfile scripts/train.py configs/opensora-v1-1/train/video.py --data-path /home/zhaowangbo/data/csv/video_image_test_2.csv --wandb True --load outputs/764-STDiT2-XL-2/epoch1-global_step6000
zhaowan+ 2509917  0.4  0.0 62343108 225364 pts/62 Sl  00:23   0:10 /home/zhaowangbo/.conda/

In [None]:
nvitop(host)

In [None]:
kill(, host)

# Training

In [23]:
def colossal_run(data_path, load_path=None):
    commands = []
    commands.append(f"cd {OPEN_SORA_HOME}")
    command = f"colossalai run --nproc_per_node 8 --hostfile hostfile scripts/train.py configs/opensora-v1-1/train/video.py --wandb True --data-path {data_path}"
    if load_path:
        command = f"{command} --load-path {load_path}"
    commands.append(command)
    cmd = " && ".join(commands)
    return cmd


def kill_all():
    commands = []
    commands.append(f"cd {OPEN_SORA_HOME}")
    commands.append('cat hostfile  | xargs -I "{}" ssh "{}" pkill -9 python')
    cmd = " && ".join(commands)
    return cmd

In [24]:
host = "h800-80"
log_file = os.path.join(OPEN_SORA_HOME, "logs/train_02.log")
data_path = "/home/zhaowangbo/data/csv/video_image_test_2.csv"
ckpt_path = "outputs/764-STDiT2-XL-2/epoch1-global_step6000"
cmd = colossal_run(data_path, ckpt_path)
print(cmd)

cd /home/zhaowangbo/zangwei/opensora/ && colossalai run --nproc_per_node 8 --hostfile hostfile scripts/train.py configs/opensora-v1-1/train/video.py --wandb True --data-path /home/zhaowangbo/data/csv/video_image_test_2.csv --load-path outputs/764-STDiT2-XL-2/epoch1-global_step6000


In [None]:
run_command(cmd, host, log_file=log_file, nohup=True)

In [27]:
cmd = kill_all()
run_command(cmd, host)

HOST: h800-80
COMMAND: bash -ic 'cd /home/zhaowangbo/zangwei/opensora/ && cat hostfile  | xargs -I "{}" ssh "{}" pkill -9 python'
==== STDERR ====
 bash: cannot set terminal process group (-1): Inappropriate ioctl for device
bash: no job control in this shell
pkill: killing pid 382879 failed: Operation not permitted

