## Setup and Imports

In [1]:
!pip install -r requirements.txt



In [None]:

import google.generativeai as genai
import numpy as np
import networkx as nx
from scipy.optimize import milp
import os
import re

# --- Configuration ---
# IMPORTANT: Paste your new API key here

API_KEY = ".env"

genai.configure(api_key=API_KEY)

print("✅ Setup Complete. Libraries imported and API key configured.")

✅ Setup Complete. Libraries imported and API key configured.


## Skill List Generation

In [3]:

OBJECTS = ["red block", "blue block", "green block", "yellow block", "blue bowl"]
LOCATIONS = ["middle of the table", "corner of the table", "red block", "blue block", "green block", "yellow block", "blue bowl"]

def get_skill_set():
    """Generates the predefined list of possible robot skills."""
    skill_set = []
    for obj in OBJECTS:
        for loc in LOCATIONS:
            if obj != loc:
                skill_set.append(f"pick_and_place({obj}, {loc})")
    skill_set.append("transport(table)")
    skill_set.append("done")
    return skill_set

def build_gemini_prompt_for_skills(instruction, generated_skills, candidate_skills):
    """Builds the prompt for Gemini to choose the best next skill."""
    prompt_template = """
    You are a task planner for a multi-robot system. Your job is to decompose a natural language instruction into the most efficient sequence of executable skills.

    **Instruction:**
    "{instruction}"

    **Completed Skills:**
    {skill_history}

    **Analysis:**
    Based on the instruction, determine the final goal for each object. From the list of candidates below, choose the single most direct and logical next skill. Avoid any redundant or unnecessary intermediate steps.

    **Candidate Skills:**
    {candidate_list}

    **Your Choice:**
    Return only the full text of the single best skill from the candidate list.
    """
    skill_history = "\n".join([f"- {s}" for s in generated_skills]) if generated_skills else "No skills completed yet."
    candidate_list = "\n".join([f"- {s}" for s in candidate_skills])
    return prompt_template.format(instruction=instruction, skill_history=skill_history, candidate_list=candidate_list)

def choose_best_skill(instruction, generated_skills, skill_set):
    """Uses the Gemini model to choose the most likely next skill."""
    prompt = build_gemini_prompt_for_skills(instruction, generated_skills, skill_set)
    try:
        model = genai.GenerativeModel('gemini-1.5-flash-latest')
        response = model.generate_content(prompt)
        chosen_skill_text = response.text.strip()
        
        for candidate in skill_set:
            if candidate in chosen_skill_text:
                return candidate
        
        print(f"Warning: Model returned an un-parsable skill: '{chosen_skill_text}'")
        return None
    except Exception as e:
        print(f"Error calling Gemini API for skill choice: {e}")
        return None

def generate_skill_list(instruction, skill_set):
    """Generates a list of skills by iteratively asking Gemini to choose the best next skill."""
    generated_skills = []
    remaining_skills = skill_set.copy()
    
    while True:
        if len(generated_skills) >= 10: # Safety break to prevent infinite loops
            print("Warning: Reached skill limit of 10.")
            break
        
        best_skill = choose_best_skill(instruction, generated_skills, remaining_skills)
        if best_skill is None or best_skill == "done":
            break
            
        generated_skills.append(best_skill)
        if best_skill in remaining_skills:
            remaining_skills.remove(best_skill)
            
    return generated_skills

print("✅ Skill Generation Module Loaded.")

✅ Skill Generation Module Loaded.


##  Dependency Graph Generation

In [4]:

def build_dependency_prompt(skill_list):
    """Builds a prompt to ask Gemini for skill dependencies."""
    formatted_skills = "\n".join([f"{i+1}. {skill}" for i, skill in enumerate(skill_list)])
    prompt = f"""
    You are a task planner. Identify precedence dependencies between the skills below. A dependency exists if one skill must be completed before another can begin.

    **Skill List:**
    {formatted_skills}

    **Your Task:**
    1.  **Reasoning:** Explain your reasoning step-by-step.
    2.  **Dependencies:** Provide the dependencies in the format "N -> M". If none, state "None".

    **Example Output:**
    **Reasoning:**
    - 'pick_and_place(blue block, middle of the table)' has no dependencies.
    - 'pick_and_place(red block, blue block)' requires the 'blue block' to be in place. Thus, skill 2 depends on skill 1.
    **Dependencies:**
    1 -> 2
    """
    return prompt

