In [2]:
import os
import shutil
import subprocess
import tempfile
import requests
from bs4 import BeautifulSoup
from clearml import PipelineDecorator
from pymongo import MongoClient
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound

# Simulating logger for simplicity
class Logger:
    @staticmethod
    def info(message):
        print(f"[INFO] {message}")

logger = Logger()

# Mock of RepositoryDocument to simulate database actions
class Document:
    # Connect to the ClearML MongoDB server
    client = MongoClient("mongodb://localhost:27017/")  # Adjust connection string if necessary
    db = client['clearml']  # Replace 'clearml' with your database name if different
    collection = db['crawled_data']  # Specify the collection name

    @staticmethod
    def find(link):
        # Check if the document with the given link already exists
        return Document.collection.find_one({"link": link})

    @staticmethod
    def find_all():
        # Fetch all documents and return the links
        return [doc["link"] for doc in Document.collection.find()]

    def __init__(self, content, name, link, platform):
        self.content = content
        self.name = name
        self.link = link
        self.platform = platform

    def save(self):
        # Insert the document into the MongoDB collection
        document = {
            "content": self.content,
            "name": self.name,
            "link": self.link,
            "platform": self.platform,
        }
        result = Document.collection.insert_one(document)
        print(f"Document saved with ID: {result.inserted_id}")


# BaseCrawler class
class BaseCrawler:
    def __init__(self):
        pass

    def extract(self, link: str) -> None:
        raise NotImplementedError("Extract method must be implemented by subclasses.")

class YouTubeCrawler(BaseCrawler):
    model = Document

    def extract(self, link: str) -> None:
        old_model = self.model.find(link=link)
        if old_model is not None:
            logger.info(f"YouTube video already exists in the database: {link}")
            return

        logger.info(f"Starting scraping YouTube video: {link}")

        try:
            # Extract video ID from the link
            video_id = link.split("v=")[-1].split("&")[0]

            # Fetch transcript
            try:
                transcript = YouTubeTranscriptApi.get_transcript(video_id)
                transcript_text = " ".join([entry["text"] for entry in transcript])
            except (TranscriptsDisabled, NoTranscriptFound):
                transcript_text = "Transcript not available for this video."

            # Fetch video title
            response = requests.get(link)
            soup = BeautifulSoup(response.text, 'html.parser')
            title = soup.find('title').text.strip()

            instance = self.model(
                content=transcript_text,
                name=title,
                link=link,
                platform="youtube",
            )
            instance.save()

        except Exception as e:
            print(f"Error during extraction: {e}")

        logger.info(f"Finished scraping YouTube video: {link}")

# GitHub Crawler
class GithubCrawler(BaseCrawler):
    model = Document

    def __init__(self, ignore=(".git", ".toml", ".lock", ".png")) -> None:
        super().__init__()
        self._ignore = ignore

    def extract(self, link: str) -> None:
        old_model = self.model.find(link=link)
        if old_model is not None:
            logger.info(f"Repository already exists in the database: {link}")
            return

        logger.info(f"Starting scraping GitHub repository: {link}")

        repo_name = link.rstrip("/").split("/")[-1]
        local_temp = tempfile.mkdtemp(dir="/mnt/c/Users/kappa/OneDrive/Documents/GitHub/CS370-RAG-System/tempDir")

        try:
            os.chdir(local_temp)
            subprocess.run(["git", "clone", link], check=True)

            repo_path = os.path.join(local_temp, os.listdir(local_temp)[0])

            tree = {}
            for root, _, files in os.walk(repo_path):
                dir = root.replace(repo_path, "").lstrip("/")
                if any(dir.startswith(ignore_item) for ignore_item in self._ignore):
                    continue

                for file in files:
                    if any(file.endswith(ignore_item) for ignore_item in self._ignore):
                        continue
                    file_path = os.path.join(dir, file)
                    with open(os.path.join(root, file), "r", errors="ignore") as f:
                        tree[file_path] = f.read().replace(" ", "")

            instance = self.model(
                content=tree,
                name=repo_name,
                link=link,
                platform="github",
            )
            instance.save()

        except Exception as e:
            print(f"Error during extraction: {e}")
        finally:
            shutil.rmtree(local_temp)

        logger.info(f"Finished scraping GitHub repository: {link}")

# LinkedIn Crawler
class LinkedInCrawler(BaseCrawler):
    model = Document

    def extract(self, link: str) -> None:
        old_model = self.model.find(link=link)
        if old_model is not None:
            logger.info(f"LinkedIn profile already exists in the database: {link}")
            return

        logger.info(f"Starting scraping LinkedIn profile: {link}")

        try:
            response = requests.get(link)
            soup = BeautifulSoup(response.text, 'html.parser')

            # Simplified example: fetch profile name and headline
            profile_name = soup.find('title').text.strip()
            content = soup.prettify()

            instance = self.model(
                content=content,
                name=profile_name,
                link=link,
                platform="linkedin",
            )
            instance.save()

        except Exception as e:
            print(f"Error during extraction: {e}")

        logger.info(f"Finished scraping LinkedIn profile: {link}")

