-
Notifications
You must be signed in to change notification settings - Fork 24.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add class for serializing message to bytes #29384
Conversation
Pinging @elastic/es-core-infra |
default void close() throws IOException {} | ||
} | ||
|
||
public interface FlushProducer extends AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really wonder what this buys us. I think it would be enough to have a method on WriteOperation
to get a Writeable
of some sort? That seems liek much simpler since we safe an object per operation potentially?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Http Responses are not necessarily individually serializable. For example in the pipelining case we need need to know what other responses have been submitted before handling the current response.
If you ‘queue’ http response number 3, but have not received 1 or 2 yet, the ‘poll’ operation will return nothing as we are not ready to write anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally we have to fuse this on-top of a netty http serializing pipeline which is a stateful thing that you put a response into and get bytes out of. It is not really a "per-response" thing.
If we want to avoid different flush and write objects we could eventually optimize that with like an internal ByteBuffer
setter that can only be accessed by the flush producer. And you could put the write operation into the producer/pipeline and when it comes out it has byte buffers and is flush ready. But I feel like that could be an optimization / follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i will need to think about this for a bit...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really wonder if we need to separate this out and make object creation and complexity necessary in all cases. This new abstraction confuses me a lot. I wonder if we should think more in the way of composing messages. ie.
class Writeable {
//this may be called with subsequent messages until we have all and then we write it back
// optimizations can apply in here and depend on the context.
public Writeable compose(Writeable writeable);
}
This will all for only adding the complexity when it's really needed in this pipelining edgecase.
WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean I don't really follow.
We're sitting here in the SocketChannelContext
. Someone has given us a Writeable
to queue to flush to the channel. How do we know if this Writeable
is ready to flush? How do we know if we are supposed to compose
it with a subsequent Writeable
? And I guess in this scenario, instead of the http serializing work living in the FlushProducer
, we have to pass that around so that it will be available to put in the Writeable
at outbound?
I also don't completely understanding why a WriteProducer
is considered "complex". It is just a pipeline that aggregates and serializes outbound messages. And is specific to the protocol in question (our protocol or http). Is it a naming issue? OutboundPipeline
? OutboundSerializer
? WriteSerializer
? It could even be a single method if you want that returns ready messages (opposed to write
and poll
):
public List<FlushOperation> serialize(WriteOperation writeOp);
I just did not do that to avoid creating a new List
every call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so I have a couple of issues:
-
I have a hard time to see how you need to use it in the http case. Here we only have one implementation and nothing shows how it's used in the http case. This also means I have no idea it's sufficient for that usecase. I think we should never introduce an abstraction that has only one implementation.
- we can fix that by adding a test that show for example how http use it or we go and simplify it by making it concrete for now and then intoduce the abstaction when we do the http one. I'd opt for the latter and remove the inferface and make
BytesFlushProducer
a concrete inner class inSocketChannelContext
- we can fix that by adding a test that show for example how http use it or we go and simplify it by making it concrete for now and then intoduce the abstaction when we do the http one. I'd opt for the latter and remove the inferface and make
-
I don't like that we have to create a new object every time here for no obvious reason. (http might add one) can't we make
WriteOperation#getObject()
return aFlushOperation
and then we call itFlushable getFlushable()
our default one would then simply callreturn this
and we don't create any new objects unless needed? -
if we do this we can special case the piplining usecase, no?
I hope this helps.
@s1monw I made some changes based on your last comment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left two comments / quesitons
|
||
SocketChannelContext getChannel(); | ||
public WriteOperation(SocketChannelContext channelContext, Object writeObject, BiConsumer<Void, Throwable> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have to loose all type safety here. Can't we fix the interface to return FlushOperation
and return this?
@@ -108,14 +126,82 @@ public boolean connect() throws IOException { | |||
return isConnected; | |||
} | |||
|
|||
public abstract int read() throws IOException; | |||
public void sendMessage(Object message, BiConsumer<Void, Throwable> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one could accept WriteOperation and then we could just add the context and the listener to it via setters?
I'll make a decision tomorrow, but I am leaning towards closing this PR. I'm not sure it has simplified the process to open a PR for abstractions without the Http work. I think I will probably just submit a PR that includes both the abstractions and the basic http work (with follow-up PRs for more advanced features like pipeline, cors, etc). |
Closing. Will submit different PR. |
This is related to #28898. In the tcp transport when data is queued for
writing to a channel it is always bytes. However, for the http transport
this data will be http response objects. These objects will need to be
serialized to bytes on the transport thread prior to flushing.
This commit resolves this by separating flush and write operations. A
write operation can be any object. When queuing a write operation with
a context, the context will use the provider class to serialize the
message and produce a flush operation. Currently there is only a single
class (
BytesFlushProducer
) which only supports flush-ready writeoperations (operations that are already byte buffers).