In [1]:
# Importing libraries
from youtube_transcript_api import YouTubeTranscriptApi
from pymongo import MongoClient
import requests
from clearml import Task, PipelineController, PipelineDecorator
import re

In [2]:
# Connecting to MongoDB

# MongoDB Configuration
MONGO_URI = "mongodb://mongoadmin:secret@localhost:27017/"
DB_NAME = "media_data_final"
COLLECTION_NAME = "raw_data_final"

# Connect to MongoDB
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[DB_NAME]
collection = db[COLLECTION_NAME]

Creating a ClearML task to monitor the progress, and using ClearML pipelines to automate the process of fetching data from YouTube, GitHub and Websites

In [3]:
# ClearML Task Setup
Task.init(project_name="ETL Pipeline Main", task_name="Media Ingestion Pipeline")


ClearML Task: created new task id=67d9ee333e6347d9bbcd189f1f03ad2c
2024-12-08 18:54:29,830 - clearml.Task - INFO - Storing jupyter notebook directly as code
ClearML results page: https://app.clear.ml/projects/56f82e0a1a2d4f18a9bd3d2f25c991b8/experiments/67d9ee333e6347d9bbcd189f1f03ad2c/output/log


<clearml.task.Task at 0x7f337d9264a0>

ClearML results page: https://app.clear.ml/projects/56f82e0a1a2d4f18a9bd3d2f25c991b8/experiments/67d9ee333e6347d9bbcd189f1f03ad2c/output/log
ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring
ClearML Monitor: Could not detect iteration reporting, falling back to iterations as seconds-from-start


In [4]:
def clean_text(text):
    text = re.sub(r'<.*?>', '', text)  # Remove HTML tags
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)  # Remove special characters
    return text.strip().lower()

We use YoutubeTranscriptAPI to fetch the transcripts of the youtube vidoes and then we store it into mongodb. We are saving the URL of the video, the video id and the transcript returned by the API

In [5]:
def fetch_youtube_transcripts(url):
    video_id = url.split("v=")[1]
    transcript = YouTubeTranscriptApi.get_transcript(video_id)
    return transcript

In [6]:
def fetch_youtube_data():
    with open('../app/youtube_sources.txt', 'r') as file:
        # Read all lines, strip newline characters, and save to a list
        youtube_links = list(set(line.strip() for line in file))
    print(len(youtube_links))
    
    for url in youtube_links:
        transcript = fetch_youtube_transcripts(url)
        document = {
            "source": "youtube",
            "url": url,
            "video_id": url.split("v=")[1],
            "data": transcript
        }
        collection.insert_one(document)
        print("Data inserted into db for Youtube URL:", url)
    
    print("Saved complete YouTube Data into the DB")
        

The function cleans the content in github files

In [7]:
def clean_rst_content(text):
    # Remove directives (e.g., .. redirect-from::, .. code-block::)
    text = re.sub(r'\.\. .*::.*', '', text)
    
    # Remove list-table and code-block markers
    text = re.sub(r'\.\. (list-table|code-block)::.*', '', text)
    
    # Remove table headers and structure
    text = re.sub(r'\n\s+\* - .*', '', text)
    
    # Convert headings (e.g., ===, ---) to plain text
    text = re.sub(r'\n[=~-]{2,}\n', '\n', text)
    
    # Remove HTML-like tags (e.g., `link <url>`__)
    text = re.sub(r'`[^`]*?`__', '', text)
    
    # Remove backticks used for inline code
    text = re.sub(r'`([^`]*?)`', r'\1', text)
    
    # Remove extra newlines and trim spaces
    text = re.sub(r'\n{2,}', '\n', text).strip()
    
    # Remove leading/trailing spaces around lines
    text = "\n".join(line.strip() for line in text.splitlines())
    
    return text

In [8]:
def extract_and_clean(data):
    # Extract the 'content' field
    raw_content = data.get('content', '')
    
    # Clean the raw content
    cleaned_content = clean_rst_content(raw_content)
    
    return {'path': data.get('path', ''), 'content': cleaned_content}

Below, we scrape github repositories for any ReadME files, text files and RST files. Given the base link of the repo, we iteratively check every file in the repo and if its one of the types we need, we scrape the content and save it into MongoDB collection.

