New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Events API project #1349
Events API project #1349
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1349 +/- ##
==========================================
+ Coverage 98.21% 98.37% +0.15%
==========================================
Files 54 56 +2
Lines 2469 2587 +118
==========================================
+ Hits 2425 2545 +120
+ Misses 44 42 -2 Continue to review full report at Codecov.
|
e18386d
to
c6de90f
Compare
f1510a9
to
02fa788
Compare
364d9ed
to
b379ed0
Compare
Updated tests.
33622dc
to
f23faaa
Compare
bigchaindb/web/websocket_server.py
Outdated
queue.put_nowait(value) | ||
return | ||
except asyncio.QueueFull: | ||
queue.get_nowait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A try except else
pattern could be used here, for readability and style reasons, since obviously not much can go wrong with the return
statement :).
try:
queue.put_nowait(value)
except asyncio.QueueFull:
queue.get_nowait()
else:
return
some related links:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code cov!
tests/pipelines/test_election.py
Outdated
# put an valid block event in the queue | ||
e.handle_block_events({'status': Bigchain.BLOCK_VALID}, block_id) | ||
event = e.event_handler.get_event() | ||
assert event.type == EventTypes.BLOCK_VALID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can treat this in a separate issue, as a clean up task, but I'll take the opportunity to share some thoughts on one possible approach regarding the imports in tests. There's this idea of not importing a module-under-test (MUT) at module scope. One helpful reference for this approach is at http://pylonsproject.org/community-unit-testing-guidelines.html
The module-under-test (MUT) can be a function, class, or method for instance. In the test above one could say that the MUT is the class Election
. As per the recommendation of the approach being discussed here, one would then import the Election
class under the test testing it. The reason that is given for doing so is:
Import failures in the module-under-test (MUT) should cause individual test cases to fail, and they should never prevent those tests from being run. Depending on the test runner, import problems may be much harder to distinguish at a glance than normal test failures.
So that's it. We do not need to be maniacal about it though 😄 as this is most likely debatable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too hot on this requirement. I've yet to see a circumstance where having the imports at the top of the file is masking a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally like the MUT approach, it makes the unit–test self contained. I'd defer it to another PR tho.
test that the process is started with the events_queue kwargs
tests/web/test_websocket_server.py
Outdated
from bigchaindb import events | ||
from bigchaindb.web.websocket_server import init_app, POISON_PILL, EVENTS_ENDPOINT | ||
|
||
event_source = asyncio.Queue(loop=loop) | ||
app = init_app(event_source, loop=loop) | ||
client = yield from test_client(app) | ||
ws = yield from client.ws_connect(EVENTS_ENDPOINT) | ||
block = create_block(b, 10).to_dict() | ||
block = _block.to_dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General note for this commit:
Using a fixture is proposed here instead of a function because we already have some tests that also rely on having a block, and these tests also have their own function to create the dummy block. See https://github.com/bigchaindb/bigchaindb/blob/master/tests/db/test_bigchain_api.py#L26-L31 for instance.
We now have the opportunity to take care of this, hence this proposed change. The fixture could perhaps be renamed, and moved up into the main conftest
module. If the proposed change is accepted, then both the rename and move can be easily taken care of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK but I believe the previous solution was more understandable:
def create_block(b, total=1):
# ...
def test_websocket_block_event(b, test_client, loop):
# ...
block = create_block(b, 10)
# ...
vs
@pytest.fixture
def _block(b, request):
total = getattr(request, 'param', 1)
# ...
@pytest.mark.parametrize('_block', (10,), indirect=('_block',), ids=('block',))
def test_websocket_block_event(b, _block, test_client, loop):
# ...
block = _block.to_dict()
# ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I understand.
For this specific case if we make the default number of transactions in a block to be 10
then the decorator part is not needed. Moreover the fixture name could simplified, such that we end up with something like:
@pytest.fixture
def block(b, request):
total = getattr(request, 'param', 10)
# ...
def test_websocket_block_event(b, block, test_client, loop):
# ...
block_dict = block.to_dict()
# ...
The whole point with this approach is that it provides us with a fixture we can re-use, and when we need to fine tune the quantity of transactions that the block contains we can do so with pytest's indirect
parametrization mechanism.
I think that the complexity is worth it in this case given what it provides us with, but that is my opinion. Lastly, tests in which this pattern is encountered is not un-common.
sync_queue.put('we') | ||
sync_queue.put('are') | ||
sync_queue.put('the') | ||
sync_queue.put('robots') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync_queue.put('dreaming')
sync_queue.put('we')
sync_queue.put('are')
sync_queue.put('humans')
@@ -59,6 +59,10 @@ | |||
'workers': None, # if none, the value will be cpu_count * 2 + 1 | |||
'threads': None, # if none, the value will be cpu_count * 2 + 1 | |||
}, | |||
'wsserver': { | |||
'host': os.environ.get('BIGCHAINDB_WSSERVER_HOST') or 'localhost', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My personal preference would be to s/wsserver/websocket/g
, little bit more readable, but that's personal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that the ws
prefix is used quite a lot to name libraries and variables. I was thinking if using websocket
or wsserver
. At the end I thought that websocket
is the name of a protocol, while wsserver
is more aligned with the configuration key server
. I'd like to rename server
to apiserver
actually 😅.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok.
self.events_queue = events_queue | ||
|
||
def put_event(self, event, timeout=None): | ||
# TODO: handle timeouts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will these TODOs be addressed before merging? I don't think we should have "TODO" comments in master, but there should be issues addressing the requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@libscott: hybrid approach, it's fine to have TODO
s as long as they reference an issue in our tracker. What do you think?
@r-marques: can you write an issue for this TODO
please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not mind TODO
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally i use TODO to indicate something i need to address in a PR. If there are TODOs in master its no problem, i will choose another random word to use.
bigchaindb/web/views/info.py
Outdated
'_links': { | ||
'docs': ''.join(docs_url), | ||
'self': api_root, | ||
'statuses': api_root + 'statuses/', | ||
'transactions': api_root + 'transactions/', | ||
# TODO: The version should probably not be hardcoded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be hardcoded, the point is to have a stable API.
|
||
|
||
logger = logging.getLogger(__name__) | ||
POISON_PILL = 'POISON_PILL' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🦇
bigchaindb/web/websocket_server.py
Outdated
|
||
for tx in block['block']['transactions']: | ||
asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id'] | ||
data = {'blockid': block['id'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer block_id
, asset_id
, tx_id
, though I've been guilty of using txid
in the past.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just followed the specification... but 💯% agree here 💃 🕺
Lemme ask quickly to the team.
Question
Given the original message:
{
"txid": "<sha3-256 hash>",
"assetid": "<sha3-256 hash>",
"blockid": "<sha3-256 hash>"
}
Do you agree in changing the message using those new keys?
{
"tx_id": "<sha3-256 hash>",
"asset_id": "<sha3-256 hash>",
"block_id": "<sha3-256 hash>"
}
Yes: 👍
No: 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am voting initially voted down because our current HTTP api uses txid
in multiple response payloads.
I think thought that we should be consistent, such that if we use tx_id
we use it in all responses.
But as @ttmc pointed out and as #1134 points out the long term points in the direction of using underscores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In broader terms, I see that as an API design specification question, and it has already been answered for the HTTP API, and in some way also for the websocket API. It is too late right now to change it I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wish to change the specifications (HTTP and websocket APIs) we can:
- do so after the planned release
v0.10
- change everything now (both HTTP and websocket APIs)
- only change the websocket API now for
v0.10
and change the HTTP API forv0.11
In any case, a change to the HTTP API will break clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
link to HTTP API docs (grep txid
): https://docs.bigchaindb.com/projects/server/en/latest/drivers-clients/http-client-server-api.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're already inconsistent with txid and tx_id. We use tx_id in HTTP API endpoints, but txid inside transactions (e.g. inputs.fulfills.txid); see issue #1134
Yup. How about putting a stake in the ground now:
- Long term we want to separate all our object IDs with an underscore
object_id
(e.g.block_id
,tx_id
, ...) - [http api] query parameters are "tx_id" meanwhile the payloads are "txid" #1134 is completed by renaming all ids to the above schema
- For this PR, we change the implementation now and it's specification because we just agreed on this schema.
OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with using tx_id
in place of txid
from hereon, at least as far as APIs are concerned (local variables are another thing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated my comments and vote. Basically going for:
only change the websocket API now for v0.10 and change the HTTP API for v0.11
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that consistency is better (at least in the publicly-visible stuff), and tx_id
is easier to read.
def _put_into_capped_queue(queue, value): | ||
"""Put a new item in a capped queue. | ||
|
||
If the queue reached its limit, get the first element |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, to clarify, this queue will lose messages rather than block if the consumer doesn't empty it fast enough? If that's the case, why is that the desired behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhm, I see what you mean. I don't have a good answer for that. My assumption is that creating blocks is computationally more intense than dispatching messages to the connected clients, but I've never tested it. Let's address this issue when we have more tools to do stress testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think since we don't have the problem of high throughput right now, it might be safer to make it a blocking queue, ie, doesn't drop messages. If a client is not reading fast enough, then the server may get blocked waiting for their socket to become writeable. In that case, the slow client socket should be somehow dealt with. If it isn't dealt with, a slow client may interfere with what other clients see just by connecting and reading slowly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, the slow client socket should be somehow dealt with. If it isn't dealt with, a slow client may interfere with what other clients see just by connecting and reading slowly.
AFAIK this should be handled by aiohttp itself. If a slow client cannot keep up with the stream, the output buffer will be drained.
I'll investigate a bit more and comment again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@libscott: I spent quite some time on it but I'm still not 100% convinced.
I've opened an issue in the aiohttp
repo:
@@ -31,5 +31,6 @@ def test_api_v1_endpoint(client): | |||
'self': 'http://localhost/api/v1/', | |||
'statuses': 'http://localhost/api/v1/statuses/', | |||
'transactions': 'http://localhost/api/v1/transactions/', | |||
'streams_v1': 'ws://localhost:9985/api/v1/streams/valid_tx', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a trailing forwardslash to be consistent with the other paths?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bit of personal interpretation in the following statement.
The difference is that http://localhost/api/v1/transactions/
represents a "directory". Under it you can find transactions that are the "files".
On the other side ws://localhost:9985/api/v1/streams/valid_tx
is a "file" since it's the stream endpoint.
We could argue if streams_v1
should point to ws://localhost:9985/api/v1/streams/valid_tx
or http://localhost:9985/api/v1/streams/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that kinda makes sense then
Please address codecov 👍 |
fyi: I am fixing the codecov problem |
|
||
for _, websocket in self.subscribers.items(): | ||
for str_item in str_buffer: | ||
websocket.send_str(str_item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these blocking calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope! Weird, right? I didn't have time to dig into the implementation details. I guess that when you call send_str
you are writing to a buffer that will be eventually emptied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pending item is to make it so that queue does not drop messages
This PR is the hub to all the sub-PRs related to #1086 and specifically the WebSocket event stream API.