Skip to content

Commit

Permalink
expand search pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
emrgnt-cmplxty committed Jun 17, 2024
1 parent e57a7af commit 4a189ce
Show file tree
Hide file tree
Showing 31 changed files with 348 additions and 292 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.html linguist-documentation
6 changes: 3 additions & 3 deletions docs/pages/cookbooks/knowledge-graph.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ from r2r import (
GenerationConfig,
Pipeline,
R2RAppBuilder,
KGAgentPipe,
KGAgentSearchPipe,
Relation,
run_pipeline,
)
Expand Down Expand Up @@ -445,13 +445,13 @@ Results:
Finally, we are in a position to automatically answer difficult to manage queries with a knowledge agent. The snippet below injects our custom schema into a generic few-shot prompt and uses gpt-4o to create a relevant query

```python filename="r2r/examples/scripts/advanced_kg_cookbook.py"
kg_agent_pipe = KGAgentPipe(
kg_agent_search_pipe = KGAgentSearchPipe(
r2r_app.providers.kg, r2r_app.providers.llm, r2r_app.providers.prompt
)

# Define the pipeline
kg_pipe = Pipeline()
kg_pipe.add_pipe(kg_agent_pipe)
kg_pipe.add_pipe(kg_agent_search_pipe)

kg.update_agent_prompt(prompt_provider, entity_types, relations)

Expand Down
4 changes: 2 additions & 2 deletions docs/pages/deep-dive/ingestion.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The **R2RVectorStoragePipe** stores the generated embeddings in a vector databas
### Knowledge Graph Pipes
When the knowledge graph provider settings are non-null, the pipeline includes pipes for generating and storing knowledge graph data.

- **KGAgentPipe**: Generates Cypher queries to interact with a Neo4j knowledge graph.
- **KGAgentSearchPipe**: Generates Cypher queries to interact with a Neo4j knowledge graph.
- **KGStoragePipe**: Stores the generated knowledge graph data in the specified knowledge graph database.


Expand Down Expand Up @@ -72,7 +72,7 @@ custom_ingestion_pipeline = CustomIngestionPipeline()
pipelines = R2RPipelineFactory(config, pipes).create_pipelines(
ingestion_pipeline = custom_ingestion_pipeline
)
r2r = R2RApp(config, providers, pipelines)
r2r = R2RApp(config=config, providers=providers, pipes=pipes, pipelines=pipelines)
```

