Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clearsource history command #268

Merged
merged 13 commits into from
Nov 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ Changelog

v0.0.6 `2016-??-??`
-------------------
Includes i18n directory in package.
- Includes i18n directory in package.
- Adds a new `clearsource_history` command/operation.
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ The following operations can be run from the command line as described underneat
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself

harvester clearsource_history [{source-id}]
- If no source id is given the history for all harvest sources (maximum is 1000)
will be cleared.
Clears all jobs and objects related to a harvest source, but keeps the source
itself. The datasets imported from the harvest source will **NOT** be deleted!!!
If a source id is given, it only clears the history of the harvest source with
the given source id.

harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources
Expand Down
37 changes: 35 additions & 2 deletions ckanext/harvest/commands/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ class Harvester(CkanCommand):
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself

harvester clearsource_history [{source-id}]
- If no source id is given the history for all harvest sources (maximum is 1000) will be cleared.
Clears all jobs and objects related to a harvest source, but keeps the source itself.
The datasets imported from the harvest source will NOT be deleted!!!
If a source id is given, it only clears the history of the harvest source with the given source id.

harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources
Expand Down Expand Up @@ -153,6 +159,8 @@ def command(self):
self.remove_harvest_source()
elif cmd == 'clearsource':
self.clear_harvest_source()
elif cmd == 'clearsource_history':
self.clear_harvest_source_history()
elif cmd == 'sources':
self.list_harvest_sources()
elif cmd == 'job':
Expand Down Expand Up @@ -182,8 +190,7 @@ def command(self):
for method, header, body in consumer.consume(queue=get_fetch_queue_name()):
fetch_callback(consumer, method, header, body)
elif cmd == 'purge_queues':
from ckanext.harvest.queue import purge_queues
purge_queues()
self.purge_queues()
elif cmd == 'initdb':
self.initdb()
elif cmd == 'import':
Expand Down Expand Up @@ -288,6 +295,29 @@ def create_harvest_source(self):
print str(e.error_dict)
raise e

def clear_harvest_source_history(self):
source_id = None
if len(self.args) >= 2:
source_id = unicode(self.args[1])

context = {
'model': model,
'user': self.admin_user['name'],
'session': model.Session
}
if source_id is not None:
get_action('harvest_source_job_history_clear')(context,{'id':source_id})
print 'Cleared job history of harvest source: %s' % source_id
else:
'''
Purge queues, because we clean all harvest jobs and
objects in the database.
'''
self.purge_queues()
cleared_sources_dicts = get_action('harvest_sources_job_history_clear')(context,{})
print 'Cleared job history for all harvest sources: %s source(s)' % len(cleared_sources_dicts)


def show_harvest_source(self):

if len(self.args) >= 2:
Expand Down Expand Up @@ -465,6 +495,9 @@ def reindex(self):
context = {'model': model, 'user': self.admin_user['name']}
get_action('harvest_sources_reindex')(context,{})

def purge_queues(self):
from ckanext.harvest.queue import purge_queues
purge_queues()

def print_harvest_sources(self, sources):
if sources:
Expand Down
67 changes: 67 additions & 0 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,73 @@ def harvest_source_clear(context, data_dict):
return {'id': harvest_source_id}


def harvest_sources_job_history_clear(context, data_dict):
'''
Clears the history for all active harvest sources. All jobs and objects related to a harvest source will
be cleared, but keeps the source itself.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!

'''
check_access('harvest_sources_clear', context, data_dict)

job_history_clear_results = []
# We assume that the maximum of 1000 (hard limit) rows should be enough
result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add the limit? e.g. in data.gov.uk we have approximately 20,000 harvested datasets, so this would be no good.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is searching for harvest sources, not harvested datasets. It could potentially be an issue on larger instances but I think it's good enough for a first version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good point @amercader. We have 400 of those. Still, why not remove the limit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit is set to the maximum of 1000 because the standard value is only 10 without defining the parameter 'limit'. And 1000 is the hard coded maximum of the limit within package_search. You have to read the harvest source packages in blocks for getting really all harvest sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough then - thanks for explaining. Perhaps add a note in the documentation about the limit, just in case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidread That's a good idea. I have added a note about the limit right now.

