# Open source data ingestion for RAGs with dlt

Video: https://www.youtube.com/watch?v=qUNyfR_X2Mo

In this hands-on workshop, we’ll learn how to build a data ingestion pipeline using dlt to load data from a REST API into LanceDB so you can have an always up to date RAG.

​We’ll cover the following steps:

* Extract data from REST APIs
* Loading and vectorizing into LanceDB, which unlike other vector DBs stores the data _and_ the embeddings
* Incremental loading

​By the end of this workshop, you’ll be able to write a portable, OSS data pipeline for your RAG that you can deploy anywhere, such as python notebooks, virtual machines, or orchestrators like Airflow, Dagster or Mage.

# Resources

* Slides: [dlt-LLM-Zoomcamp.pdf](https://github.com/user-attachments/files/16131729/dlt.LLM.Zoomcamp.pdf)
* [Google Colab notebook](https://colab.research.google.com/drive/1nNOybHdWQiwUUuJFZu__xvJxL_ADU3xl?usp=sharing) - make a copy to follow along!


# Homework

In the workshop, we extracted contents from two pages in notion titled "Workshop: Benefits and Perks" and "Workshop: Working hours, PTO, and Vacation". 

Repeat the same process for a third page titled "Homework: Employee handbook" (hidden from public view, but accessible via API key):

1. Modify the REST API source to extract only this page.
2. Write the output into a separate table called "homework".
3. Remember to update the table name in all cells where you connect to a lancedb table.

To do this you can use the [workshop Colab](https://colab.research.google.com/drive/1nNOybHdWQiwUUuJFZu__xvJxL_ADU3xl?usp=sharing) as a basis.

Now, answer the following questions:  

In [1]:
!yes | dlt init rest_api lancedb

Looking up the init scripts in [1mhttps://github.com/dlt-hub/verified-sources.git[0m...
Cloning and configuring a verified source [1mrest_api[0m (Generic API Source)
Do you want to proceed? [Y/n]: 
Verified source [1mrest_api[0m was added to your project!
* See the usage examples and code snippets to copy from [1mrest_api_pipeline.py[0m
* Add credentials for [1mlancedb[0m and other secrets in [1m./.dlt/secrets.toml[0m
* [1mrequirements.txt[0m was created. Install it with:
pip3 install -r requirements.txt
* Read [1mhttps://dlthub.com/docs/walkthroughs/create-a-pipeline[0m for more information
yes: standard output: Broken pipe


In [2]:
import os
from datetime import datetime, timezone
import dlt
import lancedb
from openai import OpenAI
from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONResponsePaginator
from dlt.sources.helpers.requests import Response, Request
from dlt.destinations.adapters import lancedb_adapter
from rest_api import RESTAPIConfig, rest_api_source

In [3]:
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"
os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb"

In [4]:
class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

@dlt.resource(name="homework")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers":{
                "Content-Type": "application/json",
                "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "homework",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config,name="homework")

def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

@dlt.resource(
    name="homework",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
)
def rest_api_notion_incremental(
        last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
    # last_value = last_edited_time.last_value
    # print(last_value)

    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="company_policies",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="homework",
        write_disposition="merge"
    )
    print(load_info)

load_notion()



Pipeline company_policies load step completed in 10.13 seconds
1 load package(s) were loaded to destination LanceDB and into dataset notion_pages
The LanceDB destination used <dlt.destinations.impl.lancedb.configuration.LanceDBCredentials object at 0x7f32c0e1d4d0> location to store data
Load package 1720710936.9023495 is LOADED and contains no failed jobs


## Q1. Rows in LanceDB

How many rows does the lancedb table "notion_pages__homework" have?

In [5]:
db = lancedb.connect(".lancedb")
dbtable = db.open_table("notion_pages___homework")
df = dbtable.to_pandas()
print(f"The lancedb table 'notion_pages__homework' have {df.shape[0]} rows")

The lancedb table 'notion_pages__homework' have 17 rows


## Q2. Running the Pipeline: Last edited time

In the demo, we created an incremental dlt resource `rest_api_notion_incremental` that keeps track of `last_edited_time`. What value does it store after you've run your pipeline once? (Hint: you will be able to get this value by performing some aggregation function on the column `last_edited_time` of the table)

In [6]:
print(f"The last edited time after I've run my pipline once is {df['last_edited_time'].max()}")

The last edited time after I've run my pipline once is 2024-07-05 23:33:00+00:00


## Q3. Ask the Assistant 

Find out with the help of the AI assistant: how many PTO days are the employees entitled to in a year?  

In [7]:
def retrieve_context_from_lancedb(dbtable, question, top_k=2):

    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])
    return context
        
def rag_from_lancedb(query, db_name, table_name):
    # Connect to the lancedb table
    db = lancedb.connect(db_name)
    dbtable = db.open_table(table_name)

    # A system prompt telling OpenAI to accept input in the form of "Question: ... ; Context: ..."
    messages = [
        {"role": "system", "content": "You are a helpful assistant that helps users understand policies inside a company's employee handbook. The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. Please answer the question based on the provided context. For any details missing in the paragraph, encourage the employee to contact the HR for that information. Please keep the responses conversational."}
    ]

    # Retrieve the relevant paragraphs on the question
    context = retrieve_context_from_lancedb(dbtable, query, top_k=2)
    messages.append(
        {"role": "user", "content": f"Question: '{query}'; Context: '{context}'"}
    )

    client = OpenAI(
        base_url='http://localhost:11434/v1/',
        api_key='ollama',
    )

    response = client.chat.completions.create(
        model='gemma:2b',
        messages=messages
    )
    
    response_content = response.choices[0].message.content
    print(f"Question: {query}")
    print(f"Assistant: {response_content}")
    return response_content
    

db_name = ".lancedb"
table_name = "notion_pages___homework"
query = "how many PTO days are the employees entitled to in a year?"
res = rag_from_lancedb(query, db_name, table_name)



Question: how many PTO days are the employees entitled to in a year?
Assistant: Sure, here is the answer:

Hi! It seems you're curious about the number of PTO days employees are entitled to in a year. According to the handbook, employees receive 30 days of paid time off (PTO) per year.

However, it's important to note that these 30 days can be spread out throughout the year, with each employee receiving 2.5 days per month. You can take your PTO at any time after your first week with us, and you can use any time off you haven't accrued yet.

If you'd like to use your PTO, you can submit a request through our HRIS. Your manager or HR must approve your leave before you can take it.

Please don't hesitate to contact the HR department if you have any further questions or concerns about your PTO rights.


## Submit the results

* Submit your results here: https://courses.datatalks.club/llm-zoomcamp-2024/homework/workshop1
* It's possible that your answers won't match exactly. If it's the case, select the closest one.