In [1]:
from langchain.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [2]:
# Extract text from PDF files
def load_pdf_files(data):
    loader = DirectoryLoader(
        data,
        glob="*.pdf",
        loader_cls=PyPDFLoader
    )

    documents = loader.load()
    return documents

In [3]:
extracted_data = load_pdf_files("../data")

In [4]:
extracted_data

[Document(metadata={'producer': 'pdfTeX-1.40.20', 'creator': 'TeX', 'creationdate': '2022-08-23T14:25:57+05:30', 'author': 'Francisco Martín Rico', 'keywords': 'Behavior Trees; C++; Python; Reactive Behaviors; Robot Programming; ROS', 'moddate': '2022-09-07T12:03:53+05:30', 'ptex.fullbanner': 'This is pdfTeX, Version 3.14159265-2.6-1.40.20 (TeX Live 2019) kpathsea version 6.3.1', 'subject': 'A Concise Introduction to Robot Programming with ROS2 provides the reader with the concepts and tools necessary to bring a robot to life through programming. It will equip the reader with the skills necessary to undertake projects in ROS2, the new version of ROS.', 'title': 'A Concise Introduction to Robot Programming with ROS2', 'trapped': '/Unknown', 'source': '..\\data\\A Concise Introduction to Robot Programming with ROS2.pdf', 'total_pages': 264, 'page': 0, 'page_label': 'Cover'}, page_content=''),
 Document(metadata={'producer': 'pdfTeX-1.40.20', 'creator': 'TeX', 'creationdate': '2022-08-23T

In [5]:
len(extracted_data)

264

In [6]:
from typing import List
from langchain.schema import Document

def filter_to_minimal_docs(docs: List[Document]) -> List[Document]:
    """
    Given a list of Document objects, return a new list of Document objects
    containing only 'source' in metadata and the original page_content.
    """
    minimal_docs: List[Document] = []
    for doc in docs:
        src = doc.metadata.get("source")
        minimal_docs.append(
            Document(
                page_content=doc.page_content,
                metadata={"source": src}
            )
        )
    return minimal_docs

In [7]:
minimal_docs = filter_to_minimal_docs(extracted_data)
minimal_docs

[Document(metadata={'source': '..\\data\\A Concise Introduction to Robot Programming with ROS2.pdf'}, page_content=''),
 Document(metadata={'source': '..\\data\\A Concise Introduction to Robot Programming with ROS2.pdf'}, page_content='A Concise Introduction to \nRobot Programming \nwith ROS2\nA Concise Introduction to Robot Programming with ROS2 provides the reader with the con-\ncepts and tools necessary to bring a robot to life through programming. It will equip the reader \nwith the skills necessary to undertake projects with ROS2, the new version of ROS. It is not \nnecessary to have previous experience with ROS2 as it will describe its concepts, tools, and \nmethodologies from the beginning. \nKey Features\n•\t Uses the two programming languages officially supported in ROS2 (C++, mainly, and Py -\nthon)\n•\t Approaches ROS2 from three different but complementary dimensions: the Community, \nComputation Graph, and the Workspace\n•\t Includes a complete simulated robot, development

In [8]:
# Split the documents into smaller chunks
def text_split(minimal_docs):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=20,
    )
    texts_chunk = text_splitter.split_documents(minimal_docs)
    return texts_chunk

In [9]:
texts_chunk = text_split(minimal_docs)
print(f"Number of chunks: {len(texts_chunk)}")

Number of chunks: 1074


In [10]:
texts_chunk[0]

Document(metadata={'source': '..\\data\\A Concise Introduction to Robot Programming with ROS2.pdf'}, page_content='A Concise Introduction to \nRobot Programming \nwith ROS2\nA Concise Introduction to Robot Programming with ROS2 provides the reader with the con-\ncepts and tools necessary to bring a robot to life through programming. It will equip the reader \nwith the skills necessary to undertake projects with ROS2, the new version of ROS. It is not \nnecessary to have previous experience with ROS2 as it will describe its concepts, tools, and \nmethodologies from the beginning. \nKey Features')

In [11]:
from langchain.embeddings import HuggingFaceEmbeddings

def download_embeddings():
    """
    Download and return the HuggingFace embeddings model.
    """
    model_name = "sentence-transformers/all-MiniLM-L6-v2"
    embeddings = HuggingFaceEmbeddings(
        model_name=model_name
    )
    return embeddings

embedding = download_embeddings()

  embeddings = HuggingFaceEmbeddings(
  from .autonotebook import tqdm as notebook_tqdm


In [12]:
embedding

HuggingFaceEmbeddings(client=SentenceTransformer(
  (0): Transformer({'max_seq_length': 256, 'do_lower_case': False}) with Transformer model: BertModel 
  (1): Pooling({'word_embedding_dimension': 384, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False, 'pooling_mode_weightedmean_tokens': False, 'pooling_mode_lasttoken': False, 'include_prompt': True})
  (2): Normalize()
), model_name='sentence-transformers/all-MiniLM-L6-v2', cache_folder=None, model_kwargs={}, encode_kwargs={}, multi_process=False, show_progress=False)

In [13]:
from dotenv import load_dotenv
import os
load_dotenv(dotenv_path="C:\\Users\\rohit rawat\\Desktop\\Gen_AI\\ROS_CHATBOT\\.env",override=True)

True

In [14]:
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
# print(PINECONE_API_KEY)
# print(GEMINI_API_KEY)

os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY
os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY

In [15]:
!pip install --upgrade pinecone




[notice] A new release of pip is available: 25.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [16]:
import os
from pinecone import Pinecone,ServerlessSpec
index_name="ros"
## initialize the Pinecone client
pc=Pinecone(api_key=PINECONE_API_KEY)

#create the index
if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=384,  # dimensionality of dense model
        metric="dotproduct",  # sparse values supported only for dotproduct
        spec=ServerlessSpec(cloud="aws", region="us-east-1"),
    )

In [17]:
pc

<pinecone.control.pinecone.Pinecone at 0x263306fdf40>

#NOTE YOU CAN ALSO ADD DOCUMENTS IN AN EXISTING DB OR INDEX.

In [18]:
# dswith = Document(
#     page_content="dswithbappy is a youtube channel that provides tutorials on various topics.",
#     metadata={"source": "Youtube"}
# )
# docsearch.add_documents(documents=[dswith])

# this is launching an instance of a pre existing index

In [19]:
# this is launching an instance of a pre existing index
from langchain_pinecone import PineconeVectorStore

docsearch = PineconeVectorStore.from_documents(
    documents=texts_chunk,
    embedding=embedding,
    index_name=index_name
)

In [20]:
# Load Existing index 

from langchain_pinecone import PineconeVectorStore
# Embed each chunk and upsert the embeddings into your Pinecone index.
docsearch = PineconeVectorStore.from_existing_index(
    index_name=index_name,
    embedding=embedding
)

In [21]:
docsearch

<langchain_pinecone.vectorstores.PineconeVectorStore at 0x26330f02a50>

In [22]:
retriever = docsearch.as_retriever(search_type="similarity", search_kwargs={"k":5})

In [23]:
retrieved_docs = retriever.invoke("What is lifecycle node?")
retrieved_docs

[Document(id='a115e7f9-9ced-4822-9ee6-b2588acd489c', metadata={'source': '..\\data\\A Concise Introduction to Robot Programming with ROS2.pdf'}, page_content='5.2.3 Lifecycle Nodes\nSo far, we have seen that the nodes in ROS2 are objects of class Node that inherit\nmethods that allow us to communicate with other nodes or obtain information. In\nROS2, there is a type of node, the LifeCycleNode, whose lifetime is deﬁned using\nstates and the transitions between them:\n•When a LifeCycleNode is created, it is in Unconﬁgured state, and it must\ntrigger the conﬁgure transition to enter the Inactive state.'),
 Document(id='1601e170-3ca5-4747-9a8c-2412c8e9d7f6', metadata={'source': 'data\\A Concise Introduction to Robot Programming with ROS2.pdf'}, page_content='5.2.3 Lifecycle Nodes\nSo far, we have seen that the nodes in ROS2 are objects of class Node that inherit\nmethods that allow us to communicate with other nodes or obtain information. In\nROS2, there is a type of node, the LifeCycleNod

In [24]:
from langchain_google_genai import ChatGoogleGenerativeAI
# Get API key
api_key = os.getenv("GEMINI_API_KEY")

# Initialize Gemini model
llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    google_api_key=api_key,
    temperature=0.6
)
llm

ChatGoogleGenerativeAI(model='models/gemini-1.5-flash', google_api_key=SecretStr('**********'), temperature=0.6, client=<google.ai.generativelanguage_v1beta.services.generative_service.client.GenerativeServiceClient object at 0x0000026332E03D40>, default_metadata=(), model_kwargs={})

In [25]:
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate

from langchain.prompts import ChatPromptTemplate

system_prompt = """
You are a ROS AI Assistant specialized in robotics development. Use the following retrieved context to answer the user's question. Your responsibilities:
- Produce complete, correct, and runnable ROS code in either Python (ROS 2 / rclpy) or C++ (ROS 2 / rclcpp) when asked, including node(s), message types, and example launch or composition/launch files.
- Explain concepts in simple, easy-to-understand terms, suitable for an engineering student learning robotics.
- Provide a clear, structured architecture for projects the user wants to build (components, topics/actions/services, TF frames, sensors/actuators, data flow, and CI/test suggestions).
- Write correct hardware interfacing code (GPIO, serial, I2C, SPI, PWM, motor drivers, sensor drivers) with wiring notes and safety checks.
- Diagnose and correct errors in user-provided code; show minimal reproducible fixes and an explanation of the root cause.
- Implement algorithms and workflows in the ROS framework (navigation, SLAM, mapping, perception pipeline, control loops, state machines), with pseudo-code and real code examples.
- Always include dependencies and exact commands to build/run (colcon build, ros2 run, ros2 launch), and minimal test steps to verify functionality.
- Prefer ROS 2 idioms (composition, lifecycle nodes, parameters, ros2 daemon) unless user asks for ROS 1.
- When showing code: include concise comments, usage examples, expected inputs/outputs, and example messages or sample data.
- If a choice is needed (algorithms, sensors, packages), present 2–3 practical options with pros/cons and a recommended option.
- Keep answers concise when requested; otherwise be thorough and structured with numbered or short bullet sections.
- If you do not know the answer or lack required information, explicitly say so and list the exact additional data you need.
- Respect safety and do not produce harmful instructions.
- make sure to give complete code or information.

Context:
{context}
"""

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("human", "{input}"),
    ]
)



question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)

In [26]:
response = rag_chain.invoke({"input": "write a nav2 api waypoints follower code in python?"})
print(response["answer"])

This code implements a Nav2 waypoint follower using the `nav2_client` Python library.  It assumes you have a running Nav2 system.  Error handling and more sophisticated features (like speed control and recovery behaviors) could be added for a production-ready system.

```python
import rclpy
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener

class WaypointFollower(Node):

    def __init__(self):
        super().__init__('waypoint_follower')
        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)
        self.waypoints = []  # List to hold waypoints
        self.goal_handle = None
        self.action_client = self.create_client(NavigateToPose, '/navigate_to_pose')
        while not self.action_client.wait_for_service(timeout_sec=1.0):
            self.g