<H1> Function for User to Agent Pass 2: </H1>

In [1]:
import os
import zipfile
from pathlib import Path

def create_subset_zip_pass1(
    root_path: str,
    indices=None,
    output_zip=None,
    subset_files: tuple = (
        "camera_angle_0.png",
        "camera_angle_1.png",
    ),
    strict: bool = False,   # if True, raise on any missing folder/file
):
    """
    Pack selected files from numbered sub‑folders of `root_path` into a single ZIP.

    Parameters
    ----------
    root_path : str
        Directory that contains sub‑folders named '2', '3', '4', ...
    indices : list[int]
        Which sub‑folder names to include (e.g. [2, 5, 7]).
    output_zip : str, optional
        Name (or full path) of the resulting ZIP archive.
    subset_files : tuple[str], optional
        Files to copy from each sub‑folder.
    strict : bool, optional
        • False (default) → skip missing items with a warning.  
        • True  → raise FileNotFoundError if anything is missing.
    """
    root = Path(root_path).expanduser().resolve()

    with zipfile.ZipFile(output_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for idx in indices:
            folder = root / str(idx)
            if not folder.is_dir():
                msg = f"Folder not found: {folder}"
                if strict:
                    raise FileNotFoundError(msg)
                print("⚠️", msg, "(skipping)")
                continue

            for filename in subset_files:
                fpath = folder / filename
                if not fpath.is_file():
                    msg = f"Missing file: {fpath}"
                    if strict:
                        raise FileNotFoundError(msg)
                    print("⚠️", msg, "(skipping)")
                    continue

                # Preserve "<index>/<filename>" structure inside the ZIP
                arcname = Path(str(idx)) / filename
                zf.write(fpath, arcname)

    print(f"✅  Created {output_zip}")

def create_subset_zip_pass2(
    root_path: str,
    indices=None,
    output_zip=None,
    subset_files: tuple = (
        "points.txt",
        "camera_angle_0.png",
        "camera_angle_1.png",
        #"points_view_0.png",
        #"points_view_1.png",
    ),
    strict: bool = False,   # if True, raise on any missing folder/file
):
    """
    Pack selected files from numbered sub‑folders of `root_path` into a single ZIP.

    Parameters
    ----------
    root_path : str
        Directory that contains sub‑folders named '2', '3', '4', ...
    indices : list[int]
        Which sub‑folder names to include (e.g. [2, 5, 7]).
    output_zip : str, optional
        Name (or full path) of the resulting ZIP archive.
    subset_files : tuple[str], optional
        Files to copy from each sub‑folder.
    strict : bool, optional
        • False (default) → skip missing items with a warning.  
        • True  → raise FileNotFoundError if anything is missing.
    """
    root = Path(root_path).expanduser().resolve()

    with zipfile.ZipFile(output_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for idx in indices:
            folder = root / str(idx)
            if not folder.is_dir():
                msg = f"Folder not found: {folder}"
                if strict:
                    raise FileNotFoundError(msg)
                print("⚠️", msg, "(skipping)")
                continue

            for filename in subset_files:
                fpath = folder / filename
                if not fpath.is_file():
                    msg = f"Missing file: {fpath}"
                    if strict:
                        raise FileNotFoundError(msg)
                    print("⚠️", msg, "(skipping)")
                    continue

                # Preserve "<index>/<filename>" structure inside the ZIP
                arcname = Path(str(idx)) / filename
                zf.write(fpath, arcname)

    print(f"✅  Created {output_zip}")



def create_subset_zip_pass3(
    root_path: str,
    indices=None,
    output_zip=None,
    subset_files: tuple = (
        "points.txt",
        "camera_angle_0.png",
        "camera_angle_1.png",
        "trajectory_points.txt"
    ),
    strict: bool = False,   # if True, raise on any missing folder/file
):
    """
    Pack selected files from numbered sub‑folders of `root_path` into a single ZIP.

    Parameters
    ----------
    root_path : str
        Directory that contains sub‑folders named '2', '3', '4', ...
    indices : list[int]
        Which sub‑folder names to include (e.g. [2, 5, 7]).
    output_zip : str, optional
        Name (or full path) of the resulting ZIP archive.
    subset_files : tuple[str], optional
        Files to copy from each sub‑folder.
    strict : bool, optional
        • False (default) → skip missing items with a warning.  
        • True  → raise FileNotFoundError if anything is missing.
    """
    root = Path(root_path).expanduser().resolve()

    with zipfile.ZipFile(output_zip, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        for idx in indices:
            folder = root / str(idx)
            if not folder.is_dir():
                msg = f"Folder not found: {folder}"
                if strict:
                    raise FileNotFoundError(msg)
                print("⚠️", msg, "(skipping)")
                continue

            for filename in subset_files:
                fpath = folder / filename
                if not fpath.is_file():
                    msg = f"Missing file: {fpath}"
                    if strict:
                        raise FileNotFoundError(msg)
                    print("⚠️", msg, "(skipping)")
                    continue

                # Preserve "<index>/<filename>" structure inside the ZIP
                arcname = Path(str(idx)) / filename
                zf.write(fpath, arcname)

    print(f"✅  Created {output_zip}")
    

<H1>Function for Copying Agents Response to Pass 2 to the Proper Directories</H1>

In [2]:
import os
import re
import zipfile
import shutil
import tempfile
from pathlib import Path


_CAM_TRAJ_PAT = re.compile(r"camera_angle_\d+_traj_\d+\.png$")


def _find_seed_root(start_dir: Path) -> Path:
    """
    Return the first directory whose *immediate* children are all
    integer‑named folders (your seed dirs). Raises if not found.
    """
    for dirpath, dirnames, _ in os.walk(start_dir):
        if dirnames and all(name.isdigit() for name in dirnames):
            return Path(dirpath)
    raise ValueError("❌  Couldn’t locate seed folders with integer names.")


def copy_agent_data(
    copy_path,          # directory that holds the ZIP
    paste_path,         # destination where seed folders live / will be created
    zip_name="agent_pass_2_with_traj.zip",
    strict=False        # True → raise if a seed folder has none of the files
):
    """
    Unpacks the ZIP in `copy_path`, finds the seed folders (0,1,2,…),
    and copies:

      • camera_angle_*_traj_*.png
      • trajectory_points.txt
      • prompts.txt                 ← NEW

    into matching seed dirs under `paste_path`.
    """
    src_zip = Path(copy_path, zip_name).expanduser().resolve()
    if not src_zip.is_file():
        raise FileNotFoundError(f"ZIP not found: {src_zip}")

    dst_root = Path(paste_path).expanduser().resolve()
    dst_root.mkdir(parents=True, exist_ok=True)

    with tempfile.TemporaryDirectory() as tmpdir:
        # 1) extract everything (structure doesn’t matter)
        with zipfile.ZipFile(src_zip, "r") as zf:
            zf.extractall(tmpdir)

        # 2) locate the directory that directly contains the integer seeds
        seed_root = _find_seed_root(Path(tmpdir))

        # 3) iterate over each seed directory
        for seed_dir in filter(lambda p: p.is_dir(), seed_root.iterdir()):
            dest_dir = dst_root / seed_dir.name
            dest_dir.mkdir(parents=True, exist_ok=True)

            found_any = False

            # ── copy camera_angle_*_traj_*.png ───────────────────────────
            for img in seed_dir.glob("camera_angle_*_traj_*.png"):
                if _CAM_TRAJ_PAT.match(img.name):
                    shutil.copy2(img, dest_dir / img.name)
                    found_any = True

            # ── copy trajectory_points.txt ──────────────────────────────
            traj_txt = seed_dir / "trajectory_points.txt"
            if traj_txt.exists():
                shutil.copy2(traj_txt, dest_dir / traj_txt.name)
                found_any = True

            # ── copy prompts.txt (NEW) ───────────────────────────────────
            prompts_txt = seed_dir / "prompts.txt"
            if prompts_txt.exists():
                shutil.copy2(prompts_txt, dest_dir / prompts_txt.name)
                found_any = True

            # ── copy euler_orientations.npy & quaternion_orientations.npy ───────────
            for npy_name in ("euler_orientations.npy", "quaternion_orientations.npy","gripper_actions.npy"):
                npy_file = seed_dir / npy_name
                if npy_file.exists():
                    shutil.copy2(npy_file, dest_dir / npy_file.name)
                    found_any = True

            if strict and not found_any:
                raise FileNotFoundError(
                    f"No matching files in seed folder {seed_dir}"
                )

    print(f"✅  Copied agent data into {dst_root}")

<H1> Agent Pass 1</H1>

In [14]:
path_dir = "run_results/lift_numbered_block/demos"

indices = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19] #,11,12,13,14,15,16,17,18,19]
#indices = [11,12,13,14,15,16,17,18,19,20]
create_subset_zip_pass1(
        root_path=path_dir,
        indices=indices,   # folders you want
        output_zip=path_dir + "/"+"agent_pass_1.zip",
    )

✅  Created run_results/lift_numbered_block/demos/agent_pass_1.zip


<H1> Agent Pass 2 </H1>

In [9]:
path_dir = "run_results_ablations/insert_in_peg/demos"
#indices = [3,5,14,15,19]
#indices = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]
indices = [20]
create_subset_zip_pass2(
        root_path=path_dir,
        indices=indices,   # folders you want
        output_zip=path_dir + "/"+"agent_pass_2.zip",
    )


⚠️ Missing file: /home/wbaron407/Projects/diag-sketches/diag-sketches-modified/run_results_ablations/insert_in_peg/demos/20/points.txt (skipping)
✅  Created run_results_ablations/insert_in_peg/demos/agent_pass_2.zip


<H1> Agent Pass 3</H1>

In [19]:
path_dir = "run_results/play_jenga/demos"
indices = [2,4,5,6,7,8,9,10]
#indices = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]
create_subset_zip_pass3(
        root_path=path_dir,
        indices=indices,   # folders you want
        output_zip=path_dir + "/"+"agent_pass_3.zip")

✅  Created run_results/play_jenga/demos/agent_pass_3.zip


<H1> Copy relevant files to the iteration working dir </H1>

In [5]:
copy_path = "run_results_ablations/rubish_in_bin/demos/agent_data"
paste_path = "run_results_ablations/rubish_in_bin/demos"

copy_agent_data(copy_path, paste_path,zip_name="agent_response_2_seed_20.zip")
#copy_agent_data(copy_path, paste_path,zip_name="agent_response_2.zip")

#zip -r ../agent_response_1.zip .


FileNotFoundError: ZIP not found: /home/wbaron407/Projects/diag-sketches/diag-sketches-modified/run_results_ablations/rubish_in_bin/demos/agent_data/agent_response_2_seed_20.zip

<H1> SCRATCH </H1>

In [2]:
import pickle
import numpy as np

working_dir = "run_results/lift_numbered_block/demos/2/"

with open(working_dir+'generated_trajectories_numpy_with_gripper.pkl', "rb") as f:
    trajs_generated = pickle.load(f)


close_index  = 155 #originally 172, then 170
open_index = 513

trajs_generated[0,:close_index,3] = 0.04
trajs_generated[0,close_index:open_index,3] = 0.0
trajs_generated[0,open_index:,3] = 0.04
vals = np.asarray(trajs_generated[0, :, 3])


# Option A – np.where (concise)
change_idx = np.where(vals[:-1] != vals[1:])[0] + 1
print("Change points:", change_idx)


with open(working_dir+'generated_trajectories_numpy_with_gripper.pkl', 'wb') as f:
    pickle.dump(trajs_generated, f)

Change points: [155 513]


In [14]:
import pickle
path = "run_results_original/lift_numbered_block/demos/0/full_ivk_trajectories.pkl"

with open(path, 'rb') as file:
    data = pickle.load(file)

print(data["successful"])

[True]
