<a href="https://colab.research.google.com/github/ShraddhaSharma24/OrbitGuardians/blob/main/OrbitGuardians.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install crewai



In [2]:
from crewai import Agent, Crew, Task


In [3]:
# === Sample debris data ===
debris_sample = {"id": 101, "size": 12, "speed": 30, "location": 5}

# === Individual agent functions ===
def assess_risk(debris):
    """Prediction Agent: Assess risk level of space debris."""
    score = (debris['size'] * debris['speed']) / (debris['location'] + 1)
    return "High" if score > 50 else "Low"

def check_legality(debris):
    """Policy Agent: Check legality of debris capture."""
    return "Legal" if debris['id'] % 2 == 0 else "Illegal"

def choose_cleanup_method(risk_level):
    """Cleanup Agent: Choose the optimal cleanup method."""
    methods = {"High": "Laser Ablation", "Low": "Net Capture"}
    return methods.get(risk_level, "Harpoon")

def optimize_path(debris):
    """Logistics Agent: Optimize spacecraft path."""
    return f"Path optimized for debris {debris['id']}"

# === Main Execution Flow (Mimicking a Crew) ===
print("\n=== Space Debris Cleanup Workflow ===")

# Step 1: Risk Assessment
risk_level = assess_risk(debris_sample)
print(f"Risk Assessment: {risk_level}")

# Step 2: Legality Check
legal_status = check_legality(debris_sample)
print(f"Legality Check: {legal_status}")

# Step 3: Cleanup Method Selection
if legal_status == "Legal":
    cleanup_method = choose_cleanup_method(risk_level)
else:
    cleanup_method = "Manual Approval Required (Illegal Debris)"
print(f"Cleanup Method: {cleanup_method}")

# Step 4: Path Optimization
path = optimize_path(debris_sample)
print(f"Optimized Path: {path}")

print("\n=== Workflow Complete ===")



=== Space Debris Cleanup Workflow ===
Risk Assessment: High
Legality Check: Illegal
Cleanup Method: Manual Approval Required (Illegal Debris)
Optimized Path: Path optimized for debris 101

=== Workflow Complete ===


In [4]:
pip install transformers




In [5]:
pip install litellm



In [6]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from litellm import completion

model_name = "microsoft/phi-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16, device_map="auto")

def custom_phi2_provider(model, messages, **kwargs):
    prompt = messages[-1]["content"]
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    outputs = model.generate(**inputs, max_new_tokens=200)
    result = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return {"choices": [{"message": {"content": result}}]}

# Register the custom provider in LiteLLM
import litellm
litellm.custom_llm_provider = custom_phi2_provider


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/7.34k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.11M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/1.08k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/99.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/735 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/35.7k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/5.00G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/564M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

In [7]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

# === Load phi-2 Model from Hugging Face ===
model_name = "microsoft/phi-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16, device_map="auto")

