For asset discovery, the chunking strategy applied here is chunking the structured data based on headings, removing non-code sections, and summarizing the non-code chunk to create the knowledge base for RAG.

Model Used for Chunk Summarization: Mixtral LLM 8x7B

In [None]:
from llms import llms

llm = llms.nim_mixtral_llm

from langchain.schema.messages import (
    AIMessage,
    HumanMessage,
)



In [None]:
from langchain_core.prompts import ChatPromptTemplate

template = ChatPromptTemplate.from_messages(
    [("user", "Summarize the following article in 200 words or less:\n{user_input}")]
)

messages = template.format_messages(
    user_input=clean_text_no_code
)

generation: AIMessage = llm.invoke(messages)

print(generation.content)

Creating the vector database by converting chunks into embeddings.

We used SentenceTransformers framework with e5-large-unsupervised embedding model. In order to further icrease the infernece speed, we converted the PyTorch model to Tensor Engine file.



In [None]:
from typing import List
import time
import numpy as np
import tritonclient.http

triton_host = "triton"
triton_port = "8000"
triton_model_name = "transformer_tensorrt_inference"
triton_model_version = "1"

triton_url = f"{triton_host}:{triton_port}"


def embed_with_triton(query: List[str]) -> List[List[float]]:
    triton_client = tritonclient.http.InferenceServerClient(
        url=triton_url, verbose=False
    )

    triton_batch_size = len(query)
    triton_inputs = []
    triton_outputs = []
    triton_text_input = tritonclient.http.InferInput(
        name="TEXT", shape=(triton_batch_size,), datatype="BYTES"
    )
    triton_text_input.set_data_from_numpy(np.asarray(query, dtype=object))
    triton_inputs.append(triton_text_input)
    triton_outputs.append(
        tritonclient.http.InferRequestedOutput("output", binary_data=False)
    )

    inference_results = triton_client.infer(
        model_name=triton_model_name,
        model_version=triton_model_version,
        inputs=triton_inputs,
        outputs=triton_outputs,
    )

    embedded_query = inference_results.as_numpy("output").tolist()
    return embedded_query


embedded_query = embed_with_triton(["query: deep learning"])
print(embedded_query)

print(len(embedded_query))
print(len(embedded_query[0]))

Redis : open source database


