Skip to content

Commit

Permalink
feat(corpus_importer): Add Celery to recap import
Browse files Browse the repository at this point in the history
Move importer to task and add celery
  • Loading branch information
flooie committed Jun 14, 2024
1 parent 3b1e373 commit 7d83ab9
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 74 deletions.
101 changes: 27 additions & 74 deletions cl/corpus_importer/management/commands/recap_into_opinions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import eyecite
from asgiref.sync import async_to_sync
from django.core.management import BaseCommand
from django.db import transaction
from eyecite.tokenizers import HyperscanTokenizer
from httpx import (
HTTPStatusError,
Expand All @@ -11,17 +9,12 @@
TimeoutException,
)

from cl.citations.utils import filter_out_non_case_law_citations
from cl.corpus_importer.tasks import ingest_recap_document
from cl.lib.celery_utils import CeleryThrottle
from cl.lib.command_utils import logger
from cl.lib.decorators import retry
from cl.lib.microservice_utils import microservice
from cl.search.models import (
SOURCES,
Court,
Opinion,
OpinionCluster,
RECAPDocument,
)
from cl.search.models import SOURCES, Court, OpinionCluster, RECAPDocument

HYPERSCAN_TOKENIZER = HyperscanTokenizer(cache_dir=".hyperscan")

Expand Down Expand Up @@ -53,14 +46,18 @@ def extract_recap_document(rd: RECAPDocument) -> Response:
return response


def import_opinions_from_recap(court=None, total_count=0):
def import_opinions_from_recap(
court=None,
total_count=0,
queue="batch1",
) -> None:
"""Import recap documents into opinion db
:param court: Court ID if any
:param total_count: The number of new opinions to add
:param queue: The queue to use for celery
:return: None
"""
count = 0
if not court:
courts = Court.objects.filter(
jurisdiction=Court.FEDERAL_DISTRICT
Expand All @@ -69,6 +66,8 @@ def import_opinions_from_recap(court=None, total_count=0):
) # orld is historical and we gather dcd opinions from the court
else:
courts = Court.objects.filter(pk=court)

count = 0
for court in courts:
logger.info(f"Importing RECAP documents for {court}")
cluster = (
Expand All @@ -85,70 +84,17 @@ def import_opinions_from_recap(court=None, total_count=0):
is_free_on_pacer=True,
).order_by("id")

throttle = CeleryThrottle(queue_name=queue)
for recap_document in documents.iterator():
logger.info(f"Importing recap document {recap_document.id}")
docket = recap_document.docket_entry.docket
if "cv" not in docket.docket_number.lower():
logger.info("Skipping non-civil opinion")
continue

ops = Opinion.objects.filter(sha1=recap_document.sha1)
if ops.count() > 0:
throttle.maybe_wait()
ingest_recap_document.apply_async(
args=[recap_document], queue=queue
)
if total_count > 0 and count >= total_count:
logger.info(
f"Skipping previously imported opinion: {ops[0].id}"
f"RECAP import completed for {total_count} documents"
)
continue
response = extract_recap_document(rd=recap_document)

try:
citations = eyecite.get_citations(
response.json()["content"], tokenizer=HYPERSCAN_TOKENIZER
)
except AttributeError:
# Tokenizer fails with some unicode characters
# Ex. 42\u2009U.S.C.\u2009§\u200912131 \u2009 is a small space
# fallback to regular citation match
logger.warning(
"Hyperscan failed for {}, trying w/o tokenizer".format(
recap_document
)
)
citations = eyecite.get_citations(response.json()["content"])

case_law_citations = filter_out_non_case_law_citations(citations)
if len(case_law_citations) == 0:
logger.info(f"No citation found for rd: {recap_document.id}")
continue

with transaction.atomic():
cluster = OpinionCluster.objects.create(
case_name_full=docket.case_name_full,
case_name=docket.case_name,
case_name_short=docket.case_name_short,
docket=docket,
date_filed=recap_document.docket_entry.date_filed,
source=SOURCES.RECAP,
)
Opinion.objects.create(
cluster=cluster,
type=Opinion.TRIAL_COURT,
plain_text=response.json()["content"],
page_count=recap_document.page_count,
sha1=recap_document.sha1,
local_path=recap_document.filepath_local,
)

logger.info(
"Successfully imported https://www.courtlistener.com/opinion/{}/decision/".format(
cluster.id
)
)
count += 1
if total_count > 0 and count >= total_count:
logger.info(
f"RECAP import completed for {total_count} documents"
)
return
return


class Command(BaseCommand):
Expand All @@ -171,8 +117,15 @@ def add_arguments(self, parser):
default=10,
required=False,
)
parser.add_argument(
"--queue",
default="batch1",
help="The celery queue where the tasks should be processed",
)

def handle(self, *args, **options):
court = options.get("court")
total_count = options.get("total")
import_opinions_from_recap(court, total_count)
queue = options.get("queue")

