Skip to content

Commit

Permalink
Migrate CTS-related code into a separate app
Browse files Browse the repository at this point in the history
This avoids extra dependencies and the crashing celery task on
systems which do not use CTS.
  • Loading branch information
acdha committed Apr 30, 2020
1 parent b382068 commit 27050e1
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 91 deletions.
14 changes: 9 additions & 5 deletions conf/celeryconfig.py
@@ -1,9 +1,11 @@
import datetime
import logging
import os

from celery.schedules import crontab

from chronam.settings import * # NOQA
from chronam.settings import INSTALLED_APPS

APP_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))

Expand All @@ -14,11 +16,6 @@
CELERYD_CONCURRENCY = 2

CELERYBEAT_SCHEDULE = {
"poll_cts": {
"task": "chronam.core.tasks.poll_cts",
"schedule": datetime.timedelta(hours=4),
"args": ()
},
"load_essays": {
"task": "chronam.core.tasks.load_essays",
"schedule": crontab(hour=0, minute=0),
Expand All @@ -30,5 +27,12 @@
},
}

if 'chronam.loc_cts' in INSTALLED_APPS:
CELERYBEAT_SCHEDULE["poll_cts"] = {
"task": "chronam.core.tasks.poll_cts",
"schedule": datetime.timedelta(hours=4),
"args": ()
}

CELERYBEAT_LOG_FILE = os.path.join("/var/log/celery", "celerybeat.log")
CELERYBEAT_LOG_LEVEL = logging.INFO
63 changes: 0 additions & 63 deletions core/tasks.py
@@ -1,14 +1,10 @@
import logging
import os
import minicts

from celery.decorators import task
from django.core import management
from django.core.cache import cache

from chronam.core import cts
from chronam.core.batch_loader import BatchLoader
from chronam.core.models import Batch
from chronam.core.models import OcrDump

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,65 +56,6 @@ def purge_batch(batch, service_request=None):
if service_request:
service_request.fail(str(e))

@task
def poll_purge():
cts = minicts.CTS(settings.CTS_URL, settings.CTS_USERNAME, settings.CTS_PASSWORD)

queue = settings.CTS_QUEUE
purge_service_type = "purge.NdnpPurge.purge"
while True:
req = cts.next_service_request(queue, purge_service_type)
if req is None:
logger.info("no purge service requests")
break

logger.info('got purge service request: %s', req.url)
bag_instance_key = req.data['requestParameters']['baginstancekey']
bag_instance = cts.get_bag_instance(bag_instance_key)
batch_name = os.path.basename(bag_instance.data['filepath'])
logger.info('purging %s', batch_name)

# if the batch isn't there no need to purge
try:
if Batch.objects.filter(name=batch_name).count() == 0:
logger.info('no need to purge %s ; it is not loaded', batch_name)
logger.info('batch %s purged', batch_name)
else:
purge_batch(batch_name, req)
except Exception as e:
logger.exception("purge of %s failed", batch_name)
req.fail("purge of %s failed: %s" % (batch_name, e))


@task
def poll_cts():
if settings.MAX_BATCHES != 0 and Batch.objects.all().count() >= settings.MAX_BATCHES:
logger.debug("not loading more than %s batches", settings.MAX_BATCHES)
return None

c = cts.CTS(settings.CTS_USERNAME, settings.CTS_PASSWORD, settings.CTS_URL)

# 'ndnpstagingingestqueue', 'ingest.NdnpIngest.ingest'
sr = c.next_service_request(settings.CTS_QUEUE, settings.CTS_SERVICE_TYPE)

# no service request? whew, we're done.
if not sr:
logger.debug("no service requests")
return

# determine the location of the bag on the filesystem
logger.info("got service request: %s", sr)
bag_instance_id = sr.data['requestParameters']['baginstancekey']
bag = c.get_bag_instance(bag_instance_id)
bag_dir = bag.data['filepath']