### Conclusion
Expand Down
2 changes: 1 addition & 1 deletion docs/public/swagger.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions r2r/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
"VectorEntry",
"VectorType",
"Vector",
"SearchRequest",
"SearchResult",
"VectorSearchRequest",
"VectorSearchResult",
"AsyncPipe",
"PipeType",
"AsyncState",
Expand Down Expand Up @@ -98,7 +98,7 @@
"R2RPromptProvider",
"WebSearchPipe",
"R2RAppBuilder",
"KGAgentPipe",
"KGAgentSearchPipe",
# Prebuilts
"MultiSearchPipe",
"R2RPipeFactoryWithMultiSearch",
Expand Down
6 changes: 3 additions & 3 deletions r2r/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .abstractions.llama_abstractions import VectorStoreQuery
from .abstractions.llm import LLMChatCompletion, LLMChatCompletionChunk
from .abstractions.prompt import Prompt
from .abstractions.search import SearchRequest, SearchResult
from .abstractions.search import VectorSearchRequest, VectorSearchResult
from .abstractions.user import UserStats
from .abstractions.vector import Vector, VectorEntry, VectorType
from .logging.kv_logger import (
Expand Down Expand Up @@ -99,8 +99,8 @@
"VectorEntry",
"VectorType",
"Vector",
"SearchRequest",
"SearchResult",
"VectorSearchRequest",
"VectorSearchResult",
"AsyncPipe",
"PipeType",
"AsyncState",
Expand Down
40 changes: 35 additions & 5 deletions r2r/core/abstractions/search.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,65 @@
"""Abstractions for search functionality."""

import uuid
from typing import Any, Optional
from typing import Any, Dict, List, Optional

from pydantic import BaseModel


class SearchRequest(BaseModel):
class VectorSearchRequest(BaseModel):
"""Request for a search operation."""

query: str
limit: int
filters: Optional[dict[str, Any]] = None


class SearchResult(BaseModel):
class VectorSearchResult(BaseModel):
"""Result of a search operation."""

id: uuid.UUID
score: float
metadata: dict[str, Any]

def __str__(self) -> str:
return f"SearchResult(id={self.id}, score={self.score}, metadata={self.metadata})"
return f"VectorSearchResult(id={self.id}, score={self.score}, metadata={self.metadata})"

def __repr__(self) -> str:
return f"SearchResult(id={self.id}, score={self.score}, metadata={self.metadata})"
return f"VectorSearchResult(id={self.id}, score={self.score}, metadata={self.metadata})"

def dict(self) -> dict:
return {
"id": self.id,
"score": self.score,
"metadata": self.metadata,
}


class KGSearchRequest(BaseModel):
"""Request for a knowledge graph search operation."""

query: str


KGSearchResult = List[List[Dict[str, Any]]]


class AggregateSearchResult(BaseModel):
"""Result of an aggregate search operation."""

vector_search_results: Optional[List[VectorSearchResult]]
kg_search_results: Optional[KGSearchResult] = None

def __str__(self) -> str:
return f"AggregateSearchResult(vector_search_results={self.vector_search_results}, kg_search_results={self.kg_search_results})"

def __repr__(self) -> str:
return f"AggregateSearchResult(vector_search_results={self.vector_search_results}, kg_search_results={self.kg_search_results})"

def dict(self) -> dict:
return {
"vector_search_results": [
result.dict() for result in self.vector_search_results
],
"kg_search_results": self.kg_search_results,
}
137 changes: 115 additions & 22 deletions r2r/core/pipeline/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from enum import Enum
from typing import Any, AsyncGenerator, Optional

from ..abstractions.search import AggregateSearchResult
from ..logging.kv_logger import KVLoggingSingleton
from ..logging.run_manager import RunManager, manage_run
from ..pipes.base_pipe import AsyncPipe, AsyncState
Expand Down Expand Up @@ -223,6 +224,15 @@ def add_pipe(
return super().add_pipe(pipe, add_upstream_outputs, *args, **kwargs)


async def dequeue_requests(queue: asyncio.Queue) -> AsyncGenerator:
"""Create an async generator to dequeue requests."""
while True:
request = await queue.get()
if request is None:
break
yield request


class IngestionPipeline(Pipeline):
"""A pipeline for ingestion."""

Expand All @@ -246,7 +256,7 @@ async def run(
run_manager: Optional[RunManager] = None,
*args: Any,
**kwargs: Any,
):
) -> None:
self.state = state or AsyncState()

async with manage_run(run_manager, self.pipeline_type):
Expand Down Expand Up @@ -282,22 +292,14 @@ async def enqueue_documents():
await embedding_queue.put(None)
await kg_queue.put(None)

# Create an async generator to dequeue documents
async def dequeue_documents(queue: Queue) -> AsyncGenerator:
while True:
document = await queue.get()
if document is None:
break
yield document

# Start the document enqueuing process
enqueue_task = asyncio.create_task(enqueue_documents())

# Start the embedding and KG pipelines in parallel
if self.embedding_pipeline:
embedding_task = asyncio.create_task(
self.embedding_pipeline.run(
dequeue_documents(embedding_queue),
dequeue_requests(embedding_queue),
state,
streaming,
run_manager,
Expand All @@ -309,7 +311,7 @@ async def dequeue_documents(queue: Queue) -> AsyncGenerator:
if self.kg_pipeline:
kg_task = asyncio.create_task(
self.kg_pipeline.run(
dequeue_documents(kg_queue),
dequeue_requests(kg_queue),
state,
streaming,
run_manager,
Expand Down Expand Up @@ -359,10 +361,20 @@ def add_pipe(
raise ValueError("Pipe must be a parsing, embedding, or KG pipe")


class RAGPipeline(Pipeline):
"""A pipeline for RAG."""
class SearchPipeline(Pipeline):
"""A pipeline for search."""

pipeline_type: str = "rag"
pipeline_type: str = "search"

def __init__(
self,
pipe_logger: Optional[KVLoggingSingleton] = None,
run_manager: Optional[RunManager] = None,
):
super().__init__(pipe_logger, run_manager)
self.parsing_pipe = None
self.vector_search_pipeline = None
self.kg_search_pipeline = None

async def run(
self,
Expand All @@ -373,25 +385,106 @@ async def run(
*args: Any,
**kwargs: Any,
):
return await super().run(
input, state, streaming, run_manager, *args, **kwargs
self.state = state or AsyncState()

async with manage_run(run_manager, self.pipeline_type):
await run_manager.log_run_info(
key="pipeline_type",
value=self.pipeline_type,
is_info_log=True,
)

vector_search_queue = Queue()
kg_queue = Queue()

async def enqueue_requests():
async for message in input:
if self.vector_search_pipeline:
await vector_search_queue.put(message)
if self.kg_search_pipeline:
await kg_queue.put(message)

await vector_search_queue.put(None)
await kg_queue.put(None)

# Create an async generator to dequeue requests
async def dequeue_requests(queue: Queue) -> AsyncGenerator:
while True:
request = await queue.get()
if request is None:
break
yield request

# Start the document enqueuing process
enqueue_task = asyncio.create_task(enqueue_requests())

# Start the embedding and KG pipelines in parallel
if self.vector_search_pipeline:
vector_search_task = asyncio.create_task(
self.vector_search_pipeline.run(
dequeue_requests(vector_search_queue),
state,
streaming,
run_manager,
)
)

from ..providers.llm_provider import GenerationConfig

if self.kg_search_pipeline:
kg_task = asyncio.create_task(
self.kg_search_pipeline.run(
dequeue_requests(kg_queue),
state,
streaming,
run_manager,
rag_generation_config=GenerationConfig(model="gpt-4o"),
)
)

await enqueue_task

vector_search_results = (
await vector_search_task if self.vector_search_pipeline else None
)
kg_results = await kg_task if self.kg_search_pipeline else None

return AggregateSearchResult(
vector_search_results=vector_search_results,
kg_search_results=kg_results,
)

def add_pipe(
self,
pipe: AsyncPipe,
add_upstream_outputs: Optional[list[dict[str, str]]] = None,
kg_pipe: bool = False,
vector_search_pipe: bool = False,
*args,
**kwargs,
) -> None:
logger.debug(f"Adding pipe {pipe.config.name} to the RAGPipeline")
return super().add_pipe(pipe, add_upstream_outputs, *args, **kwargs)
logger.debug(f"Adding pipe {pipe.config.name} to the SearchPipeline")

if kg_pipe:
if not self.kg_search_pipeline:
self.kg_search_pipeline = Pipeline()
self.kg_search_pipeline.add_pipe(
pipe, add_upstream_outputs, *args, **kwargs
)
elif vector_search_pipe:
if not self.vector_search_pipeline:
self.vector_search_pipeline = Pipeline()
self.vector_search_pipeline.add_pipe(
pipe, add_upstream_outputs, *args, **kwargs
)
else:
raise ValueError("Pipe must be a vector search or KG pipe")

class SearchPipeline(Pipeline):
"""A pipeline for search."""

pipeline_type: str = "search"
class RAGPipeline(Pipeline):
"""A pipeline for RAG."""

pipeline_type: str = "rag"

async def run(
self,
Expand All @@ -413,5 +506,5 @@ def add_pipe(
*args,
**kwargs,
) -> None:
logger.debug(f"Adding pipe {pipe.config.name} to the SearchPipeline")
logger.debug(f"Adding pipe {pipe.config.name} to the RAGPipeline")
return super().add_pipe(pipe, add_upstream_outputs, *args, **kwargs)
4 changes: 2 additions & 2 deletions r2r/core/providers/embedding_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from typing import Optional

from ..abstractions.search import SearchResult
from ..abstractions.search import VectorSearchResult
from .base_provider import Provider, ProviderConfig


Expand Down Expand Up @@ -65,7 +65,7 @@ async def async_get_embeddings(
def rerank(
self,
query: str,
results: list[SearchResult],
results: list[VectorSearchResult],
stage: PipeStage = PipeStage.RERANK,
limit: int = 10,
):
Expand Down
Loading

0 comments on commit 4a189ce

Please sign in to comment.