Skip to content

Commit

Permalink
Merge pull request #113 from liampauling/streaming-fixes-2
Browse files Browse the repository at this point in the history
Streaming fixes 2
  • Loading branch information
liampauling committed Aug 6, 2017
2 parents 5ef2214 + 0362d3b commit e16e0d9
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 46 deletions.
2 changes: 1 addition & 1 deletion betfairlightweight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from . import filters

__title__ = 'betfairlightweight'
__version__ = '1.4.1'
__version__ = '1.4.2'
__author__ = 'Liam Pauling'

# Set default logging handler to avoid "No handler found" warnings.
Expand Down
10 changes: 10 additions & 0 deletions betfairlightweight/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ def __init__(self, message):
super(SocketError, self).__init__(message)


class ListenerError(BetfairError):
"""
Exception raised if error with listener.
"""

def __init__(self, connection_id, data):
message = 'connection_id: %s, data: %s' % (connection_id, data)
super(ListenerError, self).__init__(message)


class RaceCardError(BetfairError):
"""
Exception raised if error with race card request.
Expand Down
18 changes: 15 additions & 3 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import datetime
import collections

from ..exceptions import SocketError
from ..exceptions import (
SocketError,
ListenerError,
)
from ..compat import is_py3


Expand Down Expand Up @@ -115,7 +118,11 @@ def subscribe_to_markets(self, market_filter, market_data_filter, initial_clk=No
'heartbeatMs': heartbeat_ms,
'segmentationEnabled': segmentation_enabled,
}
self.listener.register_stream(unique_id, 'marketSubscription')
if initial_clk and clk:
# if resubscribe only update unique_id
self.listener.stream_unique_id = unique_id
else:
self.listener.register_stream(unique_id, 'marketSubscription')
self._send(message)
return unique_id

Expand Down Expand Up @@ -143,7 +150,11 @@ def subscribe_to_orders(self, order_filter=None, initial_clk=None, clk=None, con
'heartbeatMs': heartbeat_ms,
'segmentationEnabled': segmentation_enabled,
}
self.listener.register_stream(unique_id, 'orderSubscription')
if initial_clk and clk:
# if resubscribe only update unique_id
self.listener.stream_unique_id = unique_id
else:
self.listener.register_stream(unique_id, 'orderSubscription')
self._send(message)
return unique_id

Expand Down Expand Up @@ -209,6 +220,7 @@ def _data(self, received_data):
"""
if self.listener.on_data(received_data) is False:
self.stop()
raise ListenerError(self.listener.connection_id, received_data)

