In [1]:
%pwd

'e:\\LangChain project\\DroneScripts-RAG\\research'

In [2]:
import os
os.chdir("../")

In [3]:
%pwd

'e:\\LangChain project\\DroneScripts-RAG'

In [4]:
from langchain.document_loaders import PythonLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [5]:
# #Extract Data From the PDF File
# def load_pdf_file(data):
#     loader= DirectoryLoader(data,
#                             glob="*.pdf",
#                             loader_cls=PyPDFLoader)

#     documents=loader.load()

#     return documents

# extracted_data=load_pdf_file(data='Data/')

In [6]:
file_path = "Data/takeoff.py"

In [7]:
loader = PythonLoader(file_path)

documents = loader.load()
print(documents)

[Document(metadata={'source': 'Data/takeoff.py'}, page_content='import setup_path\nimport airsim\nimport sys\nimport time\n\n# For high speed ascent and descent on PX4 you may need to set these properties:\n# param set MPC_Z_VEL_MAX_UP 5\n# param set MPC_Z_VEL_MAX_DN 5\n\nz = 5\nif len(sys.argv) > 1:\n    z = float(sys.argv[1])\n\nclient = airsim.MultirotorClient()\nclient.confirmConnection()\nclient.enableApiControl(True)\n\nclient.armDisarm(True)\n\nlanded = client.getMultirotorState().landed_state\nif landed == airsim.LandedState.Landed:\n    print("taking off...")\n    client.takeoffAsync().join()\nelse:\n    print("already flying...")\n    client.hoverAsync().join()\n\nprint("make sure we are hovering at {} meters...".format(z))\n\nif z > 5:\n    # AirSim uses NED coordinates so negative axis is up.\n    # z of -50 is 50 meters above the original launch point.\n    client.moveToZAsync(-z, 5).join()\n    client.hoverAsync().join()\n    time.sleep(5)\n\nif z > 10:\n    print("come d

In [8]:
len(documents[0].page_content)

1148

## Init Gemini model

In [9]:
import os
from dotenv import load_dotenv

_ = load_dotenv()
print(_)

True


In [10]:
from langchain_google_genai import ChatGoogleGenerativeAI

GEMINI_API_KEY=os.environ.get('GEMINI_API_KEY')

llm = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash-lite",
    google_api_key=GEMINI_API_KEY,
    temperature=0,
    max_output_tokens=2048
)

In [11]:
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant that can write python code for "
    "navigating a drone using some provided pattern in the below python script. {pattern}"
    "You must only write python code for the task provided. Do not write any explanations."
    "You can use the comment lines in the code to let the user know what you are doing."),
    ("user", "Using the provided pattern, write a python code to complete the task. {task}"),
])

prompt.input_variables

['pattern', 'task']

In [12]:
documents[0].page_content

'import setup_path\nimport airsim\nimport sys\nimport time\n\n# For high speed ascent and descent on PX4 you may need to set these properties:\n# param set MPC_Z_VEL_MAX_UP 5\n# param set MPC_Z_VEL_MAX_DN 5\n\nz = 5\nif len(sys.argv) > 1:\n    z = float(sys.argv[1])\n\nclient = airsim.MultirotorClient()\nclient.confirmConnection()\nclient.enableApiControl(True)\n\nclient.armDisarm(True)\n\nlanded = client.getMultirotorState().landed_state\nif landed == airsim.LandedState.Landed:\n    print("taking off...")\n    client.takeoffAsync().join()\nelse:\n    print("already flying...")\n    client.hoverAsync().join()\n\nprint("make sure we are hovering at {} meters...".format(z))\n\nif z > 5:\n    # AirSim uses NED coordinates so negative axis is up.\n    # z of -50 is 50 meters above the original launch point.\n    client.moveToZAsync(-z, 5).join()\n    client.hoverAsync().join()\n    time.sleep(5)\n\nif z > 10:\n    print("come down quickly to 10 meters...")\n    z = 10\n    client.moveToZAs

In [13]:
chain = prompt | llm

