Skip to content

Commit

Permalink
Merge 8684f6f into 4bcb92f
Browse files Browse the repository at this point in the history
  • Loading branch information
Heston Liebowitz committed Apr 2, 2020
2 parents 4bcb92f + 8684f6f commit bf2fe2a
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 4 deletions.
33 changes: 31 additions & 2 deletions firebasedata/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
import threading

from blinker.base import Namespace
from urllib3.exceptions import HTTPError

from . import data
from . import watcher

logger = logging.getLogger(__name__)
RETRY_INTERVAL = datetime.timedelta(minutes=1)


class LiveData(object):
def __init__(self, pyrebase_app, root_path, ttl=None):
def __init__(self, pyrebase_app, root_path, ttl=None, retry_interval=None):
self._app = pyrebase_app
self._root_path = root_path
self._ttl = ttl
self._retry_interval = (
RETRY_INTERVAL if retry_interval is None else retry_interval
)
self._db = self._app.database()
self._streams = {}
self._gc_streams = queue.Queue()
Expand All @@ -38,6 +43,12 @@ def get_data(self):

return self._cache

def get_data_silent(self):
try:
return self.get_data()
except HTTPError:
logger.exception('Error getting data')

def set_data(self, path, value):
path_list = data.get_path_list(path)
child = self._db.child(self._root_path)
Expand Down Expand Up @@ -76,10 +87,28 @@ def listen(self):
self.restart,
interval=self._ttl
)
# If the stream and stale watcher are established,
# the metawatcher is no longer needed.
self.cancel_metawatcher()

def get_metawatcher_name(self):
return 'meta_{}'.format(id(self))

def start_metawatcher(self):
watcher.watch(
self.get_metawatcher_name(),
lambda: self._cache is None,
self.get_data_silent,
interval=self._retry_interval
)

def cancel_metawatcher(self):
watcher.cancel(self.get_metawatcher_name())

def restart(self):
self.reset()
self.get_data()
self.start_metawatcher()
self.get_data_silent()

def reset(self):
logger.debug('Resetting all data')
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
argh==0.26.2
attrs==17.4.0
blinker==1.4
callee==0.3.1
certifi==2018.1.18
chardet==3.0.4
colorama==0.3.9
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name='FirebaseData',
version='0.5.2',
version='0.6.0',
packages=find_packages(),
install_requires=[
'blinker>=1.4',
Expand Down
36 changes: 36 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import datetime
import time

import callee
import pytest
from urllib3.exceptions import HTTPError

from firebasedata import data, live

Expand Down Expand Up @@ -125,3 +128,36 @@ def handler(*args, **kwargs):
path='/foo/bar/baz',
value='hola'
)


@pytest.mark.slow
def test_connection_recovery(livedata, mocker):
watch_mock = mocker.Mock(wraps=live.watcher.watch)
cancel_mock = mocker.Mock(wraps=live.watcher.cancel)
live.watcher.watch = watch_mock
live.watcher.cancel = cancel_mock
livedata._cache = None
livedata._ttl = datetime.timedelta(seconds=1)
livedata._retry_interval = datetime.timedelta(seconds=2)
livedata.is_stale = lambda: True
livedata._db.child = mocker.Mock(side_effect=HTTPError('Test error'))

livedata.restart()
time.sleep(3)

watch_mock.assert_any_call(
livedata.get_metawatcher_name(),
callee.functions.Callable(),
livedata.get_data_silent,
interval=livedata._retry_interval
)

livedata._db.child = mocker.Mock()
livedata.is_stale = lambda: False

time.sleep(3)

cancel_mock.assert_any_call(
'meta_{}'.format(id(livedata))
)
assert isinstance(livedata._cache, data.FirebaseData)
66 changes: 65 additions & 1 deletion tests/test_live.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime

import blinker.base
import callee
import pytest
from urllib3.exceptions import HTTPError

from firebasedata import data, live

Expand Down Expand Up @@ -62,6 +64,20 @@ def test_warm_cache(self, livedata):

assert result is cached

def test_connection_error(self, livedata, logger, mocker):
livedata._db.child = mocker.Mock(side_effect=HTTPError('Test error'))

with pytest.raises(HTTPError):
livedata.get_data()

def test_get_data_silent_connection_error(self, livedata, logger, mocker):
livedata._db.child = mocker.Mock(side_effect=HTTPError('Test error'))

result = livedata.get_data_silent()

assert result is None
assert logger.exception.called


class Test_set_data:
def test_set_root(self, livedata):
Expand Down Expand Up @@ -199,10 +215,24 @@ def test_watcher_is_started(self, livedata, mocker):

def test_stream_gc_is_started(self, livedata, mocker):
livedata._start_stream_gc = mocker.Mock()

livedata.listen()

assert livedata._start_stream_gc.called

def test_metawatcher_is_canceled(self, livedata, mocker):
livedata.cancel_metawatcher = mocker.Mock()

livedata.listen()

assert livedata.cancel_metawatcher.called

def test_connection_error(self, livedata, mocker, logger):
livedata._db.child = mocker.Mock(side_effect=HTTPError('Test error'))

with pytest.raises(HTTPError):
livedata.listen()


class Test_reset:
def test_calls_hangup(self, livedata, mocker):
Expand All @@ -221,17 +251,51 @@ def test_resets_cache(self, livedata):
class Test_restart:
def test_calls_reset(self, livedata, mocker):
livedata.reset = mocker.Mock()

livedata.restart()

assert livedata.reset.called

def test_resets_get_data(self, livedata, mocker):
def test_calls_start_metawatcher(self, livedata, mocker):
livedata.start_metawatcher = mocker.Mock()

livedata.restart()

assert livedata.start_metawatcher.called

def test_calls_get_data(self, livedata, mocker):
livedata.get_data = mocker.Mock()

livedata.restart()

assert livedata.get_data.called


class Test_metawatcher:
def test_get_metawatcher_name(self, livedata):
name = livedata.get_metawatcher_name()
assert 'meta_{}'.format(id(livedata)) == name

def test_start_metawatcher(self, livedata, mocker):
watcher_mock = mocker.patch('firebasedata.live.watcher.watch')
livedata.start_metawatcher()

watcher_mock.assert_called_with(
livedata.get_metawatcher_name(),
callee.functions.Callable(),
livedata.get_data_silent,
interval=callee.types.InstanceOf(datetime.timedelta)
)

def test_cancel_metawatcher(self, livedata, mocker):
name = 'metawatcher'
watcher_mock = mocker.patch('firebasedata.live.watcher.cancel')
livedata.get_metawatcher_name = lambda: name
livedata.cancel_metawatcher()

watcher_mock.assert_called_with(name)


class Test_hangup:
def test_cancel_watcher(self, livedata, mocker):
watcher_mock = mocker.patch('firebasedata.live.watcher')
Expand Down

0 comments on commit bf2fe2a

Please sign in to comment.