Skip to content

Commit

Permalink
Merge pull request #139 from druids/CreateBatchQuerysetIterator
Browse files Browse the repository at this point in the history
Created batch cached iterator
  • Loading branch information
matllubos committed Nov 22, 2022
2 parents e6b9ae0 + 7b22c7a commit 7ef69d0
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/django.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: [3.6, 3.7, 3.8]
python-version: [3.7, 3.8]

steps:
- uses: actions/checkout@v2
Expand Down
92 changes: 92 additions & 0 deletions chamber/models/batch_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from datetime import datetime, timedelta

from django.core.cache import cache
from django.utils.timezone import localtime


class BatchCachedQuerysetIterator:
"""
Batch iterator that stores last iterated object in the cache.
The iterator may be used in different processes (running at a different time).
Iteration will continue from the last complete batch.
Eq
iterator = BatchCachedQuerysetIterator(
User.objects.all(), # iterated queryset
'users', # key used to store last object (cursor) in cache
batch_size=100, # size of one iteration
expiration=10 # expiration of cached cursor in seconds
)
# 100 users are returned (if there is more than 100 users)
list(iterator)
iterator2 = BatchCachedQuerysetIterator(User.objects.all(), 'users', batch_size=100, expiration=10)
# next 100 users are returned (users will be sorted by ID)
list(iterator2)
"""

def __init__(self, queryset, key, store_cursor_with_exception=False, batch_size=10000, expiration=None):
"""
Init batch cached queryset iterator
:param queryset: queryset which you should want to iterate. Queryset should be sortable by ID
:param key: unique key of your iterator which will be used for cursor caching
:param store_cursor_with_exception: store cursor in cache if exception will be occurred
:param batch_size: size of one batch
:param expiration: expiration in datatime, timedelta or integer (number of seconds) format
"""
self._batch_size = batch_size
self._cache_key = f'batch_queryset_iterator_{key}'
self._count_processed = 0
self._queryset = queryset.order_by('pk')
self._cursor = self._cached_cursor = self._get_cursor()
self._chunked_queryset = self._get_chunked_queryset(self._cursor)
self._expiration = self._compute_expiration(expiration)
self._store_cursor_with_exception = store_cursor_with_exception

def _compute_expiration(self, expiration):
if isinstance(expiration, datetime):
return expiration
elif isinstance(expiration, int):
return localtime() + timedelta(seconds=expiration)
elif isinstance(expiration, timedelta):
return localtime() + expiration
else:
raise AttributeError('invalid value of expiration it can be datetime, integer or timedelta')

def _get_cursor(self):
return cache.get(self._cache_key)

def _set_cursor(self):
if self._cursor != self._cached_cursor:
cache.set(self._cache_key, self._cursor, (self._expiration - localtime()).total_seconds())
self._cached_cursor = self._cursor

def _filter_queryset_by_cursor(self, cursor):
queryset = self._queryset
if cursor:
queryset = queryset.filter(pk__gt=cursor)
return queryset

def _get_chunked_queryset(self, cursor):
return self._filter_queryset_by_cursor(cursor)[:self._batch_size]

def __iter__(self):
try:
for obj in self._get_chunked_queryset(self._cursor):
yield obj
self._cursor = obj.pk
self._set_cursor()
finally:
if self._store_cursor_with_exception:
self._set_cursor()

def __len__(self):
return self._get_chunked_queryset(self._cursor).count()

@property
def total_number_of_objects(self):
return self._queryset.count()

@property
def remaining_number_of_objects(self):
return self._filter_queryset_by_cursor(cursor=self._cursor).count()
1 change: 1 addition & 0 deletions example/dj/apps/test_chamber/tests/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .dispatchers import * # NOQA
from .fields import * # NOQA
from .humanized_helpers import * # NOQA
from .batch_iterator import *


class NameComparator(Comparator):
Expand Down
114 changes: 114 additions & 0 deletions example/dj/apps/test_chamber/tests/models/batch_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from datetime import timedelta

from django.core.cache import cache
from django.test import TransactionTestCase
from django.utils.timezone import now

from chamber.models.batch_iterator import BatchCachedQuerysetIterator

from germanium.tools import assert_equal, assert_raises, assert_is_none

from freezegun import freeze_time

from test_chamber.models import TestSmartModel


__all__ = (
'BatchCachedQuerysetIteratorTestCase',
)


class BatchCachedQuerysetIteratorTestCase(TransactionTestCase):

def test_batch_cached_iterator_should_return_all_items_and_set_cursor_to_last_element(self):
test_objs = [TestSmartModel.objects.create(name=str(i)) for i in range(10)]
iterator = BatchCachedQuerysetIterator(TestSmartModel.objects.all(), 'full', batch_size=100, expiration=10)

