In [1]:
import os
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser, PydanticOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain import hub
from langchain_core.prompts import PromptTemplate
import tomllib
import json

In [2]:
from dotenv import dotenv_values

config = dotenv_values("../.env")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = config["LANGCHAIN_API_KEY"]
os.environ["OPENAI_API_KEY"] = config["OPENAI_API_KEY"]

In [3]:
# Function to receive requirements (markdown file) path and return the string

def load_md_file(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as file:
            return file.read()
    except FileNotFoundError:
        raise FileNotFoundError(f"The file at {file_path} was not found.")
    except Exception as e:
        raise RuntimeError(f"An error occurred while reading the file: {e}")
    

In [8]:
requirements = load_md_file("/workspaces/composition-blueprint-engine/sdi/requirements/turtlebot-emergency.md")
requirements

'# Project Overview\n- **Project Name**: Autonomous Turtlebot Navigation\n- **Objective**: Enable Turtlebot to navigate autonomously while recognizing and responding to environmental cues.\n- **Domain**: Mobility and Autonomous Vehicles\n\n# System Overview\n- **System Context**: \n  - Operating Environment: Indoor and Outdoor\n  - OS: Ubuntu 20.04 LTS Focal Foss\n  - Middleware: ROS2 Foxy Fitzroy\n  - Key Components:\n    - Camera: Captures images and publishes them to `/camera/image_raw`\n    - Motion Controller: Receives commands via `/cmd_vel`\n\n# Functional Requirements\n\n## FR-001: Emergency Vehicle Recognition\n- **Description**: The system shall detect and recognize emergency vehicles.\n- **Input**: Image stream from `/camera/image_raw`.\n- **Output**: Classification of the detected vehicle as an emergency vehicle with a confidence level above 95%.\n\n## FR-002: Path Alteration\n- **Description**: Upon detecting an emergency vehicle, the Turtlebot shall automatically alter it

In [4]:
def load_documents_from_markdown(directory):
    documents = []

    for filename in os.listdir(directory):
        if filename.endswith(".md"):
            md_path = os.path.join(directory, filename)
            loader = UnstructuredMarkdownLoader(md_path)
            data = loader.load()
            if data and isinstance(data[0], Document):
                documents.append(data[0])

    return documents

In [15]:
service_dir = "/workspaces/composition-blueprint-engine/sdi/services"
documents = load_documents_from_markdown(service_dir)
documents

[Document(metadata={'source': '/workspaces/composition-blueprint-engine/sdi/services/yolov10n.md'}, page_content='license: agpl-3.0 tags: - object-detection - computer-vision - yolov10 - pytorch_model_hub_mixin datasets: - detection-datasets/coco library_name: yolov10 inference: false\n\nModel Description\n\nYOLOv10: Real-Time End-to-End Object Detection\n\narXiv: https://arxiv.org/abs/2405.14458v1\n\ngithub: https://github.com/THU-MIG/yolov10\n\nInstallation\n\npip install git+https://github.com/THU-MIG/yolov10.git\n\nTraining and validation\n\n```python from ultralytics import YOLOv10\n\nmodel = YOLOv10.from_pretrained(\'jameslahm/yolov10n\')\n\nTraining\n\nmodel.train(...)\n\nafter training, one can push to the hub\n\nmodel.push_to_hub("your-hf-username/yolov10-finetuned")\n\nValidation\n\nmodel.val(...) ```\n\nInference\n\nHere\'s an end-to-end example showcasing inference on a cats image:\n\n```python from ultralytics import YOLOv10\n\nmodel = YOLOv10.from_pretrained(\'jameslahm/y

In [5]:
def create_chroma_vectorstore(documents, persist_directory):
    vectorstore = Chroma.from_documents(
        documents=documents,
        embedding=OpenAIEmbeddings(),
        persist_directory=persist_directory,
    )

    return vectorstore

In [16]:
persist_dir = "./chroma_local_db"
vectorstore = create_chroma_vectorstore(documents, persist_directory=persist_dir)
print(vectorstore)

<langchain_community.vectorstores.chroma.Chroma object at 0x7f8beff10fd0>


In [6]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

def create_rag_chain(retriever, prompt, llm_model):
    llm = ChatOpenAI(model_name=llm_model, temperature=0)
    return {
        "context": retriever | format_docs,
        "question": RunnablePassthrough()
    } | prompt | llm | StrOutputParser()

In [8]:
toml_filepath = "prompts.toml"
with open(toml_filepath, "rb") as file:
    prompts = tomllib.load(file)



In [29]:
markdown_dir = "./services"
toml_file = "prompts.toml"
persist_dir = "./chroma_local_db"
requirements_dir = "./requirements/turtlebot-emergency.md"
question = load_md_file(requirements_dir)

In [31]:
template = (
    "You are an AI assistant specializing in decomposing user requirements into tasks that leverage available services. "
    "Using the provided context, identify the relevant services and decompose the request into tasks. "
    "When a relevant service is available, use its exact name in the `task` field; if multiple services match, prioritize the most specific or contextually appropriate service. "
    "Only use a descriptive task name if no service matches. "
    "For each task, ensure it has a unique ID, dependencies (`dep`), and arguments (`args`). "
    "Base the response strictly on available services and contextual requirements. If a service is partially relevant, adapt its use case where possible. "
    "Do not include additional explanations or return an empty list unless no mapping is possible for all requirements. "
    "Given the user requirements, you must provide multiple valid breakdowns for the system in JSON format. Each breakdown should explore different combinations of services or workflows, where possible. Ensure that each JSON is a standalone solution."
    "Return them in the format below: "
    "```json "
    "[JSON 1]"
    "[JSON 2]"
    "and so on"
    "Example 1: "
    "Question: Can you tell me how many objects are in e1.jpg? "
    "Answer: ["
    "{{\"task\": \"object-detection\", \"id\": 0, \"dep\": [-1], \"args\": {{\"image\": \"e1.jpg\"}}}}"
    "] "
    "Example 2: "
    "Question: In e2.jpg, what’s the animal and what’s it doing? "
    "Answer: ["
    "{{\"task\": \"image-to-text\", \"id\": 0, \"dep\": [-1], \"args\": {{\"image\": \"e2.jpg\"}}}}, "
    "{{\"task\": \"image-cls\", \"id\": 1, \"dep\": [-1], \"args\": {{\"image\": \"e2.jpg\"}}}}, "
    "{{\"task\": \"object-detection\", \"id\": 2, \"dep\": [-1], \"args\": {{\"image\": \"e2.jpg\"}}}}, "
    "{{\"task\": \"visual-question-answering\", \"id\": 3, \"dep\": [-1], \"args\": {{\"text\": \"what’s the animal doing?\", \"image\": \"e2.jpg\"}}}}"
    "] "
    "Example 3: "
    "Question: First generate a HED image of e3.jpg, then based on the HED image and a text “a girl reading a book,” create a new image as a response. "
    "Answer: ["
    "{{\"task\": \"pose-detection\", \"id\": 0, \"dep\": [-1], \"args\": {{\"image\": \"e3.jpg\"}}}}, "
    "{{\"task\": \"pose-text-to-image\", \"id\": 1, \"dep\": [0], \"args\": {{\"text\": \"a girl reading a book\", \"image\": \"<resource>-0\"}}}}"
    "] "
    "Now, based on the above examples, decompose the following user request into structured tasks that follow the json structure provided in the examples: "
    "Question: {question} "
    "{context} "
    "Helpful Answer: "
)


In [17]:
def save_json_breakdowns(json_data, directory="output_jsons"):
    # Ensure the directory exists
    os.makedirs(directory, exist_ok=True)

    # Iterate over the JSON breakdowns
    for idx, breakdown in enumerate(json_data):
        file_path = os.path.join(directory, f"breakdown_{idx + 1}.json")
        with open(file_path, 'w') as json_file:
            json.dump(breakdown, json_file, indent=4)
        print(f"Saved breakdown {idx + 1} to {file_path}")

In [33]:
retriever = vectorstore.as_retriever()
# Prompt
# prompt = hub.pull("rlm/rag-prompt")
# template = prompts["description"]

# template = ("You are an AI assistant, expert at requirement decomposition and service composition. " 
# "You are provided with various services that might be fit for fulfilling the user's request. "
# "Your job is to break down the user's request and select the appropriate services that will fulfill the decomposed tasks. "
# "If there are not suitable services available to fit the user's requirements, say that it is not possible to do so. "
# "You should give your answer in a structure json output with clear indication of tasks, selected services, and any dependencies (file, values) between these tasks. "
# "Question: {question}"
# "{context}"
# "Helpful Answer: "
# )

prompt = PromptTemplate.from_template(template)

# LLM
llm = ChatOpenAI(model_name="gpt-4o", temperature=0)

# Chain
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | JsonOutputParser()
)

# Question
result = rag_chain.invoke(
    question
)
save_json_breakdowns(result)
result

Saved breakdown 1 to output_jsons/breakdown_1.json
Saved breakdown 2 to output_jsons/breakdown_2.json


[{'task': 'object-detection',
  'id': 0,
  'dep': [-1],
  'args': {'image': '/camera/image_raw'}},
 {'task': 'path-planning',
  'id': 1,
  'dep': [0],
  'args': {'detection_results': '<resource>-0', 'output_topic': '/cmd_vel'}}]