We chose Redis as our database for a number of reasons.
1. Redis is extremely fast, and we need to minimize latency for the operations it will be performing.
2. Redis is well-supported and easy to deploy through a ready-to-go container.
3. Redis supports both vector and keyword search: vector search through the relatively recent [RedisVL](https://github.com/RedisVentures/redisvl) project, and a fairly robust suite of [search and query features](https://redis.io/docs/interact/search-and-query/) for more traditional keyword search. Notably, Redis supports BM25, the default algorithm behind the popular Elasticsearch system--making it easy to transition smoothly between the two systems.
4. Redis unifies our vector database with our document (and metadata) database, so we don't have to worry about maintaining keys in a separate index like [FAISS](https://faiss.ai/).


In [None]:
%%js
var host = window.location.host;
var url = 'http://'+host+':5006';
element.innerHTML = '<a style="color:green;" target="_blank" href='+url+'>Click to open router service API docs.</a>';

%%js
var host = window.location.host;
var url = 'http://'+host+':5006';
element.innerHTML = '<a style="color:green;" target="_blank" href='+url+'>Click to open router service API docs.</a>';

In [None]:
# Get the asset types the router expects
import httpx
import json

response = httpx.get("http://router:5006/asset-types")
asset_types_json = response.json()

print(json.dumps(asset_types_json, indent=2))

In [None]:
import os

data_dir = os.path.join(os.getcwd(), 'data', 'techblogs')
file_list = [x for x in sorted(os.listdir(data_dir)) if ".json" in x]

payloads = []

for i, filename in enumerate(file_list):
    with open(os.path.join(data_dir, filename), "r") as in_file:
        data = json.load(in_file)
    for item in data:
        # skip items that do not link to developer.nvidia.com/blog or blogs.nvidia.com
        if not item["link"].startswith(
            "https://developer.nvidia.com/blog"
        ):  # and not item['link'].startswith("https://blogs.nvidia.com"):
            # print(f"Skipping URL {item['link']}")
            continue
        document_title = item["title"]["rendered"]
        document_url = item["link"]
        document_html = item["content"]["rendered"]
        document_date = item["date_gmt"]
        document_date_modified = item["modified_gmt"]
        payloads.append(
            {
                "strategy": "heading_section_sentence",
                "code_behavior": "remove_code_sections",
                "chunk_min_words": 250,
                "chunk_overlap_words": 50,
                "input_type": "html",
                "input_str": document_html,
                "additional_metadata": {
                    "document_title": document_title,
                    "document_url": document_url,
                    "document_date": document_date,
                    "document_date_modified": document_date_modified,
                },
            }
        )

print(f"Total num payloads: {len(payloads)}")

In [None]:
import asyncio
import httpx

chunking_url = "http://chunking:5005/api/chunking"
existing_items_url = "http://router:5006/search/keyword"
delete_url = "http://router:5006/data/delete"
insert_url = "http://router:5006/data/insert"

# Initialize a semaphore object with a limit of 3.
limit = asyncio.Semaphore(3)

# chunk up an article
async def chunking_request(client: httpx.AsyncClient, payload: dict):
    chunking_resp = await client.post(chunking_url, json=payload, timeout=15)
    return chunking_resp.json()

# see if any chunks already exist in the db that match this document url
async def get_existing_items_request(client: httpx.AsyncClient, payload: dict, asset_type: str):
    existing_items_resp = await client.post(
        existing_items_url,
        json={
            "field": "document_url",
            "value": payload["additional_metadata"]["document_url"],
            "asset_types": [asset_type],
            "search_type": "exact",
            "k": 1000,  # some large number to ensure we don't hit default limit of 10
        },
        timeout=15,
    )
    return existing_items_resp.json()

# delete items with certain ids
async def delete_request(client: httpx.AsyncClient, results: list, asset_type: str):
    delete_resp = await client.post(
        delete_url,
        json={
            "asset_type": asset_type,
            "ids": [x["id"] for x in results],
        },
        timeout=15,
    )
    print(delete_resp.status_code)
    return delete_resp.json()

In [None]:
async def upload_techblogs_chunks(client: httpx.AsyncClient, payload: dict):
    async with limit:
        try:
            chunks = await chunking_request(client, payload)
        except:  # retry once
            chunks = await chunking_request(client, payload)
        print(
            f"{payload['additional_metadata']['document_url']} | num chunks: {len(chunks)}"
        )

        # gets ids of existing items with this url
        try:
            existing_items = await get_existing_items_request(client, payload, "techblogs")
        except:  # retry once
            existing_items = await get_existing_items_request(client, payload, "techblogs")

        if len(existing_items) > 0:
            results = existing_items[0]["results"]
            if len(results) > 0:
                # delete items that are associated with this url
                try:
                    deleted_items = await delete_request(client, results, "techblogs")
                except:  # retry once
                    deleted_items = await delete_request(client, results, "techblogs")
                print(f"Deleted ids reponse: {deleted_items}")

        # insert: send chunks to redis
        resp = await client.post(
            insert_url,
            json={
                "asset_type": "techblogs",
                "chunks": chunks,
            },
            timeout=15,
        )
        print(f"Inserted {len(resp.json())} chunks")

In [None]:
async def main():
    async with httpx.AsyncClient() as client:
        tasks = []
        for payload in payloads:
            tasks.append(upload_techblogs_chunks(client, payload))

        await asyncio.gather(*tasks)

In [None]:
start = time.perf_counter()

# If this were not in Jupyter we would run this
# asyncio.run(main())

# Since we are in a notebook, Jupyter is already running its own event loop
# so we can just simply await main()
await main()

end = time.perf_counter()

print(f"Took {end - start} seconds")


In [None]:
techblogs_assettype = None

for assettype in asset_types_json:
    if assettype["name"] =="techblogs":
        techblogs_assettype = assettype
print(json.dumps(techblogs_assettype, indent=2))

techblogs_assettype["chunking_params"] = json.dumps(
    {
        "strategy": "heading_section_sentence",
        "code_behavior": "remove_code_sections",
        "chunk_min_words": 250,
        "chunk_overlap_words": 50,
    }
)

print(json.dumps(techblogs_assettype, indent=2))

update_asset_types_url = "http://router:5006/asset-types/update"
response = httpx.post(update_asset_types_url, json={"data": techblogs_assettype})
print(json.dumps(response.json(), indent=2))

dump_response = httpx.post("http://router:5006/data/dump")

print(json.dumps(dump_response.json(), indent=2))

In [None]:
import redis

r = redis.Redis(host='redis', port=6379)
r.lastsave()

Programmatic Search


In [None]:
search_endpoint = "http://router:5006/search/semantic"

response = httpx.post(
    search_endpoint, json={"query": "cgroups", "k": 3, "asset_types": ["techblogs"]}
)
response.json()

result1 = response.json()[0]['results'][1]
result1

heading_section_index = json.loads(result1["heading_section_index"])
heading_section_title = json.loads(result1["heading_section_title"])
paragraph_index = json.loads(result1["paragraph_index"])
contains_code = json.loads(result1["contains_code"])
only_code = json.loads(result1["only_code"])
text_components = json.loads(result1["text_components"])


assert len(heading_section_index) == len(heading_section_title) == len(paragraph_index) == len(contains_code) == len(only_code) == len(text_components)

text = ""
last_hsi = None

for i in range(len(text_components)):
    if last_hsi is None or last_hsi != heading_section_index[i]:
        text += heading_section_title[i] + "\n"
    text += text_components[i]
    if only_code[i]:
        text += "\n"
    else:
        text += " "
    # look ahead
    if i < len(text_components) - 1:
        if paragraph_index[i] != paragraph_index[i+1]:
            text += "\n"

    last_hsi = heading_section_index[i]

print(text.strip())

text = ""

for i in range(len(text_components)):
    if only_code[i]:
        text += text_components[i]
        text += "\n"

print(text.strip())

In [None]:
## Storing TechBlog Summaries in Redis

from llms import llms
llm = llms.nim_mixtral_llm

payloads = []

file_list = [x for x in sorted(os.listdir(data_dir)) if '.json' in x]

for i, filename in enumerate(file_list):
    with open(os.path.join(data_dir, filename), 'r') as in_file:
        data = json.load(in_file)

    for item in data:

        # skip items that do not link to developer.nvidia.com/blog or blogs.nvidia.com
        if not item['link'].startswith("https://developer.nvidia.com/blog"): # and not item['link'].startswith("https://blogs.nvidia.com"):
            # print(f"Skipping URL {item['link']}")
            continue

        document_title = item['title']['rendered']
        document_url = item['link']
        document_html = item['content']['rendered']
        document_date = item['date_gmt']
        document_date_modified = item['modified_gmt']

        payload = {
            "strategy": "heading_section",
            "code_behavior": "remove_code_sections",
            "input_type": "html",
            "input_str": document_html,
            "additional_metadata": {
                "document_title": document_title,
                "document_url": document_url,
                "document_date": document_date,
                "document_date_modified": document_date_modified,
            }
        }

        payloads.append(payload)

print(f"Total num payloads: {len(payloads)}")

In [None]:
import asyncio
import httpx
import json
from langchain_core.prompts import ChatPromptTemplate

summaries = [None] * len(payloads)

# load the summaries from the json file
with open("data/techblogs_summaries/saved.json", "r") as f:
    saved_summaries = json.load(f)

# Initialize a semaphore object with a limit of 3.
limit = asyncio.Semaphore(3)

async def async_generate(llm, msg):
    resp = await llm.agenerate([msg])
    return resp.generations[0][0].text


In [None]:
# Uncomment the following line if we want to save new summaries.
# saved_summaries = {}

async def upload_techblogs_summaries(llm, client: httpx.AsyncClient, payload: dict):
    async with limit:

        try:
            chunks = await chunking_request(client, payload)
        except:  # retry once
            chunks = await chunking_request(client, payload)
        print(
            f"{payload['additional_metadata']['document_url']} | num chunks: {len(chunks)}"
        )

        clean_text_no_code = "\n".join([x["text"] for x in chunks])
        clean_text_with_code = "\n".join([ x["heading_section_title"][0] + "\n" + "\n".join(x["text_components"]) for x in chunks])

        # Ask LLM for summaries

        # uncomment if we want to save new summaries
        # template = ChatPromptTemplate.from_messages(
        #     [("user", "Summarize the following article in 200 words or less:\n{user_input}")]
        # )

        # msg = template.format_messages(
        #     user_input=clean_text_no_code
        # )

        # summary = await async_generate(llm, msg)
        # summary_with_metadata = [
        #     {
        #         "text": payload["additional_metadata"]["document_title"] + "\n" + summary,
        #         "text_components": [ x["heading_section_title"][0] + "\n" + "\n".join(x["text_components"]) for x in chunks],
        #         "document_title": payload["additional_metadata"]["document_title"],
        #         "document_url": payload["additional_metadata"]["document_url"],
        #         "document_date": payload["additional_metadata"]["document_date"],
        #         "document_date_modified": payload["additional_metadata"]["document_date_modified"],
        #         "document_full_text": clean_text_with_code
        #     }
        # ]
        # saved_summaries[payload["additional_metadata"]["document_url"]] = summary_with_metadata

        # load summary we've already generated
        # comment the following line if we want to save new summaries
        summary_with_metadata = saved_summaries[payload["additional_metadata"]["document_url"]]

        # gets ids of existing items with this url
        try:
            existing_items = await get_existing_items_request(client, payload, "summarize_techblogs")
        except:  # retry once
            existing_items = await get_existing_items_request(client, payload, "summarize_techblogs")

        if len(existing_items) > 0:
            results = existing_items[0]["results"]
            if len(results) > 0:
                # delete items that are associated with this url
                try:
                    deleted_items = await delete_request(client, results, "summarize_techblogs")
                except:  # retry once
                    deleted_items = await delete_request(client, results, "summarize_techblogs")
                print(f"Deleted ids reponse: {deleted_items}")

        # insert: send chunks to redis
        resp = await client.post(
            insert_url,
            json={
                "asset_type": "summarize_techblogs",
                "chunks": summary_with_metadata,
            },
            timeout=15,
        )
        print(f"Inserted {len(resp.json())} chunks")

In [None]:
async def main():
    async with httpx.AsyncClient() as client:
        tasks = []
        # for i in range(0, 7):
        for i in range(0, len(payloads)):
            tasks.append(upload_techblogs_summaries(llm, client, payloads[i]))

        await asyncio.gather(*tasks)

In [None]:
start = time.perf_counter()

# If this were not in Jupyter we would run this
# asyncio.run(main())

# Since we are in a notebook, Jupyter is already running its own event loop
# so we can just simply await main()
await main()

end = time.perf_counter()

print(f"Took {end - start} seconds")

# This should take around 2-3 minutes

In [None]:
# save the summaries as a json file
with open("data/techblogs_summaries/saved.json", "w") as f:
    json.dump(saved_summaries, f)

In [None]:
techblogs_summaries_assettype = None

for assettype in asset_types_json:
    if assettype["name"] =="summarize_techblogs":
        techblogs_summaries_assettype = assettype

print(json.dumps(techblogs_summaries_assettype, indent=2))

techblogs_summaries_assettype["chunking_params"] = json.dumps(
    {
        "strategy": "summarization",
        "code_behavior": "remove_code_sections",
    }
)

print(json.dumps(techblogs_summaries_assettype, indent=2))


update_asset_types_url = "http://router:5006/asset-types/update"
response = httpx.post(update_asset_types_url, json={"data": techblogs_summaries_assettype})
print(json.dumps(response.json(), indent=2))

dump_response = httpx.post("http://router:5006/data/dump")
print(json.dumps(dump_response.json(), indent=2))