# Featurization Pipelines Milestone

### Import necessary libraries and initialize variables

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from qdrant_client.http.models import PointStruct
from transformers import AutoTokenizer, AutoModel
from qdrant_client import QdrantClient
from pymongo import MongoClient
from clearml import Task
from tqdm import tqdm
import numpy as np
import requests
import torch
import json

In [3]:
AutoTokenizer.parallelism = False

In [4]:
#Initialize mongodb

client = MongoClient("mongodb://mongo:27017/")
db = client["ros2_docs"]

In [5]:
CHATGPT_API_URL = "https://api.openai.com/v1/chat/completions"
CHATGPT_API_KEY = "sk-K5Fr76TbY4UbHtJak2RQisahvSShe79mgjO19Lh4O4T3BlbkFJ6CWb8mh7gOQvBYc5qWC83CP8xNyv0bq9vXWT8GHTEA"

In [6]:
%env CLEARML_WEB_HOST=http://webserver:8080/
%env CLEARML_API_HOST=http://apiserver:8008
%env CLEARML_FILES_HOST=http://fileserver:8081
%env CLEARML_API_ACCESS_KEY=WOLAKOQGPNE6UYI1O3MGC57ZLNX7IN
%env CLEARML_API_SECRET_KEY=JTD4z46ycq_VRIlG60rRPAdE2kkjd1O4jtHWTvhVmiRKvg82MKV-YfoeNIgX12bNuIQ

env: CLEARML_WEB_HOST=http://webserver:8080/
env: CLEARML_API_HOST=http://apiserver:8008
env: CLEARML_FILES_HOST=http://fileserver:8081
env: CLEARML_API_ACCESS_KEY=WOLAKOQGPNE6UYI1O3MGC57ZLNX7IN
env: CLEARML_API_SECRET_KEY=JTD4z46ycq_VRIlG60rRPAdE2kkjd1O4jtHWTvhVmiRKvg82MKV-YfoeNIgX12bNuIQ


### Define ChatGPT function to create a question-answer pair

This question-answer pair will be part of the dataset for finetuning at a later milestone

In [7]:
def call_chatgpt_question_answer_pair(chunk_text):
    
    #Define headers for api call
    headers = {
        "Authorization": f"Bearer {CHATGPT_API_KEY}",
        "Content-Type": "application/json"
    }
    
    #Define payload with a prompt template to get a question-answer pair from a ROS2 document chunk
    payload = {
        "model": "gpt-4o-mini",
        "messages": [
            {
                "role": "system",
                "content": (
                    f"You are a helpful assistant generating question-answer pairs for text data."
                    f"Generate a question-answer pair for the 3 sets of context related to ROS2 given by the user."
                    f"Format your answer in this way:\nQuestion:<question>\nAnswer:\n<answer>"
                    f"Ensure your question corresponds to the main elements of the context given and your answers are elaborate and informative for developers."
                    f"Include technical questions regarding code if present and format your answer to highlight the technical concepts in a detailed manner."
                )
            },
            {
                "role": "user",
                "content": (
                    f"The context is below: \n"
                    f"{chunk_text}"
                )
            }
        ],
        "max_tokens": 1000,
        "temperature": 0.7
    }
    
    #Get the response from chatgpt
    response = requests.post(CHATGPT_API_URL, headers=headers, json=payload)
    
    #Parse the question and answer from the chatgpt response
    if response.status_code == 200:
        data = response.json()
        # Extract the generated questions and answers
        message_content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
        qa_pairs = {"question": "", "answer": "", "context": chunk_text}
        
        question = True
        for line in message_content.split("\n"):
            if line.strip():  # Skip empty lines
                if str(line).startswith("Question:"):
                    question = True
                    line = line.split("Question:")[1]
                elif str(line).startswith("Answer:"):
                    question = False
                    line = line.split("Answer:")[1]
                
                if question:
                    qa_pairs["question"] += line
                    qa_pairs["question"] += "\n"
                else:
                    qa_pairs["answer"] += line
                    qa_pairs["answer"] += "\n"
          
        return qa_pairs
    else:
        print(f"Error with ChatGPT API: {response.status_code}, {response.text}")
        return {}

