In [None]:
# Boilerplate: This block goes into every notebook.
# It sets up the environment, installs the requirements, and checks for the required environment variables.

from IPython.display import clear_output
from dotenv import load_dotenv
import os

requirements_installed = False
max_retries = 3
retries = 0
REQUIRED_ENV_VARS = []


def install_requirements():
    """Installs the requirements from requirements.txt file"""
    global requirements_installed
    if requirements_installed:
        print("Requirements already installed.")
        return

    print("Installing requirements...")
    install_status = os.system("pip install -r requirements.txt")
    if install_status == 0:
        print("Requirements installed successfully.")
        requirements_installed = True
    else:
        print("Failed to install requirements.")
        if retries < max_retries:
            print("Retrying...")
            retries += 1
            return install_requirements()
        exit(1)
    return


def setup_env():
    """Sets up the environment variables"""

    def check_env(env_var):
        value = os.getenv(env_var)
        if value is None:
            print(f"Please set the {env_var} environment variable.")
            exit(1)
        else:
            print(f"{env_var} is set.")

    load_dotenv(override=True)

    variables_to_check = REQUIRED_ENV_VARS

    for var in variables_to_check:
        check_env(var)


install_requirements()
clear_output()
setup_env()
print("🚀 Setup complete. Continue to the next cell.")

In [2]:
from bs4 import BeautifulSoup
import requests
from typing import List
import traceback
from markdownify import markdownify as md
from youtube_transcript_api.formatters import TextFormatter
from youtube_transcript_api import YouTubeTranscriptApi

formatter = TextFormatter()

cache = {}


def get_base_url(url):
    return "/".join(url.split("/")[:3])


def get_links_from_page(url):
    global cache
    try:
        cached_item = cache.get(url)
        cached_urls = cached_item.get("urls") if cached_item else None
        if cached_urls:
            print(f"Returning cached links for {url}")
            return cached_urls, None
        base_url = get_base_url(url)
        response = requests.get(url)
        soup = BeautifulSoup(response.text, "html.parser")
        links = []
        for link in soup.find_all("a"):
            cur_link = link.get("href")
            if not cur_link:
                continue
            if not cur_link.startswith("http"):
                links.append(f"{base_url}{cur_link}")
            else:
                links.append(cur_link)
        if not cached_item:
            cache[url] = {
                "urls": links,
                "content": None,
                "url": url,
            }
        else:
            cached_item["urls"] = links
            cache[url] = cached_item
        return links, None
    except Exception as e:
        print(f"Failed to get links from {url}. Error: {e}")
        traceback.print_exc()
        return [], str(e)


def get_page_content(url):
    try:
        cached_item = cache.get(url)
        cached_content = cached_item.get("content") if cached_item else None
        if cached_content:
            print(f"Returning cached content for {url}")
            return cached_content, None
        if "youtube" in url or "youtu.be" in url or "youtube.com" in url:
            video_id = url.split("=")[-1]
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            text_transcript = formatter.format_transcript(transcript)
            return md(text_transcript), None
        response = requests.get(url)
        soup = BeautifulSoup(response.text, "html.parser")
        result = md(str(soup))
        if not cached_item:
            cache[url] = {
                "urls": None,
                "content": result,
                "url": url,
            }
        else:
            cached_item["content"] = result
            cache[url] = cached_item
        return result, None
    except Exception as e:
        print(f"Failed to get content from {url}. Error: {e}")
        traceback.print_exc()
        return "", str(e)


def get_page_content_batch(urls: List[str]):
    results = []
    for url in urls:
        cached_item = cache.get(url)
        cached_content = cached_item.get("content") if cached_item else None
        if cached_content:
            print(f"Returning cached content for {url}")
            results.append({"url": url, "content": cached_content, "error": None})
            continue
        print(f"Getting content from {url}")
        content, error = get_page_content(url)
        results.append({"url": url, "content": content, "error": error})
        if not cached_item:
            cache[url] = {
                "urls": None,
                "content": content,
                "url": url,
            }
        else:
            cached_item["content"] = content
            cache[url] = cached_item
        result_bytes_count = len(content.encode("utf-8"))
        print(f"Content from {url} fetched. Size: {result_bytes_count} bytes!")
    return results

In [3]:
from datetime import datetime

knowledge_base_cache = {}


def get_ae_blog_links():
    blogs_base_url = "https://arpitbhayani.me/blogs"
    blog_links = []
    links, error = get_links_from_page(blogs_base_url)
    if error:
        return blog_links, error
    blog_links.extend(links)
    return blog_links, None


def clean_whitespace(text: str) -> str:
    """
    Cleans the whitespace as per following rules;
    - Removes leading and trailing whitespaces.
    - Replaces multiple whitespaces with a single whitespace.
    - Replaces multiple newlines with a single newline.
    - Removes any leading or trailing newlines.
    """
    if not text:
        return ""
    multiple_whitespaces_remover = " ".join(text.split())
    return (
        multiple_whitespaces_remover.replace("\n ", "\n").replace(" \n", "\n").strip()
    )


def fetch_knowledge_base():
    knowledge_base_link = "https://arpitbhayani.me/knowledge-base"
    links_to_fetch = []
    sub_page_links, error = get_links_from_page(knowledge_base_link)
    if error:
        return [], error
    for link in sub_page_links:
        is_knowledge_base_link = "knowledge-base" in link
        if not is_knowledge_base_link:
            continue
        is_google_drive_link = "drive.google.com" in link
        if is_google_drive_link:
            continue
        result_links = []
        try:
            blog_links, error = get_links_from_page(link)
            if error:
                print(f"Failed to get links from {link}. Error: {error}")
                continue
            result_links.extend(blog_links)
        except Exception as e:
            print(f"Failed to get links from {link}. Error: {e}")
            traceback.print_exc()
        links_to_fetch.extend(result_links)
    result = get_page_content_batch(links_to_fetch)
    return result


def build_knowledge_base(output_file="ae_knowledge_base.md"):
    try:
        now = datetime.now()
        now_human_formtted = now.strftime("%d_%m_%Y_%H_%M_%S")
        output_file_name, extension = os.path.splitext(output_file)
        output_file = f"{output_file_name}_{now_human_formtted}{extension}"
        file_exists = os.path.exists(output_file)
        if not file_exists:
            with open(output_file, "w") as f:
                f.write("# AE Knowledge Base\n\n")
        blog_links, error = get_ae_blog_links()
        if error:
            return [], error
        all_content = []
        content = get_page_content_batch(blog_links)
        knowledge_base_content = fetch_knowledge_base()
        all_content.extend(knowledge_base_content)
        with open(output_file, "w") as f:
            for c in all_content:
                # c['content'] = clean_whitespace(c['content'])
                f.write(f"# {c['url']}\n")
                f.write(c["content"])
                f.write("\n\n")
        return content, None
    except Exception as e:
        print(f"Failed to build knowledge base. Error: {e}")
        traceback.print_exc()
        return [], str(e)

In [None]:
ae_links, error = get_ae_blog_links()

print(f"Found {len(ae_links)} links")

In [None]:
knowledge_base_content = fetch_knowledge_base()

print(f"Fetched {len(knowledge_base_content)} links.")

In [None]:
output_file = "ae_knowledge_base_v0_0_2.md"
contents, error = build_knowledge_base(output_file=output_file)


if error:
    print(f"Failed to build knowledge base. Error: {error}")
else:
    print(f"Knowledge base built successfully. Output file: {output_file}")
    print(f"Content: {len(contents)} sections. ")

In [None]:
#
# TODO: Things to do till this is complete.
# - Load and clean the knowledge base content.
# - Load the knowledge base content into a vector store.
# - Implement a simple RAG using Phi4 and the vector store.
# - Implement any optimizations on the RAG to improve response quality.
#