In [12]:
import pandas as pd

In [13]:
PATH = "gs://induction-labs/jonathan/sampled_trajectories/osworld_uitars_testing_30x_tmp_03_5k_b"
DATA_FILE = f"{PATH}/samples.jsonl"

In [14]:
# fix samples
ORIGINAL_TRAJECTORIES = pd.read_json(DATA_FILE, lines=True)
FIXED_TRAJECTORIES = ORIGINAL_TRAJECTORIES.copy()
FIXED_TRAJECTORIES["eval_task_id"] = FIXED_TRAJECTORIES["eval_task_id"].str.replace(r"-2$", "", regex=True)
FIXED_TRAJECTORIES = FIXED_TRAJECTORIES[(FIXED_TRAJECTORIES["reward"] == 0) | (FIXED_TRAJECTORIES["reward"] == 1)]

In [15]:
from google.cloud import storage
import asyncio
import json
import re

_GS_RE = re.compile(r"^gs://([^/]+)/(.+)$")
def load_turns_gcs(gs_uri: str):
    """
    Reads a JSON array from Google Cloud Storage and returns
    [{"image": ..., "text": ...}, ...] minus the last row.
    """
    m = _GS_RE.match(gs_uri)
    if not m:
        raise ValueError(f"Not a valid gs:// URI: {gs_uri}")
    bucket_name, blob_name = m.groups()

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    # download the whole object as one string
    data_str = blob.download_as_text()

    records = json.loads(data_str)  # list-of-dicts
    return [
        {"image": r["image"], "text": r["text"]} for r in records[:-1]
    ]  # mimic .iloc[0:-1]

async def load_turns_gcs_async(semaphore: asyncio.Semaphore, gs_uri: str):
    """
    Asynchronous version of load_turns_gcs.
    """
    async with semaphore:
        return await asyncio.to_thread(load_turns_gcs, gs_uri)

In [16]:
legacy = False
if legacy:
    semaphore = asyncio.Semaphore(64)  # limit concurrent loads
    async def check_if_fail(semaphore, attempt_id):
        try:
            return (await load_turns_gcs_async(semaphore, f"{PATH}/metadata/{attempt_id}.json"))[-1]["text"] is None
        except:
            return True

    failed = await asyncio.gather(*[
        check_if_fail(semaphore, row["attempt_id"])
        for _i, row in FIXED_TRAJECTORIES.iterrows()
    ])

In [17]:
if legacy:
    FIXED_TRAJECTORIES["failed"] = failed
    FIXED_TRAJECTORIES = FIXED_TRAJECTORIES[~FIXED_TRAJECTORIES["failed"]]

In [18]:
FIXED_TRAJECTORIES.to_json(f"{PATH}/samples_fixed.jsonl", orient="records", lines=True)

In [19]:
FIXED_TRAJECTORIES = pd.read_json(f"{PATH}/samples_fixed.jsonl", lines=True)

In [20]:
import pandas as pd
import numpy as np

def expand_with_sliding_windows(df: pd.DataFrame, width: int,
                                length_col: str = "trajectory_length") -> pd.DataFrame:
    """
    For every row in *df*, create (trajectory_length – width + 1) copies,
    one for every sliding window of size *width*.
    Adds two new columns:  `window_start` and `window_end` (inclusive).
    Rows whose `trajectory_length` < width are silently dropped.

    Parameters
    ----------
    df : pd.DataFrame                 Original data.
    width : int                       Size of the sliding window.
    length_col : str, default 'trajectory_length'
                                      Column holding the total length.

    Returns
    -------
    pd.DataFrame                      Expanded frame.
    """
    if width <= 0:
        raise ValueError("width must be a positive integer")

    # how many windows each row will produce
    n_windows = df[length_col] - width + 1

    # keep only rows where at least one window fits
    valid_mask = n_windows > 0
    df_valid   = df[valid_mask].copy()
    n_windows  = n_windows[valid_mask]

    # repeat the rows the right number of times
    expanded = df_valid.loc[df_valid.index.repeat(n_windows)].reset_index(drop=True)

    # build the window start indices in one NumPy shot
    starts = np.concatenate([np.arange(k) for k in n_windows])
    expanded["image_turns_start"] = starts
    expanded["image_turns_end"]   = starts + width        # inclusive upper bound
    expanded["text_turns_start"] = 0
    expanded["text_turns_end"]   = starts + width

    return expanded

In [21]:
CORRECT_TRAJECTORIES_EXPANDED = FIXED_TRAJECTORIES.copy()
CORRECT_TRAJECTORIES_EXPANDED = CORRECT_TRAJECTORIES_EXPANDED[CORRECT_TRAJECTORIES_EXPANDED["reward"] == 1]
CORRECT_TRAJECTORIES_EXPANDED = expand_with_sliding_windows(
    CORRECT_TRAJECTORIES_EXPANDED,
    width=5,
    length_col="trajectory_length"
)
num = 20
CORRECT_TRAJECTORIES_EXPANDED = CORRECT_TRAJECTORIES_EXPANDED[CORRECT_TRAJECTORIES_EXPANDED["text_turns_end"] <= num]
CORRECT_TRAJECTORIES_EXPANDED.to_json(f"{PATH}/samples_correct_trajectories_expanded_under_{num}.jsonl", orient="records", lines=True)

In [22]:
shuffled = CORRECT_TRAJECTORIES_EXPANDED.sample(frac=1, random_state=248239)
test_size = 64
train_size = len(shuffled) - test_size
TRAIN_SET = shuffled.iloc[:train_size]
TEST_SET = shuffled.iloc[train_size:]

TRAIN_SET.to_json(f"{PATH}/samples_correct_trajectories_expanded_under_{num}_train.jsonl", orient="records", lines=True)
TEST_SET.to_json(f"{PATH}/samples_correct_trajectories_expanded_under_{num}_test.jsonl", orient="records", lines=True)