In [None]:
from zenml.steps import step

## Medium - ETL



In [26]:
import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient
import re
from clearml import Task
from clearml import Logger

# MongoDB connection parameters
MONGO_URI = "mongodb://localhost:27017/"
DATABASE_NAME = "req_data"
COLLECTION_NAME = "medium"

def extract(link: str) -> str:
    """Extract the HTML content from the Medium link."""
    try:
        response = requests.get(link)
        response.raise_for_status()
        return response.text
    except requests.RequestException as e:
        print(f"Error fetching {link}: {e}")
        return None

def clean_text(text):
    """Clean and normalize the text."""
    # Remove multiple spaces, tabs, and newlines
    text = re.sub(r'\s+', ' ', text)
    # Remove non-printable characters
    text = re.sub(r'[^\x20-\x7E]', '', text)
    # Trim leading and trailing spaces
    return text.strip()

def transform(html_content: str, link: str) -> dict:
    """Parse the HTML and extract metadata and cleaned content."""
    soup = BeautifulSoup(html_content, "html.parser")
    
    # Extract metadata
    title = soup.find("title").text if soup.find("title") else "No title"
    author = soup.find("meta", {"name": "author"})
    author_name = author["content"] if author else "Unknown author"
    publication_date = soup.find("time")["datetime"] if soup.find("time") else "Unknown date"
    
    # Extract and clean article content
    article_content = ""
    article_body = soup.find("article")
    if article_body:
        for script_or_style in article_body(["script", "style"]):
            script_or_style.extract()  # Remove script and style tags
        paragraphs = article_body.find_all("p")
        article_content = "\n".join(clean_text(p.text) for p in paragraphs if p.text)
    
    # Construct data
    data = {
        "metadata": {
            "type": "Medium",
            "url": link,
            "title": clean_text(title),
            "author": clean_text(author_name),
            "publication_date": clean_text(publication_date),
        },
        "content": article_content
    }
    return data

def load(data: dict, mongo_uri: str, db_name: str, collection_name: str):
    """Store the extracted data in MongoDB."""
    if data and data.get("content"):  # Ensure content is not empty
        client = MongoClient(mongo_uri)
        db = client[db_name]
        collection = db[collection_name]
        collection.insert_one(data)
        print(f"Inserted article: {data['metadata']['url']}")
    else:
        print("No content to insert.")

def medium_etl_pipeline(medium_links: list, mongo_uri: str, db_name: str, collection_name: str):
    """The full ETL pipeline."""
    for link in medium_links:
        # Start a new task for each link
        task = Task.init(project_name="ETL_Project", task_name=f"ETL_Task_{link}", task_type=Task.TaskTypes.optimizer)
        
        # Extract HTML content
        html_content = extract(link)
        
        # Transform HTML content
        data = transform(html_content, link)
        
        # Load data into MongoDB
        load(data, mongo_uri, db_name, collection_name)
        
        # Log any relevant output for later review
        task.get_logger().report_text(f"Inserted article: {data['metadata']['url']}")
        
        task.close()  # Finish task

if __name__ == "__main__":
    # Define the Medium links
    medium_links = [
        "https://medium.com/schmiedeone/getting-started-with-ros2-part-1-d4c3b7335c71",
        "https://medium.com/@nullbyte.in/ros2-from-the-ground-up-part-1-an-introduction-to-the-robot-operating-system-4c2065c5e032",
        "https://medium.com/@tetraengnrng/a-beginners-guide-to-ros2-29721dcf49c8"
    ]
    
    # Run the pipeline
    medium_etl_pipeline(medium_links, mongo_uri=MONGO_URI, db_name=DATABASE_NAME, collection_name=COLLECTION_NAME)

ClearML Task: created new task id=5ec58e0b4d7449bcb3fac7a12c60922d
2024-12-05 22:04:37,038 - clearml.Task - INFO - Storing jupyter notebook directly as code
ClearML results page: https://app.clear.ml/projects/362ab8cfa3e343c8bf274f5953c0bcbc/experiments/5ec58e0b4d7449bcb3fac7a12c60922d/output/log
Inserted article: https://medium.com/schmiedeone/getting-started-with-ros2-part-1-d4c3b7335c71
Inserted article: https://medium.com/schmiedeone/getting-started-with-ros2-part-1-d4c3b7335c71
2024-12-05 22:04:38,276 - clearml.Task - INFO - Waiting for repository detection and full package requirement analysis
2024-12-05 22:04:38,531 - clearml.Task - INFO - Finished repository detection and package analysis


Could not read Jupyter Notebook: No module named 'nbconvert'
Please install nbconvert using "pip install nbconvert"


ClearML Task: created new task id=7228d772b4004ee0a73d5c4587b73716
ClearML results page: https://app.clear.ml/projects/362ab8cfa3e343c8bf274f5953c0bcbc/experiments/7228d772b4004ee0a73d5c4587b73716/output/log
Inserted article: https://medium.com/@nullbyte.in/ros2-from-the-ground-up-part-1-an-introduction-to-the-robot-operating-system-4c2065c5e032
Inserted article: https://medium.com/@nullbyte.in/ros2-from-the-ground-up-part-1-an-introduction-to-the-robot-operating-system-4c2065c5e032
ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring


Could not read Jupyter Notebook: No module named 'nbconvert'
Please install nbconvert using "pip install nbconvert"


ClearML Task: created new task id=1ff27df1338a4723b25deafbfd8d70d2
ClearML results page: https://app.clear.ml/projects/362ab8cfa3e343c8bf274f5953c0bcbc/experiments/1ff27df1338a4723b25deafbfd8d70d2/output/log
Inserted article: https://medium.com/@tetraengnrng/a-beginners-guide-to-ros2-29721dcf49c8
Inserted article: https://medium.com/@tetraengnrng/a-beginners-guide-to-ros2-29721dcf49c8
ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring


# Documentation ETL

In [47]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from pymongo import MongoClient
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin
import re

# MongoDB Configuration
MONGO_URI = "mongodb://localhost:27017"
DATABASE_NAME = "RAG_data"

client = MongoClient(MONGO_URI)
db = client[DATABASE_NAME]

# DOCUMENTATION_SITES = [
#     {"base_url": "https://gazebosim.org/docs/latest/getstarted/", "collection_name": "gazebo_documentation", "domain": "gazebosim.org/docs"},
#     {"base_url": "https://moveit.picknik.ai/main/index.html", "collection_name": "moveit_documentation", "domain": "moveit.picknik.ai"},
# ]

DOCUMENTATION_SITES = [
    {"base_url": "https://gazebosim.org/docs/latest/getstarted/", "collection_name": "gazebo_documentation", "domain": "gazebosim.org/docs"},
    {"base_url": "https://moveit.picknik.ai/main/index.html", "collection_name": "moveit_documentation", "domain": "moveit.picknik.ai"},
    {"base_url": "https://docs.ros.org/en/foxy/index.html", "collection_name": "ros2_documentation", "domain": "docs.ros.org"},
    {"base_url": "https://docs.nav2.org/", "collection_name": "nav2_documentation", "domain": "docs.nav2.org"},
    
    ]

def get_selenium_driver():
    options = webdriver.ChromeOptions()
    options.add_argument("--headless")  # Run in headless mode
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)
    return driver

