Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions api/webview/migrations/0002_auto_20151113_1515.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import models, migrations
import django_pgjson.fields


class Migration(migrations.Migration):

dependencies = [
('webview', '0001_initial'),
]

operations = [
migrations.CreateModel(
name='Version',
fields=[
('id', models.AutoField(primary_key=True, auto_created=True, verbose_name='ID', serialize=False)),
('source', models.CharField(max_length=255)),
('docID', models.TextField()),
('timestamps', django_pgjson.fields.JsonField()),
('providerUpdatedDateTime', models.DateTimeField(null=True)),
('raw', django_pgjson.fields.JsonField()),
('normalized', django_pgjson.fields.JsonField(null=True)),
],
),
migrations.AddField(
model_name='document',
name='status',
field=models.TextField(null=True),
),
migrations.AddField(
model_name='document',
name='timestamps',
field=django_pgjson.fields.JsonField(null=True),
),
migrations.AddField(
model_name='version',
name='key',
field=models.ForeignKey(to='webview.Document'),
),
]
19 changes: 19 additions & 0 deletions api/webview/migrations/0003_version_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import models, migrations


class Migration(migrations.Migration):

dependencies = [
('webview', '0002_auto_20151113_1515'),
]

operations = [
migrations.AddField(
model_name='version',
name='status',
field=models.TextField(null=True),
),
]
16 changes: 16 additions & 0 deletions api/webview/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ class Document(models.Model):
providerUpdatedDateTime = models.DateTimeField(null=True)

raw = JsonField()
timestamps = JsonField(null=True)
normalized = JsonField(null=True)
status = models.TextField(null=True)

def save(self, *args, **kwargs):
if not self.key:
Expand All @@ -22,6 +24,20 @@ def _make_key(cls, source, docID):
return '|'.join((source, docID))


class Version(models.Model):
key = models.ForeignKey(Document)

source = models.CharField(max_length=255)
docID = models.TextField()
timestamps = JsonField()

providerUpdatedDateTime = models.DateTimeField(null=True)

raw = JsonField()
normalized = JsonField(null=True)
status = models.TextField(null=True)


class HarvesterResponse(models.Model):

key = models.TextField(primary_key=True)
Expand Down
6 changes: 6 additions & 0 deletions scrapi/linter/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, attributes, validate=True, clean=False):
:param bool validate: If true, the object will be validated before creation
:param bool clean: If true, optional fields that are null will be deleted
'''
attributes = attributes or {}
# validate a version of the attributes that are safe to check
# against the JSON schema

Expand Down Expand Up @@ -83,6 +84,11 @@ def __setitem__(self, attr, val):
def __delitem__(self, attr):
del self.attributes[attr]

def __bool__(self):
return bool(self.attributes)

__nonzero__ = __bool__


class RawDocument(BaseDocument):

Expand Down
22 changes: 15 additions & 7 deletions scrapi/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def rename(docs, target=None, **kwargs):


@tasks.task_autoretry(default_retry_delay=30, max_retries=5)
def cross_db(docs, target_db=None, index=None, **kwargs):
def cross_db(docs, source_db=None, target_db=None, index=None, versions=False, **kwargs):
"""
Migration to go between
cassandra > postgres
Expand All @@ -49,6 +49,8 @@ def cross_db(docs, target_db=None, index=None, **kwargs):
"""
assert target_db, 'Please specify a target db for the migration -- either postgres or elasticsearch'
assert target_db in ['postgres', 'cassandra', 'elasticsearch'], 'Invalid target database - please specify either postgres, cassandra or elasticsearch'
source_processor = get_processor(source_db or settings.CANONICAL_PROCESSOR)
target_processor = get_processor(target_db)
for doc in docs:
try:
if not doc.raw['doc']:
Expand All @@ -60,14 +62,20 @@ def cross_db(docs, target_db=None, index=None, **kwargs):

raw, normalized = doc.raw, doc.normalized

target_processor = get_processor(target_db)

if not kwargs.get('dry'):
target_processor.process_raw(raw)
if normalized:
target_processor.process_normalized(raw, normalized)
if versions:
for raw_version, norm_version in source_processor.get_versions(raw['source'], raw['docID']):
target_processor.process_raw(raw_version)
if norm_version:
target_processor.process_normalized(raw_version, norm_version)
else:
logger.info('Not storing migrated normalized version from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID']))
else:
logger.info('Not storing migrated normalized from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID']))
target_processor.process_raw(raw)
if normalized:
target_processor.process_normalized(raw, normalized)
else:
logger.info('Not storing migrated normalized from {} with id {}, document is not in approved set list.'.format(raw.attributes['source'], raw.attributes['docID']))
except Exception as e:
logger.exception(e)
log_to_sentry(e)
Expand Down
10 changes: 10 additions & 0 deletions scrapi/processing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ def documents(self, *sources):
'''
raise NotImplementedError