### Define function to clean the data obtained from the mongo db after web scraping

In [8]:
def clean_data(documents, collection_name):
    cleaned_docs = []
    
    #Standardize data key names
    for doc in documents:
        if collection_name in ["LinkedIn", "Medium"]:
            cleaned_docs.append({
                "name": doc["name"],
                "url": doc["url"],
                "data": doc["data"]
            })
        elif collection_name == "YouTube":
            cleaned_docs.append({
                "name": doc["video_title"],
                "url": doc["url"],
                "data": doc["transcript"]
            })
        elif collection_name == "GitHub":
            cleaned_docs.append({
                "name": doc["repo"],
                "url": doc["url"],
                "data": doc["data"]
            })
    return cleaned_docs

### Define a function to get MiniLM embedding model for featurization

In [9]:
def initialize_embedding_model(model_name="sentence-transformers/all-MiniLM-L6-v2"):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    model.eval()
    return tokenizer, model

Using the embedding model, the below function creates vector embeddings for a batch of texts using pytorch

In [10]:
def generate_embeddings(texts, tokenizer, model, device="cuda:0"):
    
    #Initialize MiniLM tokenizer
    tokens = tokenizer(
        texts, 
        padding=True, 
        truncation=True, 
        return_tensors="pt", 
        max_length=512
    ).to(device)
    
    #Generate vector embeddings
    with torch.no_grad():
        outputs = model(**tokens)
        embeddings = outputs.last_hidden_state.mean(dim=1)
        
    return embeddings.cpu().numpy()

### Define the function to chunk the documents into smaller parts

Using `RecursiveCharacterTextSplitter` allows chunking based on logical separators

In [11]:
def chunk_data(doc, collection_name, chunk_size=1000):
    data = doc["data"]
    url = doc.get("url", None)

    # Initialize LangChain's RecursiveCharacterTextSplitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, # Max tokens/characters per chunk
        chunk_overlap=50, # Overlap between chunks to preserve context
        length_function=len # Use character length to measure
    )

    # Split the text into chunks
    split_chunks = text_splitter.split_text(data)

    # Create chunk metadata and store in chunks list
    chunks = [
        {"content": chunk, "metadata": {"url": url, "collection": collection_name, "title": doc["name"]}}
        for chunk in split_chunks
    ]

    return chunks

### Define a function to upsert embedded vectors in qdrant DB

The function below initializes the qdrant db for storing the vectors

In [12]:
def initialize_qdrant(db_path="http://qdrant:6333", collection_name="ros2_docs", vector_size=512):
    client = QdrantClient(db_path, timeout=15)

    #Ensure collection exists with the correct configuration
    client.recreate_collection(
        collection_name=collection_name,
        vectors_config={"size": vector_size, "distance": "Cosine"}
    )
    return client

In [13]:
def embed_and_upsert(chunks, qdrant_client: QdrantClient, tokenizer, model, collection_name, start_id, device="cuda:0"):  
    
    #Process 64 chunks in a batch  
    batch_size=64
    
    for b in tqdm(range(0, len(chunks), batch_size), "Upserting", leave=False):
        
        # Generate embeddings for all chunks in a batch
        batch_embeddings = generate_embeddings([chunk["content"] for chunk in chunks[b:b+batch_size]], tokenizer, model, device=device)
        
        # Convert the embeddings and metadata into the points structure
        points = []
        for i in range(len(batch_embeddings)):
            chunk = chunks[b+i]
            payload = {
                "content": chunk["content"],
                "metadata": chunk["metadata"]
            }
            
            points.append(
                PointStruct(
                    id=start_id + b + i,
                    vector=batch_embeddings[i].tolist(),
                    payload=payload
                )
            )


        # Upsert into Qdrant
        qdrant_client.upsert(collection_name=collection_name, points=points)

### Setup the databases