In [14]:
response = chain.invoke({
              "pattern": documents[0].page_content,
              "task": "Takeoff the drone at 20 meters. Hover for 10 seconds and land the drone."
          })


In [15]:
import pprint
pprint.pprint(response.content)

('```python\n'
 'import setup_path\n'
 'import airsim\n'
 'import sys\n'
 'import time\n'
 '\n'
 '# For high speed ascent and descent on PX4 you may need to set these '
 'properties:\n'
 '# param set MPC_Z_VEL_MAX_UP 5\n'
 '# param set MPC_Z_VEL_MAX_DN 5\n'
 '\n'
 'z = 20\n'
 'if len(sys.argv) > 1:\n'
 '    z = float(sys.argv[1])\n'
 '\n'
 'client = airsim.MultirotorClient()\n'
 'client.confirmConnection()\n'
 'client.enableApiControl(True)\n'
 '\n'
 'client.armDisarm(True)\n'
 '\n'
 'landed = client.getMultirotorState().landed_state\n'
 'if landed == airsim.LandedState.Landed:\n'
 '    print("taking off...")\n'
 '    client.takeoffAsync().join()\n'
 'else:\n'
 '    print("already flying...")\n'
 '    client.hoverAsync().join()\n'
 '\n'
 'print("make sure we are hovering at {} meters...".format(z))\n'
 '\n'
 'if z > 5:\n'
 '    # AirSim uses NED coordinates so negative axis is up.\n'
 '    # z of -50 is 50 meters above the original launch point.\n'
 '    client.moveToZAsync(-z, 5).join

In [15]:
response.content

'```python\nimport setup_path\nimport airsim\nimport sys\nimport time\n\n# For high speed ascent and descent on PX4 you may need to set these properties:\n# param set MPC_Z_VEL_MAX_UP 5\n# param set MPC_Z_VEL_MAX_DN 5\n\nz = 20  # Target altitude\n\nclient = airsim.MultirotorClient()\nclient.confirmConnection()\nclient.enableApiControl(True)\nclient.armDisarm(True)\n\nlanded = client.getMultirotorState().landed_state\nif landed == airsim.LandedState.Landed:\n    print("taking off...")\n    client.takeoffAsync().join()\nelse:\n    print("already flying...")\n    client.hoverAsync().join()\n\n# Takeoff to 20 meters\nprint("Taking off to {} meters...".format(z))\nclient.moveToZAsync(-z, 5).join()\nclient.hoverAsync().join()\ntime.sleep(1)\n\n# Check altitude and hover/land\nwhile True:\n    current_z = -client.getMultirotorState().kinematics_estimated.position.z\n    print(f"Current altitude: {current_z:.2f} meters")\n\n    if abs(current_z - 10) < 1:  # Within 1 meter of 10 meters\n     

In [None]:
raw = response.content

# find first block of code
if "```" in raw:
    code = raw.split("```")[1]  # get the code block
    code = code.replace("python\n", "")  # remove python tag
else:
    code = raw  # fallback: if doesn't have ``` just use raw

with open("take_off_llm_generate2.py", "w", encoding="utf-8") as f:
    f.write(code.strip())


## RAG pipeline

In [8]:
#Split the Data into Text Chunks
def text_split(extracted_data):
    text_splitter=RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=20)
    text_chunks=text_splitter.split_documents(extracted_data)
    return text_chunks

In [9]:
text_chunks=text_split(extracted_data)
print("Length of Text Chunks", len(text_chunks))

Length of Text Chunks 11


In [10]:
from langchain.embeddings import HuggingFaceEmbeddings

In [17]:
#Download the Embeddings from Hugging Face
def download_hugging_face_embeddings():
    embeddings=HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2')
    return embeddings

In [18]:
embeddings = download_hugging_face_embeddings()

  embeddings=HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2')


In [19]:
#test the embeddings
query_result = embeddings.embed_query("Hello world")
print("Length", len(query_result))

Length 384


In [11]:
from dotenv import load_dotenv
load_dotenv()

True

In [12]:
PINECONE_API_KEY=os.environ.get('PINECONE_API_KEY')

