In [1]:
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

In [2]:
import os

os.environ["NUMEXPR_MAX_THREADS"] = "20"

In [3]:
from llama_index import StorageContext, load_index_from_storage

## RAG

RAG is for augmenting LLMs with custom data.

1. Indexing stage: preparing a knowledge base
2. Querying stage: retrieving relevant context from the knowledge to assist the LLM

### Indexing

- Readers read from different data sources into a Document Representation
- Documents/Nodes are containers around data. A node is the atomic unit of data in LI, representing a chunk of a source Document. It includes metadat and relationships to other nodes.
- Data indexes parses raw documents into intermediate representations, e.g. using VectorStoreIndex

### Querying

In the querying stage, a RAG pipeline finds relevant context and passes it to the LLM. 

### Building Blocks
- Retrievers: define how to retrieve context from an index with a given query. Most popular is dense retrieval against a vector index
- Node postprocessors: takes nodes, applies transforms, filters, or re-ranking logic
- Synthesizers: genereates a respond from an LLM using a user query and a given set of text chunks

### Pipelines 

- Query Engines: an end-to-end pipeline that allows you to ask questiosn over data. It takes a NL query and returns a response, along with reference content.
- Chat Engines: multiple back and forth instead of a single q/a
- Agents: automated decision maker that uers a set of tools

## Indexing

First we load documents 

In [4]:
import json

issues = json.load(open("data/issues-data.json", "r"))
discussions = json.load(open("data/discussions-data.json", "r"))

In [5]:
issues[0]

{'id': 'MDU6SXNzdWUzMjc3NzQ5MDA=',
 'number': 1,
 'title': 'Separate Pandas Dataframe Solid into Two Sources',
 'bodyText': 'Right now because of the first architecture the pandas dataframe_solid has a single source which is called "CSVORPARQUET" and takes a format argument. This should be separated into two distinct sources, one Csv, one Parquet',
 'state': 'CLOSED',
 'createdAt': '2018-05-30T15:08:27Z',
 'closedAt': '2018-06-08T13:49:28Z',
 'url': 'https://github.com/dagster-io/dagster/issues/1',
 'labels': {'nodes': []},
 'reactions': {'totalCount': 0}}

In [6]:
discussions[100]

