Skip to content

Commit

Permalink
checkin progress
Browse files Browse the repository at this point in the history
  • Loading branch information
emrgnt-cmplxty committed Jun 17, 2024
1 parent 500903a commit 2d8e794
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 420 deletions.
11 changes: 4 additions & 7 deletions r2r/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,10 @@
TextParser,
XLSXParser,
)
from .pipeline.base_pipeline import (
EvalPipeline,
IngestionPipeline,
Pipeline,
RAGPipeline,
SearchPipeline,
)
from .pipeline.base_pipeline import EvalPipeline, Pipeline
from .pipeline.ingestion_pipeline import IngestionPipeline
from .pipeline.rag_pipeline import RAGPipeline
from .pipeline.search_pipeline import SearchPipeline
from .pipes.base_pipe import AsyncPipe, AsyncState, PipeType
from .pipes.loggable_pipe import LoggableAsyncPipe
from .providers.embedding_provider import EmbeddingConfig, EmbeddingProvider
Expand Down
286 changes: 0 additions & 286 deletions r2r/core/pipeline/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,9 @@

import asyncio
import logging
from asyncio import Queue
from enum import Enum
from typing import Any, AsyncGenerator, Optional

from ..abstractions.search import (
AggregateSearchResult,
KGSearchSettings,
VectorSearchSettings,
)
from ..logging.kv_logger import KVLoggingSingleton
from ..logging.run_manager import RunManager, manage_run
from ..pipes.base_pipe import AsyncPipe, AsyncState
Expand Down Expand Up @@ -235,283 +229,3 @@ async def dequeue_requests(queue: asyncio.Queue) -> AsyncGenerator:
if request is None:
break
yield request


class IngestionPipeline(Pipeline):
"""A pipeline for ingestion."""

pipeline_type: str = "ingestion"

def __init__(
self,
pipe_logger: Optional[KVLoggingSingleton] = None,
run_manager: Optional[RunManager] = None,
):
super().__init__(pipe_logger, run_manager)
self.parsing_pipe = None
self.embedding_pipeline = None
self.kg_pipeline = None

async def run(
self,
input: Any,
state: Optional[AsyncState] = None,
streaming: bool = False,
run_manager: Optional[RunManager] = None,
*args: Any,
**kwargs: Any,
) -> None:
self.state = state or AsyncState()

async with manage_run(run_manager, self.pipeline_type):
await run_manager.log_run_info(
key="pipeline_type",
value=self.pipeline_type,
is_info_log=True,
)
if self.parsing_pipe is None:
raise ValueError(
"parsing_pipeline must be set before running the ingestion pipeline"
)
if self.embedding_pipeline is None and self.kg_pipeline is None:
raise ValueError(
"At least one of embedding_pipeline or kg_pipeline must be set before running the ingestion pipeline"
)
# Use queues to duplicate the documents for each pipeline
embedding_queue = Queue()
kg_queue = Queue()

async def enqueue_documents():
async for document in await self.parsing_pipe.run(
self.parsing_pipe.Input(message=input),
state,
run_manager,
*args,
**kwargs,
):
if self.embedding_pipeline:
await embedding_queue.put(document)
if self.kg_pipeline:
await kg_queue.put(document)
await embedding_queue.put(None)
await kg_queue.put(None)

# Start the document enqueuing process
enqueue_task = asyncio.create_task(enqueue_documents())

# Start the embedding and KG pipelines in parallel
if self.embedding_pipeline:
embedding_task = asyncio.create_task(
self.embedding_pipeline.run(
dequeue_requests(embedding_queue),
state,
streaming,
run_manager,
*args,
**kwargs,
)
)

if self.kg_pipeline:
kg_task = asyncio.create_task(
self.kg_pipeline.run(
dequeue_requests(kg_queue),
state,
streaming,
run_manager,
*args,
**kwargs,
)
)

# Wait for the enqueueing task to complete
await enqueue_task

# Wait for the embedding and KG tasks to complete
if self.embedding_pipeline:
await embedding_task
if self.kg_pipeline:
await kg_task

def add_pipe(
self,
pipe: AsyncPipe,
add_upstream_outputs: Optional[list[dict[str, str]]] = None,
parsing_pipe: bool = False,
kg_pipe: bool = False,
embedding_pipe: bool = False,
*args,
**kwargs,
) -> None:
logger.debug(
f"Adding pipe {pipe.config.name} to the IngestionPipeline"
)