@abstractmethod
def get_versions(self, source, docID):
raise NotImplementedError

def different(self, old, new):
try:
return not all([new[key] == old[key] or (not new[key] and not old[key]) for key in new.keys() if key != 'timestamps'])
except Exception:
return True # If the document fails to load/compare for some reason, accept a new version


class BaseDatabaseManager(object):
'''A base class for database managers in the scrapi processing module
Expand Down
25 changes: 15 additions & 10 deletions scrapi/processing/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,6 @@ def send_to_database(self, docID, source, **kwargs):
# create document
return DocumentModel.create(docID=docID, source=source, **kwargs)

def different(self, old, new):
try:
return not all([new[key] == old[key] or (not new[key] and not old[key]) for key in new.keys() if key != 'timestamps'])
except Exception:
return True # If the document fails to load/compare for some reason, accept a new version

def documents(self, *sources):
q = DocumentModel.objects.timeout(500).allow_filtering().all().limit(100)
querysets = (q.filter(source=source) for source in sources) if sources else [q]
Expand All @@ -169,7 +163,6 @@ def documents(self, *sources):
for doc in page:
doc.save()
yield DocumentTuple(self.to_raw(doc), self.to_normalized(doc))
logger.info('yielded 1000 pages')
page = try_n_times(5, self.next_page, query, page)

def next_page(self, query, page):
Expand All @@ -181,14 +174,13 @@ def to_raw(self, doc):
'docID': doc.docID,
'source': doc.source,
'filetype': doc.filetype,
'timestamps': doc.timestamps,
'versions': doc.versions
'timestamps': doc.timestamps
}, validate=False, clean=False)

def to_normalized(self, doc):
# make the new dict actually contain real items
normed = {}
do_not_include = ['docID', 'doc', 'filetype', 'timestamps', 'source']
do_not_include = ['docID', 'doc', 'filetype', 'timestamps', 'source', 'versions', 'key']
for key, value in dict(doc).items():
if value and key not in do_not_include:
try:
Expand Down Expand Up @@ -229,6 +221,19 @@ def delete(self, source, docID):
def create(self, attributes):
DocumentModel.create(**attributes).save()

def get_versions(self, source, docID):
try:
doc = DocumentModel.get(source=source, docID=docID)
except DocumentModel.DoesNotExist:
return
for uuid in doc.versions:
try:
version = VersionModel.get(key=uuid)
except VersionModel.DoesNotExist:
continue
yield DocumentTuple(self.to_raw(version), self.to_normalized(version))
yield DocumentTuple(self.to_raw(doc), self.to_normalized(doc))


@DatabaseManager.registered_model
class DocumentModel(models.Model):
Expand Down
57 changes: 48 additions & 9 deletions scrapi/processing/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import django

from api.webview.models import HarvesterResponse, Document
from api.webview.models import HarvesterResponse, Document, Version

from scrapi import events
from scrapi.util import json_without_bytes
Expand Down Expand Up @@ -36,6 +36,12 @@ def celery_setup(self, *args, **kwargs):
pass


def paginated(query, page_size=10):
for offset in range(0, query.count(), page_size):
for doc in query[offset:offset + page_size]:
yield doc


class PostgresProcessor(BaseProcessor):
NAME = 'postgres'

Expand All @@ -45,7 +51,7 @@ def documents(self, *sources):
q = Document.objects.all()
querysets = (q.filter(source=source) for source in sources) if sources else [q]
for query in querysets:
for doc in query:
for doc in paginated(query):
try:
raw = RawDocument(doc.raw, clean=False, validate=False)
except AttributeError as e:
Expand Down Expand Up @@ -85,22 +91,22 @@ def HarvesterResponseModel(self):

@events.logged(events.PROCESSING, 'raw.postgres')
def process_raw(self, raw_doc):
source, docID = raw_doc['source'], raw_doc['docID']
document = self._get_by_source_id(source, docID) or Document(source=source, docID=docID)

document = self.version(raw=raw_doc)
timestamps = raw_doc.get('timestamps')
modified_doc = copy.deepcopy(raw_doc.attributes)
if modified_doc.get('versions'):
modified_doc['versions'] = list(map(str, modified_doc['versions']))

document.raw = modified_doc
document.timestamps = timestamps

document.save()

@events.logged(events.PROCESSING, 'normalized.postgres')
def process_normalized(self, raw_doc, normalized):
source, docID = raw_doc['source'], raw_doc['docID']
document = self._get_by_source_id(source, docID) or Document(source=source, docID=docID)
document = self.version(raw=raw_doc, normalized=normalized)
timestamps = raw_doc.get('timestamps') or normalized.get('timestamps')

document.raw = raw_doc.attributes
document.timestamps = timestamps
document.normalized = normalized.attributes
document.providerUpdatedDateTime = normalized['providerUpdatedDateTime']

Expand All @@ -112,6 +118,39 @@ def _get_by_source_id(self, source, docID):
except Document.DoesNotExist:
return None

def version(self, raw=None, normalized=None):
old_doc = self._get_by_source_id(raw['source'], raw['docID'])
if old_doc:
raw_changed = raw and self.different(raw.attributes, old_doc.raw)
norm_changed = normalized and self.different(normalized.attributes, old_doc.normalized)
version = Version(
key=old_doc,
source=old_doc.source,
docID=old_doc.docID,
providerUpdatedDateTime=old_doc.providerUpdatedDateTime,
raw=old_doc.raw,
normalized=old_doc.normalized,
status=old_doc.status,
timestamps=old_doc.timestamps
)
if raw_changed or norm_changed:
version.save()
return old_doc
else:
return Document(source=raw['source'], docID=raw['docID'])

def get_versions(self, source, docID):
doc = self._get_by_source_id(source, docID)
for version in doc.version_set.all().order_by('id'):
yield DocumentTuple(
RawDocument(version.raw, clean=False, validate=False),
NormalizedDocument(version.normalized, clean=False, validate=False)
)
yield DocumentTuple(
RawDocument(doc.raw, clean=False, validate=False),
NormalizedDocument(doc.normalized, clean=False, validate=False)
)


class HarvesterResponseModel(BaseHarvesterResponse):

Expand Down
10 changes: 4 additions & 6 deletions scrapi/processing/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"""

import os
import copy
import json

from scrapi.util import json_without_bytes
Expand All @@ -18,11 +17,7 @@ class StorageProcessor(BaseProcessor):
NAME = 'storage'

def process_raw(self, raw):
new_attrs = copy.deepcopy(raw.attributes)
if new_attrs.get('versions'):
new_attrs['versions'] = list(map(str, new_attrs['versions']))

self.write(raw['source'], raw['docID'], 'raw', new_attrs)
self.write(raw['source'], raw['docID'], 'raw', raw.attributes)

def process_normalized(self, raw, normalized):
self.write(raw['source'], raw['docID'], 'normalized', normalized.attributes)
Expand All @@ -38,3 +33,6 @@ def write(self, source, doc_id, filename, content):

def documents(self, *sources):
raise NotImplementedError

def get_versions(self, source, docID):
raise NotImplementedError
2 changes: 1 addition & 1 deletion scrapi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def migrate(migration, source_db=None, sources=tuple(), async=False, dry=True, g
if async:
segment = list(islice(docs, group_size))
while segment:
migration.s(segment, sources=sources, dry=dry, **kwargs).apply_async()
migration.s(segment, sources=sources, dry=dry, source_db=source_db, **kwargs).apply_async()
segment = list(islice(docs, group_size))
else:
for doc in docs:
Expand Down
Loading