In [14]:
device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer, model = initialize_embedding_model()
model.to(device)
embedding_dim = model.config.hidden_size
qdrant_client = initialize_qdrant(vector_size=embedding_dim)

  client.recreate_collection(


### Execute the featurization pipeline

The below function parses the chunk to be used as context for question-answer pair generation process

In [15]:
def get_parsed_chunk(chunk, i):
    title = chunk["metadata"]["title"]
    content = chunk["content"]
    return f"Context {i} Title: {title}\nContext content: \n{content}"

In [16]:
# Collections to Process
collections = ["LinkedIn", "GitHub", "YouTube", "Medium"]
chunk_log = {}

#Vector DB id
start_id = 0

task = Task.init(project_name="ROS2 RAG System", task_name=f"Featurization", task_type=Task.TaskTypes.data_processing)
logger = task.get_logger()

for collection_name in tqdm(collections, "Collection"):
    logger.report_text(f"Processing collection -  {collection_name}")
    
    #Collection for obtaining scraped content
    collection = db[collection_name]
    
    #Collection to store Question-Answer pairs
    fine_tuning_col = db["QuestionAnswer"]
    
    #Obtain all documents in collection
    documents = list(collection.find({}))
    logger.report_text(f"Obtained {len(documents)} documents from MongoDB")

    # Clean Data
    cleaned_docs = clean_data(documents, collection_name)
    logger.report_text("Completed cleaning all documents")

    # Chunk Data
    all_chunks = []
    for doc in tqdm(cleaned_docs, "Documents", leave=False):
        chunks = chunk_data(doc, collection_name)
        
        # If enough chunks present, generate a question answer pair using chatgpt
        # if len(chunks) >= 3:
        #     random_elements = np.random.choice(chunks, size=3, replace=False)
        #     context = "\n\n\n".join([get_parsed_chunk(ele, i+1) for i, ele in enumerate(random_elements)])
        #     qa_pair = call_chatgpt_question_answer_pair(context)
        #     qa_pair["collection"] = collection_name
        #     fine_tuning_col.insert_one(qa_pair)
            
        all_chunks.extend(chunks)
    
    logger.report_text(f"Created {len(all_chunks)} chunks from all the documents")
    chunk_log[collection_name] = len(all_chunks)

    # Embed and Store in Vector DB
    embed_and_upsert(all_chunks, qdrant_client, tokenizer, model, "ros2_docs", start_id, device)
    logger.report_text(f"Completed data insertion of {collection_name} chunks into vectorDB")
    
    start_id += len(all_chunks)

logger.report_text("Creating Question-Answer Dataset")

#Fetch all question answer pairs
# collection = db["QuestionAnswer"]
# data = list(collection.find({}))

# formatted_data = []

# for record in data:
#     if 'question' not in record:
#         print("Incomplete Record. Skipping")
#         continue
    
#     #Get question, answer and context from each record
#     question = record["question"]
#     context = record["context"]
#     answer = record["answer"]

#     #Create the prompt and gpt response expected
#     prompt = f"This is the document context:\n{context}\n\n\nThis is the user prompt: {question}\n\nUsing the given document context pertaining to ROS2, answer the question prompted by the user."
#     completion = f"{answer.strip()}"

#     #Store the data in a format required for finetuning
#     formatted_data.append({"conversations": [{"from": "human", "value": prompt}, {"from": "gpt", "value": completion}]})

# # Save as JSON
# with open("formatted_dataset.json", "w") as f:
#     json.dump(formatted_data, f, indent=4)

task.upload_artifact("Finetuning_QA", artifact_object='formatted_dataset.json')
task.close()

ClearML Task: created new task id=61b4ee3dc14c4ee7bec5b1d0499780a2
ClearML results page: http://webserver:8080/projects/e54dc86522f8446889c50edbdf493bef/experiments/61b4ee3dc14c4ee7bec5b1d0499780a2/output/log
CLEARML-SERVER new package available: UPGRADE to v1.17.0 is recommended!
Release Notes:
### New Features 
- New ClearML Model dashboard: View all live model endpoints in a single location, complete with real time metrics reporting.
- New UI pipeline run table comparative view: compare plots and scalars of selected pipeline runs
- Improve services agent behavior: If no credentials are specified, agent uses default credentials ([ClearML Server GitHub issue #140](https://github.com/allegroai/clearml-server/issues/140))
- Add UI re-enqueue of failed tasks
- Add UI experiment scalar results table view
- Add "Block running user's scripts in the browser" UI setting option for added security
- Add UI "Reset" to set task installed packages to originally recorded values 
- Add UI edit of de

Collection:   0%|          | 0/4 [00:00<?, ?it/s]

Processing collection -  LinkedIn
Obtained 100 documents from MongoDB
Completed cleaning all documents




Created 628 chunks from all the documents




ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring


Collection:  25%|██▌       | 1/4 [00:32<01:38, 32.72s/it]

Completed data insertion of LinkedIn chunks into vectorDB
Processing collection -  GitHub
Obtained 806 documents from MongoDB
Completed cleaning all documents




Created 7780 chunks from all the documents




ClearML Monitor: Could not detect iteration reporting, falling back to iterations as seconds-from-start


Collection:  50%|█████     | 2/4 [10:18<11:56, 358.07s/it]

Completed data insertion of GitHub chunks into vectorDB
Processing collection -  YouTube
Obtained 51 documents from MongoDB
Completed cleaning all documents




Created 1177 chunks from all the documents


Collection:  75%|███████▌  | 3/4 [11:17<03:41, 221.69s/it]

Completed data insertion of YouTube chunks into vectorDB
Processing collection -  Medium
Obtained 60 documents from MongoDB
Completed cleaning all documents




Created 667 chunks from all the documents


Collection: 100%|██████████| 4/4 [12:00<00:00, 180.16s/it]


Completed data insertion of Medium chunks into vectorDB
Creating Question-Answer Dataset


## Total Chunks Created:

In [18]:
total_chunks = sum([chunk_log[source] for source in chunk_log])
print(f"Inserted {total_chunks} chunks in vector db!")

Inserted 10252 chunks in vector db!


## Question-Answer Dataset statistics

In [19]:
with open(r"formatted_dataset.json", "r") as f:
    data = json.load(f)

### Total Question Answer Pairs:

In [21]:
pairs_count = len(data)
print(f"Created {pairs_count} pairs of question and answers")

Created 951 pairs of question and answers


### Sample Dataset Question:

In [25]:
sam = np.random.choice(data, 1)[0]
print(sam["conversations"][0]["value"])

This is the document context:
Context 1 Title: moveit/moveit2_tutorials
Context content: 
In Shell 3:

    
    
    ros2 launch moveit2_tutorials perception_pipeline_demo.launch.py
    

In Shell 4:

    
    
    ros2 bag play -r 5 <your_bag_file> --loop
    

:codedir:`perception_pipeline_demo.launch.py
<examples/perception_pipeline/launch/perception_pipeline_demo.launch.py>` is
similar to :codedir:`demo.launch.py
</doc/tutorials/quickstart_in_rviz/launch/demo.launch.py>` inside MoveIt
Quickstart in RViz except a couple of details. For
`perception_pipeline_demo.launch.py`, following lines is added to
`moveit_config`.

You can find these additional lines in line 51, 52 and 53 inside
`perception_pipeline_demo.launch.py`:

    
    
    .sensors_3d(file_path = os.path.join(
                get_package_share_directory("moveit2_tutorials"),
                "config/sensors_3d.yaml"))
    

Finally, all demo codes can be found in :codedir:`perception_pipeline's
directory <examples/percepti

### Sample Dataset Answer

In [26]:
print(sam["conversations"][1]["value"])

The setup for a perception pipeline using MoveIt2 involves several key components and configurations that are crucial for successfully integrating and visualizing sensor data. 
1. **Launching the Perception Pipeline**: The first step involves launching the perception pipeline demo using the command:
   ```
   ros2 launch moveit2_tutorials perception_pipeline_demo.launch.py
   ```
   This command initializes the necessary nodes and parameters defined in the `perception_pipeline_demo.launch.py` file, which is a specialized launch file for the perception pipeline.
2. **Playing Recorded Bag Files**: In conjunction with launching the perception pipeline, you can play back recorded sensor data using:
   ```
   ros2 bag play -r 5 <your_bag_file> --loop
   ```
   This command allows you to replay data at a rate of 5x, which is useful for testing and visualizing the sensor inputs in real time.
3. **Sensor Configuration**: The configuration for sensors is specified in the `sensors_3d.yaml` file.