if parsing_pipe:
self.parsing_pipe = pipe
elif kg_pipe:
if not self.kg_pipeline:
self.kg_pipeline = Pipeline()
self.kg_pipeline.add_pipe(
pipe, add_upstream_outputs, *args, **kwargs
)
elif embedding_pipe:
if not self.embedding_pipeline:
self.embedding_pipeline = Pipeline()
self.embedding_pipeline.add_pipe(
pipe, add_upstream_outputs, *args, **kwargs
)
else:
raise ValueError("Pipe must be a parsing, embedding, or KG pipe")


class SearchPipeline(Pipeline):
"""A pipeline for search."""

pipeline_type: str = "search"

def __init__(
self,
pipe_logger: Optional[KVLoggingSingleton] = None,
run_manager: Optional[RunManager] = None,
):
super().__init__(pipe_logger, run_manager)
self.parsing_pipe = None
self.vector_search_pipeline = None
self.kg_search_pipeline = None

async def run(
self,
input: Any,
state: Optional[AsyncState] = None,
streaming: bool = False,
vector_search_settings: Optional[
VectorSearchSettings
] = VectorSearchSettings(),
kg_search_settings: Optional[KGSearchSettings] = KGSearchSettings(),
run_manager: Optional[RunManager] = None,
*args: Any,
**kwargs: Any,
):
self.state = state or AsyncState()

async with manage_run(run_manager, self.pipeline_type):
await run_manager.log_run_info(
key="pipeline_type",
value=self.pipeline_type,
is_info_log=True,
)

vector_search_queue = Queue()
kg_queue = Queue()

async def enqueue_requests():
async for message in input:
if self.vector_search_pipeline:
await vector_search_queue.put(message)
if self.kg_search_pipeline:
await kg_queue.put(message)

await vector_search_queue.put(None)
await kg_queue.put(None)

# Create an async generator to dequeue requests
async def dequeue_requests(queue: Queue) -> AsyncGenerator:
while True:
request = await queue.get()
if request is None:
break
yield request

# Start the document enqueuing process
enqueue_task = asyncio.create_task(enqueue_requests())

# Start the embedding and KG pipelines in parallel
if self.vector_search_pipeline:
vector_search_task = asyncio.create_task(
self.vector_search_pipeline.run(
dequeue_requests(vector_search_queue),
state,
streaming,
run_manager,
vector_search_settings=vector_search_settings,
)
)

if self.kg_search_pipeline:
kg_task = asyncio.create_task(
self.kg_search_pipeline.run(
dequeue_requests(kg_queue),
state,
streaming,
run_manager,
kg_search_settings=kg_search_settings,
)
)

await enqueue_task

vector_search_results = (
await vector_search_task if self.vector_search_pipeline else None
)
kg_results = await kg_task if self.kg_search_pipeline else None

return AggregateSearchResult(
vector_search_results=vector_search_results,
kg_search_results=kg_results,
)

def add_pipe(
self,
pipe: AsyncPipe,
add_upstream_outputs: Optional[list[dict[str, str]]] = None,
kg_pipe: bool = False,
vector_search_pipe: bool = False,
*args,
**kwargs,
) -> None:
logger.debug(f"Adding pipe {pipe.config.name} to the SearchPipeline")

if kg_pipe:
if not self.kg_search_pipeline:
self.kg_search_pipeline = Pipeline()
self.kg_search_pipeline.add_pipe(
pipe, add_upstream_outputs, *args, **kwargs
)
elif vector_search_pipe:
if not self.vector_search_pipeline:
self.vector_search_pipeline = Pipeline()
self.vector_search_pipeline.add_pipe(
pipe, add_upstream_outputs, *args, **kwargs
)
else:
raise ValueError("Pipe must be a vector search or KG pipe")


class RAGPipeline(Pipeline):
"""A pipeline for RAG."""

pipeline_type: str = "rag"

async def run(
self,
input: Any,
state: Optional[AsyncState] = None,
streaming: bool = False,
run_manager: Optional[RunManager] = None,
*args: Any,
**kwargs: Any,
):
return await super().run(
input, state, streaming, run_manager, *args, **kwargs
)

def add_pipe(
self,
pipe: AsyncPipe,
add_upstream_outputs: Optional[list[dict[str, str]]] = None,
*args,
**kwargs,
) -> None:
logger.debug(f"Adding pipe {pipe.config.name} to the RAGPipeline")
return super().add_pipe(pipe, add_upstream_outputs, *args, **kwargs)
Loading

0 comments on commit 2d8e794

Please sign in to comment.