def fetch_page(driver, url):
    driver.get(url)
    time.sleep(2)  # Wait for the page to load fully
    return driver.page_source

# Clean text utility
def clean_text(text):
    """Clean and normalize text content."""
    text = re.sub(r'\s+', ' ', text)  # Replace multiple spaces/newlines with a single space
    text = re.sub(r'[^\x20-\x7E]', '', text)  # Remove non-printable characters
    text = re.sub(r"http\S+", "", text)  # Remove URLs
    return text.strip()

# Extract links and clean content from a page
def extract_links_and_content_from_page(soup, base_url, domain_filter):
    links = []
    content = ""

    # Remove unwanted tags
    for script_or_style in soup(["script", "style", "noscript"]):
        script_or_style.extract()

    # Extract the main page content and clean it
    raw_content = soup.get_text(strip=True)
    content = clean_text(raw_content)  # Clean the extracted text

    # Find and normalize all links
    for a_tag in soup.find_all('a', href=True):
        link = a_tag['href']
        normalized_link = urljoin(base_url, link)  # Handle absolute and relative links
        if domain_filter in normalized_link:  # Only keep links from the target domain
            links.append(normalized_link)

    return links, content

def scrape_documentation(base_url, collection_name, domain_filter):
    collection = db[collection_name]  # Use a different collection for each site
    driver = get_selenium_driver()
    
    try:
        print(f"Starting to scrape: {base_url}")

        # Fetch and process the base page
        page_source = fetch_page(driver, base_url)
        soup = BeautifulSoup(page_source, 'html.parser')

        # Extract links and content
        links_to_scrape, base_page_content = extract_links_and_content_from_page(soup, base_url, domain_filter)

        # Save the base page content
        data = {
            "metadata": {
                "url": base_url,
                "type": collection_name,
            },
            "content": base_page_content,
        }
        collection.insert_one(data)
        print(f"Saved: {base_url}")

        # Iterate through links to scrape additional pages
        for link in links_to_scrape:
            try:
                page_source = fetch_page(driver, link)
                soup = BeautifulSoup(page_source, 'html.parser')

                # Extract content for each linked page
                _, link_content = extract_links_and_content_from_page(soup, link, domain_filter)

                # Save the linked page content
                data = {
                    "metadata": {
                        "url": link,
                        "type": collection_name,
                    },
                    "content": link_content,
                }
                collection.insert_one(data)
                print(f"Saved: {link}")
            except Exception as e:
                print(f"Error processing {link}: {e}")

    finally:
        driver.quit()

# Scrape each site in DOCUMENTATION_SITES
for site in DOCUMENTATION_SITES:
    scrape_documentation(site["base_url"], site["collection_name"], site["domain"])

print("Scraping, cleaning, and data saving completed for all documentation sites.")

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

Starting to scrape: https://gazebosim.org/docs/latest/getstarted/
Saved: https://gazebosim.org/docs/latest/getstarted/
Saved: https://gazebosim.org/docs/latest/getstarted/#main-content
Saved: https://gazebosim.org/docs/latest/
Saved: https://gazebosim.org/docs
Saved: https://gazebosim.org/docs/jetty/getstarted/
Saved: https://gazebosim.org/docs/ionic/getstarted/
Saved: https://gazebosim.org/docs/harmonic/getstarted/
Saved: https://gazebosim.org/docs/garden/getstarted/
Saved: https://gazebosim.org/docs/fortress/getstarted/
Saved: https://gazebosim.org/docs/edifice/getstarted/
Saved: https://gazebosim.org/docs/dome/getstarted/
Saved: https://gazebosim.org/docs/citadel/getstarted/
Saved: https://gazebosim.org/docs/blueprint/getstarted/
Saved: https://gazebosim.org/docs/acropolis/getstarted/
Saved: https://gazebosim.org/docs
Saved: https://gazebosim.org/docs/latest/getstarted/
Saved: https://gazebosim.org/docs/latest/install/
Saved: https://gazebosim.org/docs/latest/install_ubuntu/
Saved: 

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

Starting to scrape: https://moveit.picknik.ai/main/index.html
Saved: https://moveit.picknik.ai/main/index.html
Saved: https://moveit.picknik.ai/main/index.html
Saved: https://moveit.picknik.ai/main/doc/tutorials/tutorials.html
Saved: https://moveit.picknik.ai/main/doc/examples/examples.html
Saved: https://moveit.picknik.ai/main/doc/concepts/concepts.html
Saved: https://moveit.picknik.ai/main/doc/how_to_guides/how_to_guides.html
Saved: https://moveit.picknik.ai/main/doc/api/api.html
Saved: https://moveit.picknik.ai/main/doc/how_to_contribute/how_to_contribute.html
Saved: https://moveit.picknik.ai/main/index.html
Saved: https://moveit.picknik.ai/main/index.html
Saved: https://moveit.picknik.ai/main/index.html#moveit-2-documentation
Saved: https://moveit.picknik.ai/main/index.html#how-to-use-this-website
Saved: https://moveit.picknik.ai/main/doc/tutorials/tutorials.html
Saved: https://moveit.picknik.ai/main/doc/how_to_guides/how_to_guides.html
Saved: https://moveit.picknik.ai/main/doc/con

KeyboardInterrupt: 

# Reddit - ETL

In [None]:
import praw
from pymongo import MongoClient
import re

# MongoDB Configuration
MONGO_URI = "mongodb://localhost:27017"
DATABASE_NAME = "req_data"
COLLECTION_NAME = "reddit"

# Reddit API Configuration
REDDIT_CLIENT_ID = "9Hjzq9IuhgEzlfLwGnT9YA"
REDDIT_CLIENT_SECRET = "y63xI-407llXiRNVWwgpO97r1M7AuQ"
REDDIT_USER_AGENT = "ros2_rag"

# Clean text utility
def clean_text(text):
    """Clean and normalize Reddit post text."""
    text = re.sub(r"\s+", " ", text)  # Replace multiple spaces/newlines with a single space
    text = re.sub(r"[^\x20-\x7E]", "", text)  # Remove non-printable characters
    text = re.sub(r"http\S+", "", text)  # Remove URLs
    text = text.strip()  # Trim leading/trailing whitespace
    return text

# Get Reddit posts
def get_reddit_posts(subreddit, keyword, limit=10):
    reddit = praw.Reddit(
        client_id=REDDIT_CLIENT_ID,
        client_secret=REDDIT_CLIENT_SECRET,
        user_agent=REDDIT_USER_AGENT
    )
    posts = reddit.subreddit(subreddit).search(keyword, limit=limit)
    return posts

# Transform and clean post data
def transform_and_store_post_data(post, subreddit, keyword):
    metadata = {
        "type": "reddit",
        "subreddit": subreddit,
        "keyword": keyword,
        "url": f"https://reddit.com{post.permalink}",
    }
    # Clean the post content
    content = clean_text(post.selftext)
    
    return {"metadata": metadata, "content": content}

# Load cleaned data into MongoDB
def load_data_to_mongodb(data):
    client = MongoClient(MONGO_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]
    collection.insert_one(data)
    print(f"Ingested Reddit post data: {data['metadata']['url']}")

