# Generate Valid Blocks World Plans
An approach to generate valid Blocks World plans. *Valid for only 2 blocks.*

In [1]:
import random
from typing import List, Dict, Set, Tuple
from dataclasses import dataclass

## Helper Classes

In [2]:
@dataclass
class BlockState:
    """Represents the state of blocks in the world following PDDL predicates"""
    clear: Set[str]      # blocks with nothing on top (clear ?x)
    on_table: Set[str]   # blocks on table (on-table ?x)
    on: Dict[str, str]   # block stacked on another block (on ?x ?y)
    holding: str = None  # block being held, if any (holding ?x)
    
    @property
    def arm_empty(self) -> bool:
        return self.holding is None
    
    def copy(self):
        return BlockState(
            clear=self.clear.copy(),
            on_table=self.on_table.copy(),
            holding=self.holding,
            on=self.on.copy()
        )

In [3]:
class BlocksWorldGenerator:
    def __init__(self, num_blocks: int):
        self.num_blocks = num_blocks
        self.blocks = [chr(65 + i) for i in range(num_blocks)]  # A, B, C, ...
        
    def generate_random_state(self) -> BlockState:
        """Generate a random valid state with arm empty"""
        clear = set(self.blocks)  # Initially all blocks are clear
        on_table = set()
        on = {}  
        available = self.blocks.copy()
        
        while available:
            block = available.pop()
            if random.random() < 0.5 or not available:  # 50% chance or last block
                on_table.add(block)
            else:
                possible_bases = list(clear - {block})
                if possible_bases:  # Pick a random available block as base
                    base = random.choice(possible_bases)
                    on[block] = base
                    clear.remove(base)
                else:
                    on_table.add(block)

        return BlockState(clear=clear, on_table=on_table, holding=None, on=on)
    
    def get_valid_actions(self, state: BlockState) -> List[Tuple[str, str, str]]:
        """Get all valid actions in current state based on PDDL preconditions"""
        actions = []
        
        # If holding a block, can only put it down or stack it
        if state.holding:
            # putdown action
            actions.append(('putdown', state.holding, None))
            
            # stack action - can stack on any clear block
            for underob in state.clear:
                if underob != state.holding:  # Can't stack block on itself
                    actions.append(('stack', state.holding, underob))
                    
        # If arm is empty, can pickup from table or unstack
        else:  # arm_empty is True
            # pickup action - any clear block on table
            for ob in state.clear & state.on_table:
                actions.append(('pickup', ob, None))
                
            # unstack action - any clear block that's on another block
            for ob, underob in state.on.items():
                if ob in state.clear:
                    actions.append(('unstack', ob, underob))
                    
        return actions

    def apply_action(self, state: BlockState, action: Tuple[str, str, str]) -> BlockState:
        """Apply action to state following PDDL effects"""
        new_state = state.copy()
        action_type, ob, underob = action
        
        if action_type == 'pickup':  # pickup ?ob
            assert ob in state.clear and ob in state.on_table and state.arm_empty
            new_state.holding = ob
            new_state.clear.remove(ob)
            new_state.on_table.remove(ob)
            
        elif action_type == 'putdown':  # putdown ?ob
            assert state.holding == ob
            new_state.clear.add(ob)
            new_state.on_table.add(ob)
            new_state.holding = None
            
        elif action_type == 'stack':  # stack ?ob ?underob
            assert state.holding == ob and underob in state.clear
            new_state.on[ob] = underob
            new_state.clear.add(ob)
            new_state.clear.remove(underob)
            new_state.holding = None
            
        elif action_type == 'unstack':  # unstack ?ob ?underob
            assert ob in state.clear and state.on[ob] == underob and state.arm_empty
            new_state.holding = ob
            new_state.clear.remove(ob)
            new_state.clear.add(underob)
            new_state.on.pop(ob)
            
        return new_state

    def encode_state(self, state: BlockState) -> Dict[str, int]:
        """Encode state into the format from the document"""
        encoded = {
            'V1': 0,  # what's on table
            'V21': 0, # what's below A
            'V22': 0, # what's on top of A
            'V31': 0, # what's below B
            'V32': 0  # what's on top of B
        }
        
        # Encode V1 (what's on table)
        table_blocks = list(state.on_table)
        if 'A' in table_blocks and 'B' in table_blocks:
            encoded['V1'] = 3
        elif 'A' in table_blocks:
            encoded['V1'] = 1
        elif 'B' in table_blocks:
            encoded['V1'] = 2
            
        # Encode what's below A
        if state.holding != 'A':  # A is not being held
            if 'A' in state.on_table:
                encoded['V21'] = 2
            elif 'A' in state.on and state.on['A'] == 'B':
                encoded['V21'] = 1
                
        # Encode what's on top of A
        for block, under in state.on.items():
            if under == 'A' and block == 'B':
                encoded['V22'] = 1
                
        # Similarly for B
        if state.holding != 'B':  # B is not being held
            if 'B' in state.on_table:
                encoded['V31'] = 2
            elif 'B' in state.on and state.on['B'] == 'A':
                encoded['V31'] = 1
                
        for block, under in state.on.items():
            if under == 'B' and block == 'A':
                encoded['V32'] = 1
                    
        return encoded

    def generate_random_plan(self) -> Dict:
        """Generate a random valid plan"""
        initial_state: BlockState = self.generate_random_state()
        goal_state: BlockState = self.generate_random_state()

        # Use simple breadth-first search to find a plan
        visited = set()
        queue = [(initial_state, [])]
        
        while queue:
            current_state, plan = queue.pop(0)

            # Create a hash of the current state for visited check
            state_hash = (
                frozenset(current_state.clear),
                frozenset(current_state.on_table),
                current_state.holding,
                frozenset(current_state.on.items())
            )
            
            # Check if we've reached the goal
            if (current_state.on == goal_state.on and 
                current_state.on_table == goal_state.on_table and
                current_state.arm_empty):
                
                # Convert plan to encoded states
                encoded_plan = []
                state = initial_state
                encoded_plan.append(self.encode_state(state))
                
                for action in plan:
                    state = self.apply_action(state, action)
                    encoded_plan.append(self.encode_state(state))
                
                return {
                    'initial_state': self.encode_state(initial_state),
                    'goal_state': self.encode_state(goal_state),
                    'plan': encoded_plan,
                    'actions': plan
                }
            
            if state_hash in visited:
                continue
                
            visited.add(state_hash)
            
            # Try all valid actions
            for action in self.get_valid_actions(current_state):
                new_state = self.apply_action(current_state, action)
                queue.append((new_state, plan + [action]))
                
        return None  # No plan found

    def generate_dataset(self, num_plans: int) -> List[Dict]:
        """Generate multiple random plans"""
        plans = []
        while len(plans) < num_plans:
            plan = self.generate_random_plan()
            if plan:
                plans.append(plan)
        return plans

