Skip to content
This repository has been archived by the owner on Apr 16, 2019. It is now read-only.

Commit

Permalink
Merge 6b7167c into d3710e5
Browse files Browse the repository at this point in the history
  • Loading branch information
czue committed Nov 12, 2015
2 parents d3710e5 + 6b7167c commit 06a7baf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
15 changes: 14 additions & 1 deletion fluff/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from fluff import exceptions
from fluff.exceptions import EmitterValidationError
from fluff.signals import BACKEND_SQL, BACKEND_COUCH
from pillowtop.listener import PythonPillow
from pillowtop.checkpoints.manager import get_default_django_checkpoint_for_legacy_pillow_class
from pillowtop.listener import PythonPillow, PYTHONPILLOW_CHUNK_SIZE
from .signals import indicator_document_updated
import fluff.util

Expand Down Expand Up @@ -687,6 +688,7 @@ def aggregate_all_results(cls, keys, reduce=True, date_range=None):
class Meta:
app_label = 'fluff'


class FluffPillow(PythonPillow):
document_filter = None
wrapper = None
Expand All @@ -699,6 +701,17 @@ class FluffPillow(PythonPillow):
# see explanation in IndicatorDocument for how this is used
deleted_types = ()

def __init__(self, chunk_size=None, checkpoint=None):
# the arguments to this function are just for tests.
# explicitly check against None since we want to pass chunk_size=0 through
chunk_size = chunk_size if chunk_size is not None else PYTHONPILLOW_CHUNK_SIZE
# fluff pillows should default to SQL checkpoints
checkpoint = checkpoint or get_default_django_checkpoint_for_legacy_pillow_class(self.__class__)
super(FluffPillow, self).__init__(
chunk_size=chunk_size,
checkpoint=checkpoint,
)

@classmethod
def get_sql_engine(cls):
engine = getattr(cls, '_engine', None)
Expand Down
21 changes: 16 additions & 5 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_indicator_calculation(self):
)
for cls in [MockIndicators, MockIndicatorsWithGetters]:
classname = cls.__name__
pillow = cls.pillow()(chunk_size=0)
pillow = cls.pillow()(chunk_size=0, checkpoint=mock_checkpoint())
pillow.processor(change_from_couch_row({'changes': [], 'id': '123', 'seq': 1, 'doc': doc}),
PillowRuntimeContext())
indicator = self.fakedb.mock_docs.get("%s-123" % classname, None)
Expand Down Expand Up @@ -521,7 +521,7 @@ def test_deleting_on_doc_type_change(self):
)
for cls in [MockIndicators, MockIndicatorsWithGetters]:
classname = cls.__name__
pillow = cls.pillow()(chunk_size=0)
pillow = cls.pillow()(chunk_size=0, checkpoint=mock_checkpoint())
pillow.processor(change_from_couch_row({'changes': [], 'id': '123', 'seq': 1, 'doc': doc}),
PillowRuntimeContext())
indicator = self.fakedb.mock_docs.get("%s-123" % classname, None)
Expand All @@ -530,7 +530,7 @@ def test_deleting_on_doc_type_change(self):
doc['doc_type'] = 'MockArchive'
for cls in [MockIndicators, MockIndicatorsWithGetters]:
classname = cls.__name__
pillow = cls.pillow()(chunk_size=0)
pillow = cls.pillow()(chunk_size=0, checkpoint=mock_checkpoint())
pillow.processor(change_from_couch_row({'changes': [], 'id': '123', 'seq': 1, 'doc': doc}),
PillowRuntimeContext())
indicator = self.fakedb.mock_docs.get("%s-123" % classname, None)
Expand All @@ -547,7 +547,7 @@ def test_deleting_on_doc_type_change_sql(self):
)

for cls in [MockIndicatorsSql]:
pillow = cls.pillow()(chunk_size=0)
pillow = cls.pillow()(chunk_size=0, checkpoint=mock_checkpoint())
pillow.processor(change_from_couch_row({'changes': [], 'id': '123', 'seq': 1, 'doc': doc}),
PillowRuntimeContext())
with self.engine.begin() as connection:
Expand All @@ -556,13 +556,23 @@ def test_deleting_on_doc_type_change_sql(self):

doc['doc_type'] = 'MockArchive'
for cls in [MockIndicatorsSql]:
pillow = cls.pillow()(chunk_size=0)
pillow = cls.pillow()(chunk_size=0, checkpoint=mock_checkpoint())
pillow.processor(change_from_couch_row({'changes': [], 'id': '123', 'seq': 1, 'doc': doc}),
PillowRuntimeContext())
with self.engine.begin() as connection:
rows = connection.execute(sqlalchemy.select([cls._table]))
self.assertEqual(rows.rowcount, 0)


def mock_checkpoint():
# for some reason the testrunner chokes on these if they are not defined inline.
# in the future we may want to explicitly use django tests instead of regular tests
# if we're going to depend on a django environment.
from pillowtop.checkpoints.manager import PillowCheckpoint
from pillowtop.dao.mock import MockDocumentStore
return PillowCheckpoint(MockDocumentStore(), 'mock-checkpoint')


class MockDoc(Document):
_doc_type = "Mock"

Expand Down Expand Up @@ -652,6 +662,7 @@ class MockIndicatorsSql(fluff.IndicatorDocument):
class Meta:
app_label = 'Mock'


class MockIndicatorsSqlWithFlatFields(fluff.IndicatorDocument):

document_class = MockDoc
Expand Down

0 comments on commit 06a7baf

Please sign in to comment.