# ETL Process for Reddit posts
def etl_reddit_posts(configurations, limit=10):
    try:
        for config in configurations:
            subreddit = config["subreddit"]
            keyword = config["keyword"]
            posts = get_reddit_posts(subreddit, keyword, limit)
            for post in posts:
                transformed_data = transform_and_store_post_data(post, subreddit, keyword)
                if transformed_data["content"]:  # Ensure there's valid content
                    load_data_to_mongodb(transformed_data)
                else:
                    print(f"Skipped empty post: {post.permalink}")
    except Exception as e:
        print(f"Error fetching Reddit posts: {e}")

# Configuration for multiple keywords and subreddits
configurations = [
    {"subreddit": "ROS", "keyword": "ROS2"},
    {"subreddit": "ROS", "keyword": "nav2"},
    {"subreddit": "ROS", "keyword": "gazebo"},
    {"subreddit": "ROS", "keyword": "moveit"},
]

# Execute the ETL pipeline
etl_reddit_posts(configurations, limit=20)

[1;35mInitiating a new run for the pipeline: [0m[1;36mreddit_etl_pipeline[1;35m.[0m
[1;35mRegistered new pipeline: [0m[1;36mreddit_etl_pipeline[1;35m.[0m


# Stack overflow - ETL

In [69]:
from pymongo import MongoClient
import requests
import re

# MongoDB Configuration
MONGO_URI = "mongodb://localhost:27017"
DATABASE_NAME = "req_data"
COLLECTION_NAME = "stackoverflow"

# Stack Overflow API Configuration
STACKOVERFLOW_API_BASE = "https://api.stackexchange.com/2.3/questions"
STACKOVERFLOW_API_PARAMS = {
    "site": "stackoverflow",
    "filter": "withbody"  # Includes question body in response
}

# Function to fetch Stack Overflow question data
def fetch_stackoverflow_data(question_url):
    """
    Fetch question data from Stack Overflow API based on the question URL.
    """
    # Extract the question ID from the URL
    match = re.search(r"/questions/(\d+)", question_url)
    if not match:
        raise ValueError(f"Invalid Stack Overflow URL format: {question_url}")
    
    question_id = match.group(1)
    url = f"{STACKOVERFLOW_API_BASE}/{question_id}"
    
    # API request
    response = requests.get(url, params=STACKOVERFLOW_API_PARAMS)
    if response.status_code == 200:
        items = response.json().get("items", [])
        if items:
            return items[0]  # Return the first item (question data)
        else:
            raise ValueError(f"No data found for question ID {question_id}")
    else:
        raise RuntimeError(f"Failed to fetch data for question ID {question_id}: {response.status_code} {response.text}")

# Function to clean and transform data
def clean_content(content):
    """
    Clean and normalize the content by:
    - Removing HTML tags.
    - Collapsing multiple spaces/newlines.
    """
    content = re.sub(r"<[^>]*>", "", content)  # Remove HTML tags
    content = re.sub(r"\s+", " ", content)  # Collapse multiple spaces/newlines
    return content.strip()

def transform_data(question_data, url):
    """
    Transform the fetched question data into a MongoDB-friendly format.
    """
    metadata = {
        "type": "Stackoverflow",
        "url": url,
        "title": question_data.get("title", "Untitled")
    }
    content = clean_content(question_data.get("body", "No content available"))
    return {"metadata": metadata, "content": content}

# Function to load data into MongoDB
def load_data_to_mongodb(data):
    """
    Load the transformed data into MongoDB.
    """
    client = MongoClient(MONGO_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]
    collection.insert_one(data)
    print(f"Ingested data for question: {data['metadata']['title']}")

# ETL Process for Stack Overflow URLs
def etl_stackoverflow_data(urls):
    """
    Perform the ETL process for a list of Stack Overflow question URLs.
    """
    for url in urls:
        try:
            print(f"Processing URL: {url}")
            # Fetch, transform, and load data
            question_data = fetch_stackoverflow_data(url)
            transformed_data = transform_data(question_data, url)
            load_data_to_mongodb(transformed_data)
        except Exception as e:
            print(f"Error processing {url}: {e}")

# List of Stack Overflow question URLs
urls = [
    "https://stackoverflow.com/questions/57426715/import-modules-in-package-in-ros2",
    "https://stackoverflow.com/questions/51187676/whats-the-difference-between-ros2-and-dds",
    "https://stackoverflow.com/questions/68771051/ros2-pub-sub-custom-message-through-ros2-web-bridge-to-client-app",
]

# Run the ETL process
etl_stackoverflow_data(urls)


Processing URL: https://stackoverflow.com/questions/57426715/import-modules-in-package-in-ros2
Ingested data for question: Import modules in package in ROS2
Processing URL: https://stackoverflow.com/questions/51187676/whats-the-difference-between-ros2-and-dds
Ingested data for question: What&#39;s the difference between ROS2 and DDS?
Processing URL: https://stackoverflow.com/questions/68771051/ros2-pub-sub-custom-message-through-ros2-web-bridge-to-client-app
Ingested data for question: ros2 pub/sub custom message through ros2-web-bridge to client app


# Youtube ETL

In [None]:
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

## Get youtube service

In [2]:
import os
import pickle
import re
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.auth.transport.requests import Request
from pymongo import MongoClient

# MongoDB Configuration
MONGO_URI = "mongodb://localhost:27017"
DATABASE_NAME = "req_data"
COLLECTION_NAME = "youtube_captions"

# OAuth2 YouTube API Configuration
CLIENT_SECRETS_FILE = "client_secret.json"  # Path to your OAuth2 credentials JSON file
SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"]

# Function to get authenticated YouTube service
# def get_authenticated_service():
#     creds = None
#     if os.path.exists("token.pickle"):
#         with open("token.pickle", "rb") as token:
#             creds = pickle.load(token)

#     if not creds or not creds.valid:
#         if creds and creds.expired and creds.refresh_token:
#             creds.refresh(Request())
#         else:
#             flow = InstalledAppFlow.from_client_secrets_file(
#                 CLIENT_SECRETS_FILE, SCOPES
#             )
#             creds = flow.run_local_server(port=54510)

#         with open("token.pickle", "wb") as token:
#             pickle.dump(creds, token)

#     youtube = build("youtube", "v3", credentials=creds)
#     return youtube


from googleapiclient.discovery import build

# API Key for YouTube Data API
API_KEY = "AIzaSyCcJEWk7ULyidLRC-BpkIaCW3Mh-Uz7oLk"  # Replace with your actual API key

# Function to get YouTube service using API Key
def get_youtube_service_with_api_key():
    youtube = build("youtube", "v3", developerKey=API_KEY)
    return youtube

# Set up the YouTube API client using OAuth2
youtube = get_youtube_service_with_api_key()

## Get list of youtube ids from playlist urls

In [3]:
import re

def get_video_ids_from_playlists(youtube, playlist_urls):
    """
    Fetches a list of video IDs for all videos in a list of YouTube playlists.

    Args:
        youtube: Authenticated YouTube API client.
        playlist_urls (list): List of YouTube playlist URLs.

    Returns:
        list: A combined list of video IDs from all provided playlists.
    """
    def extract_playlist_id_from_url(url):
        """Extract playlist ID from a YouTube playlist URL."""
        match = re.search(r"list=([a-zA-Z0-9_-]+)", url)
        if match:
            return match.group(1)
        else:
            raise ValueError(f"Invalid YouTube playlist URL format: {url}")

    def get_video_ids_from_single_playlist(youtube, playlist_id):
        """Fetch video IDs from a single playlist."""
        video_ids = []
        playlist_items = youtube.playlistItems().list(
            part="snippet",
            playlistId=playlist_id,
            maxResults=50
        ).execute()

        while playlist_items:
            for item in playlist_items["items"]:
                video_id = item["snippet"]["resourceId"]["videoId"]
                video_ids.append(video_id)

            if "nextPageToken" in playlist_items:
                playlist_items = youtube.playlistItems().list(
                    part="snippet",
                    playlistId=playlist_id,
                    maxResults=50,
                    pageToken=playlist_items["nextPageToken"]
                ).execute()
            else:
                break

        return video_ids

    all_video_ids = []
    for url in playlist_urls:
        try:
            playlist_id = extract_playlist_id_from_url(url)
            print(f"Fetching videos from playlist: {playlist_id}")
            video_ids = get_video_ids_from_single_playlist(youtube, playlist_id)
            all_video_ids.extend(video_ids)
        except Exception as e:
            print(f"Error processing playlist {url}: {e}")

    return all_video_ids

In [4]:
playlist_urls = ["https://www.youtube.com/watch?v=C6eQ6VwTpxk&list=PLSK7NtBWwmpTS_YVfjeN3ZzIxItI1P_Sr&ab_channel=KevinWood%7CRobotics%26AI"]
video_urls = get_video_ids_from_playlists(youtube, playlist_urls)

Fetching videos from playlist: PLSK7NtBWwmpTS_YVfjeN3ZzIxItI1P_Sr


In [5]:
print(video_urls)

['C6eQ6VwTpxk', '7FKi-waQuMM', 'dJLBLb0IXdw', '72a-wJ2k25A', 'sWw69pIiMz0', 'oVOR74D8A3U', 'nsbgIys0_oc', 'PcO-sTuP8zg', 'QQLOk8l2lEo', 'zNxCqBKKbGM', 'JNM2qIhseiU', '9Myw-9UQxPw', 'mFCundd5s-Q', 'KLvUMtYI_Ag', '4zGUDisw4UI', 'rGsyQHwWObA', 'lDSrqQM85zA', 'EOZNdm00-Cw', 'NX_lgA3cDB4', 'pT7OtvMdpo8', 'BXW3DDD70KA', 'rb_jxfI2rwc', 'vMgiBxPNIoo', '_A_LRRIxpLg', '8GM6R3zo7iQ', 'WWuov6dKXuU', 'YQes7T5g-JU', 'Ve64pXGkfnA', 'wG-0S1D8DiA', '5wObwC9yjUw', 'RIOJJDK3iho', 'T8KJH47aZ8w', 'EZf-1GozbSk', 'r8rylSHlsl0', 'y5z0PxKr9No', 'aL_oM8QhTVI', 'CraslJJkrcU', 'Y7YINVs5PFM', 'gaEEOettwKg', 'zfSJ3LRrCqs', 'F1BAm6Nf5Ec', '8fzl-45NIb0', 'cLBqTCKfzsw', 'NHrsftC61i4', 'LsKL8N5Iwkw', 'DoT3iAgY9Vc', 'qWoGkPDg4N8', 'PM_1Nb9u-N0', 'MnMGjvYxlUk', 'K1OB2Ky9gpc', 'e4l5W7ajl5w', 'tzN0QT1id0M', '8N9elizZ1x4', '0G6LDuslqmA', 'Xbij9Tst-WA', 'f9VQWAxXhvE']


## Youtube ETL

In [6]:
# Function to fetch YouTube video details (Optional, if you still want the title and description)
def get_video_details(video_id):
    try:
        # You can fetch video details like title/description from a different source, 
        # or you can hardcode this data if it's not critical for your task
        return {"title": "Unknown", "description": "No description available"}
    except Exception as e:
        print(f"Error fetching details for video {video_id}: {e}")
        return {"title": "Error", "description": str(e)}

# Function to fetch captions from YouTube (using third-party API)
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound, VideoUnavailable

def fetch_video_captions_fallback(video_id):
    try:
        # Use youtube-transcript-api to fetch the transcript
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        return " ".join([item["text"] for item in transcript])
    except NoTranscriptFound:
        return "No captions or transcript available"
    except VideoUnavailable:
        return "Video is unavailable"
    except Exception as e:
        print(f"Error fetching transcript for video {video_id}: {e}")
        return "Error fetching captions"

# Function to clean and transform data
def clean_content(content):
    content = re.sub(r"<[^>]*>", "", content)  # Remove HTML tags
    content = re.sub(r"\s+", " ", content)  # Collapse multiple spaces/newlines
    return content.strip()

# Function to transform fetched data into MongoDB format
def transform_data(video_details, captions, url):
    metadata = {
        "type": "YouTube",
        "url": url,
        "title": video_details.get("title", "Untitled Video"),
        "description": video_details.get("description", "No description available")
    }
    content = clean_content(captions)
    return {"metadata": metadata, "content": content}

# Function to load data into MongoDB
def load_data_to_mongodb(data):
    client = MongoClient(MONGO_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]
    collection.insert_one(data)
    print(f"Ingested data for video: {data['metadata']['title']}")

# ETL Process for a list of YouTube video links
def etl_youtube_videos(video_ids):
    """
    ETL process for a list of YouTube video IDs using the third-party API only.
    
    Args:
        video_ids (list): List of YouTube video IDs.
    """
    for video_id in video_ids:
        try:
            print(f"Processing video: {video_id}")
            
            # Fetch video details (optional, can use hardcoded data)
            video_details = get_video_details(video_id)
            
            # Fetch captions or transcripts using the third-party API
            captions = fetch_video_captions_fallback(video_id)
            if captions in ["Captions are restricted or unavailable", "No captions or transcript available"]:
                print(f"Skipping video {video_id}: Captions not accessible")
                continue
            
            # Transform and load data
            url = f"https://www.youtube.com/watch?v={video_id}"
            transformed_data = transform_data(video_details, captions, url)
            load_data_to_mongodb(transformed_data)
        
        except Exception as e:
            print(f"Error processing video {video_id}: {e}")

# List of YouTube video URLs
# Example: video_urls = ["https://www.youtube.com/watch?v=PcO-sTuP8zg", ...]

# Run the ETL process
etl_youtube_videos(video_urls)

Processing video: C6eQ6VwTpxk
Ingested data for video: Unknown
Processing video: 7FKi-waQuMM
Ingested data for video: Unknown
Processing video: dJLBLb0IXdw
Ingested data for video: Unknown
Processing video: 72a-wJ2k25A
Ingested data for video: Unknown
Processing video: sWw69pIiMz0
Ingested data for video: Unknown
Processing video: oVOR74D8A3U
Ingested data for video: Unknown
Processing video: nsbgIys0_oc
Ingested data for video: Unknown
Processing video: PcO-sTuP8zg
Ingested data for video: Unknown
Processing video: QQLOk8l2lEo
Ingested data for video: Unknown
Processing video: zNxCqBKKbGM
Ingested data for video: Unknown
Processing video: JNM2qIhseiU
Ingested data for video: Unknown
Processing video: 9Myw-9UQxPw
Ingested data for video: Unknown
Processing video: mFCundd5s-Q
Ingested data for video: Unknown
Processing video: KLvUMtYI_Ag
Ingested data for video: Unknown
Processing video: 4zGUDisw4UI
Ingested data for video: Unknown
Processing video: rGsyQHwWObA
Ingested data for video: 

In [None]:
# Function to fetch video ID from YouTube URL
def extract_video_id_from_url(url):
    match = re.search(r"v=([a-zA-Z0-9_-]+)", url)
    if match:
        return match.group(1)
    else:
        raise ValueError(f"Invalid YouTube video URL format: {url}")

# Function to fetch YouTube video details
def get_video_details(youtube, video_id):
    try:
        video_details = youtube.videos().list(
            part="snippet,contentDetails,statistics",
            id=video_id
        ).execute()

        if video_details["items"]:
            video = video_details["items"][0]
            title = video["snippet"].get("title", "Untitled Video")
            description = video["snippet"].get("description", "No description available")
            return {"title": title, "description": description}
        else:
            return {"title": "Unknown", "description": "No description available"}
    except Exception as e:
        print(f"Error fetching details for video {video_id}: {e}")
        return {"title": "Error", "description": str(e)}

# Function to fetch captions from YouTube
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound, VideoUnavailable

def fetch_video_captions_fallback(video_id):
    try:
        # Use youtube-transcript-api as a fallback
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        return " ".join([item["text"] for item in transcript])
    except NoTranscriptFound:
        return "No captions or transcript available"
    except VideoUnavailable:
        return "Video is unavailable"
    except Exception as e:
        print(f"Error fetching transcript for video {video_id}: {e}")
        return "Error fetching captions"

def fetch_video_captions(youtube, video_id):
    try:
        # First attempt with YouTube API
        captions = youtube.captions().list(part="snippet", videoId=video_id).execute()
        if "items" in captions and len(captions["items"]) > 0:
            caption_id = captions["items"][0]["id"]
            try:
                caption_data = youtube.captions().download(id=caption_id).execute()
                return caption_data["body"]
            except Exception as e:
                print(f"falling back: Error downloading captions for video {video_id}: {e}")
                return fetch_video_captions_fallback(video_id)
        else:
            # Fallback if no captions via API
            return fetch_video_captions_fallback(video_id)
    except Exception as e:
        print(f"Error fetching captions for video {video_id}: {e}")
        return fetch_video_captions_fallback(video_id)

# Function to clean and transform data
def clean_content(content):
    content = re.sub(r"<[^>]*>", "", content)  # Remove HTML tags
    content = re.sub(r"\s+", " ", content)  # Collapse multiple spaces/newlines
    return content.strip()

# Function to transform fetched data into MongoDB format
def transform_data(video_details, captions, url):
    metadata = {
        "type": "YouTube",
        "url": url,
        "title": video_details.get("title", "Untitled Video"),
        "description": video_details.get("description", "No description available")
    }
    content = clean_content(captions)
    return {"metadata": metadata, "content": content}

# Function to load data into MongoDB
def load_data_to_mongodb(data):
    client = MongoClient(MONGO_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]
    collection.insert_one(data)
    print(f"Ingested data for video: {data['metadata']['title']}")

# ETL Process for a list of YouTube video links
def etl_youtube_videos(video_ids):
    """
    ETL process for a list of YouTube video IDs.
    
    Args:
        video_ids (list): List of YouTube video IDs.
    """
    for video_id in video_ids:
        try:
            print(f"Processing video: {video_id}")
            
            # Fetch video details
            video_details = get_video_details(youtube, video_id)
            
            # Fetch captions or transcripts
            captions = fetch_video_captions(youtube, video_id)
            if captions in ["Captions are restricted or unavailable", "No captions or transcript available"]:
                print(f"Skipping video {video_id}: Captions not accessible")
                continue
            
            # Transform and load data
            url = f"https://www.youtube.com/watch?v={video_id}"
            transformed_data = transform_data(video_details, captions, url)
            load_data_to_mongodb(transformed_data)
        
        except Exception as e:
            print(f"Error processing video {video_id}: {e}")# List of YouTube video URLs
# Run the ETL process
print(video_urls)
etl_youtube_videos(video_urls)

['C6eQ6VwTpxk', '7FKi-waQuMM', 'dJLBLb0IXdw', '72a-wJ2k25A', 'sWw69pIiMz0', 'oVOR74D8A3U', 'nsbgIys0_oc', 'PcO-sTuP8zg', 'QQLOk8l2lEo', 'zNxCqBKKbGM', 'JNM2qIhseiU', '9Myw-9UQxPw', 'mFCundd5s-Q', 'KLvUMtYI_Ag', '4zGUDisw4UI', 'rGsyQHwWObA', 'lDSrqQM85zA', 'EOZNdm00-Cw', 'NX_lgA3cDB4', 'pT7OtvMdpo8', 'BXW3DDD70KA', 'rb_jxfI2rwc', 'vMgiBxPNIoo', '_A_LRRIxpLg', '8GM6R3zo7iQ', 'WWuov6dKXuU', 'YQes7T5g-JU', 'Ve64pXGkfnA', 'wG-0S1D8DiA', '5wObwC9yjUw', 'RIOJJDK3iho', 'T8KJH47aZ8w', 'EZf-1GozbSk', 'r8rylSHlsl0', 'y5z0PxKr9No', 'aL_oM8QhTVI', 'CraslJJkrcU', 'Y7YINVs5PFM', 'gaEEOettwKg', 'zfSJ3LRrCqs', 'F1BAm6Nf5Ec', '8fzl-45NIb0', 'cLBqTCKfzsw', 'NHrsftC61i4', 'LsKL8N5Iwkw', 'DoT3iAgY9Vc', 'qWoGkPDg4N8', 'PM_1Nb9u-N0', 'MnMGjvYxlUk', 'K1OB2Ky9gpc', 'e4l5W7ajl5w', 'tzN0QT1id0M', '8N9elizZ1x4', '0G6LDuslqmA', 'Xbij9Tst-WA', 'f9VQWAxXhvE']
Processing video: C6eQ6VwTpxk
Error fetching details for video C6eQ6VwTpxk: <HttpError 403 when requesting https://youtube.googleapis.com/youtube/v3/videos?par

### Try this ---> without the youtube API

In [None]:
# Function to fetch video ID from YouTube URL
def extract_video_id_from_url(url):
    match = re.search(r"v=([a-zA-Z0-9_-]+)", url)
    if match:
        return match.group(1)
    else:
        raise ValueError(f"Invalid YouTube video URL format: {url}")

# Function to fetch captions from YouTube using youtube-transcript-api
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound, VideoUnavailable

def fetch_video_captions_fallback(video_id):
    try:
        # Use youtube-transcript-api as a fallback
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        return " ".join([item["text"] for item in transcript])
    except NoTranscriptFound:
        return "No captions or transcript available"
    except VideoUnavailable:
        return "Video is unavailable"
    except Exception as e:
        print(f"Error fetching transcript for video {video_id}: {e}")
        return "Error fetching captions"

# Function to clean and transform data
def clean_content(content):
    content = re.sub(r"<[^>]*>", "", content)  # Remove HTML tags
    content = re.sub(r"\s+", " ", content)  # Collapse multiple spaces/newlines
    return content.strip()

# Function to transform fetched data into MongoDB format
def transform_data(captions, url):
    metadata = {
        "type": "YouTube",
        "url": url,
    }
    content = clean_content(captions)
    return {"metadata": metadata, "content": content}

# Function to load data into MongoDB
def load_data_to_mongodb(data):
    client = MongoClient(MONGO_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]
    collection.insert_one(data)
    print(f"Ingested data for video: {data['metadata']['url']}")

# ETL Process for a list of YouTube video links
def etl_youtube_videos(video_urls):
    """
    ETL process for a list of YouTube video URLs.
    
    Args:
        video_urls (list): List of YouTube video URLs.
    """
    for url in video_urls:
        try:
            print(f"Processing video: {url}")
            
            # Extract video ID from URL
            videao_id = extract_video_id_from_url(url)
            
            # Fetch captions or transcripts using the third-party API
            captions = fetch_video_captions_fallback(video_id)
            if captions in ["Captions are restricted or unavailable", "No captions or transcript available"]:
                print(f"Skipping video {video_id}: Captions not accessible")
                continue
            
            # Transform and load data
            transformed_data = transform_data(captions, url)
            load_data_to_mongodb(transformed_data)
        
        except Exception as e:
            print(f"Error processing video {url}: {e}")

# List of YouTube video URLs
# Run the ETL process
etl_youtube_videos(video_urls)

Processing video: C6eQ6VwTpxk
Error processing video C6eQ6VwTpxk: Invalid YouTube video URL format: C6eQ6VwTpxk
Processing video: 7FKi-waQuMM
Error processing video 7FKi-waQuMM: Invalid YouTube video URL format: 7FKi-waQuMM
Processing video: dJLBLb0IXdw
Error processing video dJLBLb0IXdw: Invalid YouTube video URL format: dJLBLb0IXdw
Processing video: 72a-wJ2k25A
Error processing video 72a-wJ2k25A: Invalid YouTube video URL format: 72a-wJ2k25A
Processing video: sWw69pIiMz0
Error processing video sWw69pIiMz0: Invalid YouTube video URL format: sWw69pIiMz0
Processing video: oVOR74D8A3U
Error processing video oVOR74D8A3U: Invalid YouTube video URL format: oVOR74D8A3U
Processing video: nsbgIys0_oc
Error processing video nsbgIys0_oc: Invalid YouTube video URL format: nsbgIys0_oc
Processing video: PcO-sTuP8zg
Error processing video PcO-sTuP8zg: Invalid YouTube video URL format: PcO-sTuP8zg
Processing video: QQLOk8l2lEo
Error processing video QQLOk8l2lEo: Invalid YouTube video URL format: QQLO

# Chunking and creating embeddings and storing Qdrant

In [36]:
from pymongo import MongoClient

# MongoDB Configuration
MONGO_URI = "mongodb://localhost:27017"
DATABASE_NAME = "req_data"
COLLECTIONS = ["medium", "ros2_documentation", "reddit","stackoverflow", "youtube_captions"]

client = MongoClient(MONGO_URI)
db = client[DATABASE_NAME]

def fetch_documents():
    """
    Fetch all documents from MongoDB collections.
    Ensures metadata includes only `type` and `url`.
    """
    documents = []
    for collection_name in COLLECTIONS:
        collection = db[collection_name]
        for doc in collection.find({}, {"_id": 0, "content": 1, "metadata": 1}):
            # Restrict metadata to url and type
            # print("doc is:",doc)
            metadata = {
                "type": doc["metadata"].get("type", collection_name),  # Default to collection name if type is missing
                "url": doc["metadata"].get("url", ""),  # Default to empty string if URL is missing
            }
            documents.append({"content": doc["content"], "metadata": metadata})
    return documents


# Chunking function


In [37]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

def chunk_documents(documents, chunk_size=500, chunk_overlap=50):
    """
    Chunk documents into smaller pieces using RecursiveCharacterTextSplitter.
    Retains only `type` and `url` in metadata.
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""]
    )
    
    chunked_data = []
    for doc in documents:
        content = doc["content"]
        metadata = doc["metadata"]  # Only includes `type` and `url`
        
        # Create chunks for each document
        chunks = text_splitter.split_text(content)
        for chunk in chunks:
            chunked_data.append({
                "content": chunk,
                "metadata": metadata  # Pass only `type` and `url`
            })
    
    return chunked_data


# embedding function

In [38]:
from sentence_transformers import SentenceTransformer
from zenml.steps import step

def generate_embeddings(text_chunks, batch_size=32):
    model = SentenceTransformer('all-MiniLM-L6-v2')
    embeddings = []
    
    for i in range(0, len(text_chunks), batch_size):
        batch = text_chunks[i:i + batch_size]
        batch_embeddings = model.encode(batch)
        embeddings.extend(batch_embeddings.tolist())
    
    return embeddings

# storing in single qdrant collection

In [40]:
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, PointStruct

# Initialize Qdrant Client
qdrant_client = QdrantClient("http://localhost:6333")


def store_in_qdrant(chunked_data, collection_name="unified_collection", batch_size=100):
    """
    Store chunked and embedded data in Qdrant collection in batches.
    Ensures metadata includes only `type` and `url`.
    """
    # Create or recreate the Qdrant collection
    vector_size = len(chunked_data[0]["embedding"])  # Dimension of embeddings
    qdrant_client.recreate_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(
            size=vector_size,
            distance="Cosine"  # Use cosine similarity for matching
        )
    )
    
    # Insert data into Qdrant in batches
    for i in range(0, len(chunked_data), batch_size):
        batch = chunked_data[i:i + batch_size]
        points = [
            PointStruct(
                id=j + i,  # Ensure unique IDs across batches
                vector=chunk["embedding"],
                payload={
                    "content": chunk["content"],  # Chunked content
                    "type": chunk["metadata"]["type"],  # Metadata: type
                    "url": chunk["metadata"]["url"],  # Metadata: url
                }
            )
            for j, chunk in enumerate(batch)
        ]

        # Upsert batch into Qdrant
        qdrant_client.upsert(collection_name=collection_name, points=points)
        print(f"Inserted batch {i // batch_size + 1} with {len(points)} points.")
    
    print(f"Stored {len(chunked_data)} chunks in Qdrant collection: {collection_name}")


In [49]:
from clearml import Task
from clearml.automation import PipelineController
from pymongo import MongoClient
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, PointStruct

# MongoDB Configuration
MONGO_URI = "mongodb://localhost:27017"
DATABASE_NAME = "req_data"
COLLECTIONS = ["medium", "ros2_documentation", "reddit", "stackoverflow", "youtube_captions"]

client = MongoClient(MONGO_URI)
db = client[DATABASE_NAME]

# MongoDB Fetch Step
def fetch_documents():
    documents = []
    for collection_name in COLLECTIONS:
        collection = db[collection_name]
        for doc in collection.find({}, {"_id": 0, "content": 1, "metadata": 1}):
            metadata = {
                "type": doc["metadata"].get("type", collection_name),
                "url": doc["metadata"].get("url", ""),
            }
            documents.append({"content": doc["content"], "metadata": metadata})
    
    return documents


# Text Chunking Step
def chunk_documents(documents, chunk_size=500, chunk_overlap=50):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", " ", ""]
    )
    
    chunked_data = []
    for doc in documents:
        content = doc["content"]
        metadata = doc["metadata"]
        
        # Create chunks for each document
        chunks = text_splitter.split_text(content)
        for chunk in chunks:
            chunked_data.append({
                "content": chunk,
                "metadata": metadata
            })
    
    return chunked_data


# Embedding Generation Step
def generate_embeddings(text_chunks, batch_size=32):
    model = SentenceTransformer('all-MiniLM-L6-v2')
    embeddings = []
    
    for i in range(0, len(text_chunks), batch_size):
        batch = text_chunks[i:i + batch_size]
        batch_embeddings = model.encode(batch)
        embeddings.extend(batch_embeddings.tolist())
    
    # Add embeddings back to the chunks
    for i, chunk in enumerate(text_chunks):
        chunk["embedding"] = embeddings[i]
    
    return text_chunks


# Qdrant Store Step
qdrant_client = QdrantClient("http://localhost:6333")

def store_in_qdrant(chunked_data, collection_name="unified_collection", batch_size=100):
    # Create or recreate the Qdrant collection
    vector_size = len(chunked_data[0]["embedding"])  # Dimension of embeddings
    qdrant_client.recreate_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(
            size=vector_size,
            distance="Cosine"  # Use cosine similarity for matching
        )
    )
    
    # Insert data into Qdrant in batches
    for i in range(0, len(chunked_data), batch_size):
        batch = chunked_data[i:i + batch_size]
        points = [
            PointStruct(
                id=j + i,  # Ensure unique IDs across batches
                vector=chunk["embedding"],
                payload={
                    "content": chunk["content"],
                    "type": chunk["metadata"]["type"],
                    "url": chunk["metadata"]["url"],
                }
            )
            for j, chunk in enumerate(batch)
        ]

        # Upsert batch into Qdrant
        qdrant_client.upsert(collection_name=collection_name, points=points)

    return len(chunked_data)


# Create and run the pipeline
def create_and_run_pipeline():
    # Initialize PipelineController
    pipe = PipelineController(
        name='ETL Pipeline',
        project='ETL Project',
        version='0.0.1',
        add_pipeline_tags=False,
    )

    # Set default execution queue for the pipeline
    pipe.set_default_execution_queue('default')  # Set default queue for the entire pipeline
    
    # Add steps to the pipeline
    pipe.add_function_step(
        name="Fetch Documents",
        function=fetch_documents,
        function_return=['documents'],
    )

    pipe.add_function_step(
        name="Chunk Documents",
        function=chunk_documents,
        function_kwargs={'documents': '${Fetch Documents.documents}'},  # Passing data from Fetch Documents step
        function_return=['chunked_data'],
    )

    pipe.add_function_step(
        name="Generate Embeddings",
        function=generate_embeddings,
        function_kwargs={'text_chunks': '${Chunk Documents.chunked_data}'},  # Passing data from Chunk Documents step
        function_return=['chunked_data'],
    )

    pipe.add_function_step(
        name="Store in Qdrant",
        function=store_in_qdrant,
        function_kwargs={'chunked_data': '${Generate Embeddings.chunked_data}'},  # Passing data from Generate Embeddings step
        function_return=['num_chunks_stored'],
    )

    # Start the pipeline
    pipe.start()  # Start the pipeline after defining all steps

create_and_run_pipeline()

ClearML Task: created new task id=09dc18cdf0094f208509440c9991bb21
ClearML results page: https://app.clear.ml/projects/ef243519c6424f6ca878b8f8eae72902/experiments/09dc18cdf0094f208509440c9991bb21/output/log
ClearML pipeline page: https://app.clear.ml/pipelines/ef243519c6424f6ca878b8f8eae72902/experiments/09dc18cdf0094f208509440c9991bb21
[33mCould not fetch function declared in __main__: <module '__main__'> is a built-in module[0m
[33mCould not fetch function imports: <module '__main__'> is a built-in module[0m
[1;35mNo repository found, storing script code instead[0m
[33mCould not fetch function declared in __main__: <module '__main__'> is a built-in module[0m
[33mCould not fetch function imports: <module '__main__'> is a built-in module[0m
[1;35mNo repository found, storing script code instead[0m
[33mCould not fetch function declared in __main__: <module '__main__'> is a built-in module[0m
[33mCould not fetch function imports: <module '__main__'> is a built-in module[

Switching to remote execution, output log page https://app.clear.ml/projects/ef243519c6424f6ca878b8f8eae72902/experiments/09dc18cdf0094f208509440c9991bb21/output/log
ClearML Terminating local execution process - continuing execution remotely


: 

In [79]:

# Step 1: Fetch documents from MongoDB
print("Fetching documents from MongoDB...")
documents = fetch_documents()
print("doc length is:",len(documents))
# Step 2: Chunk documents
print("Chunking documents...")
chunked_data = chunk_documents(documents)
print(len(chunked_data))
print(chunked_data[0])
# Step 3: Generate embeddings for each chunk
print("Generating embeddings...")
texts = [chunk['content'] for chunk in chunked_data]
batch_size = 64  # Adjust based on available resources
all_embeddings = generate_embeddings(texts, batch_size=batch_size)
# Assign embeddings back to chunks
for i, chunk in enumerate(chunked_data):
    chunk["embedding"] = all_embeddings[i]
# Step 4: Store in Qdrant
print("Storing data in Qdrant...")
store_in_qdrant(chunked_data)

print("Process completed successfully!")



Fetching documents from MongoDB...
doc length is: 333
Chunking documents...
8620
{'content': 'Sharad Maheshwari\nFollow\nschmiedeone\n--\n1\nListen\nShare\nNote: This series assumes that you have working knowledge of ROS1. We focus on getting to know ROS2, which is what the robotics community is moving towards. Here are Part 2 and Part 3 of this series.If youre new to Robot Operating System as a concept, check out this post to read what ROS actually is.\nHello there! Since you already have working knowledge of ROS1, for brevity, were gonna jump straight to the differences between ROS1 and ROS2.', 'metadata': {'type': 'Medium', 'url': 'https://medium.com/schmiedeone/getting-started-with-ros2-part-1-d4c3b7335c71'}}
Generating embeddings...
0
64
128
192
256
320
384
448
512
576
640
704
768
832
896
960
1024
1088
1152
1216
1280
1344
1408
1472
1536
1600
1664
1728
1792
1856
1920
1984
2048
2112
2176
2240
2304
2368
2432
2496
2560
2624
2688
2752
2816
2880
2944
3008
3072
3136
3200
3264
3328
3392
3

  qdrant_client.recreate_collection(


Inserted batch 1 with 100 points.
Inserted batch 2 with 100 points.
Inserted batch 3 with 100 points.
Inserted batch 4 with 100 points.
Inserted batch 5 with 100 points.
Inserted batch 6 with 100 points.
Inserted batch 7 with 100 points.
Inserted batch 8 with 100 points.
Inserted batch 9 with 100 points.
Inserted batch 10 with 100 points.
Inserted batch 11 with 100 points.
Inserted batch 12 with 100 points.
Inserted batch 13 with 100 points.
Inserted batch 14 with 100 points.
Inserted batch 15 with 100 points.
Inserted batch 16 with 100 points.
Inserted batch 17 with 100 points.
Inserted batch 18 with 100 points.
Inserted batch 19 with 100 points.
Inserted batch 20 with 100 points.
Inserted batch 21 with 100 points.
Inserted batch 22 with 100 points.
Inserted batch 23 with 100 points.
Inserted batch 24 with 100 points.
Inserted batch 25 with 100 points.
Inserted batch 26 with 100 points.
Inserted batch 27 with 100 points.
Inserted batch 28 with 100 points.
Inserted batch 29 with 100 po

In [9]:
from qdrant_client import QdrantClient

# Initialize Qdrant Client
qdrant_client = QdrantClient("http://localhost:6333")

def retrieve_similar_documents(query_embedding, collection_name="unified_collection", top_k=5):
    """
    Retrieve the top-k most similar documents from Qdrant for the given query embedding.
    """
    results = qdrant_client.search(
        collection_name=collection_name,
        query_vector=query_embedding,
        limit=top_k  # Retrieve the top-k matches
    )
    
    # Extract relevant information from results
    similar_docs = [
        {
            "content": result.payload.get("content"),  # The actual text content
            "metadata": {
                "type": result.payload.get("type"),
                "url": result.payload.get("url")
            },
            "score": result.score  # Similarity score
        }
        for result in results
    ]
    context_chunks = [result.payload for result in results]
    context = "\n".join([result.payload["content"] for result in results])

    return similar_docs,context_chunks,context


In [81]:
!pip install SentenceTransformer

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


[31mERROR: Could not find a version that satisfies the requirement SentenceTransformer (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for SentenceTransformer[0m[31m
[0m

In [10]:

from sentence_transformers import SentenceTransformer

def generate_embedding_query(query, model_name='all-MiniLM-L6-v2'):
    """
    Generate an embedding for the query using a pre-trained embedding model.
    """
    # Load the model (you can cache it to avoid reloading multiple times)
    model = SentenceTransformer(model_name)
    
    # Generate the embedding
    embedding = model.encode(query).tolist()  # Convert numpy array to list for compatibility
    return embedding


In [11]:
query = "Ros2 full form"
query_embedding = generate_embedding_query(query)

In [84]:
similar_docs,context_chunks,context = retrieve_similar_documents(query_embedding, collection_name="unified_collection", top_k=5)

# Print results
for doc in similar_docs:
    print(f"Content: {doc['content']}")
    print(f"Type: {doc['metadata']['type']}")
    print(f"URL: {doc['metadata']['url']}")
    print(f"Score: {doc['score']}")
    print("-" * 50)
print(context_chunks)

Content: goal of the ROS 2 project is to adapt to these changes, leveraging what is great about ROS 1 and improving what isnt.Here you will find the official documentation onROS 2, the newest version of ROS.If youre looking for documentation on ROS 1 (i.e., ROS as it has existed for several years, and what you might be using right now), check theROS wiki.Where to startNewcomers and experienced ROS users should consult this overview of our user-centric content to find what theyre looking
Type: ros2_documentation_documentation
URL: https://docs.ros.org/en/dashing/index.html
Score: 0.6760732
--------------------------------------------------
Content: ROS community. The goal of the ROS 2 project is to adapt to these changes, leveraging what is great about ROS 1 and improving what isnt.Here you will find the official documentation onROS 2, the newest version of ROS.If youre looking for documentation on ROS 1 (i.e., ROS as it has existed for several years, and what you might be using right n

In [13]:
import subprocess

def generate_response_with_llama(query, context):
    """
    Generate a response using Llama 3 via Ollama CLI.
    """
    prompt = f"""
    You are a knowledgeable assistant. Use the following context to answer the query. If the context does not contain enough information, say, "I cannot answer this based on the provided context."

    Context:
    {context}

    Query:
    {query}

    Answer:
    """
    # Use subprocess to call Ollama CLI
    result = subprocess.run(
        ["ollama", "run", "llama2:7b"],
        input=prompt,
        text=True,
        capture_output=True
    )
    return result.stdout.strip()


In [14]:
def full_pipeline(query):
    # Step 1: Generate query embedding
    # query_embedding = generate_query_embedding(query)
    query_embedding = generate_embedding_query(query)
    _,context_chunks,context = retrieve_similar_documents(query_embedding, collection_name="unified_collection", top_k=5)
    # Step 2: Retrieve relevant context from Qdrant
    # context = retrieve_context(query_embedding, collection_name="unified_collection", top_k=5)
    # Step 3: Handle case with no relevant context
    print("context is:",context)
    if not context.strip():
        return "I'm sorry, but I couldn't find any relevant data to answer your question."

    # Step 4: Generate response using Llama 3
    response = generate_response_with_llama(query, context)
    return response


In [15]:
import gradio as gr
def gradio_qa(query):
    return full_pipeline(query)
interface = gr.Interface(
    fn=gradio_qa,  # Connect Gradio function to the full pipeline
    inputs=gr.Textbox(lines=2, label="Enter your query"),  # Input: Query
    outputs=gr.Textbox(label="Generated Answer"),  # Output: Model response
    title="Q&A System with Llama 2",
    description="Ask a question, and the system will retrieve relevant context and generate an answer using Llama 2."
)

In [16]:
query = "give ros2 full form"
response = full_pipeline(query)
print("Response:\n", response)


context is: goal of the ROS 2 project is to adapt to these changes, leveraging what is great about ROS 1 and improving what isnt.Here you will find the official documentation onROS 2, the newest version of ROS.If youre looking for documentation on ROS 1 (i.e., ROS as it has existed for several years, and what you might be using right now), check theROS wiki.Where to startNewcomers and experienced ROS users should consult this overview of our user-centric content to find what theyre looking
in all ROS 2 core packages since the previous release.Table of
ROS community. The goal of the ROS 2 project is to adapt to these changes, leveraging what is great about ROS 1 and improving what isnt.Here you will find the official documentation onROS 2, the newest version of ROS.If youre looking for documentation on ROS 1 (i.e., ROS as it has existed for several years, and what you might be using right now), check theROS wiki.Where to startNewcomers and experienced ROS users should consult this overv

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Response:
 ROS 2 stands for Robot Operating System 2.


## gradio - installation

# gradio - deploy

In [17]:
if __name__ == "__main__":
    interface.launch()

* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


context is: ROS2 is the successor to ROS (Robot Operating System), which was created by Willow Garage, a robotics research institute, in 2007. ROS was created to provide a common framework for developing robotics applications, and it has since grown in popularity as a platform for building
From turtles to drones, and everything in between, ROS2 helps robots work, learn, and dream, Its a powerful tool, a powerful ally, Helping robots reach their highest potential, and never say goodbye.
ROS2 (Robot Operating System 2) is a collection of open-source software libraries and tools that provide a flexible framework for developing and running Robotics applications. It is intended to be the primary development platform for robots that operate in complex, dynamic environments, and it is designed to be an open, scalable, and Interoperable framework for building robot applications.
goal of the ROS 2 project is to adapt to these changes, leveraging what is great about ROS 1 and improving what isnt

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