{'id': 'D_kwDOB9hbPs4AUy8D',
 'number': 15567,
 'title': 'How to auto-materialize historical partitions of a time-partitioned asset?',
 'bodyText': 'How to make sure that if a number of latest time partitioned asset partitions are missing, all of it are triggered for materialization? Currently if I understand correctly only the latest is triggered, if auto materialization is used. Any other approaches that would guarantee no missing non-materialized partitions?',
 'answer': {'bodyText': 'The max_materializations_per_minute of AutoMaterializePolicy is what governs this.  Its default value is 1. This means that, if a bunch of historical partitions would be auto-materialized at once, all partitions except for the latest one will be discarded.\nThe goal of this parameter is to avoid "surprise backfills", where an accidental change to upstream data or similar causes a large number of partitions to be auto-materialized.\nYou can change this behavior by changing the max_materializations_per_m

In [7]:
# Create documents
from llama_index import Document

excluded_keys = ["createdAt"]
excluded_meta_keys = ["createdAt", "url"]


def issue_to_document(issue):
    return Document(
        id_=issue["id"],
        text="Issue: " + issue["bodyText"],
        metadata={
            "documentType": "issue",
            "title": issue["title"],
            "state": issue["state"],
            "createdAt": issue["createdAt"],
            "url": issue["url"],
            "labels": ",".join([x["name"] for x in issue["labels"]["nodes"]]),
            "votes": issue["reactions"]["totalCount"],
        },
        excluded_llm_metadata_keys=excluded_keys,
        excluded_embed_metadata_keys=excluded_meta_keys,
    )


issue_to_document(issues[0])

Document(id_='MDU6SXNzdWUzMjc3NzQ5MDA=', embedding=None, metadata={'documentType': 'issue', 'title': 'Separate Pandas Dataframe Solid into Two Sources', 'state': 'CLOSED', 'createdAt': '2018-05-30T15:08:27Z', 'url': 'https://github.com/dagster-io/dagster/issues/1', 'labels': '', 'votes': 0}, excluded_embed_metadata_keys=['createdAt', 'url'], excluded_llm_metadata_keys=['createdAt'], relationships={}, hash='50cb03231b02030d47a45ced2ae30188239d69817ffe855253ac9ed51debd938', text='Issue: Right now because of the first architecture the pandas dataframe_solid has a single source which is called "CSVORPARQUET" and takes a format argument. This should be separated into two distinct sources, one Csv, one Parquet', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n')

In [8]:
def discussion_to_document(discussion):
    text = (
        "Question: "
        + discussion["bodyText"]
        + ". Answer: "
        + discussion["answer"]["bodyText"]
        if discussion["answer"]
        else "Unanswered"
    )

    return Document(
        id_=discussion["id"],
        text=text,
        metadata={
            "documentType": "discussion",
            "title": discussion["title"],
            "category": discussion["category"].get("name"),
            "createdAt": discussion["createdAt"],
            "url": discussion["url"],
            "labels": ",".join([i["name"] for i in discussion["labels"]["nodes"]]),
            "votes": discussion["reactions"]["totalCount"],
        },
        excluded_llm_metadata_keys=excluded_keys,
        excluded_embed_metadata_keys=excluded_meta_keys,
    )


discussion_to_document(discussions[100])

Document(id_='D_kwDOB9hbPs4AUy8D', embedding=None, metadata={'documentType': 'discussion', 'title': 'How to auto-materialize historical partitions of a time-partitioned asset?', 'category': 'Assets', 'createdAt': '2023-07-28T17:31:00Z', 'url': 'https://github.com/dagster-io/dagster/discussions/15567', 'labels': 'area: auto-materialize,area: asset', 'votes': 0}, excluded_embed_metadata_keys=['createdAt', 'url'], excluded_llm_metadata_keys=['createdAt'], relationships={}, hash='bbc91e6c6ff089ab09dc21edde978ffa8b36b3331674d4f1cf8cb486efaa82a5', text='Question: How to make sure that if a number of latest time partitioned asset partitions are missing, all of it are triggered for materialization? Currently if I understand correctly only the latest is triggered, if auto materialization is used. Any other approaches that would guarantee no missing non-materialized partitions?. Answer: The max_materializations_per_minute of AutoMaterializePolicy is what governs this.  Its default value is 1. Th

In [9]:
from llama_index.schema import MetadataMode

document = discussion_to_document(discussions[100])
print("The LLM sees this: \n", document.get_content(metadata_mode=MetadataMode.LLM))
print(
    "The Embedding model sees this: \n",
    document.get_content(metadata_mode=MetadataMode.EMBED),
)

The LLM sees this: 
 documentType: discussion
title: How to auto-materialize historical partitions of a time-partitioned asset?
category: Assets
url: https://github.com/dagster-io/dagster/discussions/15567
labels: area: auto-materialize,area: asset
votes: 0

Question: How to make sure that if a number of latest time partitioned asset partitions are missing, all of it are triggered for materialization? Currently if I understand correctly only the latest is triggered, if auto materialization is used. Any other approaches that would guarantee no missing non-materialized partitions?. Answer: The max_materializations_per_minute of AutoMaterializePolicy is what governs this.  Its default value is 1. This means that, if a bunch of historical partitions would be auto-materialized at once, all partitions except for the latest one will be discarded.
The goal of this parameter is to avoid "surprise backfills", where an accidental change to upstream data or similar causes a large number of partiti

In [10]:
issues_docs = [issue_to_document(i) for i in issues]
discussions_docs = [discussion_to_document(i) for i in discussions]

## Generate Nodes

Nodes are chunks of Documentas, split using a Token or Sentence Splitter. 

In [11]:
from llama_index.node_parser import SimpleNodeParser
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from llama_index.embeddings import LangchainEmbedding
from llama_index import VectorStoreIndex, ServiceContext
from llama_index.embeddings import OpenAIEmbedding
from langchain.embeddings.huggingface import HuggingFaceBgeEmbeddings

node_parser = SimpleNodeParser.from_defaults(chunk_size=1000, chunk_overlap=20)

issue_nodes = node_parser.get_nodes_from_documents(issues_docs)
discussion_nodes = node_parser.get_nodes_from_documents(discussions_docs)

In [12]:
import chromadb
from llama_index.vector_stores import ChromaVectorStore

db = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = db.get_or_create_collection("dagster-support")

INFO:chromadb.telemetry.posthog:Anonymized telemetry enabled. See https://docs.trychroma.com/telemetry for more information.
Anonymized telemetry enabled. See https://docs.trychroma.com/telemetry for more information.


In [13]:
from llama_index import ServiceContext


embed_model = s.get_embedding_model()
service_context = ServiceContext.from_defaults(embed_model=embed_model)
BUILD_INDEX = False

if BUILD_INDEX:
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    service_context = ServiceContext.from_defaults(embed_model=embed_model)
    index = VectorStoreIndex.from_documents(
        issues_docs + discussions_docs,
        storage_context=storage_context,
        service_context=service_context,
        show_progress=False,
    )

INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: BAAI/bge-base-en
Load pretrained SentenceTransformer: BAAI/bge-base-en
INFO:sentence_transformers.SentenceTransformer:Use pytorch device: cuda
Use pytorch device: cuda


In [14]:
# load from disk
db2 = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = db2.get_or_create_collection("dagster-support")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
index = VectorStoreIndex.from_vector_store(
    vector_store, service_context=service_context, similarity_top_k=5, max_top_k=10
)

INFO:chromadb.telemetry.posthog:Anonymized telemetry enabled. See https://docs.trychroma.com/telemetry for more information.
Anonymized telemetry enabled. See https://docs.trychroma.com/telemetry for more information.


In [15]:
chroma_collection.count()

7223

In [16]:
retreiver = index.as_retriever(similarity_top_k=5)

In [17]:
from llama_index.response.notebook_utils import *

In [18]:
x = retreiver.retrieve("Asset Materialization")
print([display_source_node(i, source_length=100, show_source_metadata=True) for i in x])

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**Node ID:** 55e7366d-17b6-4f0a-a1bb-8aeaddbc36cd<br>**Similarity:** 1.0<br>**Text:** Issue: Right now, you need to know to shift-click the Materialize button, which is not very disco...<br>**Metadata:** {'documentType': 'issue', 'title': 'making materializing an asset with config less hidden', 'state': 'CLOSED', 'createdAt': '2023-03-07T23:21:02Z', 'url': 'https://github.com/dagster-io/dagster/issues/12786', 'labels': 'area: asset', 'votes': 0}<br>

**Node ID:** bb4bb5b6-89e6-45d8-b20d-637ba8c9e251<br>**Similarity:** 1.0<br>**Text:** Issue: What's the use case?
Asset Materialization is an extremely powerful concept in that it min...<br>**Metadata:** {'documentType': 'issue', 'title': 'Connected the data layer to inform the status of Asset Materialization', 'state': 'OPEN', 'createdAt': '2022-12-23T15:26:43Z', 'url': 'https://github.com/dagster-io/dagster/issues/11354', 'labels': 'type: feature-request', 'votes': 0}<br>

**Node ID:** 4e19a228-be16-43a6-9e11-a436e86fb1fa<br>**Similarity:** 1.0<br>**Text:** Issue: What's the use case?
The  current implementation/API for conditional materialization doesn...<br>**Metadata:** {'documentType': 'issue', 'title': "Conditional materialization and non-IO managed assets don't conceptually line up", 'state': 'OPEN', 'createdAt': '2023-07-20T16:31:57Z', 'url': 'https://github.com/dagster-io/dagster/issues/15418', 'labels': 'type: feature-request', 'votes': 0}<br>

**Node ID:** 819e8e99-244f-4d45-a46e-7e52dc9bbbc2<br>**Similarity:** 1.0<br>**Text:** Issue: Re-materialize API for materializing assets in-process via python
Repo-level resources
De-...<br>**Metadata:** {'documentType': 'issue', 'title': 'AssetGroup remaining work post 0.14.0', 'state': 'CLOSED', 'createdAt': '2022-02-16T18:48:51Z', 'url': 'https://github.com/dagster-io/dagster/issues/6647', 'labels': '', 'votes': 0}<br>

**Node ID:** 6537e109-20cf-4c84-80d4-7668090077b7<br>**Similarity:** 1.0<br>**Text:** Issue: Use Case
During fitting of ML models we want to track certrain key figures like the models...<br>**Metadata:** {'documentType': 'issue', 'title': 'Asset Materializations during development / testing / debugging', 'state': 'CLOSED', 'createdAt': '2021-02-23T11:33:01Z', 'url': 'https://github.com/dagster-io/dagster/issues/3725', 'labels': 'type: feature-request', 'votes': 0}<br>

[None, None, None, None, None]


## Defining Autoretrievers

These dont work well since they need to better understand how to write queries for Chroma. TBD!

In [19]:
discussions_docs[0].metadata

{'documentType': 'discussion',
 'title': 'RFC: Asset checks',
 'category': 'General',
 'createdAt': '2023-08-16T16:30:40Z',
 'url': 'https://github.com/dagster-io/dagster/discussions/15880',
 'labels': '',
 'votes': 2}

In [20]:
from llama_index.indices.vector_store.retrievers import VectorIndexAutoRetriever
from llama_index.vector_stores.types import MetadataInfo, VectorStoreInfo


vector_store_info = VectorStoreInfo(
    content_info="Github Issues and Discussions related to the open-source orchestrator Dagster",
    metadata_info=[
        MetadataInfo(
            name="title",
            type="str",
            description="The title of the Github Issue or Discussion",
        ),
        MetadataInfo(
            name="state",
            type="str",
            description="Whether the issue or discussion is either 'open' or 'closed'",
        ),
        MetadataInfo(
            name="labels",
            type="str",
            description="Any labels associated with the issue or disccusion",
        ),
        MetadataInfo(
            name="category",
            type="str",
            description="A category associated with this discussion",
        ),
    ],
)
retriever = VectorIndexAutoRetriever(
    index, vector_store_info=vector_store_info, similarity_top_k=5
)

In [21]:
from llama_index.llms import LlamaCPP
from llama_index.llms.llama_utils import messages_to_prompt, completion_to_prompt

llm = LlamaCPP(
    model_path="/home/pedram/projects/llm-cache/models/openbuddy-llama2-13b-v11.1.Q4_0.gguf",
    temperature=0.1,
    max_new_tokens=-1,
    # llama2 has a context window of 4096 tokens, but we set it lower to allow for some wiggle room
    context_window=2048,
    # kwargs to pass to __call__()
    generate_kwargs={},
    # kwargs to pass to __init__()
    # set to at least 1 to use GPU
    model_kwargs={"n_gpu_layers": 50, "n_batch": 512},
    # transform inputs into Llama2 format
    messages_to_prompt=messages_to_prompt,
    completion_to_prompt=completion_to_prompt,
    verbose=False,
)

ggml_init_cublas: found 1 CUDA devices:
  Device 0: NVIDIA GeForce RTX 4080, compute capability 8.9
llama_model_loader: loaded meta data with 19 key-value pairs and 363 tensors from /home/pedram/projects/llm-cache/models/openbuddy-llama2-13b-v11.1.Q4_0.gguf (version GGUF V2 (latest))
llama_model_loader: - tensor    0:                token_embd.weight q4_0     [  5120, 37632,     1,     1 ]
llama_model_loader: - tensor    1:              blk.0.attn_q.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    2:              blk.0.attn_k.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    3:              blk.0.attn_v.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    4:         blk.0.attn_output.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    5:            blk.0.ffn_gate.weight q4_0     [  5120, 13824,     1,     1 ]
llama_model_loader: - tensor    6:              blk.0.ffn_up.weight

In [22]:
response = llm.complete("Hello! Can you tell me a poem about cats and dogs?")

In [23]:
print(response)





In [24]:
service_context = ServiceContext.from_defaults(llm=llm)

In [25]:
query_engine = index.as_query_engine(service_context=service_context)

In [26]:
response = query_engine.query(
    "Find a question related to dbt source assets, include the URL to the Github issue"
)
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** <<SYS>>

In [27]:
response = query_engine.query(
    "What are some issues or discussions related to Resources? Include their ID, URL, Title, and Summarize the question or discussion for each."
)
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** <<SYS>>
There is one issue related to resources that I can find in the given context information. 
Issue ID: 3902
URL: https://github.com/dagster-io/dagster/issues/3902
Title: Problem on /concepts/modes-resources/resources#resources page
Summary: The issue is about a problem with the resources page, specifically that it leads to a 404 error when searching for "resource". The user suggests that there should be a "Modes and Resources" page at https://docs.dagster.io/concepts/modes-resources.

In [28]:
response = query_engine.query("Summarize the RFC on Asset Checks")
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** The RFC proposes renaming "severity" to "max_severity", which would allow for more flexibility in specifying the maximum severity level that a check can return. Additionally, it suggests allowing both a severity and max_severity argument in @asset_check and AssetCheckSpec, or not having any severity-related arguments at all and instead providing severities at runtime. The RFC also mentions the idea of prefixing asset keys on that asset's specs when key-prefixing an asset.

In [30]:
response = query_engine.query(
    "FInd an issue related to dynamically generating assets and summarize the answer, include a link to the discussion"
)
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** 

In [1]:
from llama_index.llms import OpenAI
from llama_index import VectorStoreIndex, ServiceContext, set_global_service_context

llm = OpenAI(model="gpt-3.5-turbo-16k", temperature=0, max_tokens=10000)

In [40]:
service_context = ServiceContext.from_defaults(llm=llm)
set_global_service_context(service_context)

In [41]:
query_engine = index.as_query_engine(service_context=service_context)

In [42]:
response = query_engine.query(
    "Find a question related to dbt source assets, include the URL to the Github issue"
)
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** A related question about dbt source assets can be found in the conversation excerpt from the Dagster Slack. The user asks how to create dependencies between two assets, specifically between airbyte and dbt. The conversation suggests that non-arg dependencies may not work in this case. The user also mentions that they have set up dependencies between airbyte and dbt using ops, but they are curious about how to do it with assets. Another user mentions that this feature is not currently supported and suggests creating a GitHub issue for it. The URL to the related GitHub issue is: https://github.com/dagster-io/dagster/issues/9575

In [43]:
response = query_engine.query(
    "What are some issues or discussions related to Resources? Include their ID, URL, Title, and Summarize the question or discussion for each."
)
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** ID: 3902
URL: https://github.com/dagster-io/dagster/issues/3902
Title: [Documentation Feedback] Problem on /concepts/modes-resources/resources#resources page
Summary: The user reported that when searching for "resource," they were directed to a concept page that led to a 404 error. They mentioned that there is a "Modes and Resources" page available, but the link provided was not working.

ID: 12247
URL: https://github.com/dagster-io/dagster/issues/12247
Title: Add documentation on how to use `dbt_cli_resource`
Summary: The user requested examples on how to configure `dbt_cli_resource` in the API documentation. They mentioned that although using this feature requires knowledge of resources, they would like a copy-pastable example to be provided.

In [44]:
response = query_engine.query("Summarize the RFC on Asset Checks")
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** The RFC on Asset Checks proposes several changes and ideas related to asset checks in the Dagster framework. One of the proposed changes is to prefix the asset keys on an asset's specs when the asset itself is key-prefixed. Another idea is to rename the severity parameter to max_severity in order to make it clearer and more intuitive. The RFC also suggests accepting both a severity and a max_severity argument, allowing for more flexibility in specifying the severity levels for checks. Additionally, there is a suggestion to provide severities at runtime instead of as arguments, which would require waiting for a check to complete before materializing a downstream asset. These ideas and changes aim to improve the functionality and usability of asset checks in Dagster.

In [45]:
response = query_engine.query(
    "FInd an issue related to dynamically generating assets and summarize the answer, include a link to the discussion"
)
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** There is an issue related to dynamically generating assets in the context information. The issue is titled "add key_prefix to multi_asset" and it is an open issue on the Dagster GitHub repository. The issue discusses the recommended pattern for building graph-backed multi_assets and how to pass a key_prefix to @multi_asset. The discussion in the issue thread provides insights into the current limitations and possible solutions for dynamically generating assets and selectively materializing subsets of them. You can find the discussion and more details about the issue at the following link: [add key_prefix to multi_asset](https://github.com/dagster-io/dagster/issues/9344)

In [47]:
response = query_engine.query("Summarize a sampling of issues")
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** A sampling of issues includes:
- Issue #10408: Provide a way to mock system-provided fields like `job_name` when using `build_op_context`. This issue was generated from a conversation on the Dagster Slack. The user is asking for a solution to mock attributes of the `OpExecutionContext` that are not assigned when using the `build_op_context` helper. They specifically mention `job_name` and `pipeline_run.previous_run_id` as attributes they are having issues with.
- Issue #7314: should be dagsir creating the issue. This issue was also generated from a conversation on the Dagster Slack. The user is suggesting that the issue should have been created by someone named "dagsir". The issue has been closed.

In [50]:
response = query_engine.query("How do I setup a dbt project with Dagster?")
display_response(response)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

**`Final Response:`** One approach to setting up a dbt project with Dagster is to place the dbt project inside the Dagster project. This can be done by following the steps mentioned in the GitHub discussion #12248. However, if your dbt project is located in a different folder or even a different Git repo, you can still deploy it to Dagster.

In such scenarios, one approach is to set up the Dagster repository's CI/CD (Continuous Integration/Continuous Deployment) process to clone the dbt repository into a subdirectory before building. This ensures that the dbt project is included within the Dagster project. Additionally, you can set up a dispatch action on the dbt repository to trigger a deploy of Dagster whenever changes are made to the dbt project.

For a practical example, you can refer to the provided GitHub links: 
- [Dagster project example](https://github.com/slopp/dagsterproj)
- [dbt project example](https://github.com/slopp/dbtproj)

## Approach

Take the user question, summarize it for keywords, then use those words for an embedding search.


In [1]:
llm = OpenAI(model="gpt-3.5-turbo-16k", temperature=0, max_tokens=10000)

NameError: name 'OpenAI' is not defined