# From REST to reasoning: ingest, index, and query with dlt and Cognee

- Video: https://www.youtube.com/watch?v=MNt_KK32gys
- Homework solution: TBA
- [Slides](https://docs.google.com/presentation/d/1oHQilxEVqGGW4S2ctNEE0wHY2LgcjYLaRUziAoinsis/edit?usp=sharing)
- [dltHub](https://dlthub.com/)

## Basic: What is dlt?
- a.k.a data load tool
- an open-source Python library that lets you build modern ELT pipelines using just Python code.
- It helps you:
    - Extract data from APIs, databases, files, or custom sources
    - Transform and normalize data
    - Load data into destinations like BigQuery, DuckDB, Redshift, etc.
    - Manage schemas, state, and incremental loading automatically

## Basics: What is Cognee?
- [Cognee](https://www.cognee.ai/) is an open-source python library, connects data points and establishes ground truths to improve the accuracy of your AI agents and LLMs.
- It lets you:
    - Add structured or unstructured data (DataFrames, documents, tables)
    - Automatically build a knowledge graph from it
    - Ask natural language questions and get grounded, context-rich answers

## Basics: What is Kuzu?
- [Kuzu](https://kuzudb.com/) is an open-source embedded, scalable, blazing fast graph database.

Read more about Cognee and Kuzu here: https://blog.kuzudb.com/post/cognee-kuzu-relational-data-to-knowledge-graph/

In [1]:
from datetime import datetime
import os

import cognee
from cognee.shared.logging_utils import get_logger, ERROR
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.api.v1.search import SearchType
from cognee.modules.engine.models import NodeSet
import dlt
from dotenv import load_dotenv
import pandas as pd
import requests

os.environ["GRAPH_DATABASE_PROVIDER"] = "kuzu"


[2m2025-07-09T18:13:24.251999[0m [[32m[1minfo     [0m] [1mLogging initialized           [0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m [36mcognee_version[0m=[35m0.2.0[0m [36mos_info[0m=[35m'Darwin 24.4.0 (Darwin Kernel Version 24.4.0: Fri Apr 11 18:33:39 PDT 2025; root:xnu-11417.101.15~117/RELEASE_ARM64_T6020)'[0m [36mpython_version[0m=[35m3.11.12[0m [36mstructlog_version[0m=[35m25.4.0[0m

[2m2025-07-09T18:13:24.252564[0m [[32m[1minfo     [0m] [1mWant to learn more? Visit the Cognee documentation: https://docs.cognee.ai[0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m

[1mHTTP Request: GET https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json "HTTP/1.1 200 OK"[0m


## **Data we'll be using**

In this example, we’ll request data from an API that serves the **NYC taxi dataset**. For these purposes we created an API that can serve the data you are already familiar with.

### **API documentation**:
- **Data**: Comes in pages of 1,000 records.
- **Pagination**: When there’s no more data, the API returns an empty page.
- **Details**:
  - **Method**: GET
  - **URL**: `https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api`
  - **Parameters**:
    - `page`: Integer (page number), defaults to 1.

Here’s how we design our requester:
1. **Request page by page** until we hit an empty page. Since we don’t know how much data is behind the API, we must assume it could be as little as 1,000 records or as much as 10GB.
2. **Use a generator** to handle this efficiently and avoid loading all data into memory.

## **We'll be partitioning our data in our own way**

1. first_10_days
2. second_10_days
3. last_10_days

We'll be doing this manually for clarity, but dlt also supports partitioning, as you can find [here](https://dlthub.com/docs/plus/ecosystem/iceberg#partitioning).

In [None]:

# Step 1: Create DLT resource
@dlt.resource(write_disposition="replace", name="zoomcamp_data")
def zoomcamp_data():
    url = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
    response = requests.get(url)
    data = response.json()

    # Convert to DataFrame
    df = pd.DataFrame(data)
    df['Trip_Pickup_DateTime'] = pd.to_datetime(df['Trip_Pickup_DateTime'])

    # Define buckets
    df['tag'] = pd.cut(
        df['Trip_Pickup_DateTime'],
        bins=[
            pd.Timestamp("2009-06-01"),
            pd.Timestamp("2009-06-10"),
            pd.Timestamp("2009-06-20"),
            pd.Timestamp("2009-06-30")
        ],
        labels=["first_10_days", "second_10_days", "last_10_days"],
        right=False
    )

    # Drop rows not in the specified range
    df = df[df['tag'].notnull()]
    yield df


# Step 2: Create and run the pipeline
pipeline = dlt.pipeline(
    pipeline_name="zoomcamp_pipeline",
    destination="duckdb",
    dataset_name="zoomcamp_tagged_data"
)
load_info = pipeline.run(zoomcamp_data())

In [3]:
dataset = pipeline.dataset().zoomcamp_data.df()

dataset[:5]

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,store_and_forward,surcharge,vendor_name,tag
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00,2009-06-14 23:23:00,,0.0,VTS,second_10_days
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.0,8.5,1.56,2009-06-18 17:43:00,2009-06-18 17:35:00,,1.0,VTS,second_10_days
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.0,15.5,3.37,2009-06-10 18:27:00,2009-06-10 18:08:00,,1.0,VTS,second_10_days
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.0,5.4,1.11,2009-06-14 23:58:00,2009-06-14 23:54:00,,0.5,VTS,second_10_days
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00,2009-06-13 13:01:00,,0.0,VTS,second_10_days


In [4]:
dataset["tag"].value_counts()

tag
first_10_days     481
second_10_days    295
last_10_days      222
Name: count, dtype: int64

## **Lets load data into Cognee!**

Here, I am using `cognee.add()` and then `cognee.cognify()` directly.

If you'd like to learn about how to use relational datasets in cognee, please visit the [docs](https://docs.cognee.ai/tutorials/load-your-relational-database) :)

In [8]:
async def main():
    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)

    # Add the first 10 days
    df_set1 = dataset.loc[dataset["tag"] == "first_10_days"]
    df_set1.drop(columns=["tag"], inplace=True)
    df_set1 = df_set1.to_json(orient="records", lines=False)
    await cognee.add(df_set1, node_set=["first_10_days"])

    # Add the second 10 days
    df_set2 = dataset.loc[dataset["tag"] == "second_10_days"]
    df_set2.drop(columns=["tag"], inplace=True)
    df_set2 = df_set2.to_json(orient="records", lines=False)
    await cognee.add(df_set2, node_set=["second_10_days"])

    # Add the last 10 days
    df_set3 = dataset.loc[dataset["tag"] == "last_10_days"]
    df_set3.drop(columns=["tag"], inplace=True)
    df_set3 = df_set3.to_json(orient="records", lines=False)
    await cognee.add(df_set3, node_set=["last_10_days"])

    await cognee.cognify()

    visualization_path = "/Users/vasiliy/projects/llm-zoomcamp/dlt/content/.artifacts/graph_visualization.html"
    await visualize_graph(visualization_path)

In [None]:
await main()

## Search

In [10]:
async def search_cognee(query, node_set, query_type=SearchType.GRAPH_COMPLETION):
    answer = await cognee.search(
        query_text=query,
        query_type=query_type,
        node_type=NodeSet,
        node_name=node_set,
        top_k=5 # limit search for retrieval
    )
    return answer

In [None]:
results = await search_cognee(
    "What's in this knowledge graph?",
    node_set=["first_10_days"]
)

In [12]:
print(results[0])

The knowledge graph contains information about taxi trips including details such as drop-off and pick-up times, fare amounts, payment types, distances, and vendor names for each trip. It includes nodes representing specific trips on June 9, 2009, and their respective connections.
