Skip to content

Commit

Permalink
Finally have good, robust performance for document parsing pipeline u…
Browse files Browse the repository at this point in the history
…sing page-wise queueing, Tuning celerty workers is an ongoing process. For now, trying concurrency=1 and then scaling the celeryworker instances via docker-compose. May be able to increase concurrency. Should document this tuning processing for others with different envs (more cores, less cores, whatever).
  • Loading branch information
JSv4 committed Feb 26, 2023
1 parent 3bd6690 commit e7548c5
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 22 deletions.
2 changes: 1 addition & 1 deletion compose/local/django/celery/worker/start
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ set -o errexit
set -o nounset


watchgod celery.__main__.main --args -A config.celery_app worker -l INFO
watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1
8 changes: 4 additions & 4 deletions config/graphql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def mutate(cls, root, info, *args, **kwargs):
logger.info("No user")
raise ValueError("No user in this request...")

logger.info(f"DRFMutation - kwargs: {kwargs}")
# logger.info(f"DRFMutation - kwargs: {kwargs}")
serializer = cls.IOSettings.serializer

if hasattr(cls.IOSettings, "pk_fields"):
Expand Down Expand Up @@ -207,9 +207,9 @@ def mutate(cls, root, info, *args, **kwargs):
logger.info("Succeeded updating obj")

else:
logger.info(
f"No lookup field specified... create obj with kwargs: {kwargs}"
)
# logger.info(
# f"No lookup field specified... create obj with kwargs: {kwargs}"
# )
obj_serializer = serializer(data=kwargs)
obj_serializer.is_valid(raise_exception=True)
obj = obj_serializer.save()
Expand Down
4 changes: 3 additions & 1 deletion config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@
# CELERY_TASK_SOFT_TIME_LIMIT = 3600
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#beat-scheduler
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 1024000
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 14240000 # 14 GB (thousands of kilobytes)
CELERY_MAX_TASKS_PER_CHILD = 4
CELERY_PREFETCH_MULTIPLIER = 1
# django-rest-framework
# -------------------------------------------------------------------------------
# django-rest-framework - https://www.django-rest-framework.org/api-guide/settings/
Expand Down
3 changes: 1 addition & 2 deletions opencontractserver/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .cleanup_tasks import delete_analysis_and_annotations_task
from .doc_tasks import base_64_encode_bytes, burn_doc_annotations
from .doc_tasks import burn_doc_annotations
from .export_tasks import package_annotated_docs
from .fork_tasks import fork_corpus
from .import_tasks import import_corpus
Expand All @@ -15,7 +15,6 @@
__all__ = [
"package_annotated_docs",
"burn_doc_annotations",
"base_64_encode_bytes",
"fork_corpus",
"build_label_lookups_task",
"import_corpus",
Expand Down
15 changes: 10 additions & 5 deletions opencontractserver/tasks/doc_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class TaskStates(str, enum.Enum):
TEMP_DIR = "./tmp"


@celery_app.task()
@celery_app.task(
autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5}
)
def process_pdf_page(
total_page_count: int, page_num: int, page_path: str, user_id: int
) -> tuple[int, str, str]:
Expand All @@ -62,7 +64,7 @@ def process_pdf_page(
with open(page_path, "rb") as page_file:
page_data = page_file.read()

logger.info(f"Page data: {page_data}")
# logger.info(f"Page data: {page_data}")
annotations = extract_pawls_from_pdfs_bytes(pdf_bytes=page_data)

logger.info(
Expand Down Expand Up @@ -96,7 +98,9 @@ def process_pdf_page(
return page_num, pawls_fragment_path, page_path


@celery_app.task()
@celery_app.task(
autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5}
)
def reassemble_extracted_pdf_parts(
doc_parts: list[list[int, str, str]],
doc_id: int,
Expand Down Expand Up @@ -143,8 +147,9 @@ def set_doc_lock_state(*args, locked: bool, doc_id: int):
document.save()


@celery_app.task()
# @validate_arguments
@celery_app.task(
autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5}
)
def split_pdf_for_processing(user_id: int, doc_id: int) -> list[tuple[int, str]]:

logger.info(f"split_pdf_for_processing() - split doc {doc_id} for user {user_id}")
Expand Down
21 changes: 13 additions & 8 deletions opencontractserver/utils/pdf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import base64
import logging
import pathlib
import uuid

from PyPDF2.generic import (
ArrayObject,
Expand Down Expand Up @@ -92,17 +94,20 @@ def add_highlight_to_page(highlight: DictionaryObject, page):


def extract_pawls_from_pdfs_bytes(
pdf_bytes: bytes, TEMP_DIR: str = "./tmp"
pdf_bytes: bytes,
) -> list[PawlsPagePythonType]:

import tempfile

from pawls.commands.preprocess import process_tesseract

with tempfile.NamedTemporaryFile(suffix=".pdf", prefix=TEMP_DIR) as tf:
print(tf.name)
print(type(tf))
tf.write(pdf_bytes)
annotations: list = process_tesseract(tf.name)
pdf_fragment_folder_path = pathlib.Path("/tmp/user_0/pdf_fragments")
pdf_fragment_folder_path.mkdir(parents=True, exist_ok=True)
pdf_fragment_path = pdf_fragment_folder_path / f"{uuid.uuid4()}.pdf"
with pdf_fragment_path.open("wb") as f:
f.write(pdf_bytes)

page_path = pdf_fragment_path.resolve().__str__()
annotations: list = process_tesseract(page_path)

pdf_fragment_path.unlink()

return annotations
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ drf-extra-fields==3.4.1 # https://github.com/Hipo/drf-extra-fields
# ------------------------------------------------------------------------------
# Pawls preprocessors are available as a command line utility in their repo for now
# BUT we can install them from their github repo subdirectory using the syntax below:
git+https://github.com/allenai/pawls.git#egg=pawls&subdirectory=cli
git+https://github.com/JSv4/pawls#egg=pawls&subdirectory=cli
scikit-learn==1.1.3
pdfplumber
pytesseract
Expand Down

0 comments on commit e7548c5

Please sign in to comment.