-
Notifications
You must be signed in to change notification settings - Fork 435
Adds the ability to use different payloads #710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds the ability to use different payloads #710
Conversation
Sources/GRPC/ReadWriteStates.swift
Outdated
| // Zero is fine: the writer will allocate the correct amount of space. | ||
| var buffer = allocator.buffer(capacity: 0) | ||
|
|
||
| do { | ||
| try message.serialize(into: &buffer) | ||
| } catch { | ||
| self = .notWriting | ||
| return .failure(.serializationFailed) | ||
| } | ||
|
|
||
| // Zero is fine: the writer will allocate the correct amount of space. | ||
| var buffer = allocator.buffer(capacity: 0) | ||
| let data = buffer.readData(length: buffer.readableBytes)! | ||
| buffer.clear() | ||
| writer.write(data, into: &buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a prettier way of doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but unfortunately we need to rework the writer API slightly. I have a PR up at the moment which adds support for compression which complicates things a little more. For the time being I think making the writer generic over the payload and having it serialize the payload into the buffer is sufficient.
cb2755e to
e4a5737
Compare
glbrntt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start @mustiikhalil! I've left some comments inline.
You may already be pretty close to the point where you can try out flatbuffers. If you look at e.g, the hello world example and switch the types to be flatbuffers and provide the conformance to Payload then it should work. Of course this is still missing all the flatbuffers codegen :)
Sources/GRPC/ReadWriteStates.swift
Outdated
| // Zero is fine: the writer will allocate the correct amount of space. | ||
| var buffer = allocator.buffer(capacity: 0) | ||
|
|
||
| do { | ||
| try message.serialize(into: &buffer) | ||
| } catch { | ||
| self = .notWriting | ||
| return .failure(.serializationFailed) | ||
| } | ||
|
|
||
| // Zero is fine: the writer will allocate the correct amount of space. | ||
| var buffer = allocator.buffer(capacity: 0) | ||
| let data = buffer.readData(length: buffer.readableBytes)! | ||
| buffer.clear() | ||
| writer.write(data, into: &buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but unfortunately we need to rework the writer API slightly. I have a PR up at the moment which adds support for compression which complicates things a little more. For the time being I think making the writer generic over the payload and having it serialize the payload into the buffer is sufficient.
| } | ||
| } | ||
|
|
||
| extension Echo_EchoRequest: Payload { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is generated, but it looks like these were added manually.
The implementation for the conformance to Payload shouldn't be in the generated code. Instead we can create another protocol (e.g. GRPCProtobufPayload) which conforms to Payload and Message. It should then provide the default implementation for Payload so that the generated messages can be extended to conform to that protocol.
You'll need to take a look at the code-gen (in protoc-gen-grpc-swift) to figure out how to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@glbrntt Yes, I just added them to see if they would pass the test cases (since they are still failing) and to have a draft of the PR. So we can talk about What we would be doing, and how.
we can have the following:
protocol GRPCProtobufPayload: GRPCPayload {}
extension GRPCProtobufPayload {
// comes the implementation of the functions
}and similarly in FlatBuffers:
protocol GRPCFlatbufPayload: GRPCPayload {}
extension GRPCFlatbufPayload {
// comes the implementation of the functions
}do you think we should generate the code? or implement it which the package itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay, no problem. Just an FYI if you want to run the tests locally you can set the ENABLE_TIMING_TESTS environment variable to false to skip some of the longer running tests (it takes the test time down to a couple of seconds).
The default implementation for GRPCProtobufPayload (which should also conform to SwiftProtobuf.Message) should be within GRPC since we already depend on SwiftProtobuf. This gives us more flexibility to change the implementation (i.e. if/when new Protobuf APIs become available) without regenerating the code.
For flatbuffers I think we have to generate the conformance to GRPCPayload each time since there's no dependency between flatbuffers and gRPC Swift here.
9439f1e to
8827fca
Compare
|
@glbrntt I think this is ready for review now! Thanks a million for the guidance though, did learn new things! |
b045bce to
287022f
Compare
glbrntt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @mustiikhalil, I've left some comments inline.
The CI is failing because the committed code for the Echo service doesn't match the code generated for that service; you'll need to regenerate it and commit it again.
Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift
Outdated
Show resolved
Hide resolved
Sources/GRPC/GRPCServerCodec.swift
Outdated
| var buffer = ByteBufferAllocator().buffer(capacity: 0) | ||
| try message.serialize(into: &buffer) | ||
| context.write(self.wrapOutboundOut(.message(buffer.readData(length: buffer.readableBytes)!)), promise: promise) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of this change, we have quite a few unnecessary copies here and on the client side (between Data and ByteBuffer). This might require a fair bit of refactoring; would you be interested in doing this? If not I'm happy to do this in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would gladly do it but do you have a structure in mind or should I look into how we can do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll try to explain the problem and what needs to be done to improve it.
Problem
As you know, before this change we explicitly expected payloads to be Message from SwiftProtobuf. The only serialization and deserialization methods from Message are via Data from Foundation. Internally we use NIO and our currency type for storing bytes is NIOs ByteBuffer.
Because we always expect a Data from Message, the write method on LengthPrefixedMessageWriter accepts a Data and writes into a ByteBuffer.
With the protocol we always serialize into a ByteBuffer (for Message we have to go via Data first). This means that we've gone from:
MessagetoData(serialization)DatatoByteBuffer(length prefixing)
to
MessagetoData(serialization)DatatoByteBuffer(we need to forGRPCPayload)ByteBuffertoData(we need to forLengthPrefixedMessageWriter.write)DatatoByteBuffer(length prefixing)
Clearly we don't want to copy bytes from Data to ByteBuffer and back from ByteBuffer to Data again (and we also don't want our API for GRPCPayload to be defined by Data).
Now I need to provide some background on the LenthPrefixedMessageWriter: gRPC payloads are sent with a 5-byte prefix (I might have explained this before but I'll repeat it for completeness). The first byte indicates whether the payload is compressed or not, the next four bytes indicate the length of the payload (the compressed length if it is compressed).
If the payload isn't compressed then this is actually really straightforward: we write a 0-byte we write the length of the payload as a UInt32 and then we write the payload. For compression we have to write the a 1-byte, leave a 4-byte gap for the length, compress the payload into the buffer and not how many bytes it was and then fill in the 4-byte gap.
Possible Solution
So I think the best we can do now (I'll use protobuf as an example again) is actually to pre-allocate the extra 5-byte space at the beginning of the buffer before we pass it into message.serialize(into:).
If we aren't using compression then all we need to do is fill in the first 5 bytes, easy! (And we have the Message to Data to ByteBuffer, as we had before, for Flatbuffers I expect we have something similar.)
When we use compression things are slightly different because we can't compress in place, so we have an extra buffer but in this case the same applies, we can avoid the extra hop via Data.
Hopefully this all makes sense so far?
What needs to be done?
LengthPrefixedMessageWriter.writeaccepts aData, instead I think the writer should accept aGRPCPayload(as the writer knows whether compression is being used so knows how to fill in the first 5-bytes of the payload etc, it can also choose whether it should preallocate 5 bytes, if we're using compression then we don't need to since we have to compress into a different buffer anyway).Zlib.deflatealso accepts aDataif I recall correctly, this will need to be changed to aByteBuffer
On the server side of things, this will require having to move some components around. At the moment the length prefixed writer lives in HTTP1ToRawGRPCServerCodec; this will have to move to GPRCServerCodec. This means the types that get sent between these handlers will also have to change (_RawGRPCServerResponsePart.message will have a ByteBuffer as its payload which will be a length prefixed message). I also expect a number of tests will have to be updated to support these changes.
167989f to
75f3133
Compare
|
I've fixed all the suggested things, and also refactored the code according to what we planned yesterday, however there is a point in the Furthermore, I ran both commands locally and there is no difference between both I am not sure why the CLI is having issues with it :/ |
|
Great, I'll take another look now.
Oh I think I see the problem, the CI looks like it isn't actually rebuilding the plugin (maybe the workspace gets cached between different commits for the same PR?!). If you update the rule in the ${PROTOC_GEN_GRPC_SWIFT}: Sources/protoc-gen-grpc-swift/*.swift |
|
let's see hopefully it works. |
… with GRPC-swift Renames all the Requests and Responses + adds comments + fixes generator code Updated the code to only use bytebuffer instead of foundation data Move the logic for reading the payload to GRPCServerCodec, and fixed the logic Updated the rule for travis ci
98ef91b to
19f2604
Compare
glbrntt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some more comments @mustiikhalil -- great work!
Sources/GRPCInteroperabilityTestModels/Generated/test.grpc.swift
Outdated
Show resolved
Hide resolved
3f8effa to
00f9f91
Compare
…ming + generated code
95bcae7 to
b180af7
Compare
1135358 to
ff56fe4
Compare
ff56fe4 to
1cfb34c
Compare
glbrntt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more changes needed but this is getting pretty close though @mustiikhalil.
| } | ||
|
|
||
| let bytesRead = inputPointer.count - self.stream.availableInputBytes | ||
| return (bytesRead, writtenBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@glbrntt I think this should fix the issue with the pointer being accessed while we already moved out of the function
df14977 to
9ffa268
Compare
| buffer.reserveCapacity(MemoryLayout.size(ofValue: payload) + LengthPrefixedMessageWriter.metadataLength) | ||
|
|
||
| func write(_ payload: GRPCPayload, into buffer: inout ByteBuffer, disableCompression: Bool = false) throws {r | ||
| buffer.reserveCapacity(buffer.writerIndex + LengthPrefixedMessageWriter.metadataLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the code to use the buffer.writerIndex instead of the MemoryLayout.size(of: payload) since I think this way it will be more robust knowing that it will always have those 5 extra bytes on the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thanks!
9ffa268 to
c94ca28
Compare
c94ca28 to
052f513
Compare
glbrntt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks for sticking with this @mustiikhalil; great job!
| buffer.reserveCapacity(MemoryLayout.size(ofValue: payload) + LengthPrefixedMessageWriter.metadataLength) | ||
|
|
||
| func write(_ payload: GRPCPayload, into buffer: inout ByteBuffer, disableCompression: Bool = false) throws {r | ||
| buffer.reserveCapacity(buffer.writerIndex + LengthPrefixedMessageWriter.metadataLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thanks!
|
Wonderful! |

Good morning!
The following PR will be moving the
Message Protocolaway from the code base and introduces thePayload protocolwhich will allow users, to decode messages according to the Type they want to send. it addresses the following issue: #708, @glbrntt let's see how we can make this better since I found the following issue:which is not needed all the time since the following object didn't require it to work