In [None]:
from pinecone.grpc import PineconeGRPC as Pinecone
from pinecone import ServerlessSpec
import os

pc = Pinecone(api_key=PINECONE_API_KEY)
index_name = "drone-rag-index"

pc.create_index(
    name=index_name,
    dimension=384, 
    metric="cosine", 
    spec=ServerlessSpec(
        cloud="aws", 
        region="us-east-1"
    ) 
) 

{
    "name": "drone-rag-index",
    "metric": "cosine",
    "host": "drone-rag-index-1uz26v7.svc.aped-4627-b74a.pinecone.io",
    "spec": {
        "serverless": {
            "cloud": "aws",
            "region": "us-east-1"
        }
    },
    "status": {
        "ready": true,
        "state": "Ready"
    },
    "vector_type": "dense",
    "dimension": 384,
    "deletion_protection": "disabled",
    "tags": null
}

In [13]:
import os
os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY

In [None]:
# Embed each chunk and upsert the embeddings into your Pinecone index.
from langchain_pinecone import PineconeVectorStore

docsearch = PineconeVectorStore.from_documents(
    documents=text_chunks,
    index_name=index_name,
    embedding=embeddings, 
)

In [20]:
# Load Existing index 

from langchain_pinecone import PineconeVectorStore
# Embed each chunk and upsert the embeddings into your Pinecone index.
docsearch = PineconeVectorStore.from_existing_index(
    index_name="drone-rag-index",
    embedding=embeddings
)

In [21]:
docsearch

<langchain_pinecone.vectorstores.PineconeVectorStore at 0x197ccbfef60>

In [22]:
retriever = docsearch.as_retriever(search_type="similarity", search_kwargs={"k":3})

In [23]:
retrieved_docs = retriever.invoke("What is navigator?")

In [24]:
retrieved_docs

[Document(id='60385ac7-a7f9-4247-ba0a-813724003aed', metadata={'author': 'Minh Tiến Trần', 'creationdate': '2025-08-16T22:48:39+07:00', 'creator': 'Microsoft® Word for Microsoft 365', 'moddate': '2025-08-16T22:48:39+07:00', 'page': 0.0, 'page_label': '1', 'producer': 'Microsoft® Word for Microsoft 365', 'source': 'Data\\drone.pdf', 'total_pages': 4.0}, page_content='"detect_weather": "Collect weather condition data (wind, temperature, etc.). " , \n \n      # Perception & Vision \n      "activate_camera": "Turn on onboard camera system. " , \n      "capture_image": "Capture a high-resolution image of the current view. " , \n      "stream_video": "Stream real-time video feed to the control center. " , \n      "detect_animal": "Detect animals in camera feed using onboard ML model. " ,'),
 Document(id='4d0845b0-e505-4c9b-a8e0-d51994d26af5', metadata={'author': 'Minh Tiến Trần', 'creationdate': '2025-08-16T22:48:39+07:00', 'creator': 'Microsoft® Word for Microsoft 365', 'moddate': '2025-08-

In [25]:
GEMINI_API_KEY=os.environ.get('GEMINI_API_KEY')
os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY

In [26]:
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-pro",
    google_api_key=GEMINI_API_KEY,
    temperature=0.4,
    max_output_tokens=2048
)

In [27]:
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate

system_prompt = (
    "You are a mission planner for an autonomous drone. "
    "Your task is to generate executable mission scripts in Python "
    "using only the provided drone actions. "
    "Always follow this template:\n\n"
    "from drone_actions import ...\n\n"
    "objects_list = [....]\n\n"
    "def mission():\n"
    "    # Step 0: ...\n"
    "    ...\n"
    "    # Step 1: ...\n"
    "   ...\n"
    "    # Final step: ...\n"
    "    ...\n"
    "    ...\n\n"
    "Constraints:\n"
    "- Use ONLY the available primitives.\n"
    "- Always structure code with numbered comments (# Step 1, # Step 2,...).\n"
    "- If mission is unclear, still generate a skeleton with TODO comments.\n\n"
    "{context}"
)


prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("human", "{input}"),
    ]
)