def parse_dependencies(response_text, skill_list):
    """Parses the LLM's text response to extract dependency edges."""
    edges = []
    dependency_pattern = re.compile(r'(\d+)\s*->\s*(\d+)')
    if "Dependencies:" in response_text:
        dependency_section = response_text.split("Dependencies:")[1]
        matches = dependency_pattern.findall(dependency_section)
        for pred_idx, succ_idx in matches:
            u = skill_list[int(pred_idx) - 1]
            v = skill_list[int(succ_idx) - 1]
            edges.append((u, v))
    return edges

def generate_dependency_graph(skill_list):
    """Generates a dependency graph from a skill list using an LLM."""
    if not skill_list: return nx.DiGraph()
    prompt = build_dependency_prompt(skill_list)
    
    for attempt in range(3):
        try:
            model = genai.GenerativeModel('gemini-1.5-flash-latest')
            response = model.generate_content(prompt)
            graph = nx.DiGraph()
            graph.add_nodes_from(skill_list)
            edges = parse_dependencies(response.text, skill_list)
            graph.add_edges_from(edges)
            
            if nx.is_directed_acyclic_graph(graph):
                print("Successfully generated a Directed Acyclic Graph (DAG).")
                return graph
            else:
                print(f"Warning: Cycle detected on attempt {attempt + 1}. Retrying...")
        except Exception as e:
            print(f"Error generating graph: {e}")
            return nx.DiGraph()
            
    print("Error: Could not generate an acyclic graph.")
    return nx.DiGraph()

print("✅ Dependency Graph Module Loaded.")

✅ Dependency Graph Module Loaded.


## Task Allocation

In [5]:

from scipy.optimize import milp, LinearConstraint 

# --- Mock Environment and Robot Definitions ---
MOCK_OBJECT_LOCATIONS = {"red block": (0.8, 0.2), "blue block": (0.8, 0.8), "green block": (0.2, 0.2), "yellow block": (0.2, 0.8), "blue bowl": (0.5, 0.5), "table": (0.5, 0.5)}
MOCK_ROBOTS = [{"id": "arm_robot_1", "type": "arm", "location": (0, 0), "capabilities": {"can_grasp": ["red", "blue"]}}, {"id": "arm_robot_2", "type": "arm", "location": (1, 0), "capabilities": {"can_grasp": ["yellow", "green"]}}, {"id": "mobile_robot_1", "type": "mobile", "location": (0, 1), "capabilities": {}}]

def find_executable_skills(graph):
    """Finds all nodes with an in-degree of 0."""
    return [node for node, in_degree in graph.in_degree() if in_degree == 0]

def calculate_weights(robots, skills, object_locations):
    """Calculates the weight matrix for assigning skills to robots."""
    weights = np.zeros((len(robots), len(skills)))
    for j, robot in enumerate(robots):
        for k, skill in enumerate(skills):
            weight = 0.0
            if robot["type"] == "arm":
                match = re.match(r"pick_and_place\((.*?),\s*(.*?)\)", skill)
                if match:
                    obj_to_grasp = match.group(1)
                    if any(color in obj_to_grasp for color in robot["capabilities"]["can_grasp"]) and obj_to_grasp in object_locations:
                        distance = np.linalg.norm(np.array(robot["location"]) - np.array(object_locations[obj_to_grasp]))
                        weight = max(0, 1 - (distance / 1.5))
            elif robot["type"] == "mobile" and "transport" in skill:
                weight = 1.0
            weights[j, k] = weight
    return weights