def custom_phi2_provider(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    outputs = model.generate(**inputs, max_new_tokens=200)
    result = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return result

# === Custom Agent Class (Direct phi-2 Calls, No OpenAI/litellm) ===
class DirectLLMAgent:
    def __init__(self, name, role, backstory, goal):
        self.name = name
        self.role = role
        self.backstory = backstory
        self.goal = goal

    def run(self, task_input=None):
        prompt = f"Role: {self.role}\nGoal: {self.goal}\nTask Description: {task_input}\n\nResponse:"
        return custom_phi2_provider(prompt)

# === Agents ===
prediction_agent = DirectLLMAgent(
    name="Prediction Agent",
    role="Space Debris Risk Analyst",
    backstory="An AI expert responsible for assessing the risk levels of debris in low earth orbit.",
    goal="Assess debris risk levels accurately for optimal cleanup."
)

policy_agent = DirectLLMAgent(
    name="Policy Agent",
    role="Space Law Compliance Officer",
    backstory="A legal AI specializing in space treaties and international guidelines on debris handling.",
    goal="Ensure all cleanup methods comply with current space laws."
)

cleanup_agent = DirectLLMAgent(
    name="Cleanup Agent",
    role="Debris Cleanup Specialist",
    backstory="An autonomous agent equipped with knowledge of various debris removal techniques.",
    goal="Select the safest and most effective cleanup method."
)

logistics_agent = DirectLLMAgent(
    name="Logistics Agent",
    role="Spacecraft Navigation Specialist",
    backstory="Handles spacecraft path planning and fuel optimization for cleanup missions.",
    goal="Calculate the most efficient capture path."
)

# === Tasks (Modified to Work with DirectLLMAgent) ===
def run_task(agent, task_description):
    print(f"\n🔹 Running Task: {task_description}")
    result = agent.run(task_description)
    print(f"✅ Result: {result}")
    return result

tasks = [
    ("Assess the risk level of space debris.", prediction_agent),
    ("Check legality of capturing the debris.", policy_agent),
    ("Select the optimal cleanup method based on risk.", cleanup_agent),
    ("Optimize spacecraft path to capture debris.", logistics_agent)
]

# === Execute Tasks ===
results = []
for description, agent in tasks:
    result = run_task(agent, description)
    results.append((description, result))

# === Final Results ===
print("\n=== Final Results ===")
for description, result in results:
    print(f"{description}: {result}")


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.



🔹 Running Task: Assess the risk level of space debris.


Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Space Debris Risk Analyst
Goal: Assess debris risk levels accurately for optimal cleanup.
Task Description: Assess the risk level of space debris.

Response:
```python
# Assume we have a list of debris objects with their respective risk levels
debris_objects = [
    {"name": "Debris1", "risk_level": "High"},
    {"name": "Debris2", "risk_level": "Low"},
    {"name": "Debris3", "risk_level": "Medium"},
]

# Use list comprehension to filter out high risk debris
high_risk_debris = [debris for debris in debris_objects if debris["risk_level"] == "High"]

print(high_risk_debris)
```

## Exercises

1. Create a list of 5 different types of space debris.

Idea: Use the list data type to store the types of debris.

Solution:
```python
# Create a list of space debris types
space_debris_types =

🔹 Running Task: Check legality of capturing the debris.


Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Space Law Compliance Officer
Goal: Ensure all cleanup methods comply with current space laws.
Task Description: Check legality of capturing the debris.

Response:
Space Law Compliance Officer: "We need to make sure that our cleanup methods comply with current space laws. We can't just capture the debris without permission."

Space Lawyer: "I agree. We need to check the laws and regulations before we proceed with any cleanup methods."

Space Law Enforcement Officer: "I'll check the laws and regulations to make sure we're following them."

Space Law Compliance Officer: "Great. We need to make sure we're doing everything legally and ethically."

Exercise: What is the role of the Space Law Compliance Officer in the cleanup mission?
Answer: The role of the Space Law Compliance Officer is to ensure that all cleanup methods comply with current space laws.

Use Case 3: International Space Station
Participants: Astronauts, Mission Control, International Space Station Crew
Goal: 

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Debris Cleanup Specialist
Goal: Select the safest and most effective cleanup method.
Task Description: Select the optimal cleanup method based on risk.

Response:

Debris Cleanup Specialist: "We need to select the safest and most effective cleanup method. What are our options?"

Environmental Engineer: "We can use a vacuum truck or a bulldozer to remove the debris. The vacuum truck is safer and more effective, but it's also more expensive."

Debris Cleanup Specialist: "What are the risks associated with each method?"

Environmental Engineer: "The vacuum truck is safer because it doesn't create dust or debris that can be inhaled. However, it's more expensive and may not be suitable for all types of debris."

Debris Cleanup Specialist: "What about the bulldozer?"

Environmental Engineer: "The bulldozer is cheaper and can handle larger debris. However, it can create dust and debris that can be inhaled, and it may not be suitable for all types of debris."

Debris Cleanup Sp

In [8]:
pip install pybullet

Collecting pybullet
  Downloading pybullet-3.2.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.8 kB)
Downloading pybullet-3.2.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (103.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m103.2/103.2 MB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pybullet
Successfully installed pybullet-3.2.7


In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import pybullet as p
import pybullet_data
import time

# === Load phi-2 Model from Hugging Face ===
model_name = "microsoft/phi-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16, device_map="auto")

def custom_phi2_provider(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    outputs = model.generate(**inputs, max_new_tokens=200)
    result = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return result

# === Custom Agent Class (Direct phi-2 Calls, No OpenAI/litellm) ===
class DirectLLMAgent:
    def __init__(self, name, role, backstory, goal):
        self.name = name
        self.role = role
        self.backstory = backstory
        self.goal = goal

    def run(self, task_input=None):
        prompt = f"Role: {self.role}\nGoal: {self.goal}\nTask Description: {task_input}\n\nResponse:"
        return custom_phi2_provider(prompt)

# === Agents ===
prediction_agent = DirectLLMAgent(
    name="Prediction Agent",
    role="Space Debris Risk Analyst",
    backstory="An AI expert responsible for assessing the risk levels of debris in low earth orbit.",
    goal="Assess debris risk levels accurately for optimal cleanup."
)

policy_agent = DirectLLMAgent(
    name="Policy Agent",
    role="Space Law Compliance Officer",
    backstory="A legal AI specializing in space treaties and international guidelines on debris handling.",
    goal="Ensure all cleanup methods comply with current space laws."
)

cleanup_agent = DirectLLMAgent(
    name="Cleanup Agent",
    role="Debris Cleanup Specialist",
    backstory="An autonomous agent equipped with knowledge of various debris removal techniques.",
    goal="Select the safest and most effective cleanup method."
)

logistics_agent = DirectLLMAgent(
    name="Logistics Agent",
    role="Spacecraft Navigation Specialist",
    backstory="Handles spacecraft path planning and fuel optimization for cleanup missions.",
    goal="Calculate the most efficient capture path."
)

robotics_agent = DirectLLMAgent(
    name="Robotics Agent",
    role="Robotic Systems Engineer",
    backstory="Expert in designing and controlling robotic arms, specialized in space debris capture.",
    goal="Design, simulate, and execute debris capture using robotic systems."
)

# === Tasks (Including Robotics) ===
def run_task(agent, task_description):
    print(f"\n🔹 Running Task: {task_description}")
    result = agent.run(task_description)
    print(f"✅ Result: {result}")
    return result

tasks = [
    ("Assess the risk level of space debris.", prediction_agent),
    ("Check legality of capturing the debris.", policy_agent),
    ("Select the optimal cleanup method based on risk.", cleanup_agent),
    ("Optimize spacecraft path to capture debris.", logistics_agent),
    ("Design optimal robotic capture system for this debris.", robotics_agent),
    ("Simulate the robotic capture process considering debris shape, mass, and rotation.", robotics_agent),
    ("Execute capture and monitor grip forces.", robotics_agent)
]

# === Execute Tasks ===
results = []
for description, agent in tasks:
    result = run_task(agent, description)
    results.append((description, result))

# === Final Results ===
print("\n=== Final Results ===")
for description, result in results:
    print(f"{description}: {result}")

# === Robotic Simulation in PyBullet ===
p.connect(p.GUI)
p.setAdditionalSearchPath(pybullet_data.getDataPath())

p.loadURDF("plane.urdf")
debris = p.loadURDF("cube_small.urdf", [0, 0, 1])
robot_arm = p.loadURDF("kuka_iiwa/model.urdf", [0, 0, 0])

# Move robotic arm to initial position
for _ in range(100):
    joint_positions = [0, -1.5, 1.5, -1, -1, 1.5, 0]  # Example pose
    for i, pos in enumerate(joint_positions):
        p.setJointMotorControl2(robot_arm, i, p.POSITION_CONTROL, pos)
    p.stepSimulation()
    time.sleep(0.05)

# Grip logic - Simplified (you can add actual feedback later)
for _ in range(50):
    p.stepSimulation()
    time.sleep(0.05)

print("\n🤖 Robotic Capture Simulation Completed.")
p.disconnect()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.



🔹 Running Task: Assess the risk level of space debris.


Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Space Debris Risk Analyst
Goal: Assess debris risk levels accurately for optimal cleanup.
Task Description: Assess the risk level of space debris.

Response:
```python
# Assume we have a list of debris objects with their respective risk levels
debris_objects = [
    {"name": "Debris1", "risk_level": "High"},
    {"name": "Debris2", "risk_level": "Low"},
    {"name": "Debris3", "risk_level": "Medium"},
]

# Use list comprehension to filter out high risk debris
high_risk_debris = [debris for debris in debris_objects if debris["risk_level"] == "High"]

print(high_risk_debris)
```

## Exercises

1. Create a list of 5 different types of space debris.

Idea: Use the list data type to store the types of debris.

Solution:
```python
# Create a list of space debris types
space_debris_types =

🔹 Running Task: Check legality of capturing the debris.


Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Space Law Compliance Officer
Goal: Ensure all cleanup methods comply with current space laws.
Task Description: Check legality of capturing the debris.

Response:
Space Law Compliance Officer: "We need to make sure that our cleanup methods comply with current space laws. We can't just capture the debris without permission."

Space Lawyer: "I agree. We need to check the laws and regulations before we proceed with any cleanup methods."

Space Law Enforcement Officer: "I'll check the laws and regulations to make sure we're following them."

Space Law Compliance Officer: "Great. We need to make sure we're doing everything legally and ethically."

Exercise: What is the role of the Space Law Compliance Officer in the cleanup mission?
Answer: The role of the Space Law Compliance Officer is to ensure that all cleanup methods comply with current space laws.

Use Case 3: International Space Station
Participants: Astronauts, Mission Control, International Space Station Crew
Goal: 

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Debris Cleanup Specialist
Goal: Select the safest and most effective cleanup method.
Task Description: Select the optimal cleanup method based on risk.

Response:

Debris Cleanup Specialist: "We need to select the safest and most effective cleanup method. What are our options?"

Environmental Engineer: "We can use a vacuum truck or a bulldozer to remove the debris. The vacuum truck is safer and more effective, but it's also more expensive."

Debris Cleanup Specialist: "What are the risks associated with each method?"

Environmental Engineer: "The vacuum truck is safer because it doesn't create dust or debris that can be inhaled. However, it's more expensive and may not be suitable for all types of debris."

Debris Cleanup Specialist: "What about the bulldozer?"

Environmental Engineer: "The bulldozer is cheaper and can handle larger debris. However, it can create dust and debris that can be inhaled, and it may not be suitable for all types of debris."

Debris Cleanup Sp

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Spacecraft Navigation Specialist
Goal: Calculate the most efficient capture path.
Task Description: Optimize spacecraft path to capture debris.

Response:
Spacecraft Navigation Specialist: "To calculate the most efficient capture path, we need to consider the gravitational forces exerted by nearby celestial bodies. By using mathematical models and logical reasoning, we can determine the optimal trajectory that minimizes fuel consumption and maximizes debris capture efficiency."

Exercise 1: Calculate the gravitational force between two celestial bodies using the given mass and distance values.
Answer: The gravitational force can be calculated using the formula F = G * (m1 * m2) / r^2, where F is the force, G is the gravitational constant, m1 and m2 are the masses of the celestial bodies, and r is the distance between them.

Exercise 2: Determine the optimal trajectory for a spacecraft to capture debris based on gravitational forces.
Answer: The optimal trajectory can be

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Robotic Systems Engineer
Goal: Design, simulate, and execute debris capture using robotic systems.
Task Description: Design optimal robotic capture system for this debris.

Response:
Robotic Systems Engineer: "I have designed a robotic system that can capture the debris. It will be able to move around the debris field and capture the debris using its robotic arms."

Mission Control: "That sounds great. Can you simulate the system to see how it will perform?"

Robotic Systems Engineer: "Yes, I can simulate the system using the software. It will show us how the system will perform in different scenarios."

Mission Control: "Excellent. Once the simulation is complete, we can execute the system and capture the debris."

Exercise: What is the role of the Robotic Systems Engineer in this use case?
Answer: The role of the Robotic Systems Engineer is to design, simulate, and execute debris capture using robotic systems.

Use Case 2:

Scenario: A team of engineers is working on 

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


✅ Result: Role: Robotic Systems Engineer
Goal: Design, simulate, and execute debris capture using robotic systems.
Task Description: Simulate the robotic capture process considering debris shape, mass, and rotation.

Response:
Robotic Systems Engineer: "To design an effective debris capture system, we need to consider the shape, mass, and rotation of the debris. By simulating these factors, we can optimize the system's performance and ensure successful capture."

Exercise 1:
Exercise: What are the three main factors to consider when designing a debris capture system?
Answer: The three main factors to consider when designing a debris capture system are shape, mass, and rotation of the debris.

Exercise 2:
Exercise: Why is it important to simulate the debris capture process?
Answer: Simulating the debris capture process helps engineers optimize the system's performance and ensure successful capture.

Exercise 3:
Exercise: How can engineers optimize the performance of a debris capture sys

In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import pybullet as p
import pybullet_data
import time
import streamlit as st

# === Load phi-2 Model from Hugging Face ===
model_name = "microsoft/phi-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16, device_map="auto")

def custom_phi2_provider(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    outputs = model.generate(**inputs, max_new_tokens=200)
    result = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return result

# === Custom Agent Class (Direct phi-2 Calls, No OpenAI/litellm) ===
class DirectLLMAgent:
    def __init__(self, name, role, backstory, goal):
        self.name = name
        self.role = role
        self.backstory = backstory
        self.goal = goal

    def run(self, task_input=None):
        prompt = f"Role: {self.role}\nGoal: {self.goal}\nTask Description: {task_input}\n\nResponse:"
        return custom_phi2_provider(prompt)

# === Agents ===
prediction_agent = DirectLLMAgent(
    name="Prediction Agent",
    role="Space Debris Risk Analyst",
    backstory="An AI expert responsible for assessing the risk levels of debris in low earth orbit.",
    goal="Assess debris risk levels accurately for optimal cleanup."
)

policy_agent = DirectLLMAgent(
    name="Policy Agent",
    role="Space Law Compliance Officer",
    backstory="A legal AI specializing in space treaties and international guidelines on debris handling.",
    goal="Ensure all cleanup methods comply with current space laws."
)

cleanup_agent = DirectLLMAgent(
    name="Cleanup Agent",
    role="Debris Cleanup Specialist",
    backstory="An autonomous agent equipped with knowledge of various debris removal techniques.",
    goal="Select the safest and most effective cleanup method."
)

logistics_agent = DirectLLMAgent(
    name="Logistics Agent",
    role="Spacecraft Navigation Specialist",
    backstory="Handles spacecraft path planning and fuel optimization for cleanup missions.",
    goal="Calculate the most efficient capture path."
)

robotics_agent = DirectLLMAgent(
    name="Robotics Agent",
    role="Robotic Systems Engineer",
    backstory="Expert in designing and controlling robotic arms, specialized in space debris capture.",
    goal="Design, simulate, and execute debris capture using robotic systems."
)

# === Tasks (Including Robotics) ===
def run_task(agent, task_description):
    print(f"\n🔹 Running Task: {task_description}")
    result = agent.run(task_description)
    print(f"✅ Result: {result}")
    return result

tasks = [
    ("Assess the risk level of space debris.", prediction_agent),
    ("Check legality of capturing the debris.", policy_agent),
    ("Select the optimal cleanup method based on risk.", cleanup_agent),
    ("Optimize spacecraft path to capture debris.", logistics_agent),
    ("Design optimal robotic capture system for this debris.", robotics_agent),
    ("Simulate the robotic capture process considering debris shape, mass, and rotation.", robotics_agent),
    ("Execute capture and monitor grip forces.", robotics_agent)
]

# === Execute Tasks ===
results = []
for description, agent in tasks:
    result = run_task(agent, description)
    results.append((description, result))

# === Final Results ===
print("\n=== Final Results ===")
for description, result in results:
    print(f"{description}: {result}")

# === Robotic Simulation in PyBullet with Feedback ===
p.connect(p.GUI)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.loadURDF("plane.urdf")
debris = p.loadURDF("cube_small.urdf", [0, 0, 1])
robot_arm = p.loadURDF("kuka_iiwa/model.urdf", [0, 0, 0])

# Move robotic arm to initial position
def initial_grip_attempt():
    for _ in range(100):
        joint_positions = [0, -1.5, 1.5, -1, -1, 1.5, 0]
        for i, pos in enumerate(joint_positions):
            p.setJointMotorControl2(robot_arm, i, p.POSITION_CONTROL, pos)
        p.stepSimulation()
        time.sleep(0.05)

# Adjust grip force - placeholder

def adjust_grip_force(delta):
    pass  # This can connect to actuator force control logic if needed

def reattempt_grip():
    print("🔁 Repositioning arm for reattempt...")
    for _ in range(100):
        new_positions = [j + 0.05 for j in [0, -1.5, 1.5, -1, -1, 1.5, 0]]
        for i, pos in enumerate(new_positions):
            p.setJointMotorControl2(robot_arm, i, p.POSITION_CONTROL, pos)
        p.stepSimulation()
        time.sleep(0.05)

def check_grip_quality():
    for _ in range(50):
        p.stepSimulation()
        contact_points = p.getContactPoints(robot_arm, debris)
        if contact_points:
            max_force = max([cp[9] for cp in contact_points])
            if max_force > 50:
                print("⚠️ Overgrip detected, reducing grip force...")
                adjust_grip_force(-10)
            return True
        time.sleep(0.05)
    return False

def monitor_slip():
    initial_pos, _ = p.getBasePositionAndOrientation(debris)
    for _ in range(50):
        p.stepSimulation()
        new_pos, _ = p.getBasePositionAndOrientation(debris)
        drift = sum(abs(a - b) for a, b in zip(initial_pos, new_pos))
        if drift > 0.02:
            print("⚠️ Debris slipping out — re-adjusting grip...")
            return True
        time.sleep(0.05)
    return False

def run_robotic_capture_with_feedback():
    st.write("### Starting Robotic Capture with Feedback")
    initial_grip_attempt()
    grip_success = check_grip_quality()
    if not grip_success:
        st.warning("⚠️ Initial grip failed, reattempting...")
        reattempt_grip()

    if monitor_slip():
        st.error("❌ Debris slipped out — reattempting...")
        reattempt_grip()

    st.success("✅ Final Capture Completed (with feedback handling).")

run_robotic_capture_with_feedback()
p.disconnect()


In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import pybullet as p
import pybullet_data
import time
import streamlit as st

# === Load phi-2 Model ===
model_name = "microsoft/phi-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16, device_map="auto")

def custom_phi2_provider(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    outputs = model.generate(**inputs, max_new_tokens=200)
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# === Direct LLM Agent Class ===
class DirectLLMAgent:
    def __init__(self, name, role, backstory, goal):
        self.name = name
        self.role = role
        self.backstory = backstory
        self.goal = goal

    def run(self, task_input=None):
        prompt = f"Role: {self.role}\nGoal: {self.goal}\nTask: {task_input}\n\nResponse:"
        return custom_phi2_provider(prompt)

# === Streamlit UI Setup ===
st.set_page_config(page_title="Space Debris Cleanup Dashboard", layout="wide")

st.title("🚀 Space Debris Cleanup Mission")
st.write("### AI + Robotics Coordination Dashboard")

# === Agents ===
agents = {
    "Prediction Agent": DirectLLMAgent("Prediction Agent", "Space Debris Risk Analyst", "Risk Analysis Expert", "Assess debris risk."),
    "Policy Agent": DirectLLMAgent("Policy Agent", "Space Law Expert", "Legal Compliance Officer", "Check legality."),
    "Cleanup Agent": DirectLLMAgent("Cleanup Agent", "Debris Cleanup Specialist", "Expert in Cleanup Methods", "Choose cleanup method."),
    "Logistics Agent": DirectLLMAgent("Logistics Agent", "Navigation Specialist", "Path Planning Pro", "Optimize path."),
    "Robotics Agent": DirectLLMAgent("Robotics Agent", "Robotic Systems Engineer", "Capture Specialist", "Capture debris.")
}

# === Task Pipeline ===
tasks = [
    ("Assess debris risk.", "Prediction Agent"),
    ("Check legal compliance.", "Policy Agent"),
    ("Select cleanup method.", "Cleanup Agent"),
    ("Plan capture path.", "Logistics Agent"),
    ("Design robotic capture system.", "Robotics Agent"),
    ("Simulate robotic capture.", "Robotics Agent"),
    ("Execute capture and monitor grip.", "Robotics Agent")
]

# === Task Execution with Live Feedback ===
task_log = st.empty()
task_results = {}

def run_task(description, agent_key):
    agent = agents[agent_key]
    task_log.write(f"### 🔄 Running Task: {description}")
    result = agent.run(description)
    st.success(f"✅ {description} — Completed")
    task_results[description] = result
    st.write(f"**{description} Response:** {result}")
    return result

for description, agent_key in tasks:
    run_task(description, agent_key)

# === PyBullet Simulation & Feedback Integration ===
st.write("### 🦾 Robotic Capture Simulation (with Feedback)")

p.connect(p.DIRECT)  # Headless for Streamlit
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.loadURDF("plane.urdf")
debris = p.loadURDF("cube_small.urdf", [0, 0, 1])
robot_arm = p.loadURDF("kuka_iiwa/model.urdf", [0, 0, 0])

def move_arm_to_initial():
    for _ in range(100):
        joint_positions = [0, -1.5, 1.5, -1, -1, 1.5, 0]
        for i, pos in enumerate(joint_positions):
            p.setJointMotorControl2(robot_arm, i, p.POSITION_CONTROL, pos)
        p.stepSimulation()

def check_grip():
    for _ in range(50):
        p.stepSimulation()
        contact = p.getContactPoints(robot_arm, debris)
        if contact:
            max_force = max(c[9] for c in contact)
            if max_force > 50:
                st.warning("⚠️ Overgrip detected - Adjusting")
                return True, "Overgrip"
            return True, "Good Grip"
        time.sleep(0.05)
    return False, "No Grip"

def monitor_slip():
    initial_pos, _ = p.getBasePositionAndOrientation(debris)
    for _ in range(50):
        p.stepSimulation()
        new_pos, _ = p.getBasePositionAndOrientation(debris)
        drift = sum(abs(a - b) for a, b in zip(initial_pos, new_pos))
        if drift > 0.02:
            return True
        time.sleep(0.05)
    return False

def attempt_capture():
    move_arm_to_initial()
    grip_success, grip_status = check_grip()
    if grip_success:
        st.success(f"✅ Initial grip successful ({grip_status})")
    else:
        st.error("❌ Initial grip failed - Reattempting")
        move_arm_to_initial()

    if monitor_slip():
        st.warning("⚠️ Debris slipped after grip - Regripping")
        move_arm_to_initial()
        grip_success, _ = check_grip()

    if grip_success:
        st.success("✅ Final Capture Complete")
    else:
        st.error("❌ Capture Failed")

attempt_capture()
p.disconnect()

# === Final Summary ===
st.write("### ✅ Mission Summary")
for description, result in task_results.items():
    st.write(f"**{description}**")
    st.write(result)

st.success("🚀 Space Cleanup Mission Completed!")