## Usage

In [4]:
generator = BlocksWorldGenerator(num_blocks=3)

### Generate Single Plan

In [5]:
# Generate single plan
plan = generator.generate_random_plan()
print("\nSample Plan:")
print(f"Initial State: {plan['initial_state']}")
print(f"Goal State: {plan['goal_state']}")
print("Plan sequence:")
for i, state in enumerate(plan['plan']):
    print(f"T{i+1}: {state}")
print("\nActions:", plan['actions'])


Sample Plan:
Initial State: {'V1': 1, 'V21': 2, 'V22': 1, 'V31': 1, 'V32': 0}
Goal State: {'V1': 1, 'V21': 2, 'V22': 1, 'V31': 1, 'V32': 0}
Plan sequence:
T1: {'V1': 1, 'V21': 2, 'V22': 1, 'V31': 1, 'V32': 0}

Actions: []


### Generate dataset

In [None]:
num = 5000
dataset = generator.generate_dataset(num_plans=num)
print(f"\nGenerated {len(dataset)} valid plans")

In [None]:
print("Avg. length of each plan")
total = 0
for plan in dataset:
    total += len(plan['plan'])
print(total/len(dataset))

In [None]:
import json

with open(f'../data/dataset_{num}.json', 'w') as f:
    json.dump(dataset, f)