In [21]:
import random
import math
import time
import json
import warnings
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
warnings.filterwarnings("ignore")

In [22]:
class SimpleLLM:
    """LLM wrapper with both mock and real model options"""
    
    def __init__(self, use_real_model=False, model_name="gpt2"):
        self.use_real_model = use_real_model
        
        if use_real_model:
            try:
                from transformers import pipeline
                print(f"Loading {model_name}... (this may take a moment)")
                self.generator = pipeline(
                    'text-generation', 
                    model=model_name,
                    device=-1,  # Force CPU
                    pad_token_id=50256  # For GPT2
                )
                print(f"{model_name} loaded successfully!")
            except Exception as e:
                print(f"Failed to load model: {e}")
                print("Falling back to mock LLM")
                self.use_real_model = False
    
    def get_object_prior(self, object_name: str, locations: List[str]) -> Dict[str, float]:
        """Get probability distribution for where object might be"""
        
        # Hand-coded rules
        priors = {
            "apple": {"kitchen": 0.5, "fridge": 0.3, "pantry": 0.2},
            "book": {"bedroom": 0.4, "living_room": 0.4, "office": 0.2},
            "keys": {"entrance": 0.4, "bedroom": 0.3, "kitchen": 0.3},
            "phone": {"bedroom": 0.3, "living_room": 0.3, "office": 0.4},
            "wallet": {"bedroom": 0.5, "entrance": 0.3, "office": 0.2},
            "plate": {"kitchen": 0.6, "dishwasher": 0.3, "cabinet": 0.1}
        }
        
        if object_name in priors:
            result = {}
            for loc in locations:
                for key in priors[object_name]:
                    if key in loc.lower() or loc.lower() in key:
                        result[loc] = priors[object_name].get(key, 0)
            
            if result:
                total = sum(result.values())
                return {k: v/total for k, v in result.items()}
        
        # Uniform distribution for unknown objects
        uniform_prob = 1.0 / len(locations)
        return {loc: uniform_prob for loc in locations}
    
    def suggest_action(self, state: Dict, goal: str, history: List[str]) -> str:
        """Suggest next action based on state and goal"""
        
        # If we can see objects and not holding anything, pick them up
        if state.get('visible_objects') and not state.get('holding'):
            return f"pick_{state['visible_objects'][0]}"
    
        # If holding something, place it
        if state.get('holding'):
            return f"place_{state['holding']}"
        
        # Move to a new location
        rooms = ["kitchen", "bedroom", "living_room", "bathroom", "office"]
        current = state.get('robot_location', '')
        available_rooms = [r for r in rooms if r != current]
        
        if available_rooms:
            return f"move_to_{random.choice(available_rooms)}"
        
        return "search"

In [23]:
@dataclass
class Node:
    state: Dict
    action: Optional[str]
    parent: Optional['Node']
    children: List['Node'] = field(default_factory=list)
    visits: int = 0
    value: float = 0.0

class SimpleMCTS:
    def __init__(self, llm, c_param=1.4, max_simulations=50):
        self.llm = llm
        self.c_param = c_param
        self.max_simulations = max_simulations
    
    def search(self, initial_state: Dict, goal: Dict, available_actions: List[str]) -> str:
        #Run MCTS and return best action
        root = Node(state=initial_state, action=None, parent=None)
        
        for sim in range(self.max_simulations):
            node = self._select(root)
            
            if node.visits > 0 and len(available_actions) > 0:
                node = self._expand(node, available_actions)
            
            reward = self._simulate(node.state, goal)
            self._backpropagate(node, reward)
        
        return self._best_action(root)
    
    def _select(self, node: Node) -> Node:
        #Select node using UCT
        while node.children:
            if all(child.visits > 0 for child in node.children):
                node = self._uct_select(node)
            else:
                return next(c for c in node.children if c.visits == 0)
        return node
    
    def _uct_select(self, node: Node) -> Node:
        #Select child with highest UCT value
        best_value = -float('inf')
        best_node = None
        
        for child in node.children:
            if child.visits == 0:
                return child
            uct_value = (child.value / child.visits + 
                        self.c_param * math.sqrt(math.log(node.visits) / child.visits))
            if uct_value > best_value:
                best_value = uct_value
                best_node = child
        
        return best_node
    
    def _expand(self, node: Node, actions: List[str]) -> Node:
        #Expand node with one child
        suggested_action = self.llm.suggest_action(node.state, "goal", [])
        
        if suggested_action not in actions:
            suggested_action = random.choice(actions) if actions else "search"
        
        new_state = self._apply_action(node.state.copy(), suggested_action)
        child = Node(state=new_state, action=suggested_action, parent=node)
        node.children.append(child)
        return child
    
    def _simulate(self, state: Dict, goal: Dict) -> float:
        """Random rollout from state"""
        current_state = state.copy()
        steps = 0
        max_steps = 20
    
        # Immediate rewards based on current state
        if current_state.get('holding'):
            # Holding an object is good progress
            return 0.4 - (steps / max_steps) * 0.1
    
        # Simulate random actions
        while steps < max_steps:
            if current_state.get('visible_objects'):
                # Seeing objects is partial success
                return 0.2 - (steps / max_steps) * 0.1
        
            # Simple state evolution
            if random.random() < 0.3:  # 30% chance of progress
                return 0.1 - (steps / max_steps) * 0.05
        
            steps += 1
    
        return 0.0
    
    def _backpropagate(self, node: Node, reward: float):
        #Update values up the tree
        while node:
            node.visits += 1
            node.value += reward
            node = node.parent
    
    def _best_action(self, root: Node) -> str:
        """Select most visited child"""
        if not root.children:
            return "search"  # Default action
        
        best_child = max(root.children, key=lambda c: c.visits)
        return best_child.action if best_child.action else "search"
    
    def _apply_action(self, state: Dict, action: str) -> Dict:
        """Apply action to state (simplified simulation)"""
        new_state = state.copy()
        new_state["last_action"] = action
        
        # Simple state transitions for simulation
        if "move" in action:
            rooms = ["kitchen", "bedroom", "living_room", "bathroom", "office"]
            new_state["robot_location"] = random.choice(rooms)
        elif "pick" in action and state.get("visible_objects"):
            new_state["holding"] = state["visible_objects"][0] if state.get("visible_objects") else None
        elif "place" in action:
            new_state["holding"] = None
            
        return new_state

