From f32cf1103ab8991d0b547e311fae6021a5eb9622 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 10:36:55 -0500 Subject: [PATCH 01/13] Update postgres models and migrations --- .../migrations/0002_auto_20151113_1515.py | 42 +++++++++++++++++++ api/webview/migrations/0003_version_status.py | 19 +++++++++ api/webview/models.py | 16 +++++++ 3 files changed, 77 insertions(+) create mode 100644 api/webview/migrations/0002_auto_20151113_1515.py create mode 100644 api/webview/migrations/0003_version_status.py diff --git a/api/webview/migrations/0002_auto_20151113_1515.py b/api/webview/migrations/0002_auto_20151113_1515.py new file mode 100644 index 00000000..11b63b8d --- /dev/null +++ b/api/webview/migrations/0002_auto_20151113_1515.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +import django_pgjson.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('webview', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='Version', + fields=[ + ('id', models.AutoField(primary_key=True, auto_created=True, verbose_name='ID', serialize=False)), + ('source', models.CharField(max_length=255)), + ('docID', models.TextField()), + ('timestamps', django_pgjson.fields.JsonField()), + ('providerUpdatedDateTime', models.DateTimeField(null=True)), + ('raw', django_pgjson.fields.JsonField()), + ('normalized', django_pgjson.fields.JsonField(null=True)), + ], + ), + migrations.AddField( + model_name='document', + name='status', + field=models.TextField(null=True), + ), + migrations.AddField( + model_name='document', + name='timestamps', + field=django_pgjson.fields.JsonField(null=True), + ), + migrations.AddField( + model_name='version', + name='key', + field=models.ForeignKey(to='webview.Document'), + ), + ] diff --git a/api/webview/migrations/0003_version_status.py b/api/webview/migrations/0003_version_status.py new file mode 100644 index 00000000..f0e4f28f --- /dev/null +++ b/api/webview/migrations/0003_version_status.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('webview', '0002_auto_20151113_1515'), + ] + + operations = [ + migrations.AddField( + model_name='version', + name='status', + field=models.TextField(null=True), + ), + ] diff --git a/api/webview/models.py b/api/webview/models.py index 6e9a232c..3376270f 100644 --- a/api/webview/models.py +++ b/api/webview/models.py @@ -10,7 +10,9 @@ class Document(models.Model): providerUpdatedDateTime = models.DateTimeField(null=True) raw = JsonField() + timestamps = JsonField(null=True) normalized = JsonField(null=True) + status = models.TextField(null=True) def save(self, *args, **kwargs): if not self.key: @@ -22,6 +24,20 @@ def _make_key(cls, source, docID): return '|'.join((source, docID)) +class Version(models.Model): + key = models.ForeignKey(Document) + + source = models.CharField(max_length=255) + docID = models.TextField() + timestamps = JsonField() + + providerUpdatedDateTime = models.DateTimeField(null=True) + + raw = JsonField() + normalized = JsonField(null=True) + status = models.TextField(null=True) + + class HarvesterResponse(models.Model): key = models.TextField(primary_key=True) From 121550f5b07fdf38330c1fc28e6bbd1f5e112301 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 10:44:43 -0500 Subject: [PATCH 02/13] Add get_versions method that will retrieve the versions for a given source/docID Still TODO: - implement the version retrieval for postgres - tests --- scrapi/processing/base.py | 10 +++++++++ scrapi/processing/cassandra.py | 22 +++++++++++------- scrapi/processing/postgres.py | 41 +++++++++++++++++++++++++++------- scrapi/processing/storage.py | 10 ++++----- 4 files changed, 61 insertions(+), 22 deletions(-) diff --git a/scrapi/processing/base.py b/scrapi/processing/base.py index 1ab45a1a..4693c9c8 100644 --- a/scrapi/processing/base.py +++ b/scrapi/processing/base.py @@ -21,6 +21,16 @@ def documents(self, *sources): ''' raise NotImplementedError + @abstractmethod + def get_versions(self, source, docID): + raise NotImplementedError + + def different(self, old, new): + try: + return not all([new[key] == old[key] or (not new[key] and not old[key]) for key in new.keys() if key != 'timestamps']) + except Exception: + return True # If the document fails to load/compare for some reason, accept a new version + class BaseDatabaseManager(object): '''A base class for database managers in the scrapi processing module diff --git a/scrapi/processing/cassandra.py b/scrapi/processing/cassandra.py index abb5c224..49067d4a 100644 --- a/scrapi/processing/cassandra.py +++ b/scrapi/processing/cassandra.py @@ -154,12 +154,6 @@ def send_to_database(self, docID, source, **kwargs): # create document return DocumentModel.create(docID=docID, source=source, **kwargs) - def different(self, old, new): - try: - return not all([new[key] == old[key] or (not new[key] and not old[key]) for key in new.keys() if key != 'timestamps']) - except Exception: - return True # If the document fails to load/compare for some reason, accept a new version - def documents(self, *sources): q = DocumentModel.objects.timeout(500).allow_filtering().all().limit(100) querysets = (q.filter(source=source) for source in sources) if sources else [q] @@ -181,8 +175,7 @@ def to_raw(self, doc): 'docID': doc.docID, 'source': doc.source, 'filetype': doc.filetype, - 'timestamps': doc.timestamps, - 'versions': doc.versions + 'timestamps': doc.timestamps }, validate=False, clean=False) def to_normalized(self, doc): @@ -229,6 +222,19 @@ def delete(self, source, docID): def create(self, attributes): DocumentModel.create(**attributes).save() + def get_versions(self, source, docID): + try: + doc = DocumentModel.get(source=source, docID=docID) + except DocumentModel.DoesNotExist: + return [] + for uuid in doc.versions: + try: + version = VersionModel.get(uuid) + except VersionModel.DoesNotExist: + continue + yield DocumentTuple(self.to_raw(version), self.to_normalized(version)) + yield DocumentTuple(self.to_raw(doc), self.to_normalized(doc)) + @DatabaseManager.registered_model class DocumentModel(models.Model): diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index d4a8aaa8..50322eea 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -8,7 +8,7 @@ import django -from api.webview.models import HarvesterResponse, Document +from api.webview.models import HarvesterResponse, Document, Version from scrapi import events from scrapi.util import json_without_bytes @@ -85,22 +85,22 @@ def HarvesterResponseModel(self): @events.logged(events.PROCESSING, 'raw.postgres') def process_raw(self, raw_doc): - source, docID = raw_doc['source'], raw_doc['docID'] - document = self._get_by_source_id(source, docID) or Document(source=source, docID=docID) - + document = self.version(raw=raw_doc) + timestamps = raw_doc.get('timestamps') modified_doc = copy.deepcopy(raw_doc.attributes) - if modified_doc.get('versions'): - modified_doc['versions'] = list(map(str, modified_doc['versions'])) document.raw = modified_doc + document.timestamps = timestamps document.save() @events.logged(events.PROCESSING, 'normalized.postgres') def process_normalized(self, raw_doc, normalized): - source, docID = raw_doc['source'], raw_doc['docID'] - document = self._get_by_source_id(source, docID) or Document(source=source, docID=docID) + document = self.version(raw=raw_doc, normalized=normalized) + timestamps = raw_doc.get('timestamps') or normalized.get('timestamps') + document.raw = raw_doc.attributes + document.timestamps = timestamps document.normalized = normalized.attributes document.providerUpdatedDateTime = normalized['providerUpdatedDateTime'] @@ -112,6 +112,31 @@ def _get_by_source_id(self, source, docID): except Document.DoesNotExist: return None + def version(self, raw=None, normalized=None): + old_doc = self._get_by_source_id(raw['source'], raw['docID']) + if old_doc: + raw_changed = raw and self.different(raw.attributes, old_doc.raw) + norm_changed = normalized and self.different(normalized.attributes, old_doc.normalized) + version = Version( + key=old_doc, + source=old_doc.source, + docID=old_doc.docID, + providerUpdatedDateTime=old_doc.providerUpdatedDateTime, + raw=old_doc.raw, + normalized=old_doc.normalized, + status=old_doc.status, + timestamps=old_doc.timestamps + ) + if raw_changed or norm_changed: + version.save() + return old_doc + else: + return Document(source=raw['source'], docID=raw['docID']) + + def get_versions(self, source, docID): + # import ipdb; ipdb.set_trace() + pass + class HarvesterResponseModel(BaseHarvesterResponse): diff --git a/scrapi/processing/storage.py b/scrapi/processing/storage.py index c8e10c7c..ec9b347c 100644 --- a/scrapi/processing/storage.py +++ b/scrapi/processing/storage.py @@ -7,7 +7,6 @@ """ import os -import copy import json from scrapi.util import json_without_bytes @@ -18,11 +17,7 @@ class StorageProcessor(BaseProcessor): NAME = 'storage' def process_raw(self, raw): - new_attrs = copy.deepcopy(raw.attributes) - if new_attrs.get('versions'): - new_attrs['versions'] = list(map(str, new_attrs['versions'])) - - self.write(raw['source'], raw['docID'], 'raw', new_attrs) + self.write(raw['source'], raw['docID'], 'raw', raw.attributes) def process_normalized(self, raw, normalized): self.write(raw['source'], raw['docID'], 'normalized', normalized.attributes) @@ -38,3 +33,6 @@ def write(self, source, doc_id, filename, content): def documents(self, *sources): raise NotImplementedError + + def get_versions(self, source, docID): + raise NotImplementedError From cc0829428a18ceeb36df64de13f8f81d7b727acc Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 10:46:37 -0500 Subject: [PATCH 03/13] start implementing cross_db migration including versions --- scrapi/migrations.py | 22 +++++++++++++++------- scrapi/tasks.py | 2 +- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/scrapi/migrations.py b/scrapi/migrations.py index b90a51dc..4aaa960d 100644 --- a/scrapi/migrations.py +++ b/scrapi/migrations.py @@ -36,7 +36,7 @@ def rename(docs, target=None, **kwargs): @tasks.task_autoretry(default_retry_delay=30, max_retries=5) -def cross_db(docs, target_db=None, index=None, **kwargs): +def cross_db(docs, source_db=None, target_db=None, index=None, versions=False, **kwargs): """ Migration to go between cassandra > postgres @@ -49,6 +49,8 @@ def cross_db(docs, target_db=None, index=None, **kwargs): """ assert target_db, 'Please specify a target db for the migration -- either postgres or elasticsearch' assert target_db in ['postgres', 'cassandra', 'elasticsearch'], 'Invalid target database - please specify either postgres, cassandra or elasticsearch' + source_processor = get_processor(source_db or settings.CANONICAL_PROCESSOR) + target_processor = get_processor(target_db) for doc in docs: try: if not doc.raw['doc']: @@ -60,14 +62,20 @@ def cross_db(docs, target_db=None, index=None, **kwargs): raw, normalized = doc.raw, doc.normalized - target_processor = get_processor(target_db) - if not kwargs.get('dry'): - target_processor.process_raw(raw) - if normalized: - target_processor.process_normalized(raw, normalized) + if versions: + for raw_version, norm_version in source_processor.get_versions(raw['source'], raw['docID']): + target_processor.process_raw(raw_version) + if normalized: + target_processor.process_normalized(raw_version, norm_version) + else: + logger.info('Not storing migrated normalized version with uuid {} from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID'])) else: - logger.info('Not storing migrated normalized from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID'])) + target_processor.process_raw(raw) + if normalized: + target_processor.process_normalized(raw, normalized) + else: + logger.info('Not storing migrated normalized from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID'])) except Exception as e: logger.exception(e) log_to_sentry(e) diff --git a/scrapi/tasks.py b/scrapi/tasks.py index 831f2926..79ed3edc 100644 --- a/scrapi/tasks.py +++ b/scrapi/tasks.py @@ -134,7 +134,7 @@ def migrate(migration, source_db=None, sources=tuple(), async=False, dry=True, g if async: segment = list(islice(docs, group_size)) while segment: - migration.s(segment, sources=sources, dry=dry, **kwargs).apply_async() + migration.s(segment, sources=sources, dry=dry, source_db=source_db, **kwargs).apply_async() segment = list(islice(docs, group_size)) else: for doc in docs: From 09d860992934ee18acc8b7684a302bf0b8df4b80 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 10:56:48 -0500 Subject: [PATCH 04/13] Fix return inside generator --- scrapi/processing/cassandra.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/processing/cassandra.py b/scrapi/processing/cassandra.py index 49067d4a..d6fd20f1 100644 --- a/scrapi/processing/cassandra.py +++ b/scrapi/processing/cassandra.py @@ -226,7 +226,7 @@ def get_versions(self, source, docID): try: doc = DocumentModel.get(source=source, docID=docID) except DocumentModel.DoesNotExist: - return [] + return for uuid in doc.versions: try: version = VersionModel.get(uuid) From c6310a2f8881bf4ff3d8f2f22f80bcfa3b99473b Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 13:26:11 -0500 Subject: [PATCH 05/13] fix non-lazy query, add version retrieval --- scrapi/processing/postgres.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 50322eea..8fb5e319 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -36,6 +36,12 @@ def celery_setup(self, *args, **kwargs): pass +def paginated(query, page_size=10): + for offset in range(0, query.count(), page_size): + for doc in query[offset:offset + page_size]: + yield doc + + class PostgresProcessor(BaseProcessor): NAME = 'postgres' @@ -45,7 +51,7 @@ def documents(self, *sources): q = Document.objects.all() querysets = (q.filter(source=source) for source in sources) if sources else [q] for query in querysets: - for doc in query: + for doc in paginated(query): try: raw = RawDocument(doc.raw, clean=False, validate=False) except AttributeError as e: @@ -134,8 +140,16 @@ def version(self, raw=None, normalized=None): return Document(source=raw['source'], docID=raw['docID']) def get_versions(self, source, docID): - # import ipdb; ipdb.set_trace() - pass + doc = self._get_by_source_id(source, docID) + for version in paginated(doc.version_set.all()): + yield DocumentTuple( + RawDocument(version.raw, clean=False, validate=False), + NormalizedDocument(version.normalized, clean=False, validate=False) + ) + yield DocumentTuple( + RawDocument(doc.raw, clean=False, validate=False), + NormalizedDocument(doc.normalized, clean=False, validate=False) + ) class HarvesterResponseModel(BaseHarvesterResponse): From 79c5081cf6ac0b5d8473e632c5df3ee715e1e87b Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 14:50:58 -0500 Subject: [PATCH 06/13] make sure attributes is a dict, even if it isn't provided --- scrapi/linter/document.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scrapi/linter/document.py b/scrapi/linter/document.py index 202fb34f..1ff22c85 100644 --- a/scrapi/linter/document.py +++ b/scrapi/linter/document.py @@ -54,6 +54,7 @@ def __init__(self, attributes, validate=True, clean=False): :param bool validate: If true, the object will be validated before creation :param bool clean: If true, optional fields that are null will be deleted ''' + attributes = attributes or {} # validate a version of the attributes that are safe to check # against the JSON schema From 8de32bcb48908a1e9d483a30bfdeb573fac0edd2 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 14:51:21 -0500 Subject: [PATCH 07/13] fix incorrect format syntax --- scrapi/migrations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/migrations.py b/scrapi/migrations.py index 4aaa960d..faaba488 100644 --- a/scrapi/migrations.py +++ b/scrapi/migrations.py @@ -69,7 +69,7 @@ def cross_db(docs, source_db=None, target_db=None, index=None, versions=False, * if normalized: target_processor.process_normalized(raw_version, norm_version) else: - logger.info('Not storing migrated normalized version with uuid {} from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID'])) + logger.info('Not storing migrated normalized version from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID'])) else: target_processor.process_raw(raw) if normalized: From 9b7ae3c23e67b83460978f256bd695a53d0fc8c6 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 14:51:40 -0500 Subject: [PATCH 08/13] Fix incorrect get query --- scrapi/processing/cassandra.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/processing/cassandra.py b/scrapi/processing/cassandra.py index d6fd20f1..4748959f 100644 --- a/scrapi/processing/cassandra.py +++ b/scrapi/processing/cassandra.py @@ -229,7 +229,7 @@ def get_versions(self, source, docID): return for uuid in doc.versions: try: - version = VersionModel.get(uuid) + version = VersionModel.get(key=uuid) except VersionModel.DoesNotExist: continue yield DocumentTuple(self.to_raw(version), self.to_normalized(version)) From 5a024fb8f4a01491ee66e22b98140c90d1eb2408 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 15:23:28 -0500 Subject: [PATCH 09/13] typo --- scrapi/migrations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/migrations.py b/scrapi/migrations.py index faaba488..f7ea199b 100644 --- a/scrapi/migrations.py +++ b/scrapi/migrations.py @@ -66,7 +66,7 @@ def cross_db(docs, source_db=None, target_db=None, index=None, versions=False, * if versions: for raw_version, norm_version in source_processor.get_versions(raw['source'], raw['docID']): target_processor.process_raw(raw_version) - if normalized: + if norm_version: target_processor.process_normalized(raw_version, norm_version) else: logger.info('Not storing migrated normalized version from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID'])) From 92c2e9cb87694da7b465b5f7d90d7b7b2df910d0 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Mon, 16 Nov 2015 15:24:06 -0500 Subject: [PATCH 10/13] add some more fields that should not be included in the normalized document to cassandra --- scrapi/processing/cassandra.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scrapi/processing/cassandra.py b/scrapi/processing/cassandra.py index 4748959f..a39c5e49 100644 --- a/scrapi/processing/cassandra.py +++ b/scrapi/processing/cassandra.py @@ -163,7 +163,6 @@ def documents(self, *sources): for doc in page: doc.save() yield DocumentTuple(self.to_raw(doc), self.to_normalized(doc)) - logger.info('yielded 1000 pages') page = try_n_times(5, self.next_page, query, page) def next_page(self, query, page): @@ -181,7 +180,7 @@ def to_raw(self, doc): def to_normalized(self, doc): # make the new dict actually contain real items normed = {} - do_not_include = ['docID', 'doc', 'filetype', 'timestamps', 'source'] + do_not_include = ['docID', 'doc', 'filetype', 'timestamps', 'source', 'versions', 'key'] for key, value in dict(doc).items(): if value and key not in do_not_include: try: From 7f5c2b1f74534e66ef26ee68b6132f3f9f8ecfd2 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Tue, 17 Nov 2015 11:24:34 -0500 Subject: [PATCH 11/13] add correct booling to the document class for python2/3 --- scrapi/linter/document.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scrapi/linter/document.py b/scrapi/linter/document.py index 1ff22c85..d2ca8485 100644 --- a/scrapi/linter/document.py +++ b/scrapi/linter/document.py @@ -84,6 +84,11 @@ def __setitem__(self, attr, val): def __delitem__(self, attr): del self.attributes[attr] + def __bool__(self): + return bool(self.attributes) + + __nonzero__ = __bool__ + class RawDocument(BaseDocument): From bfbd71c91f1449e5655329201e24a72f0d9d76c2 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Tue, 17 Nov 2015 11:26:21 -0500 Subject: [PATCH 12/13] enforce order of postgres versions --- scrapi/processing/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapi/processing/postgres.py b/scrapi/processing/postgres.py index 8fb5e319..31af65ce 100644 --- a/scrapi/processing/postgres.py +++ b/scrapi/processing/postgres.py @@ -141,7 +141,7 @@ def version(self, raw=None, normalized=None): def get_versions(self, source, docID): doc = self._get_by_source_id(source, docID) - for version in paginated(doc.version_set.all()): + for version in doc.version_set.all().order_by('id'): yield DocumentTuple( RawDocument(version.raw, clean=False, validate=False), NormalizedDocument(version.normalized, clean=False, validate=False) From 573a0fc07ad8e1e0da4cae85ce069bbaaefe4ed1 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Tue, 17 Nov 2015 11:26:36 -0500 Subject: [PATCH 13/13] add tests for cross_db migration with versions --- tests/test_migrations.py | 46 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/test_migrations.py b/tests/test_migrations.py index 261059f8..d2e10eda 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -122,6 +122,52 @@ def test_renormalize(processor_name, monkeypatch): processor.delete(docID='get_the_tables', source=RAW['source']) +@pytest.mark.django_db +@pytest.mark.cassandra +@pytest.mark.parametrize('canonical', ['postgres', 'cassandra']) +@pytest.mark.parametrize('destination', ['postgres', 'cassandra']) +def test_cross_db_with_versions(canonical, destination, monkeypatch, index='test'): + new_title = 'How to be really good at Zoo Tycoon: The Definitive Guide' + + if canonical == destination: + return + + monkeypatch.setattr('scrapi.settings.CANONICAL_PROCESSOR', canonical) + + # Get the test documents into the canonical processor + canonical_processor = get_processor(canonical) + canonical_processor.process_raw(RAW) + canonical_processor.process_normalized(RAW, NORMALIZED) + + # Get a version in there too + new_normalized = copy.deepcopy(NORMALIZED.attributes) + new_normalized['title'] = new_title + canonical_processor.process_normalized(RAW, NormalizedDocument(new_normalized)) + + destination_processor = get_processor(destination) + + # Check to see canonical_processor versions are there, and destination are not + canonical_versions = list(canonical_processor.get_versions(docID=RAW['docID'], source=RAW['source'])) + assert len(canonical_versions) == 3 + assert canonical_versions[1].normalized['title'] == NORMALIZED['title'] + assert canonical_versions[2].normalized['title'] == new_title + + destination_doc = destination_processor.get(docID=RAW['docID'], source=RAW['source']) + assert not destination_doc + + # Migrate from the canonical to the destination + tasks.migrate(cross_db, target_db=destination, dry=False, sources=['test'], index=index, versions=True) + + # Check to see if the document made it to the destinaton, and is still in the canonical + destination_versions = list(destination_processor.get_versions(docID=RAW['docID'], source=RAW['source'])) + assert len(destination_versions) == 3 + assert destination_versions[1].normalized['title'] == NORMALIZED['title'] + assert destination_versions[2].normalized['title'] == new_title + + canonical_doc = canonical_processor.get(docID=RAW['docID'], source=RAW['source']) + assert canonical_doc + + @pytest.mark.django_db @pytest.mark.cassandra @pytest.mark.elasticsearch