In [1]:
import gradio as gr

import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, VectorParams
from transformers import AutoTokenizer, AutoModel
import hashlib
from urllib.parse import urljoin, urlparse
from clearml import Task, PipelineController

ETL pipeline for collecting and vectorize Data. I gather data from website as ros2 and nav2 website using a crawler. Vectorize the content using  all-MiniLM-L6-v2 , hash the content to get an unique id.
Mongo db saved the raw content and _id. Qdrant client stored the _id and tokenized content.
When trying to retriving the context, we first vectorize the input, search for the nearest vector in Qdrant, and find the content using _id from Qdrant client.

![clearMLpic](screenshot.png)

In [50]:
mongo_client = MongoClient("mongodb://localhost:27018/")
mongo_db = mongo_client["ros2_database"]
mongo_collection = mongo_db["ros2_documents"]

%env CLEARML_WEB_HOST=http://localhost:8080/
%env CLEARML_API_HOST=http://localhost:8008
%env CLEARML_FILES_HOST=http://localhost:8081
%env CLEARML_API_ACCESS_KEY=JFG6WC6KZ91IC5AG4DQ7NA1CVSM25F ##my clearML API key on local server
%env CLEARML_API_SECRET_KEY=FO7EPdskSmDQhaXIvLEzHhyQx0dVhQD1vFWM3tglhxejLlib_M9Bmy8zekVJk3op2Ys

env: CLEARML_WEB_HOST=http://localhost:8080/
env: CLEARML_API_HOST=http://localhost:8008
env: CLEARML_FILES_HOST=http://localhost:8081
env: CLEARML_API_ACCESS_KEY=JFG6WC6KZ91IC5AG4DQ7NA1CVSM25F ##my clearML API key on local server
env: CLEARML_API_SECRET_KEY=FO7EPdskSmDQhaXIvLEzHhyQx0dVhQD1vFWM3tglhxejLlib_M9Bmy8zekVJk3op2Ys


In [None]:

task = Task.init(
    project_name="ROS2_ETL",
    task_name="Crawl",
    task_type=Task.TaskTypes.training 
)


task.set_system_tags(["development"])


In [26]:
## We don't have remote server, and having a docker agent to execute the code is awkward,I think it requires a standalone github repo and requirements.txt to get and install the needed library(see https://clear.ml/docs/latest/docs/references/sdk/task/#set_packages) , so I don't waste time on this

In [49]:
qdrant_client = QdrantClient(url="http://localhost:6333")
qdrant_collection_name = "ros2_vectors"

# 向量化模型加载 (HuggingFace 模型)
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

def extract_links(soup, base_url):
    """
    从网页中提取所有的超链接，并转换为绝对路径。
    """
    links = []
    for tag in soup.find_all('a', href=True):
        href = tag['href']
        full_url = urljoin(base_url, href)  # 转换为绝对路径
        # 过滤掉不完整的URL
        if is_valid_url(full_url):
            links.append(full_url)
    return links


def is_valid_url(url):
    """
    验证URL是否有效。
    """
    parsed = urlparse(url)
    return bool(parsed.netloc) and bool(parsed.scheme)



def get_text_from_url(url):
    """从指定URL爬取文本内容"""
    response = requests.get(url)
    response.raise_for_status()  # 确保请求成功
    soup = BeautifulSoup(response.text, "html.parser")
    
    # 提取网页正文中的文本
    paragraphs = soup.find_all(['p', 'span'])
    return "\n".join([p.get_text() for p in paragraphs])


def split_text(text, max_length=2000):
    
    sentences = text.split("\n")
    chunks = []
    current_chunk = []

    for sentence in sentences:
        if len(" ".join(current_chunk + [sentence])) <= max_length:
            current_chunk.append(sentence)
        else:
            chunks.append(" ".join(current_chunk))
            current_chunk = [sentence]

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks


def vectorize_text(text):
    """向量化文本"""
    tokens = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
    embeddings = model(**tokens).last_hidden_state.mean(dim=1).squeeze().tolist()
    return embeddings