harvest_packages = result['results']
if harvest_packages:
for data_dict in harvest_packages:
try:
clear_result = get_action('harvest_source_job_history_clear')(context, {'id': data_dict['id']})
job_history_clear_results.append(clear_result)
except NotFound:
# Ignoring not existent harvest sources because of a possibly corrupt search index
# Logging was already done in called function
pass

return job_history_clear_results


def harvest_source_job_history_clear(context, data_dict):
'''
Clears all jobs and objects related to a harvest source, but keeps the source itself.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!

:param id: the id of the harvest source to clear
:type id: string

'''
check_access('harvest_source_clear', context, data_dict)

harvest_source_id = data_dict.get('id', None)

source = HarvestSource.get(harvest_source_id)
if not source:
log.error('Harvest source %s does not exist', harvest_source_id)
raise NotFound('Harvest source %s does not exist' % harvest_source_id)

harvest_source_id = source.id

model = context['model']

sql = '''begin;
delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
commit;
'''.format(harvest_source_id=harvest_source_id)

model.Session.execute(sql)

# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})

return {'id': harvest_source_id}


def harvest_source_index_clear(context, data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but
Expand Down
11 changes: 11 additions & 0 deletions ckanext/harvest/logic/auth/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ def harvest_source_update(context, data_dict):
return {'success': False,
'msg': pt._('User {0} not authorized to update harvest source {1}').format(user, source_id)}

def harvest_sources_clear(context, data_dict):
'''
Authorization check for clearing history for all harvest sources

Only sysadmins can do it
'''
if not user_is_sysadmin(context):
return {'success': False, 'msg': pt._('Only sysadmins can clear history for all harvest jobs')}
else:
return {'success': True}

def harvest_source_clear(context, data_dict):
'''
Authorization check for clearing a harvest source
Expand Down
104 changes: 80 additions & 24 deletions ckanext/harvest/tests/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def test_patch(self):

class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT)
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
Expand All @@ -372,50 +372,106 @@ def test_harvest_source_clear(self):
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)

def test_harvest_source_job_history_clear(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])

# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_job_history_clear')(
context, {'id': source.id})

# verify
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db, 'is None'
assert_equal(dataset_from_db.id, dataset['id'])

def test_harvest_sources_job_history_clear(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)

job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])

# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_sources_job_history_clear')(
context, {})

# verify
assert_equal(
sorted(result),
sorted([{'id': source_1.id}, {'id': source_2.id}]))
source_1 = harvest_model.HarvestSource.get(source_1.id)
assert source_1
assert_equal(harvest_model.HarvestJob.get(job_1.id), None)
assert_equal(harvest_model.HarvestObject.get(object_1_.id), None)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1, 'is None'
assert_equal(dataset_from_db_1.id, dataset_1['id'])
source_2 = harvest_model.HarvestSource.get(source_1.id)
assert source_2
assert_equal(harvest_model.HarvestJob.get(job_2.id), None)
assert_equal(harvest_model.HarvestObject.get(object_2_.id), None)
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2, 'is None'
assert_equal(dataset_from_db_2.id, dataset_2['id'])

def test_harvest_source_create_twice_with_unique_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']

toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)

data_dict['name'] = 'another-source1'
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)

def test_harvest_source_create_twice_with_same_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)

site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']

toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)

data_dict['name'] = 'another-source2'
data_dict['name'] = 'another-source'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)

def test_harvest_source_create_twice_with_unique_url_and_config(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
data_dict = SOURCE_DICT.copy()
factories.HarvestSourceObj(**data_dict)

site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']

toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)

data_dict['name'] = 'another-source3'
data_dict['name'] = 'another-source'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)

def test_harvest_job_create_as_sysadmin(self):
source = factories.HarvestSource(**SOURCE_DICT)
source = factories.HarvestSource(**SOURCE_DICT.copy())

site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
Expand Down Expand Up @@ -538,4 +594,4 @@ def test_harvest_db_logger(self):
per_page = 1
data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page})
self.assertEqual(len(data), per_page)
self.assertEqual(data[0]['level'], 'INFO')
self.assertEqual(data[0]['level'], 'INFO')