def solve_task_assignment(robots, skills, weights):
    """Solves the assignment problem using Mixed-Integer Linear Programming."""
    num_robots, num_skills = len(robots), len(skills)
    if num_skills == 0: return {}
    
    # Objective function: maximize the sum of weights (we minimize the negative)
    c = -weights.flatten()

    # Constraint 1: Each skill is assigned to at most one robot
    A1 = np.zeros((num_skills, num_robots * num_skills))
    for k in range(num_skills):
        for j in range(num_robots):
            A1[k, j * num_skills + k] = 1
    constraint1 = LinearConstraint(A1, ub=np.ones(num_skills))

    # Constraint 2: Each robot is assigned at most one skill
    A2 = np.zeros((num_robots, num_robots * num_skills))
    for j in range(num_robots):
        for k in range(num_skills):
            A2[j, j * num_skills + k] = 1
    constraint2 = LinearConstraint(A2, ub=np.ones(num_robots))

    # Solve the MILP with the new constraints format
    res = milp(c=c, constraints=[constraint1, constraint2], integrality=1, bounds=(0, 1))
    
    assignments = {}
    if res.success:
        x = np.round(res.x).reshape((num_robots, num_skills))
        for j, robot in enumerate(robots):
            if np.sum(x[j, :]) == 1:
                k = np.argmax(x[j, :])
                assignments[robot["id"]] = skills[k]
    return assignments

def run_task_execution_loop(graph, robots, object_locations):
    """The main loop that finds, assigns, and 'executes' tasks."""
    execution_steps = []
    print("\n" + "="*20 + " EXECUTION LOG " + "="*20)
    
    step_num = 1
    while graph.number_of_nodes() > 0:
        executable_skills = find_executable_skills(graph)
        if not executable_skills:
            print("Error: Deadlock detected. No executable skills remaining.")
            break
        
        weights = calculate_weights(robots, executable_skills, object_locations)
        assignments = solve_task_assignment(robots, executable_skills, weights)
        
        print(f"\n--- Step {step_num} ---")
        print(f"🤖 Executable Skills: {executable_skills}")
        print(f"💡 Assignments: {assignments or 'No tasks assigned, waiting...'}")
        
        execution_steps.append(assignments)
        step_num += 1
        
        if not assignments:
            print("Stopping: No valid assignments could be made.")
            break
            
        graph.remove_nodes_from(assignments.values())
        
    print("\n" + "="*18 + " 🎉 ALL TASKS COMPLETED 🎉 " + "="*18)
    return execution_steps

print("✅ Task Allocation Module Loaded (with corrected solver).")

✅ Task Allocation Module Loaded (with corrected solver).


##  LiP-LLM Planner

In [6]:

# --- 1. Define the User Instruction ---
user_instruction = "Stack the red block on the blue block, and put the green block in the corner."

print(f"Processing Instruction: '{user_instruction}'")
print("-" * 50)

# --- 2. Generate Skill List ---
all_skills = get_skill_set()
skill_list = generate_skill_list(user_instruction, all_skills)
print("\n[Phase 1] Generated Skill List:")
for i, skill in enumerate(skill_list, 1):
    print(f"  {i}. {skill}")
print("-" * 50)

# --- 3. Generate Dependency Graph ---
dependency_graph = generate_dependency_graph(skill_list)
print("\n[Phase 2] Generated Dependency Graph:")
print(f"  Nodes: {list(dependency_graph.nodes())}")
print(f"  Edges: {list(dependency_graph.edges())}")
print("-" * 50)

# --- 4. Allocate and Execute Tasks ---
if dependency_graph.number_of_nodes() > 0:
    execution_plan = run_task_execution_loop(dependency_graph, MOCK_ROBOTS, MOCK_OBJECT_LOCATIONS)
else:
    print("Could not generate a plan. Aborting execution.")

Processing Instruction: 'Stack the red block on the blue block, and put the green block in the corner.'
--------------------------------------------------

[Phase 1] Generated Skill List:
  1. pick_and_place(blue block, middle of the table)
  2. pick_and_place(red block, blue block)
  3. pick_and_place(green block, corner of the table)
--------------------------------------------------
Successfully generated a Directed Acyclic Graph (DAG).

[Phase 2] Generated Dependency Graph:
  Nodes: ['pick_and_place(blue block, middle of the table)', 'pick_and_place(red block, blue block)', 'pick_and_place(green block, corner of the table)']
  Edges: [('pick_and_place(blue block, middle of the table)', 'pick_and_place(red block, blue block)')]
--------------------------------------------------


