Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaling KAZU to run over millions of abstracts #18

Open
kajocina opened this issue Jan 30, 2024 · 5 comments
Open

Scaling KAZU to run over millions of abstracts #18

kajocina opened this issue Jan 30, 2024 · 5 comments

Comments

@kajocina
Copy link

kajocina commented Jan 30, 2024

Hi, thanks again for sharing KAZU, it looks like exactly the tool I need for setting up a simple biomed NLP system.

I am planning on using KAZU to add ontology terms to the whole PubMed abstract dump and writing these out to disk. I am a bit concerned that it will not scale well using the approach you show with single document in the tutorial (EGFR query). However, your Ray tutorial page still has TBA.

Do you have some general suggestions for how to scale it for my task? Don't need specific code examples, unless you already have something to share, could be even "quick and dirty".

@RichJackson
Copy link
Collaborator

Hiya!

Yes sorry about the delay in producing the Ray tutorial. To answer your question, You have multiple options for scaling Kazu on a large dataset. It integrates quite well with the Spark .mapPartitions concept, and also Ray. How do you intend to serialise the results? MongoDb or on to disk?

@kajocina
Copy link
Author

I wanted to start simple with just dumping data to disk as jsons. Would there be a large speed advantage by using MongoDB instead?

@RichJackson
Copy link
Collaborator

it depends on your set up. Mongo is a great way of centralising the data, so that it can be easily queried afterwards (Elasticsearch is good too). However, the DB can be a bottleneck, if you have many Kazu workers and only a single Mongo instance.

In terms of batch processing with Ray, I've found it's useful to use the ray Queue concept

You can then wrap Kazu in ray actors, as follows

import logging
from typing import Iterable, Protocol
from typing import List, Any, cast, Optional

import ray
from hydra.utils import instantiate
from omegaconf import DictConfig
from ray import ObjectRef
from ray.util.queue import Queue, Full, Empty
from tqdm import tqdm
from kazu.pipeline import Pipeline
from kazu.data.data import Document

logger = logging.getLogger(__name__)


def await_put(queue: Queue, items: Any, timeout: Optional[int] = 1) -> None:
    attempt = 1
    while True:
        try:
            queue.put(items, block=True, timeout=timeout)
            return
        except Full:
            logger.info(f"cannot put into queue: Full. Attempt: {attempt}")
            attempt += 1


def await_get(queue: Queue, timeout: Optional[int] = 1) -> List[Document]:
    attempt = 1
    while True:
        try:
            items = cast(list[Document], queue.get(block=True, timeout=timeout))
            return items
        except Empty:
            logger.info(f"cannot get from queue: Empty. Attempt: {attempt}")
            attempt += 1

class DocumentLoader(Protocol):
    """Protocol to load documents from a source, and converts them into
    :class:`.Document`."""

    def load(self) -> Iterable[List[Document]]:
        """Convert documents from a source into :class:`.Document`, and yield a list."""
        ...

    def batch_size(self) -> int:
        """Number of documents produced per batch."""
        ...

    def total_documents(self) -> Optional[int]:
        """Total Documents in this data source, if known."""
        ...

    def total_batches_if_available(self) -> Optional[int]:
        maybe_total = self.total_documents()
        if maybe_total is not None:
            total = int(maybe_total / self.batch_size())
        else:
            total = None
        return total

class RayPipelineQueueWorker:


    def __init__(self, cfg: DictConfig):
        self.cfg = cfg
        self.pipeline: Pipeline = instantiate(cfg.Pipeline)
        self.logger = logging.getLogger(__name__)
        self.in_queue: Optional[Queue] = None
        self.out_queue: Optional[Queue] = None

    def run(self) -> None:
        while True:
            docs: list[Document] = await_get(queue=self.in_queue, timeout=None)  # type: ignore[arg-type]
            self.pipeline(docs)
            await_put(self.out_queue, docs, timeout=None)  # type: ignore[arg-type]

    def set_queues(self, in_queue: Queue, out_queue: Queue) -> None:
        self.in_queue = in_queue
        self.out_queue = out_queue


class RayBatchRunner:


    def __init__(
            self,
            cfg: DictConfig,
    ):
        self.cfg = cfg
        ray.init(num_cpus=cfg.batch_processing.worker_count)
        # write your own implementation of a document loader, to create Kazu documents from source data
        self.loader: DocumentLoader = instantiate(cfg.batch_processing.document_loader)
        self.in_queue: Queue = instantiate(cfg.batch_processing.in_queue)
        self.out_queue: Queue = instantiate(cfg.batch_processing.out_queue)
        self.workers: List[ObjectRef[None]] = self.instantiate_workers()

    def instantiate_workers(self) -> List[ObjectRef]:  # type: ignore[type-arg]
        worker_refs = []
        for i in range(self.cfg.batch_processing.worker_count):
            worker_refs.append(self._instantiate_worker())
        return worker_refs

    def _instantiate_worker(self) -> ObjectRef:  # type: ignore[type-arg]
        PipelineQueueWorkerActor = ray.remote(RayPipelineQueueWorker)
        PipelineQueueWorkerActor.options(num_cpus=1)  # type: ignore[attr-defined]

        worker: ObjectRef = PipelineQueueWorkerActor.remote(self.cfg)  # type: ignore[type-arg,call-arg]
        worker.set_queues.remote(self.in_queue, self.out_queue)  # type: ignore[attr-defined]
        task: ObjectRef = worker.run.remote()  # type: ignore[type-arg,attr-defined]
        print("worker started")
        return task

    def start(self) -> Iterable[List[Document]]:
        docs_wanted = 0
        responses = 0
        for i, docs in enumerate(
                tqdm(self.loader.load(), smoothing=0.1, total=self.loader.total_batches_if_available())
        ):
            docs_wanted += len(docs)
            await_put(queue=self.in_queue, items=docs, timeout=5)
            # handle processed batches of docs here - perhaps with additional actors or simply write to disk
            result = await_get(queue=self.out_queue, timeout=5)
            responses += len(result)
            yield result

        while docs_wanted != responses:
            logger.info("awaiting final batches: %s / %s", docs_wanted, responses)
            result = await_get(queue=self.out_queue, timeout=None)
            responses += len(result)
            yield result

@kajocina
Copy link
Author

kajocina commented Jan 30, 2024

For speed over millions of abstracts, if I use Ray on a single multi-cpu node (essentially mimic multiprocessing) but without multiple GPUs, do you think Kazu can still scale reasonably to my task?

@RichJackson
Copy link
Collaborator

yes kazu is designed to run without a GPU

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants