<a href="https://colab.research.google.com/github/Bhardwaj-Saurabh/personal_knowledge_assistant/blob/master/rag_application_data_ingestion_Vector_Index.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# !pip install -r /content/drive/MyDrive/FInananceRagAgent/requirements.txt

In [2]:
# !playwright install

In [3]:
from typing_extensions import Annotated
import json
from pathlib import Path
from pydantic import BaseModel, Field
import os

from typing_extensions import Annotated

import random
import string

import tiktoken

In [4]:
import nest_asyncio
nest_asyncio.apply()

In [5]:
from google.colab import userdata
MONGODB_URI = userdata.get('MONGODB_URI')
HUGGINGFACE_ACCESS_TOKEN = userdata.get('HF_TOKEN')
OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')

os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY

In [6]:
class DocumentMetadata(BaseModel):
    id: str
    url: str
    title: str
    properties: dict

    def obfuscate(self) -> "DocumentMetadata":
        original_id = self.id.replace("-", "")
        fake_id = generate_random_hex(len(original_id))

        self.id = fake_id
        self.url = self.url.replace(original_id, fake_id)

        return self

In [7]:
def generate_random_hex(length: int) -> str:
    hex_chars = string.hexdigits.lower()
    return "".join(random.choice(hex_chars) for _ in range(length))

In [8]:
class Document(BaseModel):
    id: str = Field(default_factory=lambda: generate_random_hex(length=32))
    metadata: DocumentMetadata
    parent_metadata: DocumentMetadata | None = None
    content: str
    content_quality_score: float | None = None
    summary: str | None = None
    child_urls: list[str] = Field(default_factory=list)

    @classmethod
    def from_file(cls, file_path: Path) -> "Document":
        json_data = file_path.read_text(encoding="utf-8")

        return cls.model_validate_json(json_data)

    def add_summary(self, summary: str) -> "Document":
        self.summary = summary

        return self

    def add_quality_score(self, score: float) -> "Document":
        self.content_quality_score = score

        return self

    def write(
        self, output_dir: Path, obfuscate: bool = False, also_save_as_txt: bool = False
    ) -> None:

        output_dir.mkdir(parents=True, exist_ok=True)

        if obfuscate:
            self.obfuscate()

        json_page = self.model_dump()

        output_file = output_dir / f"{self.id}.json"
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(
                json_page,
                f,
                indent=4,
                ensure_ascii=False,
            )

        if also_save_as_txt:
            txt_path = output_file.with_suffix(".txt")
            with open(txt_path, "w", encoding="utf-8") as f:
                f.write(self.content)

    def obfuscate(self) -> "Document":
        self.metadata = self.metadata.obfuscate()
        self.parent_metadata = (
            self.parent_metadata.obfuscate() if self.parent_metadata else None
        )
        self.id = self.metadata.id

        return self

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, Document):
            return False
        return self.id == other.id

    def __hash__(self) -> int:
        return hash(self.id)

In [9]:
def __get_json_files(data_directory: Path, nesting_level: int = 0) -> list[Path]:
    if nesting_level == 0:
        return list(data_directory.glob("*.json"))
    else:
        json_files = []
        for database_dir in data_directory.iterdir():
            if database_dir.is_dir():
                nested_json_files = __get_json_files(
                    data_directory=database_dir, nesting_level=nesting_level - 1
                )
                json_files.extend(nested_json_files)

        return json_files

In [10]:
COLLECTION_NAME = 'knowledge_assistant'
MONGODB_DATABASE_NAME = 'knowledgeassistantdatabase'

In [11]:
from typing import Generic, Type, TypeVar

from bson import ObjectId
from loguru import logger
from pydantic import BaseModel
from pymongo import MongoClient, errors

T = TypeVar("T", bound=BaseModel)


class MongoDBService(Generic[T]):
    def __init__(
        self,
        model: Type[T],
        collection_name: str = COLLECTION_NAME,
        database_name: str = MONGODB_DATABASE_NAME,
        mongodb_uri: str = MONGODB_URI,
    ) -> None:
        self.model = model
        self.collection_name = collection_name
        self.database_name = database_name
        self.mongodb_uri = mongodb_uri

        try:
            self.client = MongoClient(mongodb_uri)
            self.client.admin.command("ping")
        except Exception as e:
            raise

        self.database = self.client[database_name]
        self.collection = self.database[collection_name]

    def __enter__(self) -> "MongoDBService":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close()

    def clear_collection(self) -> None:
        try:
            result = self.collection.delete_many({})
        except errors.PyMongoError as e:
            raise

    def ingest_documents(self, documents: list[T]) -> None:
        try:
            if not documents or not all(
                isinstance(doc, BaseModel) for doc in documents
            ):
                raise ValueError("Documents must be a list of Pycantic models.")

            dict_documents = [doc.model_dump() for doc in documents]

            # Remove '_id' fields to avoid duplicate key errors
            for doc in dict_documents:
                doc.pop("_id", None)

            self.collection.insert_many(dict_documents)
        except errors.PyMongoError as e:
            raise

    def fetch_documents(self, limit: int, query: dict) -> list[T]:
        try:
            documents = list(self.collection.find(query).limit(limit))
            return self.__parse_documents(documents)
        except Exception as e:
            raise

    def __parse_documents(self, documents: list[dict]) -> list[T]:
        parsed_documents = []
        for doc in documents:
            for key, value in doc.items():
                if isinstance(value, ObjectId):
                    doc[key] = str(value)

            _id = doc.pop("_id", None)
            doc["id"] = _id

            parsed_doc = self.model.model_validate(doc)
            parsed_documents.append(parsed_doc)

        return parsed_documents

    def get_collection_count(self) -> int:
        try:
            return self.collection.count_documents({})
        except errors.PyMongoError as e:
            raise

    def close(self) -> None:
        self.client.close()