import_opinions_from_recap(court, total_count, queue)
107 changes: 107 additions & 0 deletions cl/corpus_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from tempfile import NamedTemporaryFile
from typing import Any, Dict, List, Optional, Pattern, Tuple, Union

import eyecite
import internetarchive as ia
import requests
from asgiref.sync import async_to_sync
Expand All @@ -20,6 +21,13 @@
from django.db.models import Prefetch
from django.db.models.query import prefetch_related_objects
from django.utils.timezone import now
from eyecite.tokenizers import HyperscanTokenizer
from httpx import (
HTTPStatusError,
NetworkError,
RemoteProtocolError,
TimeoutException,
)
from juriscraper.lib.exceptions import PacerLoginException, ParsingException
from juriscraper.lib.string_utils import CaseNameTweaker, harmonize
from juriscraper.pacer import (
Expand Down Expand Up @@ -51,11 +59,13 @@
from cl.alerts.tasks import enqueue_docket_alert, send_alert_and_webhook
from cl.audio.models import Audio
from cl.celery_init import app
from cl.citations.utils import filter_out_non_case_law_citations
from cl.corpus_importer.api_serializers import IADocketSerializer
from cl.corpus_importer.utils import mark_ia_upload_needed
from cl.custom_filters.templatetags.text_filters import best_case_name
from cl.lib.celery_utils import throttle_task
from cl.lib.crypto import sha1
from cl.lib.decorators import retry
from cl.lib.microservice_utils import microservice
from cl.lib.pacer import (
get_blocked_status,
Expand Down Expand Up @@ -98,15 +108,20 @@
from cl.scrapers.models import PACERFreeDocumentLog, PACERFreeDocumentRow
from cl.scrapers.tasks import extract_recap_pdf_base
from cl.search.models import (
SOURCES,
ClaimHistory,
Court,
Docket,
DocketEntry,
Opinion,
OpinionCluster,
RECAPDocument,
Tag,
)
from cl.search.tasks import add_items_to_solr

HYPERSCAN_TOKENIZER = HyperscanTokenizer(cache_dir=".hyperscan")

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -2294,3 +2309,95 @@ def query_and_save_list_of_creditors(
delete_redis_semaphore(
"CACHE", make_list_of_creditors_key(court_id, d_number_file_name)
)


@retry(
ExceptionToCheck=(
NetworkError,
TimeoutException,
RemoteProtocolError,
HTTPStatusError,
),
tries=3,
delay=5,
backoff=2,
logger=logger,
)
def extract_recap_document(rd: RECAPDocument) -> Response:
"""Call recap-extract from doctor with retries
:param rd: the recap document to extract
:return: Response object
"""
response = async_to_sync(microservice)(
service="recap-extract",
item=rd,
params={"strip_margin": True},
)
response.raise_for_status()
return response


@app.task(bind=True, max_retries=5, ignore_result=True)
def ingest_recap_document(self, recap_document: RECAPDocument) -> None:
"""Ingest recap document into Opinions
:param recap_document: The document to inspect and import
:return:None
"""
logger.info(f"Importing recap document {recap_document.id}")
docket = recap_document.docket_entry.docket
if "cv" not in docket.docket_number.lower():
logger.info("Skipping non-civil opinion")
return

ops = Opinion.objects.filter(sha1=recap_document.sha1)
if ops.count() > 0:
logger.info(f"Skipping previously imported opinion: {ops[0].id}")
return

response = extract_recap_document(rd=recap_document)
r = response.json()

try:
citations = eyecite.get_citations(
r["content"], tokenizer=HYPERSCAN_TOKENIZER
)
except AttributeError:
# Tokenizer fails with some unicode characters
# Ex. 42\u2009U.S.C.\u2009§\u200912131 \u2009 is a small space
# fallback to regular citation match
logger.warning(
f"Hyperscan failed for {recap_document}, trying w/o tokenizer"
)
citations = eyecite.get_citations(r["content"])

case_law_citations = filter_out_non_case_law_citations(citations)
if len(case_law_citations) == 0:
logger.info(f"No citation found for rd: {recap_document.id}")
return

with transaction.atomic():
cluster = OpinionCluster.objects.create(
case_name_full=docket.case_name_full,
case_name=docket.case_name,
case_name_short=docket.case_name_short,
docket=docket,
date_filed=recap_document.docket_entry.date_filed,
source=SOURCES.RECAP,
)
Opinion.objects.create(
cluster=cluster,
type=Opinion.TRIAL_COURT,
plain_text=r["content"],
page_count=recap_document.page_count,
sha1=recap_document.sha1,
local_path=recap_document.filepath_local,
extracted_by_ocr=r["extracted_by_ocr"],
)

logger.info(
"Successfully imported https://www.courtlistener.com/opinion/{}/decision/".format(
cluster.id
)
)

0 comments on commit 7d83ab9

Please sign in to comment.