-
-
Notifications
You must be signed in to change notification settings - Fork 338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add unsub cmds and lock for websocket.recv #256
Conversation
… streaming-addunsub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think use of an asyncio.Lock
is quite appropriate and combined with the fact that websocket clients have an in-memory queue (defaults to about 128 MB of objects - which is hard to hit btw unless you are processing stuff on a potato :D ), and the queue would ensure the messages don't get dropped.
Either way, this certainly won't have that big of an impact overall on performance unless someone hitting the API concurrently which doesn't happen with websockets people.
We can always give it out with a soft warning that before you use the unsub methods, be careful of what you're getting yourself into.
examples/streaming/timesales.py
Outdated
@@ -81,14 +86,84 @@ async def handle_queue(self): | |||
msg = await self.queue.get() | |||
pprint.pprint(msg) | |||
|
|||
async def _dynamic_request(self, service, cmd, symbols): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on our offline conversation, let's cut the example from this PR.
tests/streaming_test.py
Outdated
socket = await self.login_and_get_socket(ws_connect) | ||
|
||
socket.recv.side_effect = [json.dumps(self.success_response( | ||
1, 'ACCT_ACTIVITY', 'UNSUBS'))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That being said, let's investigate what TDA does when you login in and immediately unsub from a stream to which you're not already subbed. My gut tells me this is 1. something people will probably do by mistake and 2. a potentially interesting corner case that we don't understand and should test.
@@ -535,6 +559,18 @@ async def test_account_activity_subs_failure(self, ws_connect): | |||
with self.assertRaises(tda.streaming.UnexpectedResponseCode): | |||
await self.client.account_activity_sub() | |||
|
|||
@no_duplicates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one might also be unrealistic. If you can figure out a way to trigger this, great.
Adding a sample log run here for unsub/sub scenarios. I've executed the below json array cmd, service, and symbols in sequential order and allowed some time for stream data incoming. Find my comments in =======, the first 2 scenario, i wanted to know td response if i unsubscribe from a service that hasn't been subscribed, the rest are just sub then unsub. Noticed that once I unsub from a service with the same symbol, I no longer receive data from that service for that symbol. If there's. Its not proven here but if you have a set of subscribe symbols and you unsub another set of symbols, its acts as Set difference. the client > and client < are the streaming logs that show raw data from the stream, look at these lines for the series of events. I left my application logs there which pipes all of the data to my sqs services.
And for the logs, starting with start of app ...
CASE : UNSUB from a service that has not been subscribe to
CASE : SUB THEN UNSUB from chart_equity
CASE : UNSUB from chart_futures when chart_futures has not been subscribed
CASE : SUB THEN UNSUB from chart_futures ES symbol
CASE : SUB THEN UNSUB from quote service symbol SPY
CASE : SUB THEN UNSUB from option service underlying NET
CASE : SUB THEN UNSUB from LEVELONE FUTURE symbol ES
Case: When no services are subscribed for 20 seconds - verify the last 2 messages from td
|
tests/streaming_test.py
Outdated
await self.client.handle_message() | ||
await self.client.news_headline_unsubs(['GOOG', 'MSFT']) | ||
|
||
self.assert_handler_called_once_with(handler, {"service": "NEWS_HEADLINE", "command": "SUBS", "timestamp": 1590186642440, 'content': {}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go ahead and factor this timestamp out into a global constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These dicts are ugly, hard to read, and break line lengths. Please format, manually if necessary. Feel free to ignore conventions and such for now.
docs/streaming.rst
Outdated
@@ -578,6 +608,15 @@ engineering and crowdsourced experience. Take them with a grain of salt. | |||
If you have specific questions, please join our `Discord server | |||
<https://discord.gg/nfrd9gh>`__ to discuss with the community. | |||
|
|||
You can enable the websocket client debug to watch the incoming/outgoing messages on the stream by setting these |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add this in a separate PR.
tda/streaming.py
Outdated
@@ -882,6 +927,24 @@ async def level_one_equity_subs(self, symbols, *, fields=None): | |||
symbols, 'QUOTE', 'SUBS', self.LevelOneEquityFields, | |||
fields=fields) | |||
|
|||
async def level_one_equity_unsubs(self, symbols, *, fields=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this fields
parameter do? Let's verify that it's actually used.
tests/streaming_test.py
Outdated
@@ -13,6 +13,7 @@ | |||
|
|||
ACCOUNT_ID = 1000 | |||
TOKEN_TIMESTAMP = '2020-05-22T02:12:48+0000' | |||
UNIX_TIMESTAMP = 1590116673258 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexgolec I've extracted all unix timestamp and replaced all hardcoded values with this UNIX_TIMESTAMP cont var
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the factor-out, but the name UNIX_TIMESTAMP
says nothing about how this variable is going to be used. How about something like REQUEST_TIMESTAMP
?
await self.client.chart_equity_unsubs(['GOOG,MSFT']) | ||
|
||
########################################################################### | ||
# Private member _service_op |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexgolec I've added some service_op tests.
I've refactored _service_op to remove the "fields" parameter for any UNSUBS cmds and any _service_op calls with field_type=None. Although we do not have any sub/add cmds that make requests with field_type=None, it tests the behavior that fields parameter, which is optional, won't get sent if field_type=None
So it looks like subs commands may required the fields parameters. I tried sending a sub chart futures cmd without fields and it failed. Not that we are doing this anywhere in the code, but just wanted to see the behavior
|
tests/streaming_test.py
Outdated
socket.recv.side_effect = [json.dumps(self.success_response( | ||
1, 'CHART_EQUITY', 'UNSUBS'))] | ||
|
||
await self.client.chart_equity_unsubs(['GOOG', 'MSFT']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call _service_op
directly in these tests? This way we can be 100% certain that there's nothing method-specific interfering with us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, i removed the first test that used a chart equity, once broken down to use _service_op, it had the same test effect as: test_service_op_no_fields_for_unsubs
tests/streaming_test.py
Outdated
# Private member _service_op | ||
# | ||
# Note: https://developer.tdameritrade.com/content/streaming-data#_Toc504640564 | ||
# parameters are optional and in the case of UNSUBS commands, fields should not be required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
80 char lines
tda/streaming.py
Outdated
*, fields=None): | ||
if fields is None: | ||
# td fields parameter is optional and does not apply in the context of un-subscribing. | ||
if fields is None and field_type is not None and command != 'UNSUBS': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the constant checking for UNSUBS
here. This method is supposed to just do what it's told, and if it's told to do something bogus, then it's the problem of the calling method's implementation.
tda/streaming.py
Outdated
fields = field_type.all_fields() | ||
fields = sorted(self.convert_enum_iterable(fields, field_type)) | ||
|
||
if command != 'UNSUBS' and field_type is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a suggested refactor that's easier to read and (I believe) equivalent. Note I've omitted the UNSUBS
check:
parameters = {
'keys': ','.join(symbols)
}
if field_type is not None:
if fields is None:
fields = field_type.all_fields()
else:
fields = sorted(self.convert_enum_iterable(fields, field_type))
parameters['fields'] = ','.join(str(f) for f in fields)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Used a much simpler equivalent.
parameters = {
'keys': ','.join(symbols)
}
if field_type is not None:
if fields is None:
fields = field_type.all_fields()
fields = sorted(self.convert_enum_iterable(fields, field_type))
parameters['fields'] = ','.join(str(f) for f in fields)
tests/streaming_test.py
Outdated
@@ -13,6 +13,7 @@ | |||
|
|||
ACCOUNT_ID = 1000 | |||
TOKEN_TIMESTAMP = '2020-05-22T02:12:48+0000' | |||
UNIX_TIMESTAMP = 1590116673258 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the factor-out, but the name UNIX_TIMESTAMP
says nothing about how this variable is going to be used. How about something like REQUEST_TIMESTAMP
?
Also note the failing docs generation test. |
docs/streaming.rst
Outdated
@@ -114,6 +114,21 @@ subscription methods again seems to clear the old subscription and create a new | |||
one. Note this behavior is not officially documented, so this interpretation may | |||
be incorrect. | |||
|
|||
---------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically just the below change to fix the documentation check fail. @httran13
-------------------------
Un-Subscribing to Streams
-------------------------
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, you're good to go. Thanks for putting this together!
This PR adds an async.lock to streaming.py.
This lock achieves 2 goals:
Prevent websocket.recv() from being called from another coroutine while awaiting, so the locks are placed when retreiving response from the stream: https://websockets.readthedocs.io/en/stable/topics/design.html#concurrency
Ensures when making requests that sending requests and awaiting for responses occur synchronously.
With the lock in place, we can run, from our main program, concurrent programs without running into RuntimeError thrown by websocket.
I've also updated the timesales.py streaming example for a concurrent program to randomly make requests while an infinite loop is running to handle messages.
I've added unit tests for each unsub cmds and did not need to change any existing tests.