Skip to content

Commit

Permalink
feat(recap->op): Add db flag
Browse files Browse the repository at this point in the history
Merge feat-add-flag branch into pr
  • Loading branch information
flooie committed Jun 19, 2024
1 parent 92f5ded commit 4ef18f4
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions cl/corpus_importer/management/commands/recap_into_opinions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from asgiref.sync import async_to_sync
from django.conf import settings
from django.core.management import BaseCommand
from django.db.models import Q
from eyecite.tokenizers import HyperscanTokenizer
Expand Down Expand Up @@ -54,24 +55,31 @@ def import_opinions_from_recap(
skip_until: Optional[str] = None,
total_count: int = 0,
queue: str = "batch1",
db_connection: str = "default",
) -> None:
"""Import recap documents into opinion db
:param court: Court ID if any
:param skip_until: Court ID to re-start at
:param total_count: The number of new opinions to add
:param queue: The queue to use for celery
:param db_connection: The db to use
:return: None
"""
court_query = Court.objects.using(db_connection)

if not court_str:
filter_conditions = Q(jurisdiction=Court.FEDERAL_DISTRICT) & ~Q(
id__in=["orld", "dcd"]
id__in=[
"orld",
"dcd",
] # orld is historical and we gather dcd opinions from the court
)
if skip_until:
filter_conditions &= Q(id__gte=skip_until)
else:
filter_conditions = Q(pk=court_str)
courts = Court.objects.filter(filter_conditions).order_by("id")
courts = court_query.filter(filter_conditions).order_by("id")

count = 0
for court in courts:
Expand All @@ -80,7 +88,7 @@ def import_opinions_from_recap(
# Manually select the replica db which has an addt'l index added to
# improve this query.
latest_date_filed = (
OpinionCluster.objects.using("replica")
OpinionCluster.objects.using(db_connection)
.filter(docket__court=court)
.exclude(source=SOURCES.RECAP)
.order_by("-date_filed")
Expand All @@ -93,8 +101,9 @@ def import_opinions_from_recap(
)
continue

recap_document_ids = (
RECAPDocument.objects.filter(
recap_documents = (
RECAPDocument.objects.using(db_connection)
.filter(
docket_entry__docket__court=court,
docket_entry__date_filed__gt=latest_date_filed,
is_available=True,
Expand All @@ -105,15 +114,15 @@ def import_opinions_from_recap(
)

throttle = CeleryThrottle(queue_name=queue)
for recap_document_id in recap_document_ids.iterator():
count += 1
for recap_document in recap_documents.iterator():
logger.info(
f"{count}: Importing rd {recap_document_id.id} in {court.id}"
f"{count}: Importing rd {recap_document.id} in {court.id}"
)
throttle.maybe_wait()
ingest_recap_document.apply_async(
args=[recap_document_id], queue=queue
args=[recap_document.id], queue=queue
)
count += 1
if total_count > 0 and count >= total_count:
logger.info(
f"RECAP import completed for {total_count} documents"
Expand Down Expand Up @@ -152,10 +161,24 @@ def add_arguments(self, parser):
default="batch1",
help="The celery queue where the tasks should be processed",
)
parser.add_argument(
"--use-replica",
action="store_true",
default=False,
help="Use this flag to run the queries in the replica db",
)

def handle(self, *args, **options):
court = options.get("court")
skip_until = options.get("skip_until")
total_count = options.get("total")
queue = options.get("queue")
import_opinions_from_recap(court, skip_until, total_count, queue)
db_connection = (
"replica"
if options.get("use_replica") and "replica" in settings.DATABASES
else "default"
)

import_opinions_from_recap(
court, skip_until, total_count, queue, db_connection
)

0 comments on commit 4ef18f4

Please sign in to comment.