# Phase 1: Setup

In [1]:
# Install necessary packages
!pip install tensorflow tensorflow-datasets
!pip install transformers pillow
!pip install unified-planning[pyperplan]

Collecting unified-planning[pyperplan]
  Downloading unified_planning-1.2.0-py3-none-any.whl.metadata (2.8 kB)
Collecting ConfigSpace (from unified-planning[pyperplan])
  Downloading configspace-1.2.1.tar.gz (130 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m131.0/131.0 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting up-pyperplan~=1.1.0 (from unified-planning[pyperplan])
  Downloading up_pyperplan-1.1.0-py3-none-any.whl.metadata (310 bytes)
Collecting pyperplan==2.1 (from up-pyperplan~=1.1.0->unified-planning[pyperplan])
  Downloading pyperplan-2.1-py2.py3-none-any.whl.metadata (4.3 kB)
Downloading up_pyperplan-1.1.0-py3-none-any.whl (12 kB)
Downloading pyperplan-2.1-py2.py3-none-any.whl (69 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.5/69.5 kB[0m [31m7.1 

In [2]:
import tensorflow_datasets as tfds

ds = tfds.load("droid_100",
               data_dir="gs://gresearch/robotics",
               split="train")

# Phase 2: VLM Selection & PDDL Extraction

In [3]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
import torch

# Loading BLIP-2 model (opt-2.7b variant for balance of speed and quality)
processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    torch_dtype=torch.float16
).to("cuda")

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/432 [00:00<?, ?B/s]

processor_config.json:   0%|          | 0.00/68.0 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/882 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

added_tokens.json:   0%|          | 0.00/23.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/548 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


config.json: 0.00B [00:00, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/10.0G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/141 [00:00<?, ?B/s]

Phase 3: Automated PDDL Generation

In [7]:
from unified_planning.shortcuts import *
from unified_planning.io import PDDLWriter

# Types
Block = UserType('block')
Location = UserType('location')
Gripper = UserType('gripper')

# Predicates
on = Fluent('on', BoolType(), block1=Block, block2=Block)
on_table = Fluent('on_table', BoolType(), block=Block)
clear = Fluent('clear', BoolType(), block=Block)
holding = Fluent('holding', BoolType(), gripper=Gripper, block=Block)
empty = Fluent('empty', BoolType(), gripper=Gripper)

In [8]:
# Create a template problem (this will become our domain)
domain_problem = Problem('blocks_world')

# Add fluents to the problem
domain_problem.add_fluent(on, default_initial_value=False)
domain_problem.add_fluent(on_table, default_initial_value=False)
domain_problem.add_fluent(clear, default_initial_value=False)
domain_problem.add_fluent(holding, default_initial_value=False)
domain_problem.add_fluent(empty, default_initial_value=True)

# Action 1: Pick up block from table
pickup = InstantaneousAction('pickup', gripper=Gripper, block=Block)
g = pickup.parameter('gripper')
b = pickup.parameter('block')
pickup.add_precondition(clear(b))
pickup.add_precondition(on_table(b))
pickup.add_precondition(empty(g))
pickup.add_effect(holding(g, b), True)
pickup.add_effect(clear(b), False)
pickup.add_effect(on_table(b), False)
pickup.add_effect(empty(g), False)
domain_problem.add_action(pickup)

# Action 2: Put down block on table
putdown = InstantaneousAction('putdown', gripper=Gripper, block=Block)
g = putdown.parameter('gripper')
b = putdown.parameter('block')
putdown.add_precondition(holding(g, b))
putdown.add_effect(on_table(b), True)
putdown.add_effect(clear(b), True)
putdown.add_effect(holding(g, b), False)
putdown.add_effect(empty(g), True)
domain_problem.add_action(putdown)

# Action 3: Stack block on another block
stack = InstantaneousAction('stack', gripper=Gripper, block1=Block, block2=Block)
g = stack.parameter('gripper')
b1 = stack.parameter('block1')
b2 = stack.parameter('block2')
stack.add_precondition(holding(g, b1))
stack.add_precondition(clear(b2))
stack.add_effect(on(b1, b2), True)
stack.add_effect(clear(b1), True)
stack.add_effect(holding(g, b1), False)
stack.add_effect(clear(b2), False)
stack.add_effect(empty(g), True)
domain_problem.add_action(stack)

# Action 4: Unstack block from another block
unstack = InstantaneousAction('unstack', gripper=Gripper, block1=Block, block2=Block)
g = unstack.parameter('gripper')
b1 = unstack.parameter('block1')
b2 = unstack.parameter('block2')
unstack.add_precondition(on(b1, b2))
unstack.add_precondition(clear(b1))
unstack.add_precondition(empty(g))
unstack.add_effect(holding(g, b1), True)
unstack.add_effect(clear(b2), True)
unstack.add_effect(on(b1, b2), False)
unstack.add_effect(clear(b1), False)
unstack.add_effect(empty(g), False)
domain_problem.add_action(unstack)

print(f"Domain has {len(domain_problem.actions)} actions")


Domain has 4 actions


In [9]:
import numpy as np
from PIL import Image

def extract_frame_from_episode(episode, frame_idx=0):
    """Extract a frame from DROID episode"""
    for i, step in enumerate(episode["steps"]):
        if i == frame_idx:
            image = step["observation"]["exterior_image_1_left"].numpy()
            return Image.fromarray(image)
    return None

def analyze_scene_with_blip2(image):
    """Use BLIP-2 to analyze the scene"""
    prompt = "Question: What blocks are visible and where are they located? Answer:"
    inputs = processor(image, text=prompt, return_tensors="pt").to("cuda", torch.float16)

    generated_ids = model.generate(**inputs, max_new_tokens=50)
    description = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()

    return description

def generate_pddl_problem_from_description(problem_id, description):
    """Convert VLM description to PDDL problem"""
    problem = Problem(f"blocks_problem_{problem_id}")

    # Parse description and create objects
    block_a = Object("block_a", Block)
    block_b = Object("block_b", Block)
    block_c = Object("block_c", Block)
    robot_gripper = Object("robot_gripper", Gripper)

    problem.add_objects([block_a, block_b, block_c, robot_gripper])

    # Set initial state based on description
    problem.set_initial_value(on_table(block_a), True)
    problem.set_initial_value(on_table(block_b), True)
    problem.set_initial_value(clear(block_a), True)
    problem.set_initial_value(clear(block_b), True)
    problem.set_initial_value(empty(robot_gripper), True)

    # Set goal state (example: stack blocks)
    problem.add_goal(on(block_a, block_b))

    return problem

# Generate 4 problems from DROID episodes
problems = []
for i, episode in enumerate(ds.take(4)):
    frame = extract_frame_from_episode(episode)
    if frame:
        description = analyze_scene_with_blip2(frame)
        problem = generate_pddl_problem_from_description(i+1, description)
        problems.append(problem)


In [13]:
domain_writer = PDDLWriter(domain_problem)

with open("blocks_domain.pddl", "w") as f:
    f.write(domain_writer.get_domain())

print("Domain file written: blocks_domain.pddl")

# Write problem files - create a new writer for EACH problem
for i, problem in enumerate(problems, 1):
    problem_writer = PDDLWriter(problem)
    with open(f"blocks_problem_{i}.pddl", "w") as f:
        f.write(problem_writer.get_problem())
    print(f"Problem file {i} written: blocks_problem_{i}.pddl")

print(f"\nSuccessfully generated 1 domain and {len(problems)} problem files!")


Domain file written: blocks_domain.pddl
Problem file 1 written: blocks_problem_1.pddl
Problem file 2 written: blocks_problem_2.pddl
Problem file 3 written: blocks_problem_3.pddl
Problem file 4 written: blocks_problem_4.pddl

Successfully generated 1 domain and 4 problem files!


# Repo Integration

In [14]:
from google.colab import files
import os

# List all PDDL files we created
pddl_files = [f for f in os.listdir('.') if f.endswith('.pddl')]
print(f"Found {len(pddl_files)} PDDL files to download:")
for f in pddl_files:
    print(f"  - {f}")

# Download each file
print("\nDownloading files...")
for pddl_file in pddl_files:
    files.download(pddl_file)
    print(f"Downloaded: {pddl_file}")

print("\n All PDDL files downloaded to your Downloads folder!")


Found 5 PDDL files to download:
  - blocks_domain.pddl
  - blocks_problem_2.pddl
  - blocks_problem_1.pddl
  - blocks_problem_3.pddl
  - blocks_problem_4.pddl

Downloading files...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloaded: blocks_domain.pddl


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloaded: blocks_problem_2.pddl


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloaded: blocks_problem_1.pddl


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloaded: blocks_problem_3.pddl


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloaded: blocks_problem_4.pddl

 All PDDL files downloaded to your Downloads folder!