def save_to_mongodb_and_qdrant(chunks, source_url):
    """将分割的文本存入 MongoDB 并向量化存入 Qdrant"""
    for chunk in chunks:
        # generate ID (基于内容哈希)
        doc_id = hashlib.md5(chunk.encode("utf-8")).hexdigest()

        # 存入 MongoDB
        mongo_document = {
            "_id": doc_id,
            "content": chunk,
            "source_url": source_url,
        }
        mongo_collection.replace_one({"_id": doc_id}, mongo_document, upsert=True)

        # 向量化并存入 Qdrant
        vector = vectorize_text(chunk)
        qdrant_client.upsert(
            collection_name=qdrant_collection_name,
            points=[
                PointStruct(id=doc_id, vector=vector, payload={"source_url": source_url})
            ],
        )


def scrape_ros2_documentation(base_url):
    """递归爬取 ROS2 文档"""
    visited_docs_links = set()
    visited_other_links = set()
    urls_to_visit = [base_url]

    docs_links_to_visit = [base_url]
    other_links_to_visit = []

    while docs_links_to_visit and len(visited_docs_links) < MAX_MAINSITE_LINKS:
        current_url = docs_links_to_visit.pop()
        if current_url in visited_docs_links:
            continue

        print(f"Processing docs link: {current_url}")
        visited_docs_links.add(current_url)

        # 获取页面内容并提取链接
        try:
            html = requests.get(current_url).text
            text = get_text_from_url(current_url)
            chunks = split_text(text)
            save_to_mongodb_and_qdrant(chunks, current_url)
            soup = BeautifulSoup(html, 'html.parser')
            links = extract_links(soup, current_url)
        except Exception as e:
            print(f"Failed to process {current_url}: {e}")
            continue

        # 分类链接
        for link in links:
            if "#" in link or "docs.ros.org/en/rolling/p" in link or "_" in link or ("html" not in link):##This should be something Regex, but anyway
                continue
            elif DOCS_DOMAIN in link and link not in visited_docs_links and link not in docs_links_to_visit:
                docs_links_to_visit.append(link)
            elif DOCS_DOMAIN not in link and link not in visited_other_links and link not in other_links_to_visit:
                other_links_to_visit.append(link)

    print(f"Finished processing docs links. Processed {len(visited_docs_links)} links.")

    # 爬取其他链接
    while other_links_to_visit and len(visited_other_links) < MAX_OTHER_LINKS:
        current_url = other_links_to_visit.pop()
        if current_url in visited_other_links or "docs.ros.org/en/" in current_url:
            continue

        print(f"Processing other link: {current_url}")
        visited_other_links.add(current_url)

        # 获取页面内容
        try:
            html = requests.get(current_url).text
            text = get_text_from_url(current_url)
            chunks = split_text(text)
            save_to_mongodb_and_qdrant(chunks, current_url)
            soup = BeautifulSoup(html, 'html.parser')
        except Exception as e:
            print(f"Failed to process {current_url}: {e}")
            continue

    print(f"Finished processing other links. Processed {len(visited_other_links)} links.")

In [None]:

MAX_MAINSITE_LINKS = 200
MAX_OTHER_LINKS = 50
DOCS_DOMAIN = "docs.nav2.org/"
BASE_URL = "https://docs.nav2.org/"



nav2_base_url = "https://docs.nav2.org/"##sample crawl on nav2 , this has also been down on other links as docs.ros2 , we ignore these calls and show these links below
scrape_ros2_documentation(nav2_base_url)

Processing docs link: https://docs.nav2.org/
Processing docs link: https://docs.nav2.org/about/index.html
Processing docs link: https://docs.nav2.org/index.html
Processing docs link: https://docs.nav2.org/roadmap/roadmap.html
Processing docs link: https://docs.nav2.org/migration/Jazzy.html
Processing docs link: https://docs.nav2.org/migration/Iron.html
Processing docs link: https://docs.nav2.org/migration/Humble.html
Processing docs link: https://docs.nav2.org/migration/Galactic.html
Processing docs link: https://docs.nav2.org/migration/Foxy.html
Processing docs link: https://docs.nav2.org/migration/Eloquent.html
Processing docs link: https://docs.nav2.org/migration/Dashing.html
Processing docs link: https://docs.nav2.org/migration/index.html
Processing docs link: https://docs.nav2.org/plugins/index.html
Processing docs link: https://docs.nav2.org/tuning/index.html
Processing docs link: https://docs.nav2.org/configuration/packages/configuring-docking-server.html
Processing docs link: h

ClearML Monitor: Could not detect iteration reporting, falling back to iterations as seconds-from-start


In [25]:
##sample of documents and links
documents = list(mongo_collection.find())
i = 0
# 输出所有元素
for doc in documents:
    
    print(doc['source_url'])