try:
logger.info("loading %s", bag_dir)
return load_batch.delay(bag_dir, sr)

except Exception as e:
logger.exception("loading batch failed!")
sr.fail(str(e))

@task
def delete_django_cache():
Expand Down
14 changes: 6 additions & 8 deletions core/tests/__init__.py
@@ -1,20 +1,18 @@
import logging

from title_loader_tests import *
from holding_loader_tests import *
from title_pull_tests import *
from api_tests import *
from batch_loader_tests import *
from browse_tests import *
from ocr_extractor_tests import *
from essay_loader_tests import *
from holding_loader_tests import *
from index_tests import *
from jp2_tests import *
from api_tests import *
from json_tests import *
from ocr_dump_tests import *
from ocr_extractor_tests import *
from rdf_tests import *
from system_tests import *
from cts_tests import *
from ocr_dump_tests import *

from title_loader_tests import *
from title_pull_tests import *

logging.basicConfig(filename="test.log", level=logging.DEBUG)
Empty file added loc_cts/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion core/cts.py → loc_cts/cts.py
@@ -1,6 +1,6 @@
import json
import urllib
import logging
import urllib
import urlparse

import requests
Expand Down
1 change: 1 addition & 0 deletions loc_cts/management/__init__.py
@@ -0,0 +1 @@
#
20 changes: 20 additions & 0 deletions loc_cts/management/commands/__init__.py
@@ -0,0 +1,20 @@
from __future__ import absolute_import

import logging

from django.core.management.base import BaseCommand


class LoggingCommand(BaseCommand):
def execute(self, *args, **options):
verbosity = options.get("verbosity", 0)

if verbosity > 0:
log_level = logging.DEBUG if verbosity > 1 else logging.INFO
loggers = [logging.getLogger(), logging.getLogger("chronam")]
for logger in loggers:
logger.setLevel(log_level)
for handler in logger.handlers:
handler.setLevel(log_level)

return super(LoggingCommand, self).execute(*args, **options)
Expand Up @@ -2,7 +2,7 @@

from django.conf import settings

from chronam.core.cts import CTS
from chronam.loc_cts.cts import CTS

from . import LoggingCommand

Expand Down
Expand Up @@ -4,7 +4,7 @@

from django.core.management.base import CommandError

from chronam.core import tasks
from chronam.loc_cts.tasks import poll_cts

from . import LoggingCommand

Expand All @@ -16,7 +16,7 @@ class Command(LoggingCommand):

def handle(self, *args, **options):
try:
tasks.poll_cts.apply()
poll_cts.apply()
except Exception:
LOGGER.exception("Unable to load new batches from CTS")
raise CommandError("unable to load batches from CTS")
Expand Up @@ -4,7 +4,7 @@

from django.core.management.base import CommandError

from chronam.core import tasks
from chronam.loc_cts.tasks import poll_purge

from . import LoggingCommand

Expand All @@ -16,7 +16,7 @@ class Command(LoggingCommand):

def handle(self, *args, **options):
try:
tasks.poll_purge.apply()
poll_purge.apply()
except Exception:
LOGGER.exception("Unable to process purge_batch requests:")
raise CommandError("Unable to purge batches")
Expand Up @@ -2,7 +2,7 @@

from django.conf import settings

from chronam.core.cts import CTS
from chronam.loc_cts.cts import CTS

from . import LoggingCommand

Expand Down
73 changes: 73 additions & 0 deletions loc_cts/tasks.py
@@ -0,0 +1,73 @@
import logging
import os

import minicts
from celery.decorators import task
from django.conf import settings

from chronam.core import cts
from chronam.core.models import Batch
from chronam.core.tasks import load_batch, purge_batch

logger = logging.getLogger(__name__)


@task
def poll_purge():
cts = minicts.CTS(settings.CTS_URL, settings.CTS_USERNAME, settings.CTS_PASSWORD)