# Medium Crawler
class MediumCrawler(BaseCrawler):
    model = Document

    def extract(self, link: str) -> None:
        old_model = self.model.find(link=link)
        if old_model is not None:
            logger.info(f"Medium article already exists in the database: {link}")
            return

        logger.info(f"Starting scraping Medium article: {link}")

        try:
            response = requests.get(link)
            soup = BeautifulSoup(response.text, 'html.parser')

            # Simplified example: fetch article title and content
            article_title = soup.find('h1').text.strip()
            article_body = " ".join([p.text for p in soup.find_all('p')])

            instance = self.model(
                content=article_body,
                name=article_title,
                link=link,
                platform="medium",
            )
            instance.save()

        except Exception as e:
            print(f"Error during extraction: {e}")

        logger.info(f"Finished scraping Medium article: {link}")

# Custom Article Crawler
class CustomCrawler(BaseCrawler):
    model = Document

    def extract(self, link: str) -> None:
        old_model = self.model.find(link=link)
        if old_model is not None:
            logger.info(f"Custom article already exists in the database: {link}")
            return

        logger.info(f"Starting scraping custom article: {link}")

        try:
            response = requests.get(link)
            soup = BeautifulSoup(response.text, 'html.parser')

            # Fetch title and basic content
            title = soup.find('title').text.strip()
            body = " ".join([p.text for p in soup.find_all('p')])

            instance = self.model(
                content=body,
                name=title,
                link=link,
                platform="custom",
            )
            instance.save()

        except Exception as e:
            print(f"Error during extraction: {e}")

        logger.info(f"Finished scraping custom article: {link}")

# Function to print all URLs
def print_all_urls():
    links = Document.find_all()
    if links:
        print("[INFO] Ingested URLs:")
        for link in links:
            print(link)
    else:
        print("[INFO] No URLs found in the database.")

# Usage example
def test_crawlers():
    crawlers = {
        "youtube": YouTubeCrawler(),
        #"github": GithubCrawler(),
        #"linkedin": LinkedInCrawler(),
        #"medium": MediumCrawler(),
        "custom": CustomCrawler(),
    }

    links = [
        #("youtube", "https://www.youtube.com/watch?v=bYTawHgVoRQ&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=10&t=14s"),
        #("github", "https://github.com/carlos-argueta/rse_prob_robotics2"),
        #("linkedin", "https://www.linkedin.com/posts/sharad-maheshwari-b85626ba_ros-2-navigation-part-42-nav2-project-activity-7046562582203125761-H0to?utm_source=share&utm_medium=member_desktop"),
        #("medium", "https://medium.com/schmiedeone/getting-started-with-ros2-part-1-d4c3b7335c71"),
        ("youtube", "https://www.youtube.com/watch?v=rtrGoGsMVlI&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=1"),
        ("youtube", "https://www.youtube.com/watch?v=WzOopOkrowA&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=2"),
        ("youtube", "https://www.youtube.com/watch?v=b6p-26zqLNA&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=3"),
        ("youtube", "https://www.youtube.com/watch?v=8407qTyBRe0&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=4"),
        ("youtube", "https://www.youtube.com/watch?v=RJFoM-vnDJo&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=5"),
        ("youtube", "https://www.youtube.com/watch?v=dY9aZVMC-JM&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=6"),
        ("youtube", "https://www.youtube.com/watch?v=pYGOqbexzlg&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=7"),
        ("youtube", "https://www.youtube.com/watch?v=h-1IhC01T1c&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=8"),
        ("youtube", "https://www.youtube.com/watch?v=Lgzh4p1yP-c&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=9"),
        ("youtube", "https://www.youtube.com/watch?v=q4l_-n4BrKA&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=11"),
        ("youtube", "https://www.youtube.com/watch?v=QP-cxh8qUJQ&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=12"),
        ("youtube", "https://www.youtube.com/watch?v=EOBbxBBDLxU&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=13"),
        ("youtube", "https://www.youtube.com/watch?v=V0kmKkO7tVo&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=14"),
    ]

    for platform, link in links:
        crawler = crawlers.get(platform)
        if crawler:
            crawler.extract(link)

    # Print all URLs after extraction
    print_all_urls()

test_crawlers()

[INFO] Starting scraping YouTube video: https://www.youtube.com/watch?v=rtrGoGsMVlI&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=1
Document saved with ID: 67550b322640436a9d79a8af
[INFO] Finished scraping YouTube video: https://www.youtube.com/watch?v=rtrGoGsMVlI&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=1
[INFO] Starting scraping YouTube video: https://www.youtube.com/watch?v=WzOopOkrowA&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=2
Document saved with ID: 67550b332640436a9d79a8b0
[INFO] Finished scraping YouTube video: https://www.youtube.com/watch?v=WzOopOkrowA&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=2
[INFO] Starting scraping YouTube video: https://www.youtube.com/watch?v=b6p-26zqLNA&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=3
Document saved with ID: 67550b342640436a9d79a8b1
[INFO] Finished scraping YouTube video: https://www.youtube.com/watch?v=b6p-26zqLNA&list=PLgG0XDQqJckkSJDPhXsFU_RIqEh08nG0V&index=3
[INFO] Starting scraping YouTube video: https://www.youtube.c