https://docs.ros.org/en/rolling/index.html
https://docs.ros.org/en/rolling/index.html
https://docs.ros.org/en/rolling/index.html
https://docs.ros.org/en/rolling/Citations.html
https://docs.ros.org/en/rolling/Glossary.html
https://docs.ros.org/en/rolling/Glossary.html
https://docs.ros.org/en/rolling/Related-Projects/Visualizing-ROS-2-Data-With-Foxglove.html
https://docs.ros.org/en/rolling/Related-Projects/Visualizing-ROS-2-Data-With-Foxglove.html
https://docs.ros.org/en/rolling/Related-Projects/Visualizing-ROS-2-Data-With-Foxglove.html
https://docs.ros.org/en/rolling/Related-Projects/Visualizing-ROS-2-Data-With-Foxglove.html
https://docs.ros.org/en/rolling/Related-Projects/Nvidia-ROS2-Projects.html
https://docs.ros.org/en/rolling/Related-Projects/Nvidia-ROS2-Projects.html
https://docs.ros.org/en/rolling/Related-Projects/Intel-ROS2-Projects.html
https://docs.ros.org/en/rolling/Related-Projects/Intel-ROS2-Projects.html
https://docs.ros.org/en/rolling/Related-Projects.html
https://docs.ros

In [11]:
len(documents)

2273

In [12]:
import json
##Data export:
with open("mongodb_data.json", "w") as file:
    for document in documents:
        file.write(json.dumps(document, default=str) + "\n")


In [None]:
##export the data from json
client = MongoClient("mongodb://localhost:27018/")
db = client["ros2_database"]
collection = db["ros2_documents"] 
qdrant_client = QdrantClient(url="http://localhost:6333")
qdrant_collection_name = "ros2_vectors"

qdrant_client.recreate_collection(
        collection_name=qdrant_collection_name,
        vectors_config=VectorParams(size=384, distance="Cosine"),
    )

with open("mongodb_data.json", "r") as file:
    collection.delete_many({})
    for line in file:
        
        document = json.loads(line)
        collection.insert_one(document)
        vector = vectorize_text(document['content'])
        source_url = document['source_url']
        doc_id  = document['_id']
        print(doc_id)
        qdrant_client.upsert(
        collection_name=qdrant_collection_name,
        points=[
        PointStruct(id=doc_id, vector=vector, payload={"source_url": source_url})
        ],)


Below is to some extent proof of concept youtube content ETL. I download the audio from youtube links and transfer them into text using assemblyai API. I currently have no idea about getting many links about ROS2 video. And it remains in proof of concept

In [42]:
youtube_links = ["https://www.youtube.com/watch?v=0aPbWsyENA8","https://www.youtube.com/watch?v=c5DRTN2b2kY","https://www.youtube.com/watch?v=3GbrKQ7G2P0","https://www.youtube.com/watch?v=iBGZ8LEvkCY"]
youtube_links.append("https://www.youtube.com/watch?v=wfCuPQ_6VbI")
youtube_links.append("https://www.youtube.com/watch?v=MwEXX6a-TWw")
youtube_links.append("https://www.youtube.com/watch?v=Yy4OgGwEAj8")
youtube_links.append("https://www.youtube.com/watch?v=od3JwOeyEXc")
youtube_links.append("https://www.youtube.com/watch?v=DXEnVEQjImo")
youtube_links.append("https://www.youtube.com/watch?v=FSqm0fDfxrk")
youtube_links.append("https://www.youtube.com/watch?v=vCTbUgw6k8U")

In [44]:
youtube_links

['https://www.youtube.com/watch?v=0aPbWsyENA8',
 'https://www.youtube.com/watch?v=c5DRTN2b2kY',
 'https://www.youtube.com/watch?v=3GbrKQ7G2P0',
 'https://www.youtube.com/watch?v=iBGZ8LEvkCY',
 'https://www.youtube.com/watch?v=wfCuPQ_6VbI',
 'https://www.youtube.com/watch?v=MwEXX6a-TWw',
 'https://www.youtube.com/watch?v=Yy4OgGwEAj8',
 'https://www.youtube.com/watch?v=od3JwOeyEXc',
 'https://www.youtube.com/watch?v=DXEnVEQjImo',
 'https://www.youtube.com/watch?v=FSqm0fDfxrk',
 'https://www.youtube.com/watch?v=vCTbUgw6k8U']