In [28]:
# Intent-to-Action Mapper để bridge semantic gap
from typing import List, Dict, Any
import re

class IntentToActionMapper:
    """
    Maps natural language intents to drone actions
    Solves the core semantic gap problem
    """
    
    def __init__(self):
        # Intent patterns and their mappings
        self.intent_patterns = {
            # Navigation intents
            "movement": {
                "patterns": ["fly", "go", "move", "navigate", "travel", "fly to"],
                "function": "fly_to",
                "category": "navigation"
            },
            "takeoff": {
                "patterns": ["take off", "launch", "start", "begin", "takeoff"],
                "function": "takeoff", 
                "category": "navigation"
            },
            "landing": {
                "patterns": ["land", "return", "come back", "go home", "return to base"],
                "function": "land",
                "category": "navigation"
            },
            
            # Vision/Detection intents
            "photography": {
                "patterns": ["take picture", "photograph", "capture", "snap", "capture image"],
                "function": "capture_image",
                "category": "camera"
            },
            "recording": {
                "patterns": ["record", "film", "video", "stream"],
                "function": "stream_video", 
                "category": "camera"
            },
            "detection": {
                "patterns": ["find", "detect", "look for", "search", "locate"],
                "function": "detect_animal",
                "category": "camera"
            },
            
            # Specialized actions
            "hovering": {
                "patterns": ["hover", "stay", "wait", "pause"],
                "function": "hover",
                "category": "navigation"
            }
        }
        
        # Target objects
        self.target_objects = [
            "zebra", "lion", "elephant", "giraffe", "wildebeest", 
            "swamp", "watering hole", "savanna", "hill", "tree cluster"
        ]
    
    def extract_intents(self, user_input: str) -> List[Dict[str, Any]]:
        """Extract structured intents from natural language"""
        intents = []
        text = user_input.lower()
        
        # Extract action intents
        for intent_name, intent_data in self.intent_patterns.items():
            for pattern in intent_data["patterns"]:
                if pattern in text:
                    intents.append({
                        "type": "action",
                        "intent": intent_name,
                        "action": pattern,
                        "mapped_function": intent_data["function"],
                        "category": intent_data["category"]
                    })
        
        # Extract target intents
        for target in self.target_objects:
            if target in text:
                intents.append({
                    "type": "target", 
                    "value": target,
                    "category": "object"
                })
        
        return self.dedupe_intents(intents)
    
    def dedupe_intents(self, intents: List[Dict]) -> List[Dict]:
        """Remove duplicate intents"""
        seen = set()
        unique_intents = []
        
        for intent in intents:
            key = f"{intent['type']}_{intent.get('mapped_function', intent.get('value', ''))}"
            if key not in seen:
                seen.add(key)
                unique_intents.append(intent)
        
        return unique_intents

# Test intent mapper
intent_mapper = IntentToActionMapper()
test_command = "fly to the swamp and take a picture of elephants, then return to base"
extracted = intent_mapper.extract_intents(test_command)
print("Extracted intents:")
for intent in extracted:
    print(f"- {intent}")


Extracted intents:
- {'type': 'action', 'intent': 'movement', 'action': 'fly', 'mapped_function': 'fly_to', 'category': 'navigation'}
- {'type': 'action', 'intent': 'landing', 'action': 'return', 'mapped_function': 'land', 'category': 'navigation'}
- {'type': 'target', 'value': 'elephant', 'category': 'object'}
- {'type': 'target', 'value': 'swamp', 'category': 'object'}