In [9]:
def scrape_github_repo_to_text(repo_url, access_token=None, allowed_extensions=None):
    # Extract owner and repository name from URL
    repo_name = repo_url.rstrip('/').split('/')[-1]
    owner_name = repo_url.rstrip('/').split('/')[-2]
    
    if allowed_extensions is None:
        allowed_extensions = [".txt", ".rst", ".md"]
        
    headers = {'Authorization': f'token {access_token}'} if access_token else {}
    
    # GitHub API URLs
    repo_api_url = f"https://api.github.com/repos/{owner_name}/{repo_name}"
    contents_api_url = f"{repo_api_url}/contents"
    
    # Get repository metadata
    repo_metadata = requests.get(repo_api_url, headers=headers).json()
    if 'message' in repo_metadata and 'Not Found' in repo_metadata['message']:
        print("Repository not found. Check the URL or your access token.")
        return
    
    print("Repository Metadata:")
    print("Repo metadata:", repo_metadata)
    
    # Recursive function to get all files and their content
    all_files = []
    file_names = []
    def fetch_contents(api_url, path=""):
        contents = requests.get(api_url, headers=headers).json()
        for item in contents:
                if isinstance(item, dict) and 'type' in item:
                    if item['type'] == 'file':
                        if any(item['path'].endswith(ext) for ext in allowed_extensions):
                            file_names.append(item['path'])
                            file_content = requests.get(item['download_url'], headers=headers).text
                            all_files.append({
                                "path": f"{path}/{item['name']}",
                                "content": file_content
                            })
                    elif item['type'] == 'dir':
                        fetch_contents(item['_links']['self'], path=f"{path}/{item['name']}")
                else:
                    print("Item not dict:", item)

    fetch_contents(contents_api_url)

    print(f"Fetched {len(all_files)} files from the repository.")
    return all_files

In [10]:
def fetch_github_data():
    access_token = "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"
    with open('../app/github_sources.txt', 'r') as file:
        # Read all lines, strip newline characters, and save to a list
        github_links = list(set(line.strip() for line in file))
    print(len(github_links))
    print(github_links)
    for link in github_links:
        repo_data = scrape_github_repo_to_text(link, access_token)
        for file in repo_data:
            cleaned_data = extract_and_clean(file)
            document = {
            "source": "github",
            "url": link,
            "path": cleaned_data.get('path', ''),
            "data": cleaned_data.get('content','')
            }
            collection.insert_one(document)
        print("Saved data into mongodb for repository:", link)
        
        
    print("Saved complete GitHub Data into the DB, number of repositories persisted:", len(github_links))

Helper function to query all the unique URLs that have been scraped

In [11]:
def query_ingested_urls():
    unique_urls = list(collection.distinct("url"))
    print("Number of ingested URLS:", len(unique_urls))
    print("Ingested URLs:")
    for entry in unique_urls:
        print(entry)

In [12]:
fetch_youtube_data()


11
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=96XsJ7xfsS8
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=ERhXoIn7kr4
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=ntJkRO_Z41I
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=3HuV1M1NMB8
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=Lm1ediRG5JA
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=laWn7_cj434
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=dxcU-_PGZdw
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=_qQAfTmB5wc
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=bp7MAZh4lJA
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=8QfI5a7lTKU
Data inserted into db for Youtube URL: https://www.youtube.com/watch?v=jkoGkAd0GYk
Saved complete YouTube Data into the DB


ClearML results page: https://app.clear.ml/projects/56f82e0a1a2d4f18a9bd3d2f25c991b8/experiments/3d1002c7f85a49db9897528f3444dbf0/output/log
ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring
ClearML Monitor: Could not detect iteration reporting, falling back to iterations as seconds-from-start


In [13]:
fetch_github_data()