In [24]:
class ObjectRearrangementTask:
    """Task: Rearrange objects to target locations"""
    
    def __init__(self, n_objects=3, n_rooms=5):
        self.rooms = ["kitchen", "bedroom", "living_room", "bathroom", "office"][:n_rooms]
        self.containers = {
            "kitchen": ["fridge", "cabinet", "counter"],
            "bedroom": ["dresser", "nightstand"],
            "living_room": ["coffee_table", "shelf"],
            "bathroom": ["medicine_cabinet"],
            "office": ["desk", "bookshelf"]
        }
        
        self.objects = ["apple", "book", "keys", "mug", "phone"][:n_objects]
        self.reset()
    
    def reset(self, seed=None):
        """Reset environment to random state"""
        if seed:
            random.seed(seed)
            
        self.robot_location = "kitchen"
        self.holding = None
        self.steps_taken = 0
        self.completed_goals = set()  # ADD THIS LINE - tracks completed objects
        
        # Random initial placement
        self.object_locations = {}
        for obj in self.objects:
            room = random.choice(self.rooms)
            container = random.choice(self.containers.get(room, ["floor"]))
            self.object_locations[obj] = (room, container)
        
        # Set goals
        self.goal_locations = {
            "apple": ("kitchen", "fridge"),
            "book": ("office", "bookshelf"),
            "keys": ("bedroom", "nightstand"),
        }
        
        self.observed_locations = {}
        return self.get_state()
    
    def get_state(self) -> Dict:
        """Get current partially observable state"""
        visible_objects = self._get_visible_objects()
        
        state = {
            "robot_location": self.robot_location,
            "holding": self.holding,
            "visible_objects": visible_objects,
            "observed": self.observed_locations.copy(),
            "steps": self.steps_taken
        }
        
        for obj in visible_objects:
            self.observed_locations[obj] = self.object_locations[obj]
        
        return state
    
    def _get_visible_objects(self) -> List[str]:
        """Objects visible in current room"""
        visible = []
        for obj, (room, container) in self.object_locations.items():
            if room == self.robot_location and obj != self.holding:
                # CHANGE: Don't show completed objects
                if obj not in self.completed_goals:
                    visible.append(obj)
        return visible
    
    def execute_action(self, action: str) -> Tuple[Dict, float, bool, Dict]:
        """Execute action and return (state, reward, done, info)"""
        self.steps_taken += 1
        reward = -0.01  # Step penalty
        info = {"success": False, "action": action}
        
        if action.startswith("move_to_"):
            room = action.replace("move_to_", "")
            if room in self.rooms:
                self.robot_location = room
        
        elif action.startswith("pick_"):
            obj = action.replace("pick_", "")
            if obj in self._get_visible_objects() and not self.holding:
                self.holding = obj
                #del self.object_locations[obj]
                reward = 0.1
        
        elif action.startswith("place_"):
            if self.holding:
                obj_to_place = self.holding  # CHANGE: Store the object
                self.object_locations[obj_to_place] = (self.robot_location, "floor")
                
                # CHANGE: Check if goal achieved and not already completed
                if obj_to_place in self.goal_locations and obj_to_place not in self.completed_goals:
                    goal_room, goal_container = self.goal_locations[obj_to_place]
                    if self.robot_location == goal_room:
                        reward = 1.0
                        info["success"] = True
                        self.completed_goals.add(obj_to_place)  # Mark as completed
                
                self.holding = None
        
        done = self._check_all_goals() or self.steps_taken >= 100
        return self.get_state(), reward, done, info
    
    def _check_all_goals(self) -> bool:
        """Check if all goals achieved"""
        # CHANGE: Use completed_goals to check
        for obj in self.objects:
            if obj in self.goal_locations and obj not in self.completed_goals:
                return False
        return True
    
    def get_available_actions(self) -> List[str]:
        """Get valid actions"""
        actions = []
        
        for room in self.rooms:
            if room != self.robot_location:
                actions.append(f"move_to_{room}")
        
        if not self.holding:
            for obj in self._get_visible_objects():
                actions.append(f"pick_{obj}")
        elif self.holding:
            actions.append(f"place_{self.holding}")
        
        actions.append("search")
        return actions