In [29]:
class SemanticBridgeRAG:
    """
    Addresses the core problem: semantic gap between natural language commands 
    and technical documentation in vector database
    """
    
    def __init__(self, retriever, llm, intent_mapper):
        self.retriever = retriever
        self.llm = llm
        self.intent_mapper = intent_mapper
        
    def generate_query_strategies(self, user_input: str, intents: List[Dict]) -> List[Dict]:
        """Generate multiple retrieval strategies to bridge semantic gap"""
        strategies = []
        
        # Strategy 1: Direct query (baseline)
        strategies.append({
            "query": user_input,
            "weight": 0.3,
            "type": "direct"
        })
        
        # Strategy 2: Function name queries (highest weight)
        for intent in intents:
            if intent["type"] == "action":
                function_name = intent["mapped_function"]
                strategies.append({
                    "query": function_name,
                    "weight": 0.9,
                    "type": "function_name"
                })
        
        # Strategy 3: Action-focused queries
        for intent in intents:
            if intent["type"] == "action":
                strategies.append({
                    "query": f"{intent['action']} {intent.get('target', '')}",
                    "weight": 0.8,
                    "type": "action_focused"
                })
        
        # Strategy 4: Context-expanded queries for targets
        for intent in intents:
            if intent["type"] == "target":
                strategies.append({
                    "query": f"detect {intent['value']} camera vision",
                    "weight": 0.7,
                    "type": "context_expanded"
                })
        
        return strategies
    
    def rank_and_dedupe(self, weighted_docs: List[tuple]) -> List[Any]:
        """Rank documents by weighted relevance and remove duplicates"""
        doc_scores = {}
        
        for doc, weight in weighted_docs:
            doc_key = doc.page_content[:100]  # Use first 100 chars as key
            
            if doc_key in doc_scores:
                doc_scores[doc_key]["score"] += weight
            else:
                doc_scores[doc_key] = {
                    "doc": doc,
                    "score": weight
                }
        
        # Sort by score and return top documents
        ranked = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
        return [item["doc"] for item in ranked[:5]]
    
    def process_query(self, user_input: str) -> Dict[str, Any]:
        """Main pipeline with semantic bridging"""
        
        # Step 1: Extract intents from natural language
        extracted_intents = self.intent_mapper.extract_intents(user_input)
        print(f"📋 Extracted {len(extracted_intents)} intents")
        
        # Step 2: Generate multiple query strategies
        query_strategies = self.generate_query_strategies(user_input, extracted_intents)
        print(f"🔍 Generated {len(query_strategies)} retrieval strategies")
        
        # Step 3: Multi-strategy retrieval
        all_docs = []
        for strategy in query_strategies:
            docs = self.retriever.invoke(strategy["query"])
            all_docs.extend([(doc, strategy["weight"]) for doc in docs])
            print(f"  - {strategy['type']}: '{strategy['query']}' → {len(docs)} docs")
        
        # Step 4: Weighted deduplication and ranking  
        ranked_docs = self.rank_and_dedupe(all_docs)
        print(f"📊 Final ranking: {len(ranked_docs)} unique documents")
        
        return {
            "intents": extracted_intents,
            "strategies": query_strategies,
            "docs": ranked_docs,
            "raw_docs_count": len(all_docs)
        }

print("SemanticBridgeRAG class defined ✅")


SemanticBridgeRAG class defined ✅


In [None]:
# Initialize the semantic bridge RAG system
semantic_rag = SemanticBridgeRAG(retriever, llm, intent_mapper)

🚀 Semantic Bridge RAG system initialized!
Components:
- Intent Mapper: ✅
- Retriever: ✅
- LLM: ✅


In [None]:
# Test command
test_command = "fly to the swamp and take a picture of elephants, then return to base"

print("=" * 60)
print(f"TESTING COMMAND: '{test_command}'")
print("=" * 60)

# Test new semantic RAG
result = semantic_rag.process_query(test_command)

print("\nEXTRACTED INTENTS:")
for i, intent in enumerate(result["intents"], 1):
    print(f"{i}. {intent['type'].upper()}: {intent.get('action', intent.get('value'))} → {intent.get('mapped_function', 'N/A')}")

print(f"\nRETRIEVAL STRATEGIES:")
for i, strategy in enumerate(result["strategies"], 1):
    print(f"{i}. {strategy['type']}: '{strategy['query']}' (weight: {strategy['weight']})")

print(f"\nRETRIEVAL RESULTS:")
print(f"- Total docs retrieved: {result['raw_docs_count']}")
print(f"- Unique docs after ranking: {len(result['docs'])}")

