# Workshop 1 - dlt

## Q1. Rows in LanceDB

In [8]:
# !pip install dlt[lancedb]==0.5.1a0
# !pip install sentence-transformers

### Part 1: Create a Notion -> LanceDB pipeline using dlt

In [10]:
!yes | dlt init rest_api lancedb

Looking up the init scripts in [1mhttps://github.com/dlt-hub/verified-sources.git[0m...
No files to update, exiting
yes: standard output: Broken pipe


In [9]:
import dlt
from rest_api import RESTAPIConfig, rest_api_source

from dlt.sources.helpers.rest_client.paginators import (
    BasePaginator,
    JSONResponsePaginator,
)
from dlt.sources.helpers.requests import Response, Request

from dlt.destinations.adapters import lancedb_adapter

In [11]:
from datetime import datetime, timezone


class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor


@dlt.resource(name="homework")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {"token": dlt.secrets["sources.rest_api.notion.api_key"]},
            "headers": {
                "Content-Type": "application/json",
                "Notion-Version": "2022-06-28",
            },
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "homework",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time",
                        },
                    },
                    "data_selector": "results",
                },
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id",
                        }
                    },
                },
            },
        ],
    }

    yield from rest_api_source(notion_config, name="homework")


def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc),
    }


@dlt.resource(
    name="homework",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time": {"dedup_sort": "desc"}},
)
def rest_api_notion_incremental(
    last_edited_time=dlt.sources.incremental(
        "last_edited_time",
        initial_value="2024-06-26T08:16:00.000Z",
        primary_key=("block_id"),
    ),
):
    # last_value = last_edited_time.last_value
    # print(last_value)

    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not (len(block["content"])):
            continue
        yield block


def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="for_homework",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(rest_api_notion_incremental, embed="content"),
        table_name="homework",
        write_disposition="merge",
    )
    print(load_info)


load_notion()

_dlt_loads
[{'name': 'load_id', 'data_type': 'text', 'nullable': False}, {'name': 'schema_name', 'data_type': 'text', 'nullable': True}, {'name': 'status', 'data_type': 'bigint', 'nullable': False}, {'name': 'inserted_at', 'data_type': 'timestamp', 'nullable': False}, {'name': 'schema_version_hash', 'data_type': 'text', 'nullable': True}]
homework
[{'name': 'block_id', 'nullable': False, 'primary_key': True, 'data_type': 'text'}, {'name': 'block_type', 'data_type': 'text', 'nullable': True}, {'name': 'content', 'x-lancedb-embed': True, 'data_type': 'text', 'nullable': True}, {'dedup_sort': 'desc', 'name': 'last_edited_time', 'data_type': 'timestamp', 'nullable': True}, {'name': 'inserted_at_time', 'data_type': 'timestamp', 'nullable': True}, {'name': '_dlt_load_id', 'data_type': 'text', 'nullable': False}, {'name': '_dlt_id', 'data_type': 'text', 'nullable': False, 'unique': True}]
_dlt_version
[{'name': 'version', 'data_type': 'bigint', 'nullable': False}, {'name': 'engine_version', '

In [12]:
import lancedb

db = lancedb.connect(".lancedb")
print(db.table_names())

['notion_pages____dlt_loads', 'notion_pages____dlt_pipeline_state', 'notion_pages____dlt_version', 'notion_pages___dltSentinelTable', 'notion_pages___homework']


In [19]:
dbtable = db.open_table("notion_pages___homework")
db_homework = dbtable.to_pandas()
db_homework.shape

(17, 9)

## Q2. Running the Pipeline: Last edited time

In [22]:
db_homework.last_edited_time.max()

Timestamp('2024-07-05 23:33:00+0000', tz='UTC')

## Q3. Ask the Assistant

In [26]:
import ollama

In [27]:
def retrieve_context_from_lancedb(dbtable, question, top_k=2):
    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])

    return context

In [31]:
def main():
    # Connect to the lancedb table
    db = lancedb.connect(".lancedb")
    dbtable = db.open_table("notion_pages___homework")

    # A system prompt telling ollama to accept input in the form of "Question: ... ; Context: ..."
    messages = [
        {
            "role": "system",
            "content": "You are a helpful assistant that helps users understand policies inside a company's employee handbook. The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. Please answer the question based on the provided context. For any details missing in the paragraph, encourage the employee to contact the HR for that information. Please keep the responses conversational.",
        }
    ]

    while True:
        # Accept user question
        question = input("You: ")

        # Retrieve the relevant paragraphs on the question
        context = retrieve_context_from_lancedb(dbtable, question, top_k=2)

        # Create a user prompt using the question and retrieved context
        messages.append(
            {"role": "user", "content": f"Question: '{question}'; Context:'{context}'"}
        )

        # Get the response from the LLM
        response = ollama.chat(model="phi3", messages=messages)
        response_content = response["message"]["content"]
        print(f"Assistant: {response_content}")

        # Add the response into the context window
        messages.append({"role": "assistant", "content": response_content})

In [32]:
main()

Assistant:  Based on the information provided in the handbook, employees are entitled to 30 days of Paid Time Off (PTO) per year. You start accruing PTO from the day you join our company and receive 2.5 days per month. After your first year with us, you earn an additional day per year, up to a cap at 25 days overall. Remember that you don't need to specify a reason for requesting PTO, but if you work on holidays as a non-exempt employee, it might affect overtime pay depending on the hours worked. If there are any details missing or specific questions about your situation, please reach out to HR for assistance.