In [None]:
import assemblyai as aai
from yt_dlp  import YoutubeDL

# proof-of-concept-youtube video processing, using api from assemblyai
aai.settings.api_key = ""

def download_audio(youtube_url, output_path):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
            'preferredquality': '192',
        }],
        'outtmpl': output_path,
    }
    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([youtube_url])

FILE_URL = "audio.wav.wav"

download_audio(youtube_links[4],"audio.wav")


transcriber = aai.Transcriber()
transcript = transcriber.transcribe(FILE_URL)

if transcript.status == aai.TranscriptStatus.error:
    print(transcript.error)
else:
    print(transcript.text)

[youtube] Extracting URL: https://www.youtube.com/watch?v=wfCuPQ_6VbI
[youtube] wfCuPQ_6VbI: Downloading webpage
[youtube] wfCuPQ_6VbI: Downloading ios player API JSON
[youtube] wfCuPQ_6VbI: Downloading mweb player API JSON
[youtube] wfCuPQ_6VbI: Downloading m3u8 information
[info] wfCuPQ_6VbI: Downloading 1 format(s): 251
[download] Destination: audio.wav
[download] 100% of   22.75MiB in 00:00:11 at 1.96MiB/s   
[ExtractAudio] Destination: audio.wav.wav
Deleting original file audio.wav (pass -k to keep)


In [45]:
result = []
errors = []

for link in youtube_links:
    download_audio(link,"audio.wav")
    transcriber = aai.Transcriber()
    transcript = transcriber.transcribe(FILE_URL)
    if transcript.status == aai.TranscriptStatus.error:
        print(transcript.error)
        errors.append(transcript.error)
    else:
        result.append(transcript.text)

[youtube] Extracting URL: https://www.youtube.com/watch?v=0aPbWsyENA8
[youtube] 0aPbWsyENA8: Downloading webpage
[youtube] 0aPbWsyENA8: Downloading ios player API JSON
[youtube] 0aPbWsyENA8: Downloading mweb player API JSON
[youtube] 0aPbWsyENA8: Downloading m3u8 information
[info] 0aPbWsyENA8: Downloading 1 format(s): 251
[download] Destination: audio.wav
[download] 100% of   11.21MiB in 00:00:04 at 2.35MiB/s   
[ExtractAudio] Destination: audio.wav.wav
Deleting original file audio.wav (pass -k to keep)
[youtube] Extracting URL: https://www.youtube.com/watch?v=c5DRTN2b2kY
[youtube] c5DRTN2b2kY: Downloading webpage
[youtube] c5DRTN2b2kY: Downloading ios player API JSON
[youtube] c5DRTN2b2kY: Downloading mweb player API JSON
[youtube] c5DRTN2b2kY: Downloading m3u8 information
[info] c5DRTN2b2kY: Downloading 1 format(s): 251
[download] Destination: audio.wav
[download] 100% of    9.88MiB in 00:00:04 at 2.08MiB/s   
[ExtractAudio] Destination: audio.wav.wav
Deleting original file audio.wa

In [51]:
def split_text_youtube_audio(text, max_length=2000):
    
    sentences = text.split(".")
    chunks = []
    current_chunk = []

    for sentence in sentences:
        if len(" ".join(current_chunk + [sentence])) <= max_length:
            current_chunk.append(sentence)
        else:
            chunks.append(" ".join(current_chunk))
            current_chunk = [sentence]

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks


for text in result:
    chunks = split_text_youtube_audio(text)
    save_to_mongodb_and_qdrant(chunks,"https://www.youtube.com/playlist?list=PLLSegLrePWgJudpPUof4-nVFHGkB62Izy")


In [53]:
result

 "Hey and welcome back. This is episode number two of this tutorial series, Crash Course on ROS2. You can find the series playlist in the description. And let's get started. In the previous tutorial you have installed and set up ROS2. Now we are going to directly start a ROS2 node and understand what it is. And very Basically put a ROS2 node is simply a ROS2 program that's going to interact with Rust 2 communications and tools. Okay, so I have four terminals here and let's start a node. So where can we find a ROS2 node? Well, actually when you install the ROS2, there are some packages that are actually example packages that you can use directly to start a node without having to create one. So we're going to run Rust 2, okay? So the Rust 2 command line and then space run. And after this you will need to provide the name of a package. So the ROS2 nodes are going to be organized in packages. Okay? For example, you have a package for a camera driver, a package for a navigation of a robot, 