print(f"\nTOP RETRIEVED DOCUMENTS:")
for i, doc in enumerate(result["docs"][:3], 1):
    print(f"{i}. {doc.page_content[:100]}...")


🧪 TESTING COMMAND: 'fly to the swamp and take a picture of elephants, then return to base'
📋 Extracted 4 intents
🔍 Generated 7 retrieval strategies
  - direct: 'fly to the swamp and take a picture of elephants, then return to base' → 3 docs
  - function_name: 'fly_to' → 3 docs
  - function_name: 'land' → 3 docs
  - action_focused: 'fly ' → 3 docs
  - action_focused: 'return ' → 3 docs
  - context_expanded: 'detect elephant camera vision' → 3 docs
  - context_expanded: 'detect swamp camera vision' → 3 docs
📊 Final ranking: 5 unique documents

📋 EXTRACTED INTENTS:
1. ACTION: fly → fly_to
2. ACTION: return → land
3. TARGET: elephant → N/A
4. TARGET: swamp → N/A

🔍 RETRIEVAL STRATEGIES:
1. direct: 'fly to the swamp and take a picture of elephants, then return to base' (weight: 0.3)
2. function_name: 'fly_to' (weight: 0.9)
3. function_name: 'land' (weight: 0.9)
4. action_focused: 'fly ' (weight: 0.8)
5. action_focused: 'return ' (weight: 0.8)
6. context_expanded: 'detect elephant camera vis

In [32]:
# Enhanced script generation
from langchain_core.prompts import ChatPromptTemplate

def generate_enhanced_script(user_input: str, intents: List[Dict], docs: List[Any]) -> str:
    """Generate drone script with enriched context"""
    
    # Enhanced prompt template
    enhanced_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a drone mission planner. Generate executable Python scripts using the extracted intents and retrieved documentation.

Template:
from drone_actions import {required_imports}

objects_list = [...]

def mission():
    # Step 0: Initial setup
    # Step 1: [Intent-based steps]
    # Step N: Mission completion

Available Context: {context}

Extracted Intents Summary:
{intent_summary}

Use the intents to structure your mission steps logically."""),
        ("human", "{input}")
    ])
    
    # Format intents for prompt
    intent_summary = []
    required_imports = set()
    
    for intent in intents:
        if intent["type"] == "action":
            intent_summary.append(f"- {intent['action']} → {intent['mapped_function']}")
            required_imports.add(intent['mapped_function'])
        elif intent["type"] == "target":
            intent_summary.append(f"- Target: {intent['value']}")
    
    chain = enhanced_prompt | llm
    
    response = chain.invoke({
        "input": user_input,
        "context": "\n".join([doc.page_content for doc in docs]),
        "intent_summary": "\n".join(intent_summary),
        "required_imports": ", ".join(required_imports)
    })
    
    return response.content

# Generate script using semantic RAG results
enhanced_script = generate_enhanced_script(test_command, result["intents"], result["docs"])

print("🤖 GENERATED SCRIPT:")
print("=" * 40)
print(enhanced_script)


🤖 GENERATED SCRIPT:
```python
from drone_actions import land, fly_to, takeoff, detect_animal, capture_image, return_to_base

objects_list = [
    "swamp",
    "elephant"
]

def mission():
    """
    Mission to fly to a swamp, photograph elephants, and return to base.
    """
    # Step 0: Initial setup
    print("Initiating mission...")
    takeoff()

    # Step 1: Fly to the designated area
    print("Flying to the swamp...")
    fly_to("swamp")

    # Step 2: Detect and photograph the target animal
    print("Searching for elephants...")
    detected_object = detect_animal("elephant")
    if detected_object == "elephant":
        print("Elephant detected. Capturing image...")
        capture_image("elephant")
    else:
        print("Could not find an elephant in the area.")

    # Step 3: Return to base and land
    print("Returning to base...")
    return_to_base()
    print("Landing...")
    land()

    # Step 4: Mission completion
    print("Mission complete.")

if __name__ == "