Skip to content

Commit

Permalink
fix hyde rag pipeline streaming (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
emrgnt-cmplxty committed Apr 25, 2024
1 parent f8a971d commit fdebb96
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ COPY . /app
EXPOSE 8000

# Set the command to run the application with Gunicorn
CMD ["gunicorn", "r2r.examples.servers.config_pipeline:create_app", "--bind", "0.0.0.0:8000", "--workers", "2", "--threads", "8", "--timeout", "0", "--worker-class", "uvicorn.workers.UvicornWorker"]
CMD ["gunicorn", "r2r.examples.servers.configurable_pipeline:create_app", "--bind", "0.0.0.0:8000", "--workers", "2", "--threads", "8", "--timeout", "0", "--worker-class", "uvicorn.workers.UvicornWorker"]
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,23 @@ docker run -d --name r2r_container -p 8000:8000 -e CONFIG_OPTION=local_ollama e

## Q&A Example

[`Configurable Pipeline`](r2r/examples/servers/config_pipeline.py): Execute this script to select and serve a **Q&A RAG**, **Web RAG**, or **Agent RAG** pipeline. This starter pipeline supports ingestion, embedding, and question and the specified RAG, all accessible via a REST API.
[`Configurable Pipeline`](r2r/examples/servers/configurable_pipeline.py): Execute this script to select and serve a **Q&A RAG**, **Web RAG**, or **Agent RAG** pipeline. This starter pipeline supports ingestion, embedding, and question and the specified RAG, all accessible via a REST API.
```bash
# launch the server
# For ex., do `export CONFIG_OPTION=local_ollama` or `--config=local_ollama` to run fully locally
# For ex., do `export PIPELINE_OPTION=web` or `--pipeline=web` to run WebRAG pipeline
python -m r2r.examples.servers.config_pipeline --config=default --pipeline=qna
python -m r2r.examples.servers.configurable_pipeline --config=default --pipeline=qna
```

[`Question & Answer Client`](r2r/examples/clients/run_qna_client.py): This **client script** should be executed subsequent to the server startup above with `pipeline=qna` specified. It facilitates the upload of text entries and PDFs to the server using the Python client and demonstrates the management of document and user-level vectors through its built-in features.
[`Question & Answer Client`](r2r/examples/clients/qna_rag_client.py): This **client script** should be executed subsequent to the server startup above with `pipeline=qna` specified. It facilitates the upload of text entries and PDFs to the server using the Python client and demonstrates the management of document and user-level vectors through its built-in features.

```bash
# run the client

# ingest the default documents (Lyft 10k)
python -m r2r.examples.clients.run_qna_client ingest
python -m r2r.examples.clients.qna_rag_client ingest

python -m r2r.examples.clients.run_qna_client search --query="What was lyfts profit in 2020?"
python -m r2r.examples.clients.qna_rag_client search --query="What was lyfts profit in 2020?"

# Result 1: Title: Lyft 10k 2021
# Net loss was $1.0 billion, a decreas e of 42% and 61% compared to 2020 and 2019, respectively.
Expand All @@ -121,7 +121,7 @@ docker run -d --name r2r_container -p 8000:8000 -e CONFIG_OPTION=local_ollama e

# ...

python -m r2r.examples.clients.run_qna_client rag_completion_streaming --query="What was lyfts profit in 2020?"
python -m r2r.examples.clients.qna_rag_client rag_completion_streaming --query="What was lyfts profit in 2020?"

# <search>[{"id": "a0f6b427-9083-5ef2-aaa1-024b6cebbaee", "score": 0.6862949051074227, "metadata": {"user_id": "df7021ed-6e66-5581-bd69-d4e9ac1e5ada", "pipeline_run_id": "0c2c9a81-0720-4e34-8736-b66189956013", "text": "Title: Lyft 10k 2021\nNet loss was $ ... </search>
#
Expand All @@ -136,15 +136,15 @@ docker run -d --name r2r_container -p 8000:8000 -e CONFIG_OPTION=local_ollama e

```bash
# launch the server
python -m r2r.examples.servers.config_pipeline --config=default --pipeline=hyde
python -m r2r.examples.servers.configurable_pipeline --config=default --pipeline=hyde
```

```bash
# ingests Lyft 10K, Uber 10K, and others
python -m r2r.examples.clients.run_qna_client ingest --document_filter=all
python -m r2r.examples.clients.qna_rag_client ingest --document_filter=all

# run the client
python -m r2r.examples.clients.run_qna_client search --query="What was lyft and ubers profit in 2020?"
python -m r2r.examples.clients.qna_rag_client search --query="What was lyft and ubers profit in 2020?"

# {... 'message': {'content': 'In 2020, Lyft reported a net loss of $1.7529 billion [8]. Uber also reported a significant loss for the year 2020, with its net loss improving by $1.8 billion from 2020, indicating a substantial loss for the year as well [38]. Neither company achieved a profit in 2020; instead, they both experienced considerable losses.' ...}
```
Expand Down
14 changes: 7 additions & 7 deletions docs/pages/getting-started/basic-example.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export LOCAL_DB_PATH=local.sqlite
To launch the basic application server, run the following command:

```bash
python -m r2r.examples.servers.config_pipeline
python -m r2r.examples.servers.configurable_pipeline
```

This command starts the backend server with the Q&A RAG pipeline, which includes the ingestion, embedding, and RAG pipelines served via FastAPI.
Expand All @@ -26,7 +26,7 @@ This command starts the backend server with the Q&A RAG pipeline, which includes
To ingest the demo PDF data, use the following command:

```bash
python -m r2r.examples.clients.run_qna_client ingest meditations
python -m r2r.examples.clients.qna_rag_client ingest meditations
```

This command uploads the `meditations.pdf` file located in the `examples/data` directory and processes it using the ingestion pipeline. The output should be similar to:
Expand All @@ -40,7 +40,7 @@ Upload response = {'message': "File 'meditations.pdf' processed and saved at '/p
To perform a search on the ingested PDF data, run the following command:

```bash
python -m r2r.examples.clients.run_qna_client search "what is the meditations about?"
python -m r2r.examples.clients.qna_rag_client search "what is the meditations about?"
```

This command searches the ingested PDF data for information related to the query "what is the meditations about?". The output should include relevant text snippets from the PDF, such as:
Expand All @@ -56,7 +56,7 @@ virtue, vice ...
To generate a completion using the RAG (Retrieval-Augmented Generation) pipeline, run the following command:

```bash
python -m r2r.examples.clients.run_qna_client rag_completion "what is the meditations about?"
python -m r2r.examples.clients.qna_rag_client rag_completion "what is the meditations about?"
```

This command utilizes the RAG pipeline to generate a comprehensive answer to the query "what is the meditations about?". The output should include a detailed response based on the ingested PDF data, similar to:
Expand All @@ -74,20 +74,20 @@ The RAG pipeline retrieves relevant information from the ingested PDF data and g
## Step 5: Perform Streaming Completion with RAG
To generate a streaming completion, run the following command:
```bash
python -m r2r.examples.clients.run_qna_client rag_completion_streaming "what is the meditations about?"
python -m r2r.examples.clients.qna_rag_client rag_completion_streaming "what is the meditations about?"
```
You should be able to see the response getting streamed to your console as it's getting generated.


## Step 6: Get Server logs
To get server logs using the client, run the following command:
```
python -m r2r.examples.clients.run_qna_client get_logs
python -m r2r.examples.clients.qna_rag_client get_logs
```

Or, if you just want the summary of the logs, run:
```
python -m r2r.examples.clients.run_qna_client get_logs_summary
python -m r2r.examples.clients.qna_rag_client get_logs_summary
```

## Optional Step 7: Connect to web application
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/tutorials/configuring_your_rag_pipeline.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ At this point we're almost ready to use our new Qdrant vector database. We can u
To stand up our local server, we can run:

```bash
python -m r2r.examples.servers.config_pipeline
python -m r2r.examples.servers.configurable_pipeline
```

### Ingesting and Embedding Documents
Expand Down
10 changes: 5 additions & 5 deletions docs/pages/tutorials/local_rag.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ A local vector database will be used to store the embeddings. The current defaul
To stand up the server, we can readily choose from one of the two options below:

```bash
python -m r2r.examples.servers.config_pipeline --config local_ollama
python -m r2r.examples.servers.configurable_pipeline --config local_ollama
```

```bash
python -m r2r.examples.servers.config_pipeline --config local_llama_cpp
python -m r2r.examples.servers.configurable_pipeline --config local_llama_cpp
```

Note, to run with Llama.cpp you must download `tinyllama-1.1b-chat-v1.0.Q2_K.gguf` [here](https://huggingface.co/TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF/resolve/main/tinyllama-1.1b-chat-v1.0.Q2_K.gguf?download=true) and place the output into `~/cache/model/`. The choice between ollama and Llama.cpp depends on our preferred LLM provider.
Expand All @@ -105,7 +105,7 @@ With our environment set up and our server running in a separate process, we're
Run this command to ingest the document:

```bash
python -m r2r.examples.clients.run_qna_client ingest
python -m r2r.examples.clients.qna_rag_client ingest
```

The output should look something like this:
Expand Down Expand Up @@ -137,15 +137,15 @@ ollama serve llama2
Then, to ask a question, run:

```bash
python -m r2r.examples.clients.run_qna_client rag_completion \
python -m r2r.examples.clients.qna_rag_client rag_completion \
--query="What was Lyfts profit in 2020?" \
--model="ollama/llama2"
```

For Llama.cpp, run:

```bash
python -m r2r.examples.clients.run_qna_client rag_completion \
python -m r2r.examples.clients.qna_rag_client rag_completion \
--query="What was Lyfts profit in 2020?" \
--model=tinyllama-1.1b-chat-v1.0.Q2_K.gguf
```
Expand Down
4 changes: 3 additions & 1 deletion r2r/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async def stream_rag_completion(
filters: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = None,
generation_config: Optional[Dict[str, Any]] = None,
timeout: int = 300
):
if not generation_config:
generation_config = {}
Expand All @@ -175,8 +176,9 @@ async def stream_rag_completion(
"settings": settings or {},
"generation_config": generation_config or {},
}
timeout_config = httpx.Timeout(timeout) # Configure the timeout

async with httpx.AsyncClient() as client:
async with httpx.AsyncClient(timeout=timeout_config) as client:
async with client.stream(
"POST", url, headers=headers, json=json_data
) as response:
Expand Down
4 changes: 2 additions & 2 deletions r2r/core/pipelines/rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def ingress(self, message: str) -> Any:
return message

@log_execution_to_db
def transform_message(self, message: str) -> Any:
def transform_message(self, message: str, generation_config: Optional[GenerationConfig] = None) -> Any:
"""
Transforms the input query before retrieval, if necessary.
"""
Expand Down Expand Up @@ -232,7 +232,7 @@ def run_stream(

self.initialize_pipeline(message, search_only=False)

query = self.transform_message(message)
query = self.transform_message(message, generation_config)
search_results = self.search(query, filters, search_limit)
search_results = self.rerank_results(
query, search_results, rerank_limit
Expand Down
2 changes: 1 addition & 1 deletion r2r/examples/clients/qna_rag_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def rag_completion_streaming(self, query, model="gpt-4-turbo-preview"):
async def stream_rag_completion():
async for chunk in self.client.stream_rag_completion(
query,
5,
10,
filters={"user_id": self.user_id},
generation_config={"stream": True, "model": model},
):
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions r2r/main/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ async def search(msg: RAGMessageModel):
msg.search_limit,
msg.rerank_limit,
search_only=True,
generation_config=msg.generation_config,
)
return rag_completion.search_results
except Exception as e:
Expand Down
93 changes: 71 additions & 22 deletions r2r/pipelines/hyde/rag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional
from typing import Optional, Generator

from r2r.core import (
EmbeddingProvider,
Expand Down Expand Up @@ -93,13 +93,15 @@ def transform_message(self, message: str, generation_config: GenerationConfig) -
formatted_prompt = self.prompt_provider.get_prompt(
"hyde_prompt", {"message": message, "num_answers": num_answers}
)

print('formatted prompt = ', formatted_prompt)
completion = self.generate_completion(
formatted_prompt, generation_config
)
queries = completion.choices[0].message.content.strip().split("\n\n")
print('completion = ', completion)
answers = completion.choices[0].message.content.strip().split("\n\n")
generation_config.stream = orig_stream
return queries
print('answers = ', answers)
return answers

@log_execution_to_db
def construct_context(
Expand Down Expand Up @@ -138,41 +140,88 @@ def run(
"""
self.initialize_pipeline(message, search_only)

queries = self.transform_message(message, generation_config)
answers = self.transform_message(message, generation_config)
print('transformed answers = ', answers)
search_results_tuple = [
(
query,
answer,
self.rerank_results(
query,
self.search(query, filters, search_limit),
answer,
self.search(answer, filters, search_limit),
rerank_limit,
),
)
for query in queries
for answer in answers
]

if search_only:
flattened_results = [
result for _, search_results in search_results_tuple for result in search_results
]
search_results = [
result for _, search_results in search_results_tuple for result in search_results
]


if search_only:
return RAGPipelineOutput(
flattened_results,
search_results,
None,
None,
{"queries": queries},
{"answers": answers},
)

context = ""
for offset, (query, search_results) in enumerate(search_results_tuple):
context += self.construct_context(search_results, offset=offset, query=query) + "\n\n"

prompt = self.construct_prompt({"query": query, "context": context})
context = self._construct_joined_context(search_results_tuple, message)
prompt = self.construct_prompt({"query": message, "context": context})

if not generation_config.stream:
completion = self.generate_completion(prompt, generation_config)
return RAGPipelineOutput(search_results, context, completion, {"queries": queries})
return RAGPipelineOutput(search_results, context, completion, {"answers": answers})

return self._return_stream(
search_results, context, prompt, generation_config, metadata={"queries": queries},
search_results, context, prompt, generation_config, metadata={"answers": answers},
)

def run_stream(
self,
message,
generation_config: GenerationConfig,
filters={},
search_limit=25,
rerank_limit=15,
*args,
**kwargs,
) -> Generator[str, None, None]:
"""
Runs the completion pipeline for streaming execution.
"""
if not generation_config.stream:
raise ValueError(
"Streaming mode must be enabled when running `run_stream."
)

self.initialize_pipeline(message, search_only=False)

answers = self.transform_message(message, generation_config)
search_results_tuple = [
(
answer,
self.rerank_results(
answer,
self.search(answer, filters, search_limit),
rerank_limit,
),
)
for answer in answers
]
context = self._construct_joined_context(search_results_tuple)
prompt = self.construct_prompt({"query": message, "context": context})
search_results = [
result for _, search_results in search_results_tuple for result in search_results
]

return self._return_stream(
search_results, context, prompt, generation_config
)

def _construct_joined_context(self, search_results_tuple: tuple[str, list[VectorSearchResult]]) -> str:
context = ""
for offset, (answer, search_results) in enumerate(search_results_tuple):
context += self.construct_context(search_results, offset=offset, query=answer) + "\n\n"
return context
2 changes: 1 addition & 1 deletion r2r/tests/end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def r2r_server():
"poetry",
"run",
"uvicorn",
"r2r.examples.servers.config_pipeline:app",
"r2r.examples.servers.configurable_pipeline:app",
"--port=8010",
"--workers=1",
]
Expand Down

0 comments on commit fdebb96

Please sign in to comment.