-
Notifications
You must be signed in to change notification settings - Fork 4
Conversation
29c1481
to
e1d4269
Compare
e1d4269
to
829695e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to go through it again, especially for read/write ops. Here a first step.
grpc/grpc.go
Outdated
closed: false, | ||
done: make(chan struct{}), | ||
|
||
writeQueueCh: make(chan message, 10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 10? Do we plan to log in the case it tries to write and the queue is still full? If we will do it configurable, it would help the user to adapt this value based on the ongoing rate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 is a simple buffer. There are no plans to write the logging and configurations for now, but if you think that could be an issue, I could try to adjust the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it, but is it a benchmarked number or just a value to get some buffering? I think we should start with just 1 for the minimum async (or the number of senders if they differ from 1 and increment if we can't get a good rate during benchmarks).
Otherwise, in this way, we could hide if the writer(s) got stuck and not realize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it's the value to get some buffering. It's the same number we use in the xk6-websockets, for the same purposes https://github.com/grafana/xk6-websockets/blob/main/websockets/websockets.go#L123 I don't think we did a benchmark there.
To be honest, I see your concerns, but not sure that they are significant to make an adjustment in that particular PR. Are you fine with creating a separate task and investigating/addressing them there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to do the opposite as described in the previous comment (increasing when we visualize the need), but I don't want to block you on this. I'm fine with your suggestion too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mstoykov, what do you think? If you share Ivan's concerns, I could re-think that in that PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codebien seems to be right. I don't remember why I have chosen a 10
here and there is an additional queue either way.
I doubt there is any real impact by having both or having only 1 of the bufferings. And this likely should be "fixed" in the websockets code as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Okay, I will add the go leak check here and adopt the latest webscoket's changes to the PR. So will fix it in following commits 👍
e60deac
to
3c36fce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to have a PR with tests open before we merge or have a few directly here.
grpc/stream.go
Outdated
select { | ||
case writeChannel <- msg: | ||
default: | ||
queue = append(queue, msg) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we require one more queue? The writeQueueCh should be already enough for it then we could do s.stream.Send(msg.msg)
directly here.
Otherwise, I think it sounds like a good point for a memory leak in case we can't get a reasonable rate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the buffered channel for the writing, this is a copy from the websockets and to be honest, these design with the buffers and queues doesn't look so suspicious to me since they usually that's how we negotiate the async tasks.
I'm not certain about the memory leak without benchmarking the code, but same as the writeChannel
, this could be a point of a separate investigation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a copy from the websockets and to be honest
they usually that's how we negotiate the async tasks.
I can't entirely agree, and if there isn't a rationale without it I suggest we refactor the WebSocket code also. I'm not very expert of the new WebSocket codebase so I could be wrong and it might have a specific reason.
If you prefer to investigate it on a specific PR, I'm okay with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue this likely is the better place for this to stay as it can grow and shrink dynamically.
memory leak
We let users write js code - they can already "leak memory" by just keep adding to an array indefinitely.
I am not against there being some limit, but I have no idea what it should be and what .send
in websockets (for example) should do.
Given that the operations are asynchronous I will expect users should know that send
will not wait and if they keep calling send
in a loop - it will fill something up somewhere.
The websocket specification specifically lets you know how many bytes are not written yet. I would argue the gRPC API should have something similar so that users can check if they are not trying to write way more than what is possible. No idea if nodejs, deno or some other gRPC js lib has something we can take inspiration from.
@codebien can't agree with you. I'll do these PRs separately, but as I said my plan is to merge this PR first since it will be a basis for any following PR. Otherwise, it will be a mess to maintain such open PRs with no benefits. The whole idea of breaking down the PRs is to speed up the developing process by the keeping scope smaller and moving faster. |
3c36fce
to
5b3a18c
Compare
5b3a18c
to
68b3988
Compare
Turning this into a draft before applying the latest changes related to the grafana/k6#3004 and also some adjustments similar to the xk6-websockets (grafana/xk6-websockets#39 and grafana/xk6-websockets#42) |
b56a05b
to
466a720
Compare
This already is buffered in a slice which can grow and shrink as needed. Discussed in grafana/xk6-grpc#1 (comment)
This already is buffered in a slice which can grow and shrink as needed. Discussed in grafana/xk6-grpc#1 (comment)
63f0482
to
890301f
Compare
076aa15
to
2a351f2
Compare
665898f
to
dff42cf
Compare
dff42cf
to
d4b511a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
d4b511a
to
976a2f0
Compare
Just for the record, I've just squashed the commits into the single one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except for the last waitgroup change
976a2f0
to
de04ac4
Compare
I made a very basic attempt here https://github.com/grafana/xk6-grpc/compare/feat/grpc-streaming...codebien-test?expand=1, WDYT? It sounds a bit more readable to me, feel free to discard if it is useless. |
grpc/stream.go
Outdated
if !ok { | ||
continue | ||
} | ||
|
||
if err := s.stream.CloseSend(); err != nil { | ||
s.logger.WithError(err).Error("an error happened during stream closing") | ||
} | ||
|
||
close(s.closeSend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I somehow missed this, but ... uh.
In general writers close channels, not readers. THis also will loop and will keep going in the !ok
branch and will go in the select and enter again ... and again.
THis is why I do set the channel to nil
in the websocket code base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 right, moreover end could be not called. So let me rethink the approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't thknk end
not being called is a valid code ata all. There is no way for us to know whether a stream
should end if it is not called or should continue longer.
So it is expected that if end
is not called that k6 will just wait for it indefinitely ... except I guess if there is an error that closes the stream 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't thknk end not being called is a valid code ata all. There is no way for us to know whether a stream should end if it is not called or should continue longer.
It's for the server-side streaming only case
except I guess if there is an error that closes the stream
Exactly, in that case, we should rely only on fore the io.EOF
during the reading of the stream
grpc/stream.go
Outdated
select { | ||
case <-s.closeSend: | ||
return | ||
default: | ||
} | ||
|
||
s.closeSend <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this again - this is racy.
THere is nothing stopping a call to end()
to have finished for the select loop to be handling it and while that is happened as second call to end
to get through the above select before the close of the channel and after it gets through the select, the close can happen and then we here try to write to it-> panic.
So I guess we can just close it here?
I do prefer the usage of state
field though as in websockets. Also I would argue that once the stream
is closing, using it to write or end again should return an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting why you prefer the state over the channel since there will be a data race issue highlighting that the usage is wrong because using an event loop shouldn't cause such things.
So in the latest push, I've tried to switch to it. Let me know your thoughts on that.
Co-authored-by: Mihail Stoykov <312246+mstoykov@users.noreply.github.com>
06e3f3e
to
48b4065
Compare
@codebien I've checked the version and still prefer to keep with the current one, but thanks for the effort 👍 🙇 @mstoykov the latest force push should resolve your latest open points. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
A single comment which likely needs more tests and we can have it in a separate PR so that we stop having this fairly big one re-reviewed ;)
func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object { | ||
rt := mi.vu.Runtime() | ||
|
||
client, ok := c.Argument(0).ToObject(rt).Export().(*Client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not certain but I am pretty sure this will panic if there are 0 arguments provided 🤔
We can fix this in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, it will be a TypeError, but I'll fix it in the following PRs where I'll work more with this constructor
What?
We are introducing GRPC streaming support, highly inspired by the xk6-websockets.
This is the base version, with the working (after
make build
) examples, but without the metrics and some other functionality (parameterization, metadata, and, for sure, tests) that will be introduced in the following PRs.Trying to keep the PR smaller.
Note: original code of this repo was copied from the k6 with minor modifications.
Based on #4
Why?
The users request it.