assert_equal(iterator.total_number_of_objects, 10)
assert_equal(iterator.remaining_number_of_objects, 10)

iterated_objs = [obj for obj in iterator]

assert_equal(test_objs, iterated_objs)
assert_equal(iterator.total_number_of_objects, 10)
assert_equal(iterator.remaining_number_of_objects, 0)
assert_equal(cache.get('batch_queryset_iterator_full'), test_objs[-1].pk)

def test_batch_cached_iterator_should_iterate_over_objects_by_chunks(self):
test_objs = [TestSmartModel.objects.create(name=str(i)) for i in range(20)]
iterator = BatchCachedQuerysetIterator(TestSmartModel.objects.all(), 'chunks', batch_size=10, expiration=10)

assert_equal(iterator.total_number_of_objects, 20)
assert_equal(iterator.remaining_number_of_objects, 20)

iterated_objs = [obj for obj in iterator]
assert_equal(test_objs[:10], iterated_objs)
assert_equal(iterator.total_number_of_objects, 20)
assert_equal(iterator.remaining_number_of_objects, 10)
assert_equal(cache.get('batch_queryset_iterator_chunks'), test_objs[9].pk)

iterator = BatchCachedQuerysetIterator(TestSmartModel.objects.all(), 'chunks', batch_size=10, expiration=10)
iterated_objs = [obj for obj in iterator]
assert_equal(test_objs[10:], iterated_objs)
assert_equal(iterator.total_number_of_objects, 20)
assert_equal(iterator.remaining_number_of_objects, 0)
assert_equal(cache.get('batch_queryset_iterator_chunks'), test_objs[19].pk)

def test_batch_cached_iterator_should_store_last_cursor_for_raised_exception(self):
test_objs = [TestSmartModel.objects.create(name=str(i)) for i in range(20)]
iterator = BatchCachedQuerysetIterator(TestSmartModel.objects.all(), 'exception', batch_size=10, expiration=10)

with assert_raises(RuntimeError):
for i, obj in enumerate(iterator):
if i == 2:
raise RuntimeError
assert_is_none(cache.get('batch_queryset_iterator_exception'))

iterator = BatchCachedQuerysetIterator(
TestSmartModel.objects.all(), 'exception', batch_size=10, expiration=10, store_cursor_with_exception=True
)

with assert_raises(RuntimeError):
for i, obj in enumerate(iterator):
if i == 2:
raise RuntimeError
assert_equal(cache.get('batch_queryset_iterator_exception'), test_objs[1].pk)

iterator = BatchCachedQuerysetIterator(
TestSmartModel.objects.all(), 'exception', batch_size=100, expiration=10
)
iterated_objs = [obj for obj in iterator]
assert_equal(test_objs[2:], iterated_objs)

def test_batch_cached_iterator_should_be_completed_after_exception(self):
test_objs = [TestSmartModel.objects.create(name=str(i)) for i in range(10)]
iterator = BatchCachedQuerysetIterator(
TestSmartModel.objects.all(), 'complete_exception', batch_size=10, expiration=10
)

iterated_objects = []
with assert_raises(RuntimeError):
for i, obj in enumerate(iterator):
if i == 2:
raise RuntimeError
else:
iterated_objects.append(obj)

assert_equal(iterator.total_number_of_objects, 10)
assert_equal(iterator.remaining_number_of_objects, 8)
iterated_objects += [obj for obj in iterator]
assert_equal(iterator.total_number_of_objects, 10)
assert_equal(iterator.remaining_number_of_objects, 0)
assert_equal(test_objs, iterated_objects)

def test_batch_cached_iterator_should_expire_cache(self):
test_objs = [TestSmartModel.objects.create(name=str(i)) for i in range(10)]
iterator = BatchCachedQuerysetIterator(TestSmartModel.objects.all(), 'expiration', batch_size=5, expiration=5)
iterated_objs = [obj for obj in iterator]
assert_equal(cache.get('batch_queryset_iterator_expiration'), test_objs[4].pk)

with freeze_time(now() + timedelta(seconds=5)):
iterator = BatchCachedQuerysetIterator(
TestSmartModel.objects.all(), 'expiration', batch_size=5, expiration=5
)
assert_equal(iterated_objs, list(iterator))
1 change: 1 addition & 0 deletions example/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Django==3.1.13
flake8
freezegun==1.1.0
coveralls
diff-match-patch==20110725.1
django-germanium==2.1.0
Expand Down

0 comments on commit 7ef69d0

Please sign in to comment.