In [24]:
import dlt
from rest_api import RESTAPIConfig, rest_api_source
from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONResponsePaginator
from dlt.sources.helpers.requests import Response, Request
from dlt.destinations.adapters import lancedb_adapter
from datetime import datetime, timezone
import os
import lancedb
import pandas as pd

In [16]:
os.environ["SOURCES__REST_API__NOTION__API_KEY"] = ""
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"
os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb"

In [17]:
class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}
        request.json["start_cursor"] = self.cursor

In [18]:
@dlt.resource(name="employee_handbook")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers":{
            "Content-Type": "application/json",
            "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "Homework: Employee handbook",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config, name="employee_handbook")

In [19]:
def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

In [20]:
@dlt.resource(
    name="employee_handbook",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
    )
def rest_api_notion_incremental(
    last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z", primary_key=("block_id"))
):
    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

In [21]:
def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="company_policies",
        destination="lancedb",
        dataset_name="notion_pages",
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="homework",
        write_disposition="merge"
    )
    print(load_info)

# Run the pipeline
load_notion()



homework
[{'name': 'block_id', 'nullable': False, 'primary_key': True, 'data_type': 'text'}, {'name': 'block_type', 'data_type': 'text', 'nullable': True}, {'name': 'content', 'x-lancedb-embed': True, 'data_type': 'text', 'nullable': True}, {'dedup_sort': 'desc', 'name': 'last_edited_time', 'data_type': 'timestamp', 'nullable': True}, {'name': 'inserted_at_time', 'data_type': 'timestamp', 'nullable': True}, {'name': '_dlt_load_id', 'data_type': 'text', 'nullable': False}, {'name': '_dlt_id', 'data_type': 'text', 'nullable': False, 'unique': True}]
_dlt_loads
[{'name': 'load_id', 'data_type': 'text', 'nullable': False}, {'name': 'schema_name', 'data_type': 'text', 'nullable': True}, {'name': 'status', 'data_type': 'bigint', 'nullable': False}, {'name': 'inserted_at', 'data_type': 'timestamp', 'nullable': False}, {'name': 'schema_version_hash', 'data_type': 'text', 'nullable': True}]
_dlt_pipeline_state
[{'name': 'version', 'data_type': 'bigint', 'nullable': False}, {'name': 'engine_vers

In [25]:
db = lancedb.connect(".lancedb")
dbtable = db.open_table("notion_pages___homework")

row_count = len(dbtable.to_pandas())
print(f"The 'notion_pages___homework' table has {row_count} rows.")

The 'notion_pages___homework' table has 17 rows.


In [26]:
table = db.open_table("notion_pages___homework")

df = table.to_pandas()

max_last_edited_time = df['last_edited_time'].max()

print(f"The stored value for last_edited_time after running the pipeline is: {max_last_edited_time}")

The stored value for last_edited_time after running the pipeline is: 2024-07-05 23:33:00+00:00


In [28]:
pto_content = df[df['content'].str.contains('PTO|paid time off|vacation days', case=False, na=False)]

for _, row in pto_content.iterrows():
    print(row['content'])
    print("---")

In this section, we are going to be covering information about paid time off: Employees receive 30 days of Paid Time Off (PTO) per year. Your PTO accrual begins the day you join our company and you receive 2.5 days per month. You can take your PTO at any time after your first week with us and you can use time off you haven’t accrued yet. You will earn one additional day per year after your first year with our company, with a cap at 25 days overall. If you want to use PTO, send a request through our HRIS. If your manager or HR approves, you are permitted to take your leave. You do not have to specify a reason for requesting PTO. You cannot transfer any remaining PTO to the next year. We encourage you to use your time off throughout the year. If you leave our company, we may compensate accrued PTO with your final paycheck according to local law. When the law doesn’t have provisions, we will compensate accrued leave to employees who were not terminated for cause.
---
Our company observes 