-
Notifications
You must be signed in to change notification settings - Fork 435
Add GRPCMessageDeframer
#1776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add GRPCMessageDeframer
#1776
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| /* | ||
| * Copyright 2024, gRPC Authors All rights reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import GRPCCore | ||
| import NIOCore | ||
|
|
||
| /// A ``GRPCMessageDeframer`` helps with the deframing of gRPC data frames: | ||
| /// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length | ||
| /// - It reads and decompresses the payload, if compressed | ||
| /// - It helps put together frames that have been split across multiple `ByteBuffers` by the underlying transport | ||
| struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { | ||
| /// Length of the gRPC message header (1 compression byte, 4 bytes for the length). | ||
| static let metadataLength = 5 | ||
| static let defaultMaximumPayloadSize = Int.max | ||
|
|
||
| typealias InboundOut = [UInt8] | ||
|
|
||
| private let decompressor: Zlib.Decompressor? | ||
| private let maximumPayloadSize: Int | ||
|
|
||
| /// Create a new ``GRPCMessageDeframer``. | ||
| /// - Parameters: | ||
| /// - maximumPayloadSize: The maximum size a message payload can be. | ||
| /// - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages. | ||
| /// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean | ||
| /// up any resources allocated by `Zlib`. | ||
| init( | ||
| maximumPayloadSize: Int = Self.defaultMaximumPayloadSize, | ||
| decompressor: Zlib.Decompressor? = nil | ||
| ) { | ||
| self.maximumPayloadSize = maximumPayloadSize | ||
| self.decompressor = decompressor | ||
| } | ||
|
|
||
| mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? { | ||
| guard buffer.readableBytes >= Self.metadataLength else { | ||
| // If we cannot read enough bytes to cover the metadata's length, then we | ||
| // need to wait for more bytes to become available to us. | ||
| return nil | ||
| } | ||
|
|
||
| // Store the current reader index in case we don't yet have enough | ||
| // bytes in the buffer to decode a full frame, and need to reset it. | ||
| // The force-unwraps for the compression flag and message length are safe, | ||
| // because we've checked just above that we've got at least enough bytes to | ||
| // read all of the metadata. | ||
| let originalReaderIndex = buffer.readerIndex | ||
| let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1 | ||
| let messageLength = buffer.readInteger(as: UInt32.self)! | ||
|
|
||
| if messageLength > self.maximumPayloadSize { | ||
| throw RPCError( | ||
| code: .resourceExhausted, | ||
| message: """ | ||
| Message has exceeded the configured maximum payload size \ | ||
| (max: \(self.maximumPayloadSize), actual: \(messageLength)) | ||
| """ | ||
| ) | ||
| } | ||
|
|
||
| guard var message = buffer.readSlice(length: Int(messageLength)) else { | ||
| // `ByteBuffer/readSlice(length:)` returns nil when there are not enough | ||
| // bytes to read the requested length. This can happen if we don't yet have | ||
| // enough bytes buffered to read the full message payload. | ||
| // By reading the metadata though, we have already moved the reader index, | ||
| // so we must reset it to its previous, original position for now, | ||
| // and return. We'll try decoding again, once more bytes become available | ||
| // in our buffer. | ||
| buffer.moveReaderIndex(to: originalReaderIndex) | ||
| return nil | ||
| } | ||
|
|
||
| if isMessageCompressed { | ||
| guard let decompressor = self.decompressor else { | ||
| // We cannot decompress the payload - throw an error. | ||
| throw RPCError( | ||
| code: .internalError, | ||
| message: "Received a compressed message payload, but no decompressor has been configured." | ||
| ) | ||
| } | ||
| return try decompressor.decompress(&message, limit: self.maximumPayloadSize) | ||
| } else { | ||
| return Array(buffer: message) | ||
| } | ||
| } | ||
|
|
||
| mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? { | ||
| try self.decode(buffer: &buffer) | ||
| } | ||
| } | ||
219 changes: 219 additions & 0 deletions
219
Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,219 @@ | ||
| /* | ||
| * Copyright 2024, gRPC Authors All rights reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import GRPCCore | ||
| import NIOCore | ||
| import NIOTestUtils | ||
| import XCTest | ||
|
|
||
| @testable import GRPCHTTP2Core | ||
|
|
||
| final class GRPCMessageDeframerTests: XCTestCase { | ||
gjcairo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| func testReadMultipleMessagesWithoutCompression() throws { | ||
| let firstMessage = { | ||
| var buffer = ByteBuffer() | ||
| buffer.writeInteger(UInt8(0)) | ||
| buffer.writeInteger(UInt32(16)) | ||
| buffer.writeRepeatingByte(42, count: 16) | ||
| return buffer | ||
| }() | ||
|
|
||
| let secondMessage = { | ||
| var buffer = ByteBuffer() | ||
| buffer.writeInteger(UInt8(0)) | ||
| buffer.writeInteger(UInt32(8)) | ||
| buffer.writeRepeatingByte(43, count: 8) | ||
| return buffer | ||
| }() | ||
|
|
||
| try ByteToMessageDecoderVerifier.verifyDecoder( | ||
| inputOutputPairs: [ | ||
| (firstMessage, [Array(repeating: UInt8(42), count: 16)]), | ||
| (secondMessage, [Array(repeating: UInt8(43), count: 8)]), | ||
| ]) { | ||
| GRPCMessageDeframer() | ||
| } | ||
| } | ||
|
|
||
| func testReadMessageOverSizeLimitWithoutCompression() throws { | ||
| let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) | ||
| let processor = NIOSingleStepByteToMessageProcessor(deframer) | ||
|
|
||
| var buffer = ByteBuffer() | ||
| buffer.writeInteger(UInt8(0)) | ||
| buffer.writeInteger(UInt32(101)) | ||
| buffer.writeRepeatingByte(42, count: 101) | ||
gjcairo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| XCTAssertThrowsError( | ||
| ofType: RPCError.self, | ||
| try processor.process(buffer: buffer) { _ in | ||
| XCTFail("No message should be produced.") | ||
| } | ||
| ) { error in | ||
| XCTAssertEqual(error.code, .resourceExhausted) | ||
| XCTAssertEqual( | ||
| error.message, | ||
| "Message has exceeded the configured maximum payload size (max: 100, actual: 101)" | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| func testReadMessageOverSizeLimitButWithoutActualMessageBytes() throws { | ||
| let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) | ||
| let processor = NIOSingleStepByteToMessageProcessor(deframer) | ||
|
|
||
| var buffer = ByteBuffer() | ||
| buffer.writeInteger(UInt8(0)) | ||
| // Set the message length field to be over the maximum payload size, but | ||
| // don't write the actual message bytes. This is to ensure that the payload | ||
| // size limit is enforced _before_ the payload is actually read. | ||
| buffer.writeInteger(UInt32(101)) | ||
|
|
||
| XCTAssertThrowsError( | ||
| ofType: RPCError.self, | ||
| try processor.process(buffer: buffer) { _ in | ||
| XCTFail("No message should be produced.") | ||
| } | ||
| ) { error in | ||
| XCTAssertEqual(error.code, .resourceExhausted) | ||
| XCTAssertEqual( | ||
| error.message, | ||
| "Message has exceeded the configured maximum payload size (max: 100, actual: 101)" | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| func testCompressedMessageWithoutConfiguringDecompressor() throws { | ||
| let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) | ||
| let processor = NIOSingleStepByteToMessageProcessor(deframer) | ||
|
|
||
| var buffer = ByteBuffer() | ||
| buffer.writeInteger(UInt8(1)) | ||
| buffer.writeInteger(UInt32(10)) | ||
| buffer.writeRepeatingByte(42, count: 10) | ||
|
|
||
| XCTAssertThrowsError( | ||
| ofType: RPCError.self, | ||
| try processor.process(buffer: buffer) { _ in | ||
| XCTFail("No message should be produced.") | ||
| } | ||
| ) { error in | ||
| XCTAssertEqual(error.code, .internalError) | ||
| XCTAssertEqual( | ||
| error.message, | ||
| "Received a compressed message payload, but no decompressor has been configured." | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| private func testReadMultipleMessagesWithCompression(method: Zlib.Method) throws { | ||
| let decompressor = Zlib.Decompressor(method: method) | ||
| let compressor = Zlib.Compressor(method: method) | ||
| var framer = GRPCMessageFramer() | ||
| defer { | ||
| decompressor.end() | ||
| compressor.end() | ||
| } | ||
|
|
||
| let firstMessage = try { | ||
| framer.append(Array(repeating: 42, count: 100)) | ||
| return try framer.next(compressor: compressor)! | ||
| }() | ||
|
|
||
| let secondMessage = try { | ||
| framer.append(Array(repeating: 43, count: 110)) | ||
| return try framer.next(compressor: compressor)! | ||
| }() | ||
|
|
||
| try ByteToMessageDecoderVerifier.verifyDecoder( | ||
| inputOutputPairs: [ | ||
| (firstMessage, [Array(repeating: 42, count: 100)]), | ||
| (secondMessage, [Array(repeating: 43, count: 110)]), | ||
| ]) { | ||
| GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor) | ||
| } | ||
| } | ||
|
|
||
| func testReadMultipleMessagesWithDeflateCompression() throws { | ||
| try self.testReadMultipleMessagesWithCompression(method: .deflate) | ||
| } | ||
|
|
||
| func testReadMultipleMessagesWithGZIPCompression() throws { | ||
| try self.testReadMultipleMessagesWithCompression(method: .gzip) | ||
| } | ||
|
|
||
| func testReadCompressedMessageOverSizeLimitBeforeDecompressing() throws { | ||
| let deframer = GRPCMessageDeframer(maximumPayloadSize: 1) | ||
| let processor = NIOSingleStepByteToMessageProcessor(deframer) | ||
| let compressor = Zlib.Compressor(method: .gzip) | ||
| var framer = GRPCMessageFramer() | ||
| defer { | ||
| compressor.end() | ||
| } | ||
|
|
||
| framer.append(Array(repeating: 42, count: 100)) | ||
| let framedMessage = try framer.next(compressor: compressor)! | ||
|
|
||
| XCTAssertThrowsError( | ||
| ofType: RPCError.self, | ||
| try processor.process(buffer: framedMessage) { _ in | ||
| XCTFail("No message should be produced.") | ||
| } | ||
| ) { error in | ||
| XCTAssertEqual(error.code, .resourceExhausted) | ||
| XCTAssertEqual( | ||
| error.message, | ||
| """ | ||
| Message has exceeded the configured maximum payload size \ | ||
| (max: 1, actual: \(framedMessage.readableBytes - GRPCMessageDeframer.metadataLength)) | ||
| """ | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| private func testReadDecompressedMessageOverSizeLimit(method: Zlib.Method) throws { | ||
| let decompressor = Zlib.Decompressor(method: method) | ||
| let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) | ||
| let processor = NIOSingleStepByteToMessageProcessor(deframer) | ||
| let compressor = Zlib.Compressor(method: method) | ||
| var framer = GRPCMessageFramer() | ||
| defer { | ||
| decompressor.end() | ||
| compressor.end() | ||
| } | ||
|
|
||
| framer.append(Array(repeating: 42, count: 101)) | ||
| let framedMessage = try framer.next(compressor: compressor)! | ||
|
|
||
| XCTAssertThrowsError( | ||
| ofType: RPCError.self, | ||
| try processor.process(buffer: framedMessage) { _ in | ||
| XCTFail("No message should be produced.") | ||
| } | ||
| ) { error in | ||
| XCTAssertEqual(error.code, .resourceExhausted) | ||
| XCTAssertEqual(error.message, "Message is too large to decompress.") | ||
| } | ||
| } | ||
|
|
||
| func testReadDecompressedMessageOverSizeLimitWithDeflateCompression() throws { | ||
| try self.testReadDecompressedMessageOverSizeLimit(method: .deflate) | ||
| } | ||
|
|
||
| func testReadDecompressedMessageOverSizeLimitWithGZIPCompression() throws { | ||
| try self.testReadDecompressedMessageOverSizeLimit(method: .gzip) | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.