Skip to content

Commit

Permalink
[#21] Improve gather stage error handling
Browse files Browse the repository at this point in the history
See issue for full details. Basically we don't want to catch any
exception at the queue.py level, as they prevent debugging. Harvesters
should deal with them and return a list of ids or an empty list if no
objects need to be fetched.
Also improved the debug messages.
  • Loading branch information
amercader committed Mar 14, 2013
1 parent 91f18bf commit d77f16a
Showing 1 changed file with 55 additions and 45 deletions.
100 changes: 55 additions & 45 deletions ckanext/harvest/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,54 +97,64 @@ def gather_callback(channel, method, header, body):
try:
id = json.loads(body)['harvest_job_id']
log.debug('Received harvest job id: %s' % id)

# Get a publisher for the fetch queue
publisher = get_fetch_publisher()

try:
job = HarvestJob.get(id)
except:
log.error('Harvest job does not exist: %s' % id)
else:
# Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type
# matches
harvester_found = False
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
harvester_found = True
# Get a list of harvest object ids from the plugin
job.gather_started = datetime.datetime.now()
try:
harvest_object_ids = harvester.gather_stage(job)
except Exception, e:
log.error('Gather stage failed unexpectedly: %s' % e)
job.status = 'Errored'
job.save()
continue
job.gather_finished = datetime.datetime.now()
job.save()
log.debug('Received from plugin''s gather_stage: %r' % harvest_object_ids)
if harvest_object_ids and len(harvest_object_ids) > 0:
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.debug('Sent object %s to the fetch queue' % id)

if not harvester_found:
msg = 'No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job)
err.save()
log.error(msg)

finally:
publisher.close()

except KeyError:
log.error('No harvest job id received')
finally:
model.Session.remove()
channel.basic_ack(method.delivery_tag)
return False

# Get a publisher for the fetch queue
publisher = get_fetch_publisher()

job = HarvestJob.get(id)

if not job:
log.error('Harvest job does not exist: %s' % id)
channel.basic_ack(method.delivery_tag)
return False

# Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type
# matches
harvester_found = False
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
harvester_found = True
# Get a list of harvest object ids from the plugin
job.gather_started = datetime.datetime.now()

harvest_object_ids = harvester.gather_stage(job)

job.gather_finished = datetime.datetime.now()
job.save()

if not isinstance(harvest_object_ids, list):
log.error('Gather stage failed')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False

if len(harvest_object_ids) == 0:
log.info('No harvest objects to fetch')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False

log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))

if not harvester_found:
msg = 'No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job)
err.save()
log.error(msg)

model.Session.remove()
publisher.close()
channel.basic_ack(method.delivery_tag)


def fetch_callback(channel, method, header, body):
Expand Down

0 comments on commit d77f16a

Please sign in to comment.