## Fatch Data from MongoDB

In [12]:
def fetch_from_mongodb(
    collection_name: str,
    limit: int,
) -> Annotated[list[dict], "documents"]:
    with MongoDBService(model=Document, collection_name=collection_name) as service:
        documents = service.fetch_documents(limit, query={})

    return documents

In [13]:
documents = fetch_from_mongodb(
    collection_name = COLLECTION_NAME,
    limit = 1000,
)

In [14]:
from IPython.display import Markdown

Markdown(documents[0].content)

[Lightning AI Studios: Never set up a local environment again →](https://lightning.ai)
**
Log in or create a free Lightning.ai account to track your progress and access additional course materials[Get Started →](https://lightning.ai/sign-in?redirectTo=https%3A%2F%2Flightning.ai%2Fcourses%2Fdeep-learning-fundamentals%2F)
**
Deep Learning Fundamentals 
* Pages
  * [Deep Learning Fundamentals](https://lightning.ai/courses/deep-learning-fundamentals/)
    * [Unit 1Intro to ML and DL](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/)
      * [Unit 1.1What Is ML?](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/unit-1-1/)
      * [Unit 1.2How Can We Use ML?](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/unit-1-2/)
      * [Unit 1.3A Typical ML Workflow](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/unit-1-3/)
      * [Unit 1.4The First ML Classifier](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/ml-classifier/)
      * [Unit 1.5Setting Up Our Computing Environment](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/1-5-computing-environment/)
      * [Unit 1.6Implementing a Perceptron in Python](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/1-6-implementing-a-perceptron-in-python-parts-1-3/)
      * [Unit 1.7 Evaluating Machine Learning Models](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/1-7-evaluating-machine-learning-models-parts-1-and-2/)
      * [Unit 1 ExercisesUnit 1 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/unit-1/exercises/)
    * [Unit 2Using Tensors w/ PyTorch](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/)
      * [Unit 2.1Pytorch Intro](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/2-1-introducing-pytorch/)
      * [Unit 2.2Tensors](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/2-2-what-are-tensors-part-1-and-part-2/)
      * [Unit 2.3Using Tensors](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/2-3-how-do-we-use-tensors-in-pytorch/)
      * [Unit 2.4Linear Algebra](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/2-4-improving-code-efficiency-with-linear-algebra-parts-1-4/)
      * [Unit 2.5Debugging Code](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/2-5-debugging-code/)
      * [Unit 2.6Revisiting Perceptron w/ Tensors](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/2-6-revisiting-the-perceptron-algorithm/)
      * [Unit 2.7Computation Graphs](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/2-7-seeing-predictive-models-as-computation-graphs/)
      * [Unit 2 ExercisesExercises](https://lightning.ai/courses/deep-learning-fundamentals/2-0-unit-2-overview/unit-2-exercises/)
    * [Unit 3Model Training in PyTorch](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/)
      * [Unit 3.1Using Logistic Regression for Classification](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/3-1-using-logistic-regression-for-classification-parts-1-3/)
      * [Unit 3.2The Logistic Regression Computation Graph](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/3-2-the-logistic-regression-computation-graph/)
      * [Unit 3.3Model Training with Stochastic Gradient Descent ](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/3-3-model-training-with-stochastic-gradient-descent-part-1-4/)
      * [Unit 3.4Automatic Differentiation in PyTorch](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/3-4-automatic-differentiation-in-pytorch/)
      * [Unit 3.5The PyTorch API](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/3-5-the-pytorch-api-parts-1-2/)
      * [Unit 3.6Training a Logistic Regression Model in PyTorch](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/3-6-training-a-logistic-regression-model-in-pytorch-parts-1-3/)
      * [Unit 3.7 Feature Normalization](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/3-7-feature-normalization-parts-1-2/)
      * [Unit 3 ExercisesUnit 3 Exercies](https://lightning.ai/courses/deep-learning-fundamentals/3-0-overview-model-training-in-pytorch/unit-3-exercises/)
    * [Unit 4Training Multilayer Neural Networks Overview](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/)
      * [Unit 4.1Logistic Regression for Multiple Classes](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/logistic-regression-for-multiple-classes-part-1-5/)
      * [Unit 4.2Multilayer Neural Networks](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/4-2-multilayer-neural-networks-part-1-3/)
      * [Unit 4.3Training a Multilayer Neural Network in PyTorch](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/4-3-training-a-multilayer-neural-network-in-pytorch-part-1-5/)
      * [Unit 4.4Defining Efficient Data Loaders](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/4-4-defining-efficient-data-loaders-part-1-4/)
      * [Unit 4.5Multilayer Neural Networks for Regression](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/4-5-multilayer-neural-networks-for-regression-parts-1-2/)
      * [Unit 4.6Speeding Up Model Training Using GPUs](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/4-6-speeding-up-model-training-using-gpus/)
      * [Unit 4 ExercisesUnit 4 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/training-multilayer-neural-networks-overview/unit-4-exercises/)
    * [Unit 5Organizing Your Code with Lightning](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/)
      * [Unit 5.1 Organizing Your Code with Lightning](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/organizing-your-code-with-lightning/)
      * [Unit 5.2Training a Multilayer Perceptron using the Lightning Trainer](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/5-2-training-a-multilayer-perceptron-using-the-lightning-trainer/)
      * [Unit 5.3Computing Metrics Efficiently with TorchMetrics](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/5-3-computing-metrics-efficiently-with-torchmetrics/)
      * [Unit 5.4Making Code Reproducible](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/5-4-making-code-reproducible/)
      * [Unit 5.5Organizing Your Data Loaders with Data Modules](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/5-5-organizing-your-data-loaders-with-data-modules/)
      * [Unit 5.6The Benefits of Logging Your Model Training](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/5-6-the-benefits-of-logging-your-model-training/)
      * [Unit 5.7Evaluating and Using Models on New Data](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/5-7-evaluating-and-using-models-on-new-data/)
      * [Unit 5.8Add Functionality with Callbacks](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/5-8-add-functionality-with-callbacks/)
      * [Unit 5 ExercisesUnit 5 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/overview-organizing-your-code-with-pytorch-lightning/unit-5-exercises/)
    * [Unit 6Essential Deep Learning Tips & Tricks](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/)
      * [Unit 6.1 Model Checkpointing and Early Stopping](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6.1-model-checkpointing-and-early-stopping/)
      * [Unit 6.2Learning Rates and Learning Rate Schedulers](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6.2-learning-rates-and-learning-rate-schedulers/)
      * [Unit 6.3Using More Advanced Optimization Algorithms](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6.3-using-more-advanced-optimization-algorithms/)
      * [Unit 6.4Choosing Activation Functions](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6.4-choosing-activation-functions/)
      * [Unit 6.5Automating The Hyperparameter Tuning Process](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6.5-automating-the-hyperparameter-tuning-process/)
      * [Unit 6.6Improving Convergence with Batch Normalization](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6.6-improving-convergence-with-batch-normalization/)
      * [Unit 6.7Reducing Overfitting With Dropout](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/6.7-reducing-overfitting-with-dropout/)
      * [Unit 6.8Debugging Deep Neural Networks](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/6.8-debugging-deep-neural-networks/)
      * [Unit 6 ExercisesUnit 6 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6-exercises/)
    * [Unit 7Getting Started with Computer Vision](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/)
      * [Unit 7.1Working With Images](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7.1-working-with-images/)
      * [Unit 7.2How Convolutional Neural Networks Work](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7.2-how-convolutional-neural-networks-work/)
      * [Unit 7.3Convolutional Neural Network Architectures](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7.3-convolutional-neural-network-architectures/)
      * [Unit 7.4Training Convolutional Neural Networks](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7.4-training-convolutional-neural-networks/)
      * [Unit 7.5Improving Predictions with Data Augmentation](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7.5-improving-predictions-with-data-augmentation/)
      * [Unit 7.6Leveraging Pretrained Models with Transfer Learning](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7.6-leveraging-pretrained-models-with-transfer-learning/)
      * [Unit 7.7Using Unlabeled Data with Self-Supervised](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7.7-using-unlabeled-data-with-self-supervised/)
      * [Unit 7 ExercisesUnit 7 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/unit-7-overview-getting-started-with-computer-vision/unit-7-exercises/)
    * [Unit 8Natural Language Processing and Large Language Models](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/)
      * [Unit 8.1Working with Text Data](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/8.1-working-with-text-data/)
      * [Unit 8.2Training A Text Classifier Baseline](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/8.2-training-a-text-classifier-baseline/)
      * [Unit 8.3Introduction to Recurrent Neural Networks](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/8.3-introduction-to-recurrent-neural-networks/)
      * [Unit 8.4From RNNs to the Transformer Architecture](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/8.4-from-rnns-to-the-transformer-architecture/)
      * [Unit 8.5Understanding Self-Attention](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/8.5-understanding-self-attention/)
      * [Unit 8.6Large Language Models](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/8.6-large-language-models/)
      * [Unit 8.7A Large Language Model for Classification](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/8.7-a-large-language-model-for-classification/)
      * [Unit 8 ExercisesUnit 8 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/unit-8.0-natural-language-processing-and-large-language-models/unit-8-exercises/)
    * [Unit 9Techniques for Speeding Up Model Training](https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/)
      * [Unit 9.1Accelerated Model Training via Mixed-Precision Training](https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/unit-9.1-accelerated-model-training-via-mixed-precision-training/)
      * [Unit 9.2Multi-GPU Training Strategies](https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/unit-9.2-multi-gpu-training-strategies/)
      * [Unit 9.3Deep Dive Into Data Parallelism](https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/unit-9.3-deep-dive-into-data-parallelism/)
      * [Unit 9.4Compiling PyTorch Models](https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/unit-9.4-compiling-pytorch-models/)
      * [Unit 9.5Increasing Batch Sizes to Increase Throughput ](https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/unit-9.5-increasing-batch-sizes-to-increase-throughput/)
      * [Unit 9 ExercisesUnit 9 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/unit-9-exercises/)
    * [Unit 10 The Finale: Our Next Steps After AI Model Training](https://lightning.ai/courses/deep-learning-fundamentals/10.0-overview-the-finale-our-next-steps-after-ai-model-training/)
      * [Unit 10.1Trustworthy and Reliable Machine Learning](https://lightning.ai/courses/deep-learning-fundamentals/10.0-overview-the-finale-our-next-steps-after-ai-model-training/10.1-trustworthy-and-reliable-machine-learning/)
      * [Unit 10.2Scaling PyTorch Models without Boilerplate Code](https://lightning.ai/courses/deep-learning-fundamentals/10.0-overview-the-finale-our-next-steps-after-ai-model-training/10.2-fabric-scaling-pytorch-models-without-boilerplate-code/)
      * [Unit 10.3Designing Machine Learning Systems](https://lightning.ai/courses/deep-learning-fundamentals/10.0-overview-the-finale-our-next-steps-after-ai-model-training/10.3-designing-machine-learning-systems/)
      * [Unit 10.4Conclusion](https://lightning.ai/courses/deep-learning-fundamentals/10.0-overview-the-finale-our-next-steps-after-ai-model-training/10.4-conclusion/)
      * [Unit 10 ExercisesUnit 10 Exercises](https://lightning.ai/courses/deep-learning-fundamentals/10.0-overview-the-finale-our-next-steps-after-ai-model-training/unit-10-exercises/)


[Final certification exam](https://lightning.ai/ai-education/deep-learning-fundamentals/certification/)
[Deep Learning Fundamentals](https://lightning.ai/pages/courses/deep-learning-fundamentals/) > Deep Learning Fundamentals
  * Share:
  * [![Tweet](https://lightningaidev.wpengine.com/wp-content/themes/lightning-wp/assets/images/icons/twitter.svg)](https://twitter.com/intent/tweet?source=http%3A%2F%2Flightning.ai&text=:%20http%3A%2F%2Flightning.ai&via=LightningAI "Tweet")
  * [![Submit to Reddit](https://lightningaidev.wpengine.com/wp-content/themes/lightning-wp/assets/images/icons/reddit.svg)](http://www.reddit.com/submit?url=http%3A%2F%2Flightning.ai&title= "Submit to Reddit")
  * [![Share on LinkedIn](https://lightningaidev.wpengine.com/wp-content/themes/lightning-wp/assets/images/icons/linkedin.svg)](http://www.linkedin.com/shareArticle?mini=true&url=http%3A%2F%2Flightning.ai&title=&summary=&source=http%3A%2F%2Flightning.ai "Share on LinkedIn")


Course Progress:
#  Deep Learning Fundamentals
[Start Course](https://lightning.ai/pages/courses/deep-learning-fundamentals/unit-1/unit-1-1/)
## Welcome to Deep Learning Fundamentals
Deep Learning Fundamentals is a free course on learning deep learning using a modern open-source stack.
If you found this page, you probably heard that artificial intelligence and deep learning are taking the world by storm. This is correct. In this course, [Sebastian Raschka](https://sebastianraschka.com/), a best-selling author and professor, will teach you deep learning (machine learning with deep learning) from the ground up via a course of 10 units with bite-sized videos, quizzes, and exercises. The entire course is free and uses the most popular open-source tools for deep learning.
**What will you learn in this course?**
  * What machine learning is and when to use it
  * The main concepts of deep learning
  * How to design deep learning experiments with PyTorch
  * How to write efficient deep learning code with PyTorch Lightning


**What will you be able to do after this course?**
  * Build classifiers for various kinds of data like tables, images, and text
  * Tune models effectively to optimize predictive and computational performance


**How is this course structured?**
  * The course consists of 10 units, each containing several subsections
  * It is centered around informative, succinct videos that are respectful of your time
  * In each unit, you will find optional exercises to practice your knowledge
  * We also provide additional resources for those who want a deep dive on specific topics


**What are the prerequisites?**
  * Ideally, you should already be familiar with programming in Python
  * (Some lectures will involve a tiny bit of math, but a strong math background is not required!)


**Are there interactive quizzes or exercises?**
  * Each section is accompanied by optional multiple-choice quizzes to test your understanding of the material
  * Optionally, each unit also features one or more code exercises to practice implementing concepts covered in this class


**Is there a course completion badge or certificate?**
  * At the end of this course, you can take an optional exam featuring 25 multiple-choice questions
  * Upon answering 80% of the questions in the exam correctly (there are 5 attempts), you obtain an optional course completion badge that can be shared on LinkedIn

[Start Course](https://lightning.ai/pages/courses/deep-learning-fundamentals/unit-1/unit-1-1/)
**Log in or create a free Lightning.ai account to access:**
  * Quizzes
  * Completion badges
  * Progress tracking
  * Additional downloadable content
  * Additional AI education resources
  * Notifications when new units are released
  * Free cloud computing credits

[Sign Up or Log In](https://lightning.ai/sign-in?redirectTo=https%3A%2F%2Flightning.ai%2Fcourses%2Fdeep-learning-fundamentals%2F)
#####  Watch Video 1
##### Videos
![](https://t.co/1/i/adsct?=&bci=4&dv=UTC%26en-US%26Google%20Inc.%26Linux%20x86_64%26255%261080%26600%262%2624%261080%26600%260%26na&eci=3&event=%7B%22%22%3A%22%22%7D&event_id=ced468f8-e912-4e7d-88f9-ca7345132140&integration=gtm&p_id=Twitter&p_user_id=0&pl_id=8c30a3f6-c71f-4138-93b4-02957c8b251a&tw_document_href=https%3A%2F%2Flightning.ai%2Fcourses%2Fdeep-learning-fundamentals%2F&tw_iframe_status=0&txn_id=p06ii&type=javascript&version=2.3.33)![](https://analytics.twitter.com/1/i/adsct?=&bci=4&dv=UTC%26en-US%26Google%20Inc.%26Linux%20x86_64%26255%261080%26600%262%2624%261080%26600%260%26na&eci=3&event=%7B%22%22%3A%22%22%7D&event_id=ced468f8-e912-4e7d-88f9-ca7345132140&integration=gtm&p_id=Twitter&p_user_id=0&pl_id=8c30a3f6-c71f-4138-93b4-02957c8b251a&tw_document_href=https%3A%2F%2Flightning.ai%2Fcourses%2Fdeep-learning-fundamentals%2F&tw_iframe_status=0&txn_id=p06ii&type=javascript&version=2.3.33)


## Filter the documents by a predefined Quality Score

In [15]:

def filter_by_quality(
    documents: list[Document],
    content_quality_score_threshold: float,
) -> Annotated[list[Document], "filtered_documents"]:
    assert 0 <= content_quality_score_threshold <= 1, (
        "Content quality score threshold must be between 0 and 1"
    )

    valid_docs = [
        doc
        for doc in documents
        if not doc.content_quality_score
        or doc.content_quality_score > content_quality_score_threshold
    ]

    return valid_docs

In [16]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Generator, Literal, Union
from tqdm import tqdm

from langchain_core.documents import Document as LangChainDocument
from langchain_mongodb.retrievers import (
    MongoDBAtlasParentDocumentRetriever,
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings

## Define embedding models - either openai or Huggingface

In [17]:
EmbeddingModelType = Literal["openai", "huggingface"]
EmbeddingsModel = Union[OpenAIEmbeddings, HuggingFaceEmbeddings]


def get_embedding_model(
    model_id: str,
    model_type: EmbeddingModelType = "huggingface",
    device: str = "cpu",
) -> EmbeddingsModel:

    if model_type == "openai":
        return get_openai_embedding_model(model_id)
    elif model_type == "huggingface":
        return get_huggingface_embedding_model(model_id, device)
    else:
        raise ValueError(f"Invalid embedding model type: {model_type}")


def get_openai_embedding_model(model_id: str) -> OpenAIEmbeddings:
    return OpenAIEmbeddings(
        model=model_id,
        allowed_special={"<|endoftext|>"},
    )


def get_huggingface_embedding_model(
    model_id: str, device: str
) -> HuggingFaceEmbeddings:
    return HuggingFaceEmbeddings(
        model_name=model_id,
        model_kwargs={"device": device, "trust_remote_code": True},
        encode_kwargs={"normalize_embeddings": False},
    )

## Contextual summary using OpenAI

In [24]:
import asyncio
import os

import psutil
from litellm import acompletion
from loguru import logger
from openai import AsyncOpenAI
from pydantic import BaseModel
from tqdm.asyncio import tqdm

HUGGINGFACE_DEDICATED_ENDPOINT = "meta-llama/Llama-4-Maverick-17B-128E-Instruct"
HUGGINGFACE_ACCESS_TOKEN = userdata.get('HF_TOKEN')

class ContextualDocument(BaseModel):
    content: str
    chunk: str | None = None
    contextual_summarization: str | None = None

    def add_contextual_summarization(self, summary: str) -> "ContextualDocument":
        self.contextual_summarization = summary
        return self


class ContextualSummarizationAgent:

    SYSTEM_PROMPT_TEMPLATE = """You are a helpful assistant specialized in summarizing documents relative to a given chunk.
    <document>
    {content}
    </document>
    Here is the chunk we want to situate within the whole document
    <chunk>
    {chunk}
    </chunk>
    Please give a short succinct context of maximum {characters} characters to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk. Answer only with the succinct context and nothing else.
    """

    def __init__(
        self,
        model_id: str = "gpt-4o-mini",
        max_characters: int = 128,
        mock: bool = False,
        max_concurrent_requests: int = 4,
    ) -> None:
        self.model_id = model_id
        self.max_characters = max_characters
        self.mock = mock
        self.max_concurrent_requests = max_concurrent_requests

    def __call__(self, content: str, chunks: list[str]) -> list[str]:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            results = asyncio.run(self.__summarize_context_batch(content, chunks))
        else:
            results = loop.run_until_complete(
                self.__summarize_context_batch(content, chunks)
            )

        return results

    async def __summarize_context_batch(
        self, content: str, chunks: list[str]
    ) -> list[str]:
        process = psutil.Process(os.getpid())
        start_mem = process.memory_info().rss
        total_chunks = len(chunks)
        logger.debug(
            f"Starting contextual summarization for {total_chunks} chunks with {self.max_concurrent_requests} concurrent requests. "
            f"Initial memory usage: {start_mem // (1024 * 1024)} MB"
        )

        documents = [
            ContextualDocument(content=content, chunk=chunk) for chunk in chunks
        ]

        summarized_documents = await self.__process_batch(
            documents, await_time_seconds=7
        )
        documents_with_summaries = [
            doc
            for doc in summarized_documents
            if doc.contextual_summarization is not None
        ]
        documents_without_summaries = [
            doc for doc in documents if doc.contextual_summarization is None
        ]

        # Retry failed documents with increased await time
        if documents_without_summaries:
            logger.info(
                f"Retrying {len(documents_without_summaries)} failed documents with increased await time..."
            )
            retry_results = await self.__process_batch(
                documents_without_summaries, await_time_seconds=20
            )
            documents_with_summaries += retry_results

        end_mem = process.memory_info().rss
        memory_diff = end_mem - start_mem
        logger.debug(
            f"Contextual summarization completed. "
            f"Final memory usage: {end_mem // (1024 * 1024)} MB, "
            f"Memory difference: {memory_diff // (1024 * 1024)} MB"
        )

        success_count = len(documents_with_summaries)
        failed_count = total_chunks - success_count
        logger.info(
            f"Contextual summarization results: "
            f"{success_count}/{total_chunks} chunks summarized successfully ✓ | "
            f"{failed_count}/{total_chunks} chunks failed ✗"
        )

        contextual_chunks = []
        for doc in documents_with_summaries:
            if doc.contextual_summarization is not None:
                chunk = f"{doc.contextual_summarization}\n\n{doc.chunk}"
            else:
                chunk = f"{doc.chunk}"

            contextual_chunks.append(chunk)

        return contextual_chunks

    async def __process_batch(
        self, documents: list[ContextualDocument], await_time_seconds: int
    ) -> list[ContextualDocument]:

        semaphore = asyncio.Semaphore(self.max_concurrent_requests)
        tasks = [
            self.__summarize_context(
                document, semaphore, await_time_seconds=await_time_seconds
            )
            for document in documents
        ]
        results = []
        for coro in tqdm(
            asyncio.as_completed(tasks),
            total=len(documents),
            desc="Processing documents",
            unit="doc",
        ):
            result = await coro
            results.append(result)

        return results

    async def __summarize_context(
        self,
        document: ContextualDocument,
        semaphore: asyncio.Semaphore | None = None,
        await_time_seconds: int = 2,
    ) -> ContextualDocument:
        if self.mock:
            return document.add_contextual_summarization("This is a mock summary")

        async def process_document() -> ContextualDocument:
            try:
                response = await acompletion(
                    model=self.model_id,
                    messages=[
                        {
                            "role": "system",
                            "content": self.SYSTEM_PROMPT_TEMPLATE.format(
                                characters=self.max_characters,
                                content=document.content[
                                    :6000
                                ],  # Keep it short to lower latency and costs.
                                chunk=document.chunk,
                            ),
                        },
                    ],
                    stream=False,
                    temperature=0,
                )
                await asyncio.sleep(await_time_seconds)  # Rate limiting

                if not response.choices:
                    logger.warning("No contextual summary generated for chunk")
                    return document

                context_summary: str = response.choices[0].message.content
                return document.add_contextual_summarization(context_summary)
            except Exception as e:
                logger.warning(f"Failed to generate contextual summary: {str(e)}")
                return document

        if semaphore:
            async with semaphore:
                return await process_document()

        return await process_document()




## Contextual summary using HuggingFace LLama-4

In [25]:
class SimpleSummarizationAgent:

    SYSTEM_PROMPT_TEMPLATE = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

    ### Instruction:
    You are a helpful assistant specialized in summarizing documents for the purposes of improving semantic and keyword search retrieval.
    Generate a concise TL;DR summary in plain text format having a maximum of {characters} characters of the key findings from the provided documents,
    highlighting the most significant insights. Answer only with the succinct context and nothing else.

    ### Input:
    {content}

    ### Response:
    """

    def __init__(
        self,
        model_id: str = "gpt-4o-mini",
        base_url: str | None = HUGGINGFACE_DEDICATED_ENDPOINT,
        api_key: str | None = HUGGINGFACE_ACCESS_TOKEN,
        max_characters: int = 128,
        mock: bool = False,
        max_concurrent_requests: int = 4,
    ) -> None:
        self.model_id = model_id
        self.base_url = base_url
        self.api_key = api_key
        self.max_characters = max_characters
        self.mock = mock
        self.max_concurrent_requests = max_concurrent_requests

        if self.model_id == "tgi":
            assert self.base_url and self.api_key, (
                "Base URL and API key are required for TGI Hugging Face Dedicated Endpoint"
            )

            self.client = AsyncOpenAI(
                base_url=self.base_url,
                api_key=self.api_key,
            )
        else:
            self.client = AsyncOpenAI()

    def __call__(self, content: str, chunks: list[str]) -> list[str]:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            results = asyncio.run(self.__summarize_context_batch(content, chunks))
        else:
            results = loop.run_until_complete(
                self.__summarize_context_batch(content, chunks)
            )

        return results

    async def __summarize_context_batch(
        self, content: str, chunks: list[str]
    ) -> list[str]:
        process = psutil.Process(os.getpid())
        start_mem = process.memory_info().rss
        logger.debug(
            f"Starting summarizing document."
            f"Initial memory usage: {start_mem // (1024 * 1024)} MB"
        )

        document = await self.__summarize(
            document=ContextualDocument(content=content), await_time_seconds=20
        )

        end_mem = process.memory_info().rss
        memory_diff = end_mem - start_mem
        logger.debug(
            f"Summarization completed. "
            f"Final memory usage: {end_mem // (1024 * 1024)} MB, "
            f"Memory difference: {memory_diff // (1024 * 1024)} MB"
        )

        contextual_chunks = []
        for chunk in chunks:
            if document.contextual_summarization is not None:
                chunk = f"{document.contextual_summarization}\n\n{chunk}"
            else:
                chunk = f"{chunk}"

            contextual_chunks.append(chunk)

        return contextual_chunks

    async def __summarize(
        self,
        document: ContextualDocument,
        await_time_seconds: int = 2,
    ) -> ContextualDocument:
        if self.mock:
            return document.add_contextual_summarization("This is a mock summary")

        async def process_document() -> ContextualDocument:
            try:
                response = await self.client.chat.completions.create(
                    model=self.model_id,
                    messages=[
                        {
                            "role": "system",
                            "content": self.SYSTEM_PROMPT_TEMPLATE.format(
                                characters=self.max_characters, content=document.content
                            ),
                        },
                    ],
                    stream=False,
                    temperature=0,
                )
                await asyncio.sleep(await_time_seconds)  # Rate limiting

                if not response.choices:
                    logger.warning("No contextual summary generated for chunk")
                    return document

                context_summary: str = response.choices[0].message.content
                return document.add_contextual_summarization(context_summary)
            except Exception as e:
                logger.warning(f"Failed to generate contextual summary: {str(e)}")
                return document

        return await process_document()

# Chunk dataset

In [21]:
from typing import Callable, Literal, Union

from langchain_text_splitters import RecursiveCharacterTextSplitter
from loguru import logger

# Add type definitions at the top of the file
SummarizationType = Literal["contextual", "simple", "none"]
SummarizationAgent = Union[ContextualSummarizationAgent, SimpleSummarizationAgent]


def get_splitter(
    chunk_size: int, summarization_type: SummarizationType = "none", **kwargs
) -> RecursiveCharacterTextSplitter:
    chunk_overlap = int(0.15 * chunk_size)

    if summarization_type == "none":
        return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
            encoding_name="cl100k_base",
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )

    if summarization_type == "contextual":
        handler = ContextualSummarizationAgent(**kwargs)
    elif summarization_type == "simple":
        handler = SimpleSummarizationAgent(**kwargs)

    return HandlerRecursiveCharacterTextSplitter.from_tiktoken_encoder(
        encoding_name="cl100k_base",
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        handler=handler,
    )




In [26]:
class HandlerRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):

    def __init__(
        self,
        handler: Callable[[str, list[str]], list[str]] | None = None,
        *args,
        **kwargs,
    ) -> None:
        super().__init__(*args, **kwargs)

        self.handler = handler if handler is not None else lambda _, x: x

    def split_text(self, text: str) -> list[str]:
        chunks = super().split_text(text)
        parsed_chunks = self.handler(text, chunks)

        return parsed_chunks

## Define Retrievers

In [39]:
from typing import Literal, Union

from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_mongodb.retrievers import (
    MongoDBAtlasHybridSearchRetriever,
    MongoDBAtlasParentDocumentRetriever,
)

# Add these type definitions at the top of the file
RetrieverType = Literal["contextual", "parent"]
RetrieverModel = Union[
    MongoDBAtlasHybridSearchRetriever, MongoDBAtlasParentDocumentRetriever
]

def get_hybrid_search_retriever(
    embedding_model: EmbeddingsModel, k: int
) -> MongoDBAtlasHybridSearchRetriever:
    vectorstore = MongoDBAtlasVectorSearch.from_connection_string(
        connection_string=MONGODB_URI,
        embedding=embedding_model,
        namespace=f"{MONGODB_DATABASE_NAME}.rag",
        text_key="chunk",
        embedding_key="embedding",
        relevance_score_fn="dotProduct",
    )

    retriever = MongoDBAtlasHybridSearchRetriever(
        vectorstore=vectorstore,
        search_index_name="chunk_text_search",
        top_k=k,
        vector_penalty=50,
        fulltext_penalty=50,
    )

    return retriever


def get_parent_document_retriever(
    embedding_model: EmbeddingsModel, k: int = 3
) -> MongoDBAtlasParentDocumentRetriever:
    retriever = MongoDBAtlasParentDocumentRetriever.from_connection_string(
        connection_string=MONGODB_URI,
        embedding_model=embedding_model,
        child_splitter=get_splitter(200),
        parent_splitter=get_splitter(800),
        database_name=MONGODB_DATABASE_NAME,
        collection_name="rag",
        text_key="page_content",
        search_kwargs={"k": k},
    )

    return retriever

In [40]:
def process_docs(
    retriever: Any,
    docs: list[LangChainDocument],
    splitter: RecursiveCharacterTextSplitter,
    batch_size: int = 4,
    max_workers: int = 2,
) -> list[None]:
    batches = list(get_batches(docs, batch_size))
    results = []
    total_docs = len(docs)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(process_batch, retriever, batch, splitter)
            for batch in batches
        ]

        with tqdm(total=total_docs, desc="Processing documents") as pbar:
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                pbar.update(batch_size)

    return results





In [41]:
def get_batches(
    docs: list[LangChainDocument], batch_size: int
) -> Generator[list[LangChainDocument], None, None]:
    for i in range(0, len(docs), batch_size):
        yield docs[i : i + batch_size]


In [42]:
def process_batch(
    retriever: Any,
    batch: list[LangChainDocument],
    splitter: RecursiveCharacterTextSplitter,
) -> None:
    try:
        if isinstance(retriever, MongoDBAtlasParentDocumentRetriever):
            retriever.add_documents(batch)
        else:
            split_docs = splitter.split_documents(batch)
            retriever.vectorstore.add_documents(split_docs)

        logger.info(f"Successfully processed {len(batch)} documents.")
    except Exception as e:
        logger.warning(f"Error processing batch of {len(batch)} documents: {str(e)}")

In [44]:
def get_retriever(
    embedding_model_id: str,
    embedding_model_type: EmbeddingModelType = "huggingface",
    retriever_type: RetrieverType = "contextual",
    k: int = 3,
    device: str = "cpu",
) -> RetrieverModel:
    embedding_model = get_embedding_model(
        embedding_model_id, embedding_model_type, device
    )

    if retriever_type == "contextual":
        return get_hybrid_search_retriever(embedding_model, k)
    elif retriever_type == "parent":
        return get_parent_document_retriever(embedding_model, k)
    else:
        raise ValueError(f"Invalid retriever type: {retriever_type}")

In [49]:
from langchain_mongodb.index import create_fulltext_search_index

class MongoDBIndex:
    def __init__(
        self,
        retriever,
        mongodb_client: MongoDBService,
    ) -> None:
        self.retriever = retriever
        self.mongodb_client = mongodb_client

    def create(
        self,
        embedding_dim: int,
        is_hybrid: bool = False,
    ) -> None:
        vectorstore = self.retriever.vectorstore

        vectorstore.create_vector_search_index(
            dimensions=embedding_dim,
        )
        if is_hybrid:
            create_fulltext_search_index(
                collection=self.mongodb_client.collection,
                field=vectorstore._text_key,
                index_name=self.retriever.search_index_name,
            )

In [50]:
def chunk_embed_load(
    documents: list[Document],
    collection_name: str,
    processing_batch_size: int,
    processing_max_workers: int,
    retriever_type: RetrieverType,
    embedding_model_id: str,
    embedding_model_type: EmbeddingModelType,
    embedding_model_dim: int,
    chunk_size: int,
    contextual_summarization_type: SummarizationType = "none",
    contextual_agent_model_id: str | None = None,
    contextual_agent_max_characters: int | None = None,
    mock: bool = False,
    device: str = "cpu",
) -> None:

    retriever = get_retriever(
        embedding_model_id=embedding_model_id,
        embedding_model_type=embedding_model_type,
        retriever_type=retriever_type,
        device=device,
    )
    splitter = get_splitter(
        chunk_size=chunk_size,
        summarization_type=contextual_summarization_type,
        model_id=contextual_agent_model_id,
        max_characters=contextual_agent_max_characters,
        mock=mock,
        max_concurrent_requests=processing_max_workers,
    )

    with MongoDBService(
        model=Document, collection_name=collection_name
    ) as mongodb_client:
        mongodb_client.clear_collection()

        docs = [
            LangChainDocument(
                page_content=doc.content, metadata=doc.metadata.model_dump()
            )
            for doc in documents
            if doc
        ]
        process_docs(
            retriever,
            docs,
            splitter=splitter,
            batch_size=processing_batch_size,
            max_workers=processing_max_workers,
        )

        index = MongoDBIndex(
            retriever=retriever,
            mongodb_client=mongodb_client,
        )
        index.create(
            embedding_dim=embedding_model_dim,
            is_hybrid=retriever_type == "contextual",
        )

In [51]:
extract_collection_name = COLLECTION_NAME
fetch_limit = 30
load_collection_name = 'rag'
content_quality_score_threshold = 0.6
retriever_type = 'contextual'
embedding_model_id  = 'text-embedding-3-small'
embedding_model_type = 'openai'
embedding_model_dim = 1536
chunk_size = 3072
contextual_summarization_type = 'contextual'
contextual_agent_model_id = 'gpt-4o-mini'
contextual_agent_max_characters = 128
mock = False
processing_batch_size = 2
processing_max_workers = 2
device = 'cpu' # or cuda (for Nvidia GPUs) or mps (for Apple M1/M2/M3 chips)

In [54]:
def compute_rag_vector_index(
    extract_collection_name: str,
    fetch_limit: int,
    load_collection_name: str,
    content_quality_score_threshold: float,
    retriever_type: RetrieverType,
    embedding_model_id: str,
    embedding_model_type: EmbeddingModelType,
    embedding_model_dim: int,
    chunk_size: int,
    contextual_summarization_type: SummarizationType = "none",
    contextual_agent_model_id: str | None = None,
    contextual_agent_max_characters: int | None = None,
    mock: bool = False,
    processing_batch_size: int = 256,
    processing_max_workers: int = 10,
    device: str = "cpu",
) -> None:
    documents = fetch_from_mongodb(
        collection_name=extract_collection_name, limit=fetch_limit
    )

    documents = documents[:8]

    documents = filter_by_quality(
        documents=documents,
        content_quality_score_threshold=content_quality_score_threshold,
    )
    chunk_embed_load(
        documents=documents,
        collection_name=load_collection_name,
        processing_batch_size=processing_batch_size,
        processing_max_workers=processing_max_workers,
        retriever_type=retriever_type,
        embedding_model_id=embedding_model_id,
        embedding_model_type=embedding_model_type,
        embedding_model_dim=embedding_model_dim,
        chunk_size=chunk_size,
        contextual_summarization_type=contextual_summarization_type,
        contextual_agent_model_id=contextual_agent_model_id,
        contextual_agent_max_characters=contextual_agent_max_characters,
        mock=mock,
        device=device,
    )

In [56]:
compute_rag_vector_index(extract_collection_name = COLLECTION_NAME,
                        fetch_limit = 30,
                        load_collection_name = 'rag',
                        content_quality_score_threshold = 0.1,
                        retriever_type = 'contextual',
                        embedding_model_id  = 'text-embedding-3-small',
                        embedding_model_type = 'openai',
                        embedding_model_dim = 1536,
                        chunk_size = 3072,
                        contextual_summarization_type = 'contextual',
                        contextual_agent_model_id = 'gpt-4o-mini',
                        contextual_agent_max_characters = 128,
                        mock = False,
                        processing_batch_size = 2,
                        processing_max_workers = 2,
                        device = 'cpu')

[32m2025-06-06 08:38:49.788[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_splitter[0m:[36m16[0m - [1mGetting splitter with chunk size: 3072 and overlap: 460[0m
Processing documents:   0%|          | 0/8 [00:00<?, ?it/s][32m2025-06-06 08:38:50.820[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m__summarize_context_batch[0m:[36m67[0m - [34m[1mStarting contextual summarization for 1 chunks with 2 concurrent requests. Initial memory usage: 428 MB[0m

Processing documents:   0%|          | 0/1 [00:00<?, ?doc/s][A[32m2025-06-06 08:38:50.865[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36m__summarize_context_batch[0m:[36m67[0m - [34m[1mStarting contextual summarization for 3 chunks with 2 concurrent requests. Initial memory usage: 428 MB[0m


Processing documents:   0%|          | 0/3 [00:00<?, ?doc/s][A[A
Processing documents: 100%|██████████| 1/1 [00:07<00:00,  7.83s/doc]
[32m2025-06-06 08:38:58.661[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36