def _send(self, message):
"""If not running connects socket and
Expand Down
19 changes: 13 additions & 6 deletions betfairlightweight/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class BaseListener(object):
def __init__(self, max_latency=0.5):
self.max_latency = max_latency

self.connection_id = None
self.stream = None
self.stream_type = None # marketSubscription/orderSubscription
self.stream_unique_id = None
Expand All @@ -34,6 +35,16 @@ def snap(self, market_ids=None):
else:
return []

@property
def initial_clk(self):
if self.stream is not None:
return self.stream._initial_clk

@property
def clk(self):
if self.stream is not None:
return self.stream._clk

def _add_stream(self, unique_id, operation):
logger.info('Register: %s %s' % (operation, unique_id))

Expand Down Expand Up @@ -120,13 +131,9 @@ def _on_change_message(self, data, unique_id):

def _add_stream(self, unique_id, stream_type):
if stream_type == 'marketSubscription':
return MarketStream(
unique_id, self.output_queue, self.max_latency, self.lightweight
)
return MarketStream(self)
elif stream_type == 'orderSubscription':
return OrderStream(
unique_id, self.output_queue, self.max_latency, self.lightweight
)
return OrderStream(self)

@staticmethod
def _error_handler(data, unique_id):
Expand Down
34 changes: 24 additions & 10 deletions betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ class BaseStream(object):

_lookup = 'mc'

def __init__(self, unique_id, output_queue, max_latency, lightweight):
self.unique_id = unique_id
self.output_queue = output_queue
self._max_latency = max_latency
self._lightweight = lightweight
def __init__(self, listener):
self._listener = listener

self._initial_clk = None
self._clk = None
Expand All @@ -35,16 +32,16 @@ def on_subscribe(self, data):
self._update_clk(data)
publish_time = data.get('pt')

book_data = data.get(self._lookup)
if book_data:
self._process(book_data, publish_time)
if self._lookup in data:
self._process(data[self._lookup], publish_time)
logger.info('[Stream: %s]: %s %s added' % (self.unique_id, len(self._caches), self._lookup))

def on_heartbeat(self, data):
self._update_clk(data)

def on_resubscribe(self, data):
self._update_clk(data)
self.on_update(data)
logger.info('[Stream: %s]: %s %s resubscribed' % (self.unique_id, len(self._caches), self._lookup))

def on_update(self, data):
self._update_clk(data)
Expand All @@ -54,7 +51,8 @@ def on_update(self, data):
if latency > self._max_latency:
logger.warning('[Stream: %s]: Latency high: %s' % (self.unique_id, latency))

self._process(data[self._lookup], publish_time)
if self._lookup in data:
self._process(data[self._lookup], publish_time)

def clear_cache(self):
self._caches.clear()
Expand All @@ -80,6 +78,22 @@ def _update_clk(self, data):
self._clk = clk
self.time_updated = datetime.datetime.utcnow()

@property
def unique_id(self):
return self._listener.stream_unique_id

@property
def output_queue(self):
return self._listener.output_queue

@property
def _max_latency(self):
return self._listener.max_latency

@property
def _lightweight(self):
return self._listener.lightweight

@staticmethod
def _calc_latency(publish_time):
return time.time() - publish_time / 1e3
Expand Down
28 changes: 23 additions & 5 deletions tests/test_betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from tests import mock

from betfairlightweight.streaming.betfairstream import BetfairStream
from betfairlightweight.exceptions import SocketError
from betfairlightweight.exceptions import (
SocketError,
ListenerError,
)


class BetfairStreamTest(unittest.TestCase):
Expand Down Expand Up @@ -96,19 +99,33 @@ def test_heartbeat(self, mock_send):

@mock.patch('betfairlightweight.streaming.betfairstream.BetfairStream._send')
def test_subscribe_to_markets(self, mock_send):
market_filter = {'test': 123}
market_data_filter = {'another_test': 123}
self.betfair_stream.subscribe_to_markets(market_filter, market_data_filter, heartbeat_ms=1, conflate_ms=2,
segmentation_enabled=False)

mock_send.assert_called_with(
{'op': 'marketSubscription', 'marketFilter': market_filter, 'id': self.betfair_stream._unique_id,
'marketDataFilter': market_data_filter, 'initialClk': None, 'clk': None,
'heartbeatMs': 1, 'conflateMs': 2, 'segmentationEnabled': False}
)
self.mock_listener.register_stream.assert_called_with(self.betfair_stream._unique_id, 'marketSubscription')

@mock.patch('betfairlightweight.streaming.betfairstream.BetfairStream._send')
def test_resubscribe_to_markets(self, mock_send):
market_filter = {'test': 123}
market_data_filter = {'another_test': 123}
initial_clk = 'abcdef'
clk = 'abc'
self.betfair_stream.subscribe_to_markets(market_filter, market_data_filter, initial_clk, clk,
heartbeat_ms=1, conflate_ms=2, segmentation_enabled=False)
self.betfair_stream.subscribe_to_markets(market_filter, market_data_filter, initial_clk, clk, heartbeat_ms=1,
conflate_ms=2, segmentation_enabled=False)

mock_send.assert_called_with(
{'op': 'marketSubscription', 'marketFilter': market_filter, 'id': self.betfair_stream._unique_id,
'marketDataFilter': market_data_filter, 'initialClk': initial_clk, 'clk': clk,
'heartbeatMs': 1, 'conflateMs': 2, 'segmentationEnabled': False}
)
self.mock_listener.register_stream.assert_called_with(self.betfair_stream._unique_id, 'marketSubscription')
assert not self.mock_listener.register_stream.called

@mock.patch('betfairlightweight.streaming.betfairstream.BetfairStream._send')
def test_subscribe_to_orders(self, mock_send):
Expand Down Expand Up @@ -181,7 +198,8 @@ def test_receive_all(self):
@mock.patch('betfairlightweight.streaming.betfairstream.BetfairStream.stop')
def test_data(self, mock_stop):
received_data = {"op": "status"}
self.betfair_stream._data(received_data)
with self.assertRaises(ListenerError):
self.betfair_stream._data(received_data)

self.mock_listener.on_data.assert_called_with(received_data)
assert mock_stop.called
Expand Down
15 changes: 13 additions & 2 deletions tests/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def setUp(self):
self.base_listener = BaseListener()

def test_init(self):
assert self.base_listener.connection_id is None
assert self.base_listener.stream is None
assert self.base_listener.stream_unique_id is None
assert self.base_listener.stream_type is None
Expand Down Expand Up @@ -54,6 +55,16 @@ def test_snap(self):
self.base_listener.stream = mock_stream
assert self.base_listener.snap() == mock_stream.snap(None)

def test_props(self):
assert self.base_listener.initial_clk is None
assert self.base_listener.clk is None
stream = mock.Mock()
stream._initial_clk = 2
stream._clk = 1
self.base_listener.stream = stream
assert self.base_listener.initial_clk == 2
assert self.base_listener.clk == 1

def test_str(self):
assert str(self.base_listener) == 'BaseListener'

Expand Down Expand Up @@ -137,11 +148,11 @@ def test_on_change_message(self):
def test_add_stream(self, mock_market_stream, mock_order_stream):
new_stream = self.stream_listener._add_stream(1, 'marketSubscription')
assert new_stream == 123
mock_market_stream.assert_called_with(1, self.output_queue, self.max_latency, False)
mock_market_stream.assert_called_with(self.stream_listener)

new_stream = self.stream_listener._add_stream(1, 'orderSubscription')
assert new_stream == 456
mock_order_stream.assert_called_with(1, self.output_queue, self.max_latency, False)
mock_order_stream.assert_called_with(self.stream_listener)

def test_error_handler(self):
mock_response = create_mock_json('tests/resources/streaming_connection.json')
Expand Down
43 changes: 24 additions & 19 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@
class BaseStreamTest(unittest.TestCase):

def setUp(self):
self.output_queue = mock.Mock()
self.unique_id = 1
self.max_latency = 1.5
self.stream = BaseStream(self.unique_id, self.output_queue, self.max_latency, False)
self.listener = mock.Mock()
self.listener.max_latency = 0.5
self.stream = BaseStream(self.listener)

def test_init(self):
assert self.stream.unique_id == self.unique_id
assert self.stream.output_queue == self.output_queue
assert self.stream._max_latency == self.max_latency
assert self.stream._listener == self.listener

assert self.stream._initial_clk is None
assert self.stream._clk is None
Expand All @@ -39,10 +36,10 @@ def test_on_heartbeat(self, mock_update_clk):
self.stream.on_heartbeat({})
mock_update_clk.assert_called_once_with({})

@mock.patch('betfairlightweight.streaming.stream.BaseStream._update_clk')
def test_on_resubscribe(self, mock_update_clk):
@mock.patch('betfairlightweight.streaming.stream.BaseStream.on_update')
def test_on_resubscribe(self, mock_on_update):
self.stream.on_resubscribe({})
mock_update_clk.assert_called_once_with({})
mock_on_update.assert_called_once_with({})

@mock.patch('betfairlightweight.streaming.stream.BaseStream._process')
@mock.patch('betfairlightweight.streaming.stream.BaseStream._calc_latency', return_value=0.1)
Expand Down Expand Up @@ -72,7 +69,7 @@ def test_process(self):

def test_on_process(self):
self.stream.on_process([1, 2])
self.output_queue.put.assert_called_with([1, 2])
self.stream.output_queue.put.assert_called_with([1, 2])

def test_update_clk(self):
self.stream._update_clk({'initialClk': 1234})
Expand All @@ -81,6 +78,18 @@ def test_update_clk(self):
self.stream._update_clk({'clk': 123})
assert self.stream._clk == 123

def test_unique_id(self):
assert self.stream.unique_id == self.listener.stream_unique_id

def test_output_queue(self):
assert self.stream.output_queue == self.listener.output_queue

def test_max_latency(self):
assert self.stream._max_latency == self.listener.max_latency

def test_lightweight(self):
assert self.stream._lightweight == self.listener.lightweight

@mock.patch('time.time', return_value=1485554805.107185)
def test_calc_latency(self, mock_time):
pt = 1485554796455
Expand All @@ -100,10 +109,8 @@ def test_repr(self):
class MarketStreamTest(unittest.TestCase):

def setUp(self):
self.output_queue = mock.Mock()
self.unique_id = 1
self.max_latency = 1.5
self.stream = MarketStream(self.unique_id, self.output_queue, self.max_latency, False)
self.listener = mock.Mock()
self.stream = MarketStream(self.listener)

@mock.patch('betfairlightweight.streaming.stream.MarketStream._process')
@mock.patch('betfairlightweight.streaming.stream.MarketStream._update_clk')
Expand Down Expand Up @@ -131,10 +138,8 @@ def test_repr(self):
class OrderStreamTest(unittest.TestCase):

def setUp(self):
self.output_queue = mock.Mock()
self.unique_id = 1
self.max_latency = 1.5
self.stream = OrderStream(self.unique_id, self.output_queue, self.max_latency, False)
self.listener = mock.Mock()
self.stream = OrderStream(self.listener)

def test_process(self):
pass
Expand Down

0 comments on commit e16e0d9

Please sign in to comment.