Skip to content

Commit

Permalink
Merge 4f04c62 into 4bcb92f
Browse files Browse the repository at this point in the history
  • Loading branch information
Heston Liebowitz committed Apr 1, 2020
2 parents 4bcb92f + 4f04c62 commit c99e3c1
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 14 deletions.
53 changes: 41 additions & 12 deletions firebasedata/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
import threading

from blinker.base import Namespace
from urllib3.exceptions import HTTPError

from . import data
from . import watcher

logger = logging.getLogger(__name__)
RETRY_INTERVAL = datetime.timedelta(minutes=1)


class LiveData(object):
def __init__(self, pyrebase_app, root_path, ttl=None):
def __init__(self, pyrebase_app, root_path, ttl=None, retry_interval=None):
self._app = pyrebase_app
self._root_path = root_path
self._ttl = ttl
self._retry_interval = (
RETRY_INTERVAL if retry_interval is None else retry_interval
)
self._db = self._app.database()
self._streams = {}
self._gc_streams = queue.Queue()
Expand All @@ -31,10 +36,14 @@ def __init__(self, pyrebase_app, root_path, ttl=None):
def get_data(self):
if self._cache is None:
# Fetch data now
value = self._db.child(self._root_path).get().val()
self._cache = data.FirebaseData(value)
# Listen for updates
self.listen()
try:
value = self._db.child(self._root_path).get().val()
except HTTPError:
logger.exception('Error getting data')
else:
self._cache = data.FirebaseData(value)
# Listen for updates
self.listen()

return self._cache

Expand Down Expand Up @@ -67,18 +76,38 @@ def signal(self, path, doc=None):
return self.events.signal(norm_path, doc=doc)

def listen(self):
stream = self._db.child(self._root_path).stream(self._stream_handler)
self._streams[id(stream)] = stream
self._start_stream_gc()
try:
stream = self._db.child(self._root_path).stream(self._stream_handler)
except HTTPError:
logger.exception('Error starting stream')
else:
self._streams[id(stream)] = stream
self._start_stream_gc()
watcher.watch(
id(self),
self.is_stale,
self.restart,
interval=self._ttl
)
self.cancel_metawatcher()

def get_metawatcher_name(self):
return 'meta_{}'.format(id(self))

def start_metawatcher(self):
watcher.watch(
id(self),
self.is_stale,
self.restart,
interval=self._ttl
self.get_metawatcher_name(),
lambda: self._cache is None,
self.get_data,
interval=self._retry_interval
)

def cancel_metawatcher(self):
watcher.cancel(self.get_metawatcher_name())

def restart(self):
self.reset()
self.start_metawatcher()
self.get_data()

def reset(self):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
argh==0.26.2
attrs==17.4.0
blinker==1.4
callee==0.3.1
certifi==2018.1.18
chardet==3.0.4
colorama==0.3.9
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name='FirebaseData',
version='0.5.2',
version='0.6.0',
packages=find_packages(),
install_requires=[
'blinker>=1.4',
Expand Down
36 changes: 36 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import datetime
import time

import callee
import pytest
from urllib3.exceptions import HTTPError

from firebasedata import data, live

Expand Down Expand Up @@ -125,3 +128,36 @@ def handler(*args, **kwargs):
path='/foo/bar/baz',
value='hola'
)


@pytest.mark.slow
def test_connection_recovery(livedata, mocker):
watch_mock = mocker.Mock(wraps=live.watcher.watch)
cancel_mock = mocker.Mock(wraps=live.watcher.cancel)
live.watcher.watch = watch_mock
live.watcher.cancel = cancel_mock
livedata._cache = None
livedata._ttl = datetime.timedelta(seconds=1)
livedata._retry_interval = datetime.timedelta(seconds=2)
livedata.is_stale = lambda: True
livedata._db.child = mocker.Mock(side_effect=HTTPError('Test error'))

livedata.restart()
time.sleep(3)

watch_mock.assert_any_call(
livedata.get_metawatcher_name(),
callee.functions.Callable(),
livedata.get_data,
interval=livedata._retry_interval
)

livedata._db.child = mocker.Mock()
livedata.is_stale = lambda: False

time.sleep(3)

cancel_mock.assert_any_call(
'meta_{}'.format(id(livedata))
)
assert isinstance(livedata._cache, data.FirebaseData)
61 changes: 60 additions & 1 deletion tests/test_live.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime

import blinker.base
import callee
import pytest
from urllib3.exceptions import HTTPError

from firebasedata import data, live

Expand Down Expand Up @@ -62,6 +64,14 @@ def test_warm_cache(self, livedata):

assert result is cached

def test_connection_error(self, livedata, logger, mocker):
livedata._db.child = mocker.Mock(side_effect=HTTPError('Test error'))

result = livedata.get_data()

assert result is None
assert logger.exception.called


class Test_set_data:
def test_set_root(self, livedata):
Expand Down Expand Up @@ -199,10 +209,25 @@ def test_watcher_is_started(self, livedata, mocker):

def test_stream_gc_is_started(self, livedata, mocker):
livedata._start_stream_gc = mocker.Mock()

livedata.listen()

assert livedata._start_stream_gc.called

def test_metawatcher_is_canceled(self, livedata, mocker):
livedata.cancel_metawatcher = mocker.Mock()

livedata.listen()

assert livedata.cancel_metawatcher.called

def test_connection_error(self, livedata, mocker, logger):
livedata._db.child = mocker.Mock(side_effect=HTTPError('Test error'))

livedata.listen()

assert logger.exception.called


class Test_reset:
def test_calls_hangup(self, livedata, mocker):
Expand All @@ -221,17 +246,51 @@ def test_resets_cache(self, livedata):
class Test_restart:
def test_calls_reset(self, livedata, mocker):
livedata.reset = mocker.Mock()

livedata.restart()

assert livedata.reset.called

def test_resets_get_data(self, livedata, mocker):
def test_calls_start_metawatcher(self, livedata, mocker):
livedata.start_metawatcher = mocker.Mock()

livedata.restart()

assert livedata.start_metawatcher.called

def test_calls_get_data(self, livedata, mocker):
livedata.get_data = mocker.Mock()

livedata.restart()

assert livedata.get_data.called


class Test_metawatcher:
def test_get_metawatcher_name(self, livedata):
name = livedata.get_metawatcher_name()
assert 'meta_{}'.format(id(livedata)) == name

def test_start_metawatcher(self, livedata, mocker):
watcher_mock = mocker.patch('firebasedata.live.watcher.watch')
livedata.start_metawatcher()

watcher_mock.assert_called_with(
livedata.get_metawatcher_name(),
callee.functions.Callable(),
livedata.get_data,
interval=callee.types.InstanceOf(datetime.timedelta)
)

def test_cancel_metawatcher(self, livedata, mocker):
name = 'metawatcher'
watcher_mock = mocker.patch('firebasedata.live.watcher.cancel')
livedata.get_metawatcher_name = lambda: name
livedata.cancel_metawatcher()

watcher_mock.assert_called_with(name)


class Test_hangup:
def test_cancel_watcher(self, livedata, mocker):
watcher_mock = mocker.patch('firebasedata.live.watcher')
Expand Down

0 comments on commit c99e3c1

Please sign in to comment.