queue = settings.CTS_QUEUE
purge_service_type = "purge.NdnpPurge.purge"
while True:
req = cts.next_service_request(queue, purge_service_type)
if req is None:
logger.info("no purge service requests")
break

logger.info('got purge service request: %s', req.url)
bag_instance_key = req.data['requestParameters']['baginstancekey']
bag_instance = cts.get_bag_instance(bag_instance_key)
batch_name = os.path.basename(bag_instance.data['filepath'])
logger.info('purging %s', batch_name)

# if the batch isn't there no need to purge
try:
if Batch.objects.filter(name=batch_name).count() == 0:
logger.info('no need to purge %s ; it is not loaded', batch_name)
logger.info('batch %s purged', batch_name)
else:
purge_batch(batch_name, req)
except Exception as e:
logger.exception("purge of %s failed", batch_name)
req.fail("purge of %s failed: %s" % (batch_name, e))


@task
def poll_cts():
if settings.MAX_BATCHES != 0 and Batch.objects.all().count() >= settings.MAX_BATCHES:
logger.debug("not loading more than %s batches", settings.MAX_BATCHES)
return None

c = cts.CTS(settings.CTS_USERNAME, settings.CTS_PASSWORD, settings.CTS_URL)

# 'ndnpstagingingestqueue', 'ingest.NdnpIngest.ingest'
sr = c.next_service_request(settings.CTS_QUEUE, settings.CTS_SERVICE_TYPE)

# no service request? whew, we're done.
if not sr:
logger.debug("no service requests")
return

# determine the location of the bag on the filesystem
logger.info("got service request: %s", sr)
bag_instance_id = sr.data['requestParameters']['baginstancekey']
bag = c.get_bag_instance(bag_instance_id)
bag_dir = bag.data['filepath']

try:
logger.info("loading %s", bag_dir)
return load_batch.delay(bag_dir, sr)

except Exception as e:
logger.exception("loading batch failed!")
sr.fail(str(e))
1 change: 1 addition & 0 deletions loc_cts/tests/__init__.py
@@ -0,0 +1 @@
from cts_tests import *
4 changes: 2 additions & 2 deletions core/tests/cts_tests.py → loc_cts/tests/cts_tests.py
@@ -1,7 +1,7 @@
from django.test import TestCase
from django.conf import settings
from django.test import TestCase

from chronam.core.cts import CTS
from chronam.loc_cts.cts import CTS


class CTSTest(TestCase):
Expand Down
7 changes: 7 additions & 0 deletions settings_loc.py
Expand Up @@ -18,3 +18,10 @@
IS_PRODUCTION = True
OMNITURE_SCRIPT = "https://cdn.loc.gov/js/global/metrics/sc/s_code.js"
SHARETOOL_URL = "https://cdn.loc.gov/sites/chronicling-america.js"

CTS_USERNAME = "username"
CTS_PASSWORD = "password"
CTS_PROJECT_ID = "ndnp"
CTS_QUEUE = "ndnpingestqueue"
CTS_SERVICE_TYPE = "ingest.NdnpIngest.ingest"
CTS_URL = "https://cts.loc.gov/transfer/"
13 changes: 7 additions & 6 deletions settings_template.py
Expand Up @@ -138,12 +138,13 @@

IS_PRODUCTION = False

CTS_USERNAME = "username"
CTS_PASSWORD = "password"
CTS_PROJECT_ID = "ndnp"
CTS_QUEUE = "ndnpingestqueue"
CTS_SERVICE_TYPE = "ingest.NdnpIngest.ingest"
CTS_URL = "https://cts.loc.gov/transfer/"
# These are only relevant to Library of Congress internal users
CTS_USERNAME = ""
CTS_PASSWORD = ""
CTS_PROJECT_ID = ""
CTS_QUEUE = ""
CTS_SERVICE_TYPE = ""
CTS_URL = ""

MAX_BATCHES = 0

Expand Down

0 comments on commit 27050e1

Please sign in to comment.