In [25]:
def run_experiment(agent_type, task, llm=None, mcts=None, max_steps=100):
    #Run a single experiment
    task.reset(seed=random.randint(0, 1000))
    
    total_reward = 0
    steps = 0
    
    for step in range(max_steps):
        state = task.get_state()
        actions = task.get_available_actions()
        
        if agent_type == "random":
            action = random.choice(actions)
        elif agent_type == "llm_policy":
            action = llm.suggest_action(state, "goal", [])
            if action not in actions:
                action = random.choice(actions)
        elif agent_type == "llm_mcts":
            action = mcts.search(state, {"complete": True}, actions)
        
        state, reward, done, info = task.execute_action(action)
        total_reward += reward
        steps += 1
        
        if done:
            break
    
    return {
        "total_reward": total_reward,
        "steps": steps,
        "completed": task._check_all_goals()
    }

# Run main experiment
print("=" * 60)
print("LLM-MCTS EXPERIMENT - Object Rearrangement Task")
print("=" * 60)

# Initialize
use_real_llm = False  # Set to True to use GPT-2
n_trials = 3
n_objects = 2

llm = SimpleLLM(use_real_model=use_real_llm)
mcts = SimpleMCTS(llm, max_simulations=20)
task = ObjectRearrangementTask(n_objects=n_objects)

# Run experiments
agents = ["random", "llm_policy", "llm_mcts"]
results = {agent: [] for agent in agents}

for agent_type in agents:
    print(f"\nTesting {agent_type.upper()} Agent...")
    
    for trial in range(n_trials):
        result = run_experiment(agent_type, task, llm, mcts)
        results[agent_type].append(result)
        print(f"  Trial {trial + 1}: Reward={result['total_reward']:.2f}, Steps={result['steps']}")

# Print summary
print("\n" + "=" * 60)
print("RESULTS SUMMARY")
print("=" * 60)

for agent_type in agents:
    agent_results = results[agent_type]
    avg_reward = sum(r['total_reward'] for r in agent_results) / n_trials
    avg_steps = sum(r['steps'] for r in agent_results) / n_trials
    success_rate = sum(r['completed'] for r in agent_results) / n_trials * 100
    
    print(f"\n{agent_type.upper()}:")
    print(f"  Avg Reward:    {avg_reward:.3f}")
    print(f"  Avg Steps:     {avg_steps:.1f}")
    print(f"  Success Rate:  {success_rate:.0f}%")

LLM-MCTS EXPERIMENT - Object Rearrangement Task

Testing RANDOM Agent...
  Trial 1: Reward=0.78, Steps=100
  Trial 2: Reward=0.23, Steps=100
  Trial 3: Reward=0.34, Steps=100

Testing LLM_POLICY Agent...
  Trial 1: Reward=4.50, Steps=100
  Trial 2: Reward=4.50, Steps=100
  Trial 3: Reward=4.50, Steps=100

Testing LLM_MCTS Agent...
  Trial 1: Reward=4.50, Steps=100
  Trial 2: Reward=5.40, Steps=100
  Trial 3: Reward=5.51, Steps=100

RESULTS SUMMARY

RANDOM:
  Avg Reward:    0.450
  Avg Steps:     100.0
  Success Rate:  0%

LLM_POLICY:
  Avg Reward:    4.500
  Avg Steps:     100.0
  Success Rate:  0%

LLM_MCTS:
  Avg Reward:    5.137
  Avg Steps:     100.0
  Success Rate:  0%
