From 5a495bf0bd483e25586592dca2628cde9326e068 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 22 Jan 2024 16:50:11 +0000 Subject: [PATCH 1/5] Add GRPCMessageDeframer --- .../GRPCHTTP2Core/GRPCMessageDeframer.swift | 89 ++++++++ .../GRPCMessageDeframerTests.swift | 200 ++++++++++++++++++ .../Test Utilities/XCTest+Utilities.swift | 13 ++ 3 files changed, 302 insertions(+) create mode 100644 Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift create mode 100644 Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift diff --git a/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift new file mode 100644 index 000000000..7ced043b7 --- /dev/null +++ b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift @@ -0,0 +1,89 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import NIOCore +import GRPCCore + +/// A ``GRPCMessageDeframer`` helps with the deframing of gRPC data frames: +/// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length +/// - It reads and decompresses the payload, if compressed +/// - It helps put together frames that have been split across multiple `ByteBuffers` by the underlying transport +struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { + /// Length of the gRPC message header (1 compression byte, 4 bytes for the length). + static let metadataLength = 5 + static let defaultMaximumPayloadSize = Int.max + + typealias InboundOut = [UInt8] + + private let decompressor: Zlib.Decompressor? + private let maximumPayloadSize: Int? + private var effectiveMaximumPayloadSize: Int { + self.maximumPayloadSize ?? Self.defaultMaximumPayloadSize + } + + /// Create a new ``GRPCMessageDeframer``. + /// - Parameters: + /// - maximumPayloadSize: The maximum size a message payload can be. + /// - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages. + /// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean + /// up any resources allocated by `Zlib`. + init(maximumPayloadSize: Int? = nil, decompressor: Zlib.Decompressor? = nil) { + self.maximumPayloadSize = maximumPayloadSize + self.decompressor = decompressor + } + + mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? { + guard buffer.readableBytes >= Self.metadataLength else { + // If we cannot read enough bytes to cover the metadata's length, then we + // need to wait for more bytes to become available to us. + return nil + } + + // Store the current reader index in case we don't yet have enough + // bytes in the buffer to decode a full frame, and need to reset it. + let originalReaderIndex = buffer.readerIndex + let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1 + + guard var message = buffer.readLengthPrefixedSlice(as: UInt32.self) else { + // We have moved the reader index, but we don't have enough bytes to + // read the full message payload, so reset it to its previous, original + // position for now, and return. We'll try decoding again, once more + // bytes become available in our buffer. + buffer.moveReaderIndex(to: originalReaderIndex) + return nil + } + + if isMessageCompressed { + guard let decompressor = self.decompressor else { + // We cannot decompress the payload - discard the message. + return nil + } + return try decompressor.decompress(&message, limit: self.effectiveMaximumPayloadSize) + } else { + if message.readableBytes > self.effectiveMaximumPayloadSize { + throw RPCError( + code: .resourceExhausted, + message: "Message has exceeded the configured maximum payload size (max: \(self.effectiveMaximumPayloadSize), actual: \(message.readableBytes))" + ) + } + return Array(buffer: message) + } + } + + mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? { + try self.decode(buffer: &buffer) + } +} diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift new file mode 100644 index 000000000..1f4202ced --- /dev/null +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift @@ -0,0 +1,200 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import NIOCore +import XCTest + +@testable import GRPCHTTP2Core + +final class GRPCMessageDeframerTests: XCTestCase { + func testReadMultipleMessagesWithoutCompression() throws { + let deframer = GRPCMessageDeframer() + let processor = NIOSingleStepByteToMessageProcessor(deframer) + + var buffer = ByteBuffer() + buffer.writeInteger(UInt8(0)) + buffer.writeInteger(UInt32(16)) + buffer.writeRepeatingByte(42, count: 16) + + buffer.writeInteger(UInt8(0)) + buffer.writeInteger(UInt32(8)) + buffer.writeRepeatingByte(43, count: 8) + + var messages = [[UInt8]]() + try processor.process(buffer: buffer) { message in + messages.append(message) + } + + XCTAssertEqual(messages, [ + Array(repeating: 42, count: 16), + Array(repeating: 43, count: 8) + ]) + } + + func testReadMessageOverSizeLimitWithoutCompression() throws { + let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) + let processor = NIOSingleStepByteToMessageProcessor(deframer) + + var buffer = ByteBuffer() + buffer.writeInteger(UInt8(0)) + buffer.writeInteger(UInt32(101)) + buffer.writeRepeatingByte(42, count: 101) + + XCTAssertThrowsRPCError ( + try processor.process(buffer: buffer) { _ in + XCTFail("No message should be produced.") + } + ) { error in + XCTAssertEqual(error.code, .resourceExhausted) + XCTAssertEqual(error.message, "Message has exceeded the configured maximum payload size (max: 100, actual: 101)") + } + } + + func testReadSingleMessageWithoutCompressionSplitAcrossMultipleBuffers() throws { + let deframer = GRPCMessageDeframer() + let processor = NIOSingleStepByteToMessageProcessor(deframer) + + var buffer = ByteBuffer() + + // We want to write the following gRPC frame: + // - Compression flag unset + // - Message length = 120 + // - 120 bytes of data for the message + // The header will be split in two (the first 3 bytes in a buffer, the + // remaining 2 in another one); the first chunk of the message will follow + // the second part of the metadata in the second buffer; and finally + // the rest of the message bytes in a third buffer. + // The purpose of this test is to make sure that we are correctly stitching + // together the frame. + + // Write compression flag (unset) + buffer.writeInteger(UInt8(0)) + // Write the first two bytes of the length field + buffer.writeInteger(UInt16(0)) + // Make sure we don't produce a message, since we've got incomplete data. + try processor.process(buffer: buffer) { message in + XCTAssertNil(message) + } + + buffer.clear() + // Write the next two bytes of the length field + buffer.writeInteger(UInt16(120)) + // Write the first half of the message data + buffer.writeRepeatingByte(42, count: 60) + // Again, make sure we don't produce a message, since we don't have enough + // message bytes to read (only have 60 so far, but need 120). + try processor.process(buffer: buffer) { message in + XCTAssertNil(message) + } + + buffer.clear() + // Write remaining 60 bytes of the message. + buffer.writeRepeatingByte(43, count: 60) + + // Now we should be reading the full message. + var messages = [[UInt8]]() + try processor.process(buffer: buffer) { message in + messages.append(message) + } + let expectedMessage = { + var firstHalf = Array(repeating: UInt8(42), count: 60) + firstHalf.append(contentsOf: Array(repeating: 43, count: 60)) + return firstHalf + }() + XCTAssertEqual(messages, [expectedMessage]) + } + + func testReadMultipleMessagesWithCompression() throws { + let decompressor = Zlib.Decompressor(method: .deflate) + let deframer = GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor) + let processor = NIOSingleStepByteToMessageProcessor(deframer) + let compressor = Zlib.Compressor(method: .deflate) + var framer = GRPCMessageFramer() + + framer.append(Array(repeating: 42, count: 100)) + var framedMessage = try framer.next(compressor: compressor)! + + var messages = [[UInt8]]() + try processor.process(buffer: framedMessage) { message in + messages.append(message) + } + + framer.append(Array(repeating: 43, count: 110)) + framedMessage = try framer.next(compressor: compressor)! + try processor.process(buffer: framedMessage) { message in + messages.append(message) + } + + XCTAssertEqual(messages, [ + Array(repeating: 42, count: 100), + Array(repeating: 43, count: 110) + ]) + } + + func testReadMessageOverSizeLimitWithCompression() throws { + let decompressor = Zlib.Decompressor(method: .deflate) + let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) + let processor = NIOSingleStepByteToMessageProcessor(deframer) + + let compressor = Zlib.Compressor(method: .deflate) + var framer = GRPCMessageFramer() + framer.append(Array(repeating: 42, count: 101)) + var framedMessage = try framer.next(compressor: compressor)! + + XCTAssertThrowsRPCError ( + try processor.process(buffer: framedMessage) { _ in + XCTFail("No message should be produced.") + } + ) { error in + XCTAssertEqual(error.code, .resourceExhausted) + XCTAssertEqual(error.message, "Message is too large to decompress.") + } + } + + func testReadSingleMessageWithCompressionSplitAcrossMultipleBuffers() throws { + let decompressor = Zlib.Decompressor(method: .deflate) + let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) + let processor = NIOSingleStepByteToMessageProcessor(deframer) + let compressor = Zlib.Compressor(method: .deflate) + var framer = GRPCMessageFramer() + + framer.append(Array(repeating: 42, count: 100)) + var framedMessage = try framer.next(compressor: compressor)! + var firstBuffer = ByteBuffer(buffer: framedMessage.readSlice(length: 3)!) + var secondBuffer = ByteBuffer(buffer: framedMessage.readSlice(length: 3)!) + var thirdBuffer = ByteBuffer(buffer: framedMessage) + framedMessage.moveReaderIndex(to: 0) + + // Make sure we don't produce a message, since we've got incomplete data. + try processor.process(buffer: firstBuffer) { message in + XCTFail("No message should be produced.") + } + + // Again, make sure we don't produce a message, since we don't have enough + // message bytes to read. + try processor.process(buffer: secondBuffer) { message in + XCTFail("No message should be produced.") + } + + // Now we should be reading the full message. + var messages = [[UInt8]]() + try processor.process(buffer: thirdBuffer) { message in + messages.append(message) + } + // Assert the retrieved message matches the uncompressed original message. + XCTAssertEqual(messages, [Array(repeating: 42, count: 100)]) + } +} diff --git a/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift b/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift index 4cef512d0..878e4b62f 100644 --- a/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift +++ b/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift @@ -28,3 +28,16 @@ func XCTAssertThrowsError( errorHandler(error) } } + +func XCTAssertThrowsRPCError( + _ expression: @autoclosure () throws -> T, + _ errorHandler: (RPCError) -> Void +) { + XCTAssertThrowsError(try expression()) { error in + guard let error = error as? RPCError else { + return XCTFail("Error had unexpected type '\(type(of: error))'") + } + + errorHandler(error) + } +} From acf9f049c0bc724271f759f9ef0dc39c6bd3d1a7 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 25 Jan 2024 13:54:43 +0000 Subject: [PATCH 2/5] Formatting --- .../GRPCHTTP2Core/GRPCMessageDeframer.swift | 15 ++-- .../GRPCMessageDeframerTests.swift | 83 ++++++++++--------- 2 files changed, 54 insertions(+), 44 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift index 7ced043b7..4ee418219 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift @@ -14,8 +14,8 @@ * limitations under the License. */ -import NIOCore import GRPCCore +import NIOCore /// A ``GRPCMessageDeframer`` helps with the deframing of gRPC data frames: /// - It reads the frame's metadata to know whether the message payload is compressed or not, and its length @@ -27,13 +27,13 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { static let defaultMaximumPayloadSize = Int.max typealias InboundOut = [UInt8] - + private let decompressor: Zlib.Decompressor? private let maximumPayloadSize: Int? private var effectiveMaximumPayloadSize: Int { self.maximumPayloadSize ?? Self.defaultMaximumPayloadSize } - + /// Create a new ``GRPCMessageDeframer``. /// - Parameters: /// - maximumPayloadSize: The maximum size a message payload can be. @@ -44,7 +44,7 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { self.maximumPayloadSize = maximumPayloadSize self.decompressor = decompressor } - + mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? { guard buffer.readableBytes >= Self.metadataLength else { // If we cannot read enough bytes to cover the metadata's length, then we @@ -65,7 +65,7 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { buffer.moveReaderIndex(to: originalReaderIndex) return nil } - + if isMessageCompressed { guard let decompressor = self.decompressor else { // We cannot decompress the payload - discard the message. @@ -76,13 +76,14 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { if message.readableBytes > self.effectiveMaximumPayloadSize { throw RPCError( code: .resourceExhausted, - message: "Message has exceeded the configured maximum payload size (max: \(self.effectiveMaximumPayloadSize), actual: \(message.readableBytes))" + message: + "Message has exceeded the configured maximum payload size (max: \(self.effectiveMaximumPayloadSize), actual: \(message.readableBytes))" ) } return Array(buffer: message) } } - + mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? { try self.decode(buffer: &buffer) } diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift index 1f4202ced..5c007bdd2 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift @@ -23,52 +23,58 @@ final class GRPCMessageDeframerTests: XCTestCase { func testReadMultipleMessagesWithoutCompression() throws { let deframer = GRPCMessageDeframer() let processor = NIOSingleStepByteToMessageProcessor(deframer) - + var buffer = ByteBuffer() buffer.writeInteger(UInt8(0)) buffer.writeInteger(UInt32(16)) buffer.writeRepeatingByte(42, count: 16) - + buffer.writeInteger(UInt8(0)) buffer.writeInteger(UInt32(8)) buffer.writeRepeatingByte(43, count: 8) - + var messages = [[UInt8]]() try processor.process(buffer: buffer) { message in messages.append(message) } - - XCTAssertEqual(messages, [ - Array(repeating: 42, count: 16), - Array(repeating: 43, count: 8) - ]) + + XCTAssertEqual( + messages, + [ + Array(repeating: 42, count: 16), + Array(repeating: 43, count: 8), + ] + ) } - + func testReadMessageOverSizeLimitWithoutCompression() throws { let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) let processor = NIOSingleStepByteToMessageProcessor(deframer) - + var buffer = ByteBuffer() buffer.writeInteger(UInt8(0)) buffer.writeInteger(UInt32(101)) buffer.writeRepeatingByte(42, count: 101) - - XCTAssertThrowsRPCError ( + + XCTAssertThrowsRPCError( try processor.process(buffer: buffer) { _ in XCTFail("No message should be produced.") } ) { error in XCTAssertEqual(error.code, .resourceExhausted) - XCTAssertEqual(error.message, "Message has exceeded the configured maximum payload size (max: 100, actual: 101)") + XCTAssertEqual( + error.message, + "Message has exceeded the configured maximum payload size (max: 100, actual: 101)" + ) } } - + func testReadSingleMessageWithoutCompressionSplitAcrossMultipleBuffers() throws { let deframer = GRPCMessageDeframer() let processor = NIOSingleStepByteToMessageProcessor(deframer) - + var buffer = ByteBuffer() - + // We want to write the following gRPC frame: // - Compression flag unset // - Message length = 120 @@ -79,7 +85,7 @@ final class GRPCMessageDeframerTests: XCTestCase { // the rest of the message bytes in a third buffer. // The purpose of this test is to make sure that we are correctly stitching // together the frame. - + // Write compression flag (unset) buffer.writeInteger(UInt8(0)) // Write the first two bytes of the length field @@ -88,7 +94,7 @@ final class GRPCMessageDeframerTests: XCTestCase { try processor.process(buffer: buffer) { message in XCTAssertNil(message) } - + buffer.clear() // Write the next two bytes of the length field buffer.writeInteger(UInt16(120)) @@ -99,11 +105,11 @@ final class GRPCMessageDeframerTests: XCTestCase { try processor.process(buffer: buffer) { message in XCTAssertNil(message) } - + buffer.clear() // Write remaining 60 bytes of the message. buffer.writeRepeatingByte(43, count: 60) - + // Now we should be reading the full message. var messages = [[UInt8]]() try processor.process(buffer: buffer) { message in @@ -116,45 +122,48 @@ final class GRPCMessageDeframerTests: XCTestCase { }() XCTAssertEqual(messages, [expectedMessage]) } - + func testReadMultipleMessagesWithCompression() throws { let decompressor = Zlib.Decompressor(method: .deflate) let deframer = GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor) let processor = NIOSingleStepByteToMessageProcessor(deframer) let compressor = Zlib.Compressor(method: .deflate) var framer = GRPCMessageFramer() - + framer.append(Array(repeating: 42, count: 100)) var framedMessage = try framer.next(compressor: compressor)! - + var messages = [[UInt8]]() try processor.process(buffer: framedMessage) { message in messages.append(message) } - + framer.append(Array(repeating: 43, count: 110)) framedMessage = try framer.next(compressor: compressor)! try processor.process(buffer: framedMessage) { message in messages.append(message) } - - XCTAssertEqual(messages, [ - Array(repeating: 42, count: 100), - Array(repeating: 43, count: 110) - ]) + + XCTAssertEqual( + messages, + [ + Array(repeating: 42, count: 100), + Array(repeating: 43, count: 110), + ] + ) } - + func testReadMessageOverSizeLimitWithCompression() throws { let decompressor = Zlib.Decompressor(method: .deflate) let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) let processor = NIOSingleStepByteToMessageProcessor(deframer) - + let compressor = Zlib.Compressor(method: .deflate) var framer = GRPCMessageFramer() framer.append(Array(repeating: 42, count: 101)) var framedMessage = try framer.next(compressor: compressor)! - - XCTAssertThrowsRPCError ( + + XCTAssertThrowsRPCError( try processor.process(buffer: framedMessage) { _ in XCTFail("No message should be produced.") } @@ -163,21 +172,21 @@ final class GRPCMessageDeframerTests: XCTestCase { XCTAssertEqual(error.message, "Message is too large to decompress.") } } - + func testReadSingleMessageWithCompressionSplitAcrossMultipleBuffers() throws { let decompressor = Zlib.Decompressor(method: .deflate) let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) let processor = NIOSingleStepByteToMessageProcessor(deframer) let compressor = Zlib.Compressor(method: .deflate) var framer = GRPCMessageFramer() - + framer.append(Array(repeating: 42, count: 100)) var framedMessage = try framer.next(compressor: compressor)! var firstBuffer = ByteBuffer(buffer: framedMessage.readSlice(length: 3)!) var secondBuffer = ByteBuffer(buffer: framedMessage.readSlice(length: 3)!) var thirdBuffer = ByteBuffer(buffer: framedMessage) framedMessage.moveReaderIndex(to: 0) - + // Make sure we don't produce a message, since we've got incomplete data. try processor.process(buffer: firstBuffer) { message in XCTFail("No message should be produced.") @@ -188,7 +197,7 @@ final class GRPCMessageDeframerTests: XCTestCase { try processor.process(buffer: secondBuffer) { message in XCTFail("No message should be produced.") } - + // Now we should be reading the full message. var messages = [[UInt8]]() try processor.process(buffer: thirdBuffer) { message in From c660d09b9a39777dd1bb247c45a8d20b5247a86f Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 25 Jan 2024 17:28:16 +0000 Subject: [PATCH 3/5] PR changes --- Package.swift | 2 + .../GRPCHTTP2Core/GRPCMessageDeframer.swift | 23 +- .../GRPCMessageDeframerTests.swift | 214 +++++++----------- .../Test Utilities/XCTest+Utilities.swift | 13 -- 4 files changed, 100 insertions(+), 152 deletions(-) diff --git a/Package.swift b/Package.swift index f61589f4d..4b194d25b 100644 --- a/Package.swift +++ b/Package.swift @@ -128,6 +128,7 @@ extension Target.Dependency { name: "NIOTransportServices", package: "swift-nio-transport-services" ) + static let nioTestUtils: Self = .product(name: "NIOTestUtils", package: "swift-nio") static let logging: Self = .product(name: "Logging", package: "swift-log") static let protobuf: Self = .product(name: "SwiftProtobuf", package: "swift-protobuf") static let protobufPluginLibrary: Self = .product( @@ -312,6 +313,7 @@ extension Target { .nioCore, .nioHTTP2, .nioEmbedded, + .nioTestUtils, ] ) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift index 4ee418219..5572d3e29 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift @@ -29,10 +29,7 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { typealias InboundOut = [UInt8] private let decompressor: Zlib.Decompressor? - private let maximumPayloadSize: Int? - private var effectiveMaximumPayloadSize: Int { - self.maximumPayloadSize ?? Self.defaultMaximumPayloadSize - } + private let maximumPayloadSize: Int /// Create a new ``GRPCMessageDeframer``. /// - Parameters: @@ -40,7 +37,10 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { /// - decompressor: A `Zlib.Decompressor` to use when decompressing compressed gRPC messages. /// - Important: You must call `end()` on the `decompressor` when you're done using it, to clean /// up any resources allocated by `Zlib`. - init(maximumPayloadSize: Int? = nil, decompressor: Zlib.Decompressor? = nil) { + init( + maximumPayloadSize: Int = Self.defaultMaximumPayloadSize, + decompressor: Zlib.Decompressor? = nil + ) { self.maximumPayloadSize = maximumPayloadSize self.decompressor = decompressor } @@ -68,16 +68,19 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { if isMessageCompressed { guard let decompressor = self.decompressor else { - // We cannot decompress the payload - discard the message. - return nil + // We cannot decompress the payload - throw an error. + throw RPCError( + code: .internalError, + message: "Received a compressed message payload, but no decompressor has been configured." + ) } - return try decompressor.decompress(&message, limit: self.effectiveMaximumPayloadSize) + return try decompressor.decompress(&message, limit: self.maximumPayloadSize) } else { - if message.readableBytes > self.effectiveMaximumPayloadSize { + if message.readableBytes > self.maximumPayloadSize { throw RPCError( code: .resourceExhausted, message: - "Message has exceeded the configured maximum payload size (max: \(self.effectiveMaximumPayloadSize), actual: \(message.readableBytes))" + "Message has exceeded the configured maximum payload size (max: \(self.maximumPayloadSize), actual: \(message.readableBytes))" ) } return Array(buffer: message) diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift index 5c007bdd2..460b883dd 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift @@ -14,37 +14,38 @@ * limitations under the License. */ +import GRPCCore import NIOCore +import NIOTestUtils import XCTest @testable import GRPCHTTP2Core final class GRPCMessageDeframerTests: XCTestCase { func testReadMultipleMessagesWithoutCompression() throws { - let deframer = GRPCMessageDeframer() - let processor = NIOSingleStepByteToMessageProcessor(deframer) - - var buffer = ByteBuffer() - buffer.writeInteger(UInt8(0)) - buffer.writeInteger(UInt32(16)) - buffer.writeRepeatingByte(42, count: 16) - - buffer.writeInteger(UInt8(0)) - buffer.writeInteger(UInt32(8)) - buffer.writeRepeatingByte(43, count: 8) + let firstMessage = { + var buffer = ByteBuffer() + buffer.writeInteger(UInt8(0)) + buffer.writeInteger(UInt32(16)) + buffer.writeRepeatingByte(42, count: 16) + return buffer + }() - var messages = [[UInt8]]() - try processor.process(buffer: buffer) { message in - messages.append(message) - } + let secondMessage = { + var buffer = ByteBuffer() + buffer.writeInteger(UInt8(0)) + buffer.writeInteger(UInt32(8)) + buffer.writeRepeatingByte(43, count: 8) + return buffer + }() - XCTAssertEqual( - messages, - [ - Array(repeating: 42, count: 16), - Array(repeating: 43, count: 8), - ] - ) + try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [ + (firstMessage, [Array(repeating: UInt8(42), count: 16)]), + (secondMessage, [Array(repeating: UInt8(43), count: 8)]), + ]) { + GRPCMessageDeframer() + } } func testReadMessageOverSizeLimitWithoutCompression() throws { @@ -56,7 +57,8 @@ final class GRPCMessageDeframerTests: XCTestCase { buffer.writeInteger(UInt32(101)) buffer.writeRepeatingByte(42, count: 101) - XCTAssertThrowsRPCError( + XCTAssertThrowsError( + ofType: RPCError.self, try processor.process(buffer: buffer) { _ in XCTFail("No message should be produced.") } @@ -69,101 +71,81 @@ final class GRPCMessageDeframerTests: XCTestCase { } } - func testReadSingleMessageWithoutCompressionSplitAcrossMultipleBuffers() throws { - let deframer = GRPCMessageDeframer() + func testCompressedMessageWithoutConfiguringDecompressor() throws { + let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) let processor = NIOSingleStepByteToMessageProcessor(deframer) var buffer = ByteBuffer() + buffer.writeInteger(UInt8(1)) + buffer.writeInteger(UInt32(101)) + buffer.writeRepeatingByte(42, count: 101) - // We want to write the following gRPC frame: - // - Compression flag unset - // - Message length = 120 - // - 120 bytes of data for the message - // The header will be split in two (the first 3 bytes in a buffer, the - // remaining 2 in another one); the first chunk of the message will follow - // the second part of the metadata in the second buffer; and finally - // the rest of the message bytes in a third buffer. - // The purpose of this test is to make sure that we are correctly stitching - // together the frame. - - // Write compression flag (unset) - buffer.writeInteger(UInt8(0)) - // Write the first two bytes of the length field - buffer.writeInteger(UInt16(0)) - // Make sure we don't produce a message, since we've got incomplete data. - try processor.process(buffer: buffer) { message in - XCTAssertNil(message) + XCTAssertThrowsError( + ofType: RPCError.self, + try processor.process(buffer: buffer) { _ in + XCTFail("No message should be produced.") + } + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Received a compressed message payload, but no decompressor has been configured." + ) } + } - buffer.clear() - // Write the next two bytes of the length field - buffer.writeInteger(UInt16(120)) - // Write the first half of the message data - buffer.writeRepeatingByte(42, count: 60) - // Again, make sure we don't produce a message, since we don't have enough - // message bytes to read (only have 60 so far, but need 120). - try processor.process(buffer: buffer) { message in - XCTAssertNil(message) + private func testReadMultipleMessagesWithCompression(method: Zlib.Method) throws { + let decompressor = Zlib.Decompressor(method: method) + let compressor = Zlib.Compressor(method: method) + var framer = GRPCMessageFramer() + defer { + decompressor.end() + compressor.end() } - buffer.clear() - // Write remaining 60 bytes of the message. - buffer.writeRepeatingByte(43, count: 60) - - // Now we should be reading the full message. - var messages = [[UInt8]]() - try processor.process(buffer: buffer) { message in - messages.append(message) - } - let expectedMessage = { - var firstHalf = Array(repeating: UInt8(42), count: 60) - firstHalf.append(contentsOf: Array(repeating: 43, count: 60)) - return firstHalf + let firstMessage = try { + framer.append(Array(repeating: 42, count: 100)) + return try framer.next(compressor: compressor)! }() - XCTAssertEqual(messages, [expectedMessage]) - } - - func testReadMultipleMessagesWithCompression() throws { - let decompressor = Zlib.Decompressor(method: .deflate) - let deframer = GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor) - let processor = NIOSingleStepByteToMessageProcessor(deframer) - let compressor = Zlib.Compressor(method: .deflate) - var framer = GRPCMessageFramer() - framer.append(Array(repeating: 42, count: 100)) - var framedMessage = try framer.next(compressor: compressor)! + let secondMessage = try { + framer.append(Array(repeating: 43, count: 110)) + return try framer.next(compressor: compressor)! + }() - var messages = [[UInt8]]() - try processor.process(buffer: framedMessage) { message in - messages.append(message) - } + try ByteToMessageDecoderVerifier.verifyDecoder( + inputOutputPairs: [ + (firstMessage, [Array(repeating: 42, count: 100)]), + (secondMessage, [Array(repeating: 43, count: 110)]), + ]) { + GRPCMessageDeframer(maximumPayloadSize: 1000, decompressor: decompressor) + } + } - framer.append(Array(repeating: 43, count: 110)) - framedMessage = try framer.next(compressor: compressor)! - try processor.process(buffer: framedMessage) { message in - messages.append(message) - } + func testReadMultipleMessagesWithDeflateCompression() throws { + try self.testReadMultipleMessagesWithCompression(method: .deflate) + } - XCTAssertEqual( - messages, - [ - Array(repeating: 42, count: 100), - Array(repeating: 43, count: 110), - ] - ) + func testReadMultipleMessagesWithGZIPCompression() throws { + try self.testReadMultipleMessagesWithCompression(method: .gzip) } - func testReadMessageOverSizeLimitWithCompression() throws { - let decompressor = Zlib.Decompressor(method: .deflate) + private func testReadMessageOverSizeLimitWithCompression(method: Zlib.Method) throws { + let decompressor = Zlib.Decompressor(method: method) let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) let processor = NIOSingleStepByteToMessageProcessor(deframer) - - let compressor = Zlib.Compressor(method: .deflate) + let compressor = Zlib.Compressor(method: method) var framer = GRPCMessageFramer() + defer { + decompressor.end() + compressor.end() + } + framer.append(Array(repeating: 42, count: 101)) - var framedMessage = try framer.next(compressor: compressor)! + let framedMessage = try framer.next(compressor: compressor)! - XCTAssertThrowsRPCError( + XCTAssertThrowsError( + ofType: RPCError.self, try processor.process(buffer: framedMessage) { _ in XCTFail("No message should be produced.") } @@ -173,37 +155,11 @@ final class GRPCMessageDeframerTests: XCTestCase { } } - func testReadSingleMessageWithCompressionSplitAcrossMultipleBuffers() throws { - let decompressor = Zlib.Decompressor(method: .deflate) - let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) - let processor = NIOSingleStepByteToMessageProcessor(deframer) - let compressor = Zlib.Compressor(method: .deflate) - var framer = GRPCMessageFramer() - - framer.append(Array(repeating: 42, count: 100)) - var framedMessage = try framer.next(compressor: compressor)! - var firstBuffer = ByteBuffer(buffer: framedMessage.readSlice(length: 3)!) - var secondBuffer = ByteBuffer(buffer: framedMessage.readSlice(length: 3)!) - var thirdBuffer = ByteBuffer(buffer: framedMessage) - framedMessage.moveReaderIndex(to: 0) - - // Make sure we don't produce a message, since we've got incomplete data. - try processor.process(buffer: firstBuffer) { message in - XCTFail("No message should be produced.") - } - - // Again, make sure we don't produce a message, since we don't have enough - // message bytes to read. - try processor.process(buffer: secondBuffer) { message in - XCTFail("No message should be produced.") - } + func testReadMessageOverSizeLimitWithDeflateCompression() throws { + try self.testReadMessageOverSizeLimitWithCompression(method: .deflate) + } - // Now we should be reading the full message. - var messages = [[UInt8]]() - try processor.process(buffer: thirdBuffer) { message in - messages.append(message) - } - // Assert the retrieved message matches the uncompressed original message. - XCTAssertEqual(messages, [Array(repeating: 42, count: 100)]) + func testReadMessageOverSizeLimitWithGZIPCompression() throws { + try self.testReadMessageOverSizeLimitWithCompression(method: .gzip) } } diff --git a/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift b/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift index 878e4b62f..4cef512d0 100644 --- a/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift +++ b/Tests/GRPCHTTP2CoreTests/Test Utilities/XCTest+Utilities.swift @@ -28,16 +28,3 @@ func XCTAssertThrowsError( errorHandler(error) } } - -func XCTAssertThrowsRPCError( - _ expression: @autoclosure () throws -> T, - _ errorHandler: (RPCError) -> Void -) { - XCTAssertThrowsError(try expression()) { error in - guard let error = error as? RPCError else { - return XCTFail("Error had unexpected type '\(type(of: error))'") - } - - errorHandler(error) - } -} From e01b429de8cc793b9369188461de39f277680c13 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 26 Jan 2024 12:01:26 +0000 Subject: [PATCH 4/5] PR changes --- .../GRPCHTTP2Core/GRPCMessageDeframer.swift | 34 +++++++++------ .../GRPCMessageDeframerTests.swift | 43 ++++++++++++++++--- 2 files changed, 58 insertions(+), 19 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift index 5572d3e29..af8282bb4 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageDeframer.swift @@ -54,14 +54,31 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { // Store the current reader index in case we don't yet have enough // bytes in the buffer to decode a full frame, and need to reset it. + // The force-unwraps for the compression flag and message length are safe, + // because we've checked just above that we've got at least enough bytes to + // read all of the metadata. let originalReaderIndex = buffer.readerIndex let isMessageCompressed = buffer.readInteger(as: UInt8.self)! == 1 + let messageLength = buffer.readInteger(as: UInt32.self)! - guard var message = buffer.readLengthPrefixedSlice(as: UInt32.self) else { - // We have moved the reader index, but we don't have enough bytes to - // read the full message payload, so reset it to its previous, original - // position for now, and return. We'll try decoding again, once more - // bytes become available in our buffer. + if messageLength > self.maximumPayloadSize { + throw RPCError( + code: .resourceExhausted, + message: """ + Message has exceeded the configured maximum payload size \ + (max: \(self.maximumPayloadSize), actual: \(messageLength)) + """ + ) + } + + guard var message = buffer.readSlice(length: Int(messageLength)) else { + // `ByteBuffer/readSlice(length:)` returns nil when there are not enough + // bytes to read the requested length. This can happen if we don't yet have + // enough bytes buffered to read the full message payload. + // By reading the metadata though, we have already moved the reader index, + // so we must reset it to its previous, original position for now, + // and return. We'll try decoding again, once more bytes become available + // in our buffer. buffer.moveReaderIndex(to: originalReaderIndex) return nil } @@ -76,13 +93,6 @@ struct GRPCMessageDeframer: NIOSingleStepByteToMessageDecoder { } return try decompressor.decompress(&message, limit: self.maximumPayloadSize) } else { - if message.readableBytes > self.maximumPayloadSize { - throw RPCError( - code: .resourceExhausted, - message: - "Message has exceeded the configured maximum payload size (max: \(self.maximumPayloadSize), actual: \(message.readableBytes))" - ) - } return Array(buffer: message) } } diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift index 460b883dd..acd20832b 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift @@ -77,8 +77,8 @@ final class GRPCMessageDeframerTests: XCTestCase { var buffer = ByteBuffer() buffer.writeInteger(UInt8(1)) - buffer.writeInteger(UInt32(101)) - buffer.writeRepeatingByte(42, count: 101) + buffer.writeInteger(UInt32(10)) + buffer.writeRepeatingByte(42, count: 10) XCTAssertThrowsError( ofType: RPCError.self, @@ -130,7 +130,36 @@ final class GRPCMessageDeframerTests: XCTestCase { try self.testReadMultipleMessagesWithCompression(method: .gzip) } - private func testReadMessageOverSizeLimitWithCompression(method: Zlib.Method) throws { + func testReadCompressedMessageOverSizeLimitBeforeDecompressing() throws { + let deframer = GRPCMessageDeframer(maximumPayloadSize: 1) + let processor = NIOSingleStepByteToMessageProcessor(deframer) + let compressor = Zlib.Compressor(method: .gzip) + var framer = GRPCMessageFramer() + defer { + compressor.end() + } + + framer.append(Array(repeating: 42, count: 100)) + let framedMessage = try framer.next(compressor: compressor)! + + XCTAssertThrowsError( + ofType: RPCError.self, + try processor.process(buffer: framedMessage) { _ in + XCTFail("No message should be produced.") + } + ) { error in + XCTAssertEqual(error.code, .resourceExhausted) + XCTAssertEqual( + error.message, + """ + Message has exceeded the configured maximum payload size \ + (max: 1, actual: \(framedMessage.readableBytes - GRPCMessageDeframer.metadataLength)) + """ + ) + } + } + + private func testReadDecompressedMessageOverSizeLimit(method: Zlib.Method) throws { let decompressor = Zlib.Decompressor(method: method) let deframer = GRPCMessageDeframer(maximumPayloadSize: 100, decompressor: decompressor) let processor = NIOSingleStepByteToMessageProcessor(deframer) @@ -155,11 +184,11 @@ final class GRPCMessageDeframerTests: XCTestCase { } } - func testReadMessageOverSizeLimitWithDeflateCompression() throws { - try self.testReadMessageOverSizeLimitWithCompression(method: .deflate) + func testReadDecompressedMessageOverSizeLimitWithDeflateCompression() throws { + try self.testReadDecompressedMessageOverSizeLimit(method: .deflate) } - func testReadMessageOverSizeLimitWithGZIPCompression() throws { - try self.testReadMessageOverSizeLimitWithCompression(method: .gzip) + func testReadDecompressedMessageOverSizeLimitWithGZIPCompression() throws { + try self.testReadDecompressedMessageOverSizeLimit(method: .gzip) } } From 4161301d0ff4a641c799dab944b0be1dedf30bac Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 26 Jan 2024 14:46:12 +0000 Subject: [PATCH 5/5] Add one more test --- .../GRPCMessageDeframerTests.swift | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift index acd20832b..98d5fc046 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageDeframerTests.swift @@ -71,6 +71,31 @@ final class GRPCMessageDeframerTests: XCTestCase { } } + func testReadMessageOverSizeLimitButWithoutActualMessageBytes() throws { + let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) + let processor = NIOSingleStepByteToMessageProcessor(deframer) + + var buffer = ByteBuffer() + buffer.writeInteger(UInt8(0)) + // Set the message length field to be over the maximum payload size, but + // don't write the actual message bytes. This is to ensure that the payload + // size limit is enforced _before_ the payload is actually read. + buffer.writeInteger(UInt32(101)) + + XCTAssertThrowsError( + ofType: RPCError.self, + try processor.process(buffer: buffer) { _ in + XCTFail("No message should be produced.") + } + ) { error in + XCTAssertEqual(error.code, .resourceExhausted) + XCTAssertEqual( + error.message, + "Message has exceeded the configured maximum payload size (max: 100, actual: 101)" + ) + } + } + func testCompressedMessageWithoutConfiguringDecompressor() throws { let deframer = GRPCMessageDeframer(maximumPayloadSize: 100) let processor = NIOSingleStepByteToMessageProcessor(deframer)