diff --git a/compose/local/django/celery/worker/start b/compose/local/django/celery/worker/start index d7b63cd..e89cc9e 100644 --- a/compose/local/django/celery/worker/start +++ b/compose/local/django/celery/worker/start @@ -4,4 +4,4 @@ set -o errexit set -o nounset -watchgod celery.__main__.main --args -A config.celery_app worker -l INFO +watchgod celery.__main__.main --args -A config.celery_app worker -l INFO --concurrency=1 diff --git a/config/graphql/base.py b/config/graphql/base.py index 0d5493c..30a7b4c 100644 --- a/config/graphql/base.py +++ b/config/graphql/base.py @@ -148,7 +148,7 @@ def mutate(cls, root, info, *args, **kwargs): logger.info("No user") raise ValueError("No user in this request...") - logger.info(f"DRFMutation - kwargs: {kwargs}") + # logger.info(f"DRFMutation - kwargs: {kwargs}") serializer = cls.IOSettings.serializer if hasattr(cls.IOSettings, "pk_fields"): @@ -207,9 +207,9 @@ def mutate(cls, root, info, *args, **kwargs): logger.info("Succeeded updating obj") else: - logger.info( - f"No lookup field specified... create obj with kwargs: {kwargs}" - ) + # logger.info( + # f"No lookup field specified... create obj with kwargs: {kwargs}" + # ) obj_serializer = serializer(data=kwargs) obj_serializer.is_valid(raise_exception=True) obj = obj_serializer.save() diff --git a/config/settings/base.py b/config/settings/base.py index 8d98406..071dbfe 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -409,7 +409,9 @@ # CELERY_TASK_SOFT_TIME_LIMIT = 3600 # http://docs.celeryproject.org/en/latest/userguide/configuration.html#beat-scheduler CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" -CELERY_WORKER_MAX_MEMORY_PER_CHILD = 1024000 +CELERY_WORKER_MAX_MEMORY_PER_CHILD = 14240000 # 14 GB (thousands of kilobytes) +CELERY_MAX_TASKS_PER_CHILD = 4 +CELERY_PREFETCH_MULTIPLIER = 1 # django-rest-framework # ------------------------------------------------------------------------------- # django-rest-framework - https://www.django-rest-framework.org/api-guide/settings/ diff --git a/opencontractserver/tasks/__init__.py b/opencontractserver/tasks/__init__.py index 559b114..4b1a133 100644 --- a/opencontractserver/tasks/__init__.py +++ b/opencontractserver/tasks/__init__.py @@ -1,5 +1,5 @@ from .cleanup_tasks import delete_analysis_and_annotations_task -from .doc_tasks import base_64_encode_bytes, burn_doc_annotations +from .doc_tasks import burn_doc_annotations from .export_tasks import package_annotated_docs from .fork_tasks import fork_corpus from .import_tasks import import_corpus @@ -15,7 +15,6 @@ __all__ = [ "package_annotated_docs", "burn_doc_annotations", - "base_64_encode_bytes", "fork_corpus", "build_label_lookups_task", "import_corpus", diff --git a/opencontractserver/tasks/doc_tasks.py b/opencontractserver/tasks/doc_tasks.py index 32a6771..9952993 100644 --- a/opencontractserver/tasks/doc_tasks.py +++ b/opencontractserver/tasks/doc_tasks.py @@ -41,7 +41,9 @@ class TaskStates(str, enum.Enum): TEMP_DIR = "./tmp" -@celery_app.task() +@celery_app.task( + autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5} +) def process_pdf_page( total_page_count: int, page_num: int, page_path: str, user_id: int ) -> tuple[int, str, str]: @@ -62,7 +64,7 @@ def process_pdf_page( with open(page_path, "rb") as page_file: page_data = page_file.read() - logger.info(f"Page data: {page_data}") + # logger.info(f"Page data: {page_data}") annotations = extract_pawls_from_pdfs_bytes(pdf_bytes=page_data) logger.info( @@ -96,7 +98,9 @@ def process_pdf_page( return page_num, pawls_fragment_path, page_path -@celery_app.task() +@celery_app.task( + autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5} +) def reassemble_extracted_pdf_parts( doc_parts: list[list[int, str, str]], doc_id: int, @@ -143,8 +147,9 @@ def set_doc_lock_state(*args, locked: bool, doc_id: int): document.save() -@celery_app.task() -# @validate_arguments +@celery_app.task( + autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5} +) def split_pdf_for_processing(user_id: int, doc_id: int) -> list[tuple[int, str]]: logger.info(f"split_pdf_for_processing() - split doc {doc_id} for user {user_id}") diff --git a/opencontractserver/utils/pdf.py b/opencontractserver/utils/pdf.py index ac46c7f..3a16ddd 100644 --- a/opencontractserver/utils/pdf.py +++ b/opencontractserver/utils/pdf.py @@ -1,5 +1,7 @@ import base64 import logging +import pathlib +import uuid from PyPDF2.generic import ( ArrayObject, @@ -92,17 +94,20 @@ def add_highlight_to_page(highlight: DictionaryObject, page): def extract_pawls_from_pdfs_bytes( - pdf_bytes: bytes, TEMP_DIR: str = "./tmp" + pdf_bytes: bytes, ) -> list[PawlsPagePythonType]: - import tempfile - from pawls.commands.preprocess import process_tesseract - with tempfile.NamedTemporaryFile(suffix=".pdf", prefix=TEMP_DIR) as tf: - print(tf.name) - print(type(tf)) - tf.write(pdf_bytes) - annotations: list = process_tesseract(tf.name) + pdf_fragment_folder_path = pathlib.Path("/tmp/user_0/pdf_fragments") + pdf_fragment_folder_path.mkdir(parents=True, exist_ok=True) + pdf_fragment_path = pdf_fragment_folder_path / f"{uuid.uuid4()}.pdf" + with pdf_fragment_path.open("wb") as f: + f.write(pdf_bytes) + + page_path = pdf_fragment_path.resolve().__str__() + annotations: list = process_tesseract(page_path) + + pdf_fragment_path.unlink() return annotations diff --git a/requirements/base.txt b/requirements/base.txt index 8b32649..9d54961 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -38,7 +38,7 @@ drf-extra-fields==3.4.1 # https://github.com/Hipo/drf-extra-fields # ------------------------------------------------------------------------------ # Pawls preprocessors are available as a command line utility in their repo for now # BUT we can install them from their github repo subdirectory using the syntax below: -git+https://github.com/allenai/pawls.git#egg=pawls&subdirectory=cli +git+https://github.com/JSv4/pawls#egg=pawls&subdirectory=cli scikit-learn==1.1.3 pdfplumber pytesseract