7
['https://github.com/gazebosim/ros_gz', 'https://github.com/moveit/moveit_tutorials', 'https://github.com/ros-navigation/navigation2', 'https://github.com/ros2/ros2_documentation', 'https://github.com/gazebosim/docs', 'https://github.com/ros-navigation/docs.nav2.org', 'https://github.com/moveit/moveit2']
Repository Metadata:
Repo metadata: {'id': 143215610, 'node_id': 'MDEwOlJlcG9zaXRvcnkxNDMyMTU2MTA=', 'name': 'ros_gz', 'full_name': 'gazebosim/ros_gz', 'private': False, 'owner': {'login': 'gazebosim', 'id': 1743799, 'node_id': 'MDEyOk9yZ2FuaXphdGlvbjE3NDM3OTk=', 'avatar_url': 'https://avatars.githubusercontent.com/u/1743799?v=4', 'gravatar_id': '', 'url': 'https://api.github.com/users/gazebosim', 'html_url': 'https://github.com/gazebosim', 'followers_url': 'https://api.github.com/users/gazebosim/followers', 'following_url': 'https://api.github.com/users/gazebosim/following{/other_user}', 'gists_url': 'https://api.github.com/users/gazebosim/gists{/gist_id}', 'starred_url': 'https://a

In [14]:

query_ingested_urls()

Number of ingested URLS: 18
Ingested URLs:
https://github.com/gazebosim/docs
https://github.com/gazebosim/ros_gz
https://github.com/moveit/moveit2
https://github.com/moveit/moveit_tutorials
https://github.com/ros-navigation/docs.nav2.org
https://github.com/ros-navigation/navigation2
https://github.com/ros2/ros2_documentation
https://www.youtube.com/watch?v=3HuV1M1NMB8
https://www.youtube.com/watch?v=8QfI5a7lTKU
https://www.youtube.com/watch?v=96XsJ7xfsS8
https://www.youtube.com/watch?v=ERhXoIn7kr4
https://www.youtube.com/watch?v=Lm1ediRG5JA
https://www.youtube.com/watch?v=_qQAfTmB5wc
https://www.youtube.com/watch?v=bp7MAZh4lJA
https://www.youtube.com/watch?v=dxcU-_PGZdw
https://www.youtube.com/watch?v=jkoGkAd0GYk
https://www.youtube.com/watch?v=laWn7_cj434
https://www.youtube.com/watch?v=ntJkRO_Z41I


Below, we use BeautifulSoup to scrape and parse the ROS2 and Gazebo documentation from the web pages

In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin


def scrape_gazebo_docs(base_url, max_pages=10):
    visited_urls = set()
    to_visit = []
    scraped_data = []

    def fetch_navigation_links(url):
        """Fetch links from the navigation menu."""
        print(f"Fetching navigation links from: {url}")
        response = requests.get(url)
        if response.status_code != 200:
            print(f"Failed to fetch {url}")
            return []

        soup = BeautifulSoup(response.text, 'html.parser')

        # Find the navigation menu links
        navigation_div = soup.find('div', class_='bd-toc-item')
        if not navigation_div:
            print("Navigation menu not found.")
            return []

        # Extract all anchor links within the navigation menu
        links = [
            urljoin(base_url, a['href'])
            for a in navigation_div.find_all('a', href=True)
            if a['href'] and not a['href'].startswith('#')  # Exclude fragment identifiers
        ]

        return links

    def fetch_page_content(url):
        """Fetch and parse the content of a single page."""
        print(f"Fetching page content from: {url}")
        response = requests.get(url)
        if response.status_code != 200:
            print(f"Failed to fetch {url}")
            return None

        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.find('title').text.strip() if soup.find('title') else "No title"

        # Extract main content from <div role="main">
        content_div = soup.find('article', class_='bd-article')
        if content_div:
            # Extract all text while stripping HTML tags
            content_text = content_div.get_text(separator="\n", strip=True)
        else:
            content_text = "No content available."

        return {"url": url, "title": title, "content": content_text}

    # Start by fetching navigation links from the base URL
    to_visit.extend(fetch_navigation_links(base_url))

    for url in to_visit[:max_pages]:
        if url in visited_urls:
            continue

        page_data = fetch_page_content(url)
        if page_data:
            scraped_data.append(page_data)
            visited_urls.add(url)

    return scraped_data

def scrape_ros_docs(base_url, max_pages=10):
    visited_urls = set()
    to_visit = []
    scraped_data = []

    def fetch_navigation_links(url):
        """Fetch links from the navigation menu."""
        print(f"Fetching navigation links from: {url}")
        response = requests.get(url)
        if response.status_code != 200:
            print(f"Failed to fetch {url}")
            return []

        soup = BeautifulSoup(response.text, 'html.parser')

        # Find the navigation menu links
        navigation_div = soup.find('div', class_='wy-menu wy-menu-vertical')
        if not navigation_div:
            print("Navigation menu not found.")
            return []

        # Extract all anchor links within the navigation menu
        links = [
            urljoin(base_url, a['href'])
            for a in navigation_div.find_all('a', href=True)
            if a['href'] and not a['href'].startswith('#')  # Exclude fragment identifiers
        ]
        return links

    def fetch_page_content(url):
        """Fetch and parse the content of a single page."""
        print(f"Fetching page content from: {url}")
        response = requests.get(url)
        if response.status_code != 200:
            print(f"Failed to fetch {url}")
            return None

        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.find('title').text.strip() if soup.find('title') else "No title"

        # Extract main content from <div role="main">
        content_div = soup.find('div', {'role': 'main'})
        if content_div:
            # Extract all text while stripping HTML tags
            content_text = content_div.get_text(separator="\n", strip=True)
        else:
            content_text = "No content available."

        return {"url": url, "title": title, "content": content_text}

    # Start by fetching navigation links from the base URL
    to_visit.extend(fetch_navigation_links(base_url))

    for url in to_visit[:max_pages]:
        if url in visited_urls:
            continue

        page_data = fetch_page_content(url)
        if page_data:
            scraped_data.append(page_data)
            visited_urls.add(url)

    return scraped_data


# Usage
base_urls = ["https://docs.ros.org/en/galactic/index.html", "https://docs.nav2.org/getting_started/index.html", "https://docs.ros.org/en/kinetic/api/moveit_tutorials/html/index.html"]
scraped_docs = []
for base_url in base_urls:
    # scraped_docs = scrape_ros_docs(base_url, max_pages=-1)
    scraped_docs.extend(scrape_ros_docs(base_url, max_pages=-1))

gazebo_base_url = "https://gazebosim.org/docs/latest/getstarted/"
gazebo_docs = scrape_gazebo_docs(gazebo_base_url, max_pages=-1)
scraped_docs.extend(gazebo_docs)

# Print scraped data
# for doc in scraped_docs:
#     print(f"Title: {doc['title']}")
#     print(f"URL: {doc['url']}")
#     print(f"Content Snippet: {doc['content'][:1000]}...")  # Print first 1000 characters of content text
#     print()


Fetching navigation links from: https://docs.ros.org/en/galactic/index.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/Ubuntu-Install-Debians.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/Windows-Install-Binary.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/RHEL-Install-RPMs.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/Alternatives.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/Alternatives/Ubuntu-Development-Setup.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/Alternatives/Ubuntu-Install-Binary.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/Alternatives/Windows-Development-Setup.html
Fetching page content from: https://docs.ros.org/en/galactic/Installation/Alternatives/RHEL-Development

In [None]:
len(scraped_docs)

1343

In [None]:
for doc in scraped_docs:
    document = {
        "source": "web",
        "url": doc.get('url', ''),
        "title": doc.get('title', ''),
        "content": doc.get('content', '')
    }
    collection.insert_one(document)
    print(f"Saved data into MongoDB for URL: {doc.get('url', '')}")

Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/Ubuntu-Install-Debians.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/Windows-Install-Binary.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/RHEL-Install-RPMs.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/Alternatives.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/Alternatives/Ubuntu-Development-Setup.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/Alternatives/Ubuntu-Install-Binary.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/Alternatives/Windows-Development-Setup.html
Saved data into MongoDB for URL: https://docs.ros.org/en/galactic/Installation/Alternatives/RHEL-Development-Setup.html
Saved data into Mon

We Create the Pipeline Controller, adding the functions as steps. Adding the parent attribute ensures that the function is called only after the parent task is completed.

In [None]:
# ClearML Pipeline Controller
pipeline = PipelineController(
    project="RAG System ETL Pipeline",
    name="ROS2 Media ETL",
    version="1.0",
)

# Add pipeline steps

pipeline.add_function_step(
    name="Fetch YouTube Data",
    function=fetch_youtube_data,
    execution_queue="default"
)

pipeline.add_function_step(
    name="Fetch GitHub Data",
    function=fetch_github_data,
    parents=["Fetch YouTube Data"],
    execution_queue="default"
)

pipeline.add_function_step(
    name="Fetch ROS Web Documentation",
    function=scrape_ros_docs,
    parents=["Fetch GitHub Data"],
    execution_queue="default"
)

pipeline.add_function_step(
    name="Fetch Gazebo Web Documentation",
    function=scrape_gazebo_docs,
    parents=["Fetch ROS Web Documentation"],
    execution_queue="default"
)

pipeline.add_function_step(
    name="Query Ingested URLs",
    function=query_ingested_urls,
    parents=["Fetch Gazebo Web Documentation"],
    execution_queue="default"
)
    
if __name__ == "__main__":
    pipeline.start_locally()