--- Step 1 ---
🤖 Executable Skills: ['pick_and_place(blue block, middle of the table)', 'pick_and_place(green block, corner of the table)']
💡 Assignments: {'arm_robot_1': 'pick_and_place(blue block, middle 

## Visualization

In [None]:
!pip install pybullet --no-binary :all:

In [None]:

import pybullet as p
import time
import pybullet_data
import os

def visualize_plan_in_pybullet(execution_plan, initial_locations):
    """
    Launches a PyBullet simulation using your specific asset files.
    """
    # --- 1. Setup the Simulation ---
    try:
        p.disconnect()
    except p.error:
        pass
    
    physicsClient = p.connect(p.GUI)
    p.setAdditionalSearchPath(pybullet_data.getDataPath())
    p.setGravity(0, 0, -9.8)
    p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) # Disable the default GUI panels

    # --- 2. Load Environment and Your Robot ---
    p.loadURDF("plane.urdf")
    # Load a default table since one isn't in the assets folder
    table_path = os.path.join(pybullet_data.getDataPath(), "table/table.urdf")
    tableId = p.loadURDF(table_path, basePosition=[0.5, 0, 0])
    
    # Load your UR5e robot arm from the specified path
    robot_arm_id = p.loadURDF("assets/ur5e/ur5e/ur5e.urdf", basePosition=[0, 0, 0.65], useFixedBase=True)
    print("✅ Successfully loaded environment and your UR5e robot model.")

    # --- 3. Load Objects from Your Files and Create Others Procedurally ---
    z_offset = 0.68 # Height to place objects on the table
    object_ids = {}
    current_locations = initial_locations.copy()

    # Create colored blocks procedurally since no files are provided
    block_size = [0.05, 0.05, 0.05]
    colors = {
        "red block": [1, 0, 0, 1],
        "blue block": [0, 0, 1, 1],
        "green block": [0, 1, 0, 1]
    }
    for name, color in colors.items():
        pos = [current_locations[name][0], current_locations[name][1], z_offset]
        visual_shape = p.createVisualShape(p.GEOM_BOX, halfExtents=[s/2 for s in block_size], rgbaColor=color)
        collision_shape = p.createCollisionShape(p.GEOM_BOX, halfExtents=[s/2 for s in block_size])
        obj_id = p.createMultiBody(baseMass=0.1, baseCollisionShapeIndex=collision_shape, baseVisualShapeIndex=visual_shape, basePosition=pos)
        object_ids[name] = obj_id

    # Load your bowl from the .obj file
    bowl_path = "assets/bowl/bowl/cup.obj"
    bowl_pos = [initial_locations['blue bowl'][0], initial_locations['blue bowl'][1], z_offset]
    bowl_visual_shape = p.createVisualShape(shapeType=p.GEOM_MESH, fileName=bowl_path, meshScale=[0.15, 0.15, 0.15])
    bowl_collision_shape = p.createCollisionShape(shapeType=p.GEOM_MESH, fileName=bowl_path, meshScale=[0.15, 0.15, 0.15])
    bowl_id = p.createMultiBody(baseMass=0.2, baseCollisionShapeIndex=bowl_collision_shape, baseVisualShapeIndex=bowl_visual_shape, basePosition=bowl_pos)
    object_ids['blue bowl'] = bowl_id
    
    print("✅ Successfully loaded your bowl and created blocks.")
    
    # --- 4. Step Through the Plan ---
    print("\n🚀 Starting visualization...")
    
    for step_idx, assignments in enumerate(execution_plan):
        print(f"\n--- Executing Step {step_idx + 1} ---")
        print(f"Assignments: {assignments}")
        time.sleep(1) # Pause to read the assignments
        
        for robot_id, skill in assignments.items():
            match = re.match(r"pick_and_place\((.*?),\s*(.*?)\)", skill)
            if match:
                obj_to_move = match.group(1)
                destination = match.group(2)
                
                # Get the 3D position of the destination object
                dest_pos, _ = p.getBasePositionAndOrientation(object_ids[destination])
                
                # Teleport the object to the destination
                new_pos = [dest_pos[0], dest_pos[1], dest_pos[2] + 0.05] # Stack on top
                p.resetBasePositionAndOrientation(object_ids[obj_to_move], new_pos, [0,0,0,1])
                print(f"  -> Moved '{obj_to_move}' to '{destination}'")
                time.sleep(1.5) # Pause to see the result of this move

    print("\n✅ Visualization complete!")
    # Keep the simulation window open for inspection. You can close it manually.
    # To close automatically, you would call p.disconnect() here.

# --- Run the 3D Visualization ---
visualize_plan_in_pybullet(execution_plan, MOCK_OBJECT_LOCATIONS)