From ab4f791bdf5039ef73389a8fdfce1793f5205b25 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 24 Jan 2024 10:41:48 +0000 Subject: [PATCH 1/5] Add Compressor to GPRCMessageFramer --- Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 49 +++++++++++++------ .../GRPCMessageFramerTests.swift | 39 ++++++++++++++- 2 files changed, 71 insertions(+), 17 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index 0e7717d3d..828dfbc13 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -32,25 +32,36 @@ struct GRPCMessageFramer { /// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer. static let maximumWriteBufferLength = 65_536 - private var pendingMessages: OneOrManyQueue - - private struct PendingMessage { - let bytes: [UInt8] - let compress: Bool - } + private var pendingMessages: OneOrManyQueue<[UInt8]> private var writeBuffer: ByteBuffer + + private var compressor: Zlib.Compressor? - init() { + /// Create a new ``GRPCMessageFramer``. + /// - Parameter compressor: An optional compressor to use when compressing messages. + /// - Important: The `compressor` must have been `initialized()`. + init(compressor: Zlib.Compressor? = nil) { self.pendingMessages = OneOrManyQueue() self.writeBuffer = ByteBuffer() + self.compressor = compressor + } + + /// Set a compressor on this ``GRPCMessageFramer``. + /// - Parameter compressor: An optional compressor to use when compressing messages. + /// - Important: The `compressor` must have been `initialized()`. + mutating func setCompressor(_ compressor: Zlib.Compressor?) { + self.compressor = compressor + } + + mutating func initialize() { + self.compressor?.initialize() } /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`. /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``. - /// If `compress` is true, then the given bytes will be compressed using the configured compression algorithm. - mutating func append(_ bytes: [UInt8], compress: Bool) { - self.pendingMessages.append(PendingMessage(bytes: bytes, compress: compress)) + mutating func append(_ bytes: [UInt8]) { + self.pendingMessages.append(bytes) } /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data. @@ -83,16 +94,24 @@ struct GRPCMessageFramer { return self.writeBuffer } - private mutating func encode(_ message: PendingMessage) throws { - if message.compress { + private mutating func encode(_ message: [UInt8]) throws { + if self.compressor != nil { self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag - // TODO: compress message and write the compressed message length + bytes + + // Write zeroes as length - we'll write the actual compressed size after compression. + let lengthIndex = self.writeBuffer.writerIndex + self.writeBuffer.writeInteger(UInt32(0)) + + // This force-unwrap is safe, because we know `self.compressor` is not `nil`. + let writtenBytes = try self.compressor!.compress(message, into: &self.writeBuffer) + + self.writeBuffer.setInteger(UInt32(writtenBytes), at: lengthIndex) } else { self.writeBuffer.writeMultipleIntegers( UInt8(0), // Clear compression flag - UInt32(message.bytes.count) // Set message length + UInt32(message.count) // Set message length ) - self.writeBuffer.writeBytes(message.bytes) + self.writeBuffer.writeBytes(message) } } } diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift index 95029536f..161c4d749 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift @@ -20,9 +20,10 @@ import XCTest @testable import GRPCHTTP2Core final class GRPCMessageFramerTests: XCTestCase { + func testSingleWrite() throws { var framer = GRPCMessageFramer() - framer.append(Array(repeating: 42, count: 128), compress: false) + framer.append(Array(repeating: 42, count: 128)) var buffer = try XCTUnwrap(framer.next()) let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) @@ -34,13 +35,45 @@ final class GRPCMessageFramerTests: XCTestCase { // No more bufers. XCTAssertNil(try framer.next()) } + + private func testSingleWrite(compressionMethod: Zlib.Method) throws { + var framer = GRPCMessageFramer(compressor: Zlib.Compressor(method: compressionMethod)) + framer.initialize() + + let message = [UInt8](repeating: 42, count: 128) + framer.append(message) + + var buffer = ByteBuffer() + var testCompressor = Zlib.Compressor(method: compressionMethod) + testCompressor.initialize() + let compressedSize = try testCompressor.compress(message, into: &buffer) + let compressedMessage = buffer.readSlice(length: compressedSize) + + buffer = try XCTUnwrap(framer.next()) + let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) + XCTAssertTrue(compressed) + XCTAssertEqual(length, UInt32(compressedSize)) + XCTAssertEqual(buffer.readSlice(length: Int(length)), compressedMessage) + XCTAssertEqual(buffer.readableBytes, 0) + + // No more bufers. + XCTAssertNil(try framer.next()) + } + + func testSingleWriteDeflateCompressed() throws { + try self.testSingleWrite(compressionMethod: .deflate) + } + + func testSingleWriteGZIPCompressed() throws { + try self.testSingleWrite(compressionMethod: .gzip) + } func testMultipleWrites() throws { var framer = GRPCMessageFramer() let messages = 100 for _ in 0 ..< messages { - framer.append(Array(repeating: 42, count: 128), compress: false) + framer.append(Array(repeating: 42, count: 128)) } var buffer = try XCTUnwrap(framer.next()) @@ -56,6 +89,8 @@ final class GRPCMessageFramerTests: XCTestCase { // No more bufers. XCTAssertNil(try framer.next()) } + + } extension ByteBuffer { From ae957ab3011d849e072d8cad72a149cb2742bee3 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 24 Jan 2024 13:34:37 +0000 Subject: [PATCH 2/5] Formatting --- Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 12 ++++++------ .../GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift | 13 ++++++------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index 828dfbc13..dce2e1161 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -35,7 +35,7 @@ struct GRPCMessageFramer { private var pendingMessages: OneOrManyQueue<[UInt8]> private var writeBuffer: ByteBuffer - + private var compressor: Zlib.Compressor? /// Create a new ``GRPCMessageFramer``. @@ -46,14 +46,14 @@ struct GRPCMessageFramer { self.writeBuffer = ByteBuffer() self.compressor = compressor } - + /// Set a compressor on this ``GRPCMessageFramer``. /// - Parameter compressor: An optional compressor to use when compressing messages. /// - Important: The `compressor` must have been `initialized()`. mutating func setCompressor(_ compressor: Zlib.Compressor?) { self.compressor = compressor } - + mutating func initialize() { self.compressor?.initialize() } @@ -97,14 +97,14 @@ struct GRPCMessageFramer { private mutating func encode(_ message: [UInt8]) throws { if self.compressor != nil { self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag - + // Write zeroes as length - we'll write the actual compressed size after compression. let lengthIndex = self.writeBuffer.writerIndex self.writeBuffer.writeInteger(UInt32(0)) - + // This force-unwrap is safe, because we know `self.compressor` is not `nil`. let writtenBytes = try self.compressor!.compress(message, into: &self.writeBuffer) - + self.writeBuffer.setInteger(UInt32(writtenBytes), at: lengthIndex) } else { self.writeBuffer.writeMultipleIntegers( diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift index 161c4d749..a624e21d7 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift @@ -20,7 +20,7 @@ import XCTest @testable import GRPCHTTP2Core final class GRPCMessageFramerTests: XCTestCase { - + func testSingleWrite() throws { var framer = GRPCMessageFramer() framer.append(Array(repeating: 42, count: 128)) @@ -35,14 +35,14 @@ final class GRPCMessageFramerTests: XCTestCase { // No more bufers. XCTAssertNil(try framer.next()) } - + private func testSingleWrite(compressionMethod: Zlib.Method) throws { var framer = GRPCMessageFramer(compressor: Zlib.Compressor(method: compressionMethod)) framer.initialize() let message = [UInt8](repeating: 42, count: 128) framer.append(message) - + var buffer = ByteBuffer() var testCompressor = Zlib.Compressor(method: compressionMethod) testCompressor.initialize() @@ -59,11 +59,11 @@ final class GRPCMessageFramerTests: XCTestCase { // No more bufers. XCTAssertNil(try framer.next()) } - + func testSingleWriteDeflateCompressed() throws { try self.testSingleWrite(compressionMethod: .deflate) } - + func testSingleWriteGZIPCompressed() throws { try self.testSingleWrite(compressionMethod: .gzip) } @@ -89,8 +89,7 @@ final class GRPCMessageFramerTests: XCTestCase { // No more bufers. XCTAssertNil(try framer.next()) } - - + } extension ByteBuffer { From dd8106d748124c5f98726579a493c974898d1cd5 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 24 Jan 2024 14:20:56 +0000 Subject: [PATCH 3/5] Rebase --- Sources/GRPCHTTP2Core/Compression/Zlib.swift | 6 ++++++ Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 4 ---- .../GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift | 13 +++++++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Compression/Zlib.swift b/Sources/GRPCHTTP2Core/Compression/Zlib.swift index b94bd8a1d..7906aa743 100644 --- a/Sources/GRPCHTTP2Core/Compression/Zlib.swift +++ b/Sources/GRPCHTTP2Core/Compression/Zlib.swift @@ -38,6 +38,9 @@ extension Zlib { /// Creates a new compressor for the given compression format. /// /// This compressor is only suitable for compressing whole messages at a time. + /// + /// - Important: ``Compressor/end()`` must be called when the compressor is not needed + /// anymore, to deallocate any resources allocated by `Zlib`. struct Compressor { // TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version. @@ -86,6 +89,9 @@ extension Zlib { /// Creates a new decompressor for the given compression format. /// /// This decompressor is only suitable for compressing whole messages at a time. + /// + /// - Important: ``Decompressor/end()`` must be called when the compressor is not needed + /// anymore, to deallocate any resources allocated by `Zlib`. struct Decompressor { // TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version. diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index dce2e1161..e92aa93ea 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -54,10 +54,6 @@ struct GRPCMessageFramer { self.compressor = compressor } - mutating func initialize() { - self.compressor?.initialize() - } - /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`. /// The resulting data will be returned when calling ``GRPCMessageFramer/next()``. mutating func append(_ bytes: [UInt8]) { diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift index a624e21d7..8f685ffa2 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift @@ -37,17 +37,22 @@ final class GRPCMessageFramerTests: XCTestCase { } private func testSingleWrite(compressionMethod: Zlib.Method) throws { - var framer = GRPCMessageFramer(compressor: Zlib.Compressor(method: compressionMethod)) - framer.initialize() + let compressor = Zlib.Compressor(method: compressionMethod) + defer { + compressor.end() + } + var framer = GRPCMessageFramer(compressor: compressor) let message = [UInt8](repeating: 42, count: 128) framer.append(message) var buffer = ByteBuffer() - var testCompressor = Zlib.Compressor(method: compressionMethod) - testCompressor.initialize() + let testCompressor = Zlib.Compressor(method: compressionMethod) let compressedSize = try testCompressor.compress(message, into: &buffer) let compressedMessage = buffer.readSlice(length: compressedSize) + defer { + testCompressor.end() + } buffer = try XCTUnwrap(framer.next()) let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) From 04bae73e1d9d9f0fb7b2fd2adf70769f39fe93d2 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 24 Jan 2024 15:06:40 +0000 Subject: [PATCH 4/5] Move compressor to next() --- Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 30 +++++++------------ .../GRPCMessageFramerTests.swift | 5 ++-- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index e92aa93ea..017fd2a7f 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -36,22 +36,10 @@ struct GRPCMessageFramer { private var writeBuffer: ByteBuffer - private var compressor: Zlib.Compressor? - /// Create a new ``GRPCMessageFramer``. - /// - Parameter compressor: An optional compressor to use when compressing messages. - /// - Important: The `compressor` must have been `initialized()`. - init(compressor: Zlib.Compressor? = nil) { + init() { self.pendingMessages = OneOrManyQueue() self.writeBuffer = ByteBuffer() - self.compressor = compressor - } - - /// Set a compressor on this ``GRPCMessageFramer``. - /// - Parameter compressor: An optional compressor to use when compressing messages. - /// - Important: The `compressor` must have been `initialized()`. - mutating func setCompressor(_ compressor: Zlib.Compressor?) { - self.compressor = compressor } /// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`. @@ -62,8 +50,10 @@ struct GRPCMessageFramer { /// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data. /// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`. + /// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise + /// they'll be framed as-is. /// - Throws: If an error is encountered, such as a compression failure, an error will be thrown. - mutating func next() throws -> ByteBuffer? { + mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? { if self.pendingMessages.isEmpty { // Nothing pending: exit early. return nil @@ -79,27 +69,27 @@ struct GRPCMessageFramer { var requiredCapacity = 0 for message in self.pendingMessages { - requiredCapacity += message.bytes.count + Self.metadataLength + requiredCapacity += message.count + Self.metadataLength } self.writeBuffer.clear(minimumCapacity: requiredCapacity) while let message = self.pendingMessages.pop() { - try self.encode(message) + try self.encode(message, compressor: compressor) } return self.writeBuffer } - private mutating func encode(_ message: [UInt8]) throws { - if self.compressor != nil { + private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws { + if compressor != nil { self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag // Write zeroes as length - we'll write the actual compressed size after compression. let lengthIndex = self.writeBuffer.writerIndex self.writeBuffer.writeInteger(UInt32(0)) - // This force-unwrap is safe, because we know `self.compressor` is not `nil`. - let writtenBytes = try self.compressor!.compress(message, into: &self.writeBuffer) + // This force-unwrap is safe, because we know `compressor` is not `nil`. + let writtenBytes = try compressor!.compress(message, into: &self.writeBuffer) self.writeBuffer.setInteger(UInt32(writtenBytes), at: lengthIndex) } else { diff --git a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift index 8f685ffa2..886e60319 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift @@ -20,7 +20,6 @@ import XCTest @testable import GRPCHTTP2Core final class GRPCMessageFramerTests: XCTestCase { - func testSingleWrite() throws { var framer = GRPCMessageFramer() framer.append(Array(repeating: 42, count: 128)) @@ -41,7 +40,7 @@ final class GRPCMessageFramerTests: XCTestCase { defer { compressor.end() } - var framer = GRPCMessageFramer(compressor: compressor) + var framer = GRPCMessageFramer() let message = [UInt8](repeating: 42, count: 128) framer.append(message) @@ -54,7 +53,7 @@ final class GRPCMessageFramerTests: XCTestCase { testCompressor.end() } - buffer = try XCTUnwrap(framer.next()) + buffer = try XCTUnwrap(framer.next(compressor: compressor)) let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader()) XCTAssertTrue(compressed) XCTAssertEqual(length, UInt32(compressedSize)) From e15f11f0b33d0385fbfbcc73de9298f61b709675 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 24 Jan 2024 15:23:14 +0000 Subject: [PATCH 5/5] Small nit --- Sources/GRPCHTTP2Core/GRPCMessageFramer.swift | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift index 017fd2a7f..590b5efeb 100644 --- a/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift +++ b/Sources/GRPCHTTP2Core/GRPCMessageFramer.swift @@ -81,16 +81,15 @@ struct GRPCMessageFramer { } private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws { - if compressor != nil { + if let compressor { self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag // Write zeroes as length - we'll write the actual compressed size after compression. let lengthIndex = self.writeBuffer.writerIndex self.writeBuffer.writeInteger(UInt32(0)) - // This force-unwrap is safe, because we know `compressor` is not `nil`. - let writtenBytes = try compressor!.compress(message, into: &self.writeBuffer) - + // Compress and overwrite the payload length field with the right length. + let writtenBytes = try compressor.compress(message, into: &self.writeBuffer) self.writeBuffer.setInteger(UInt32(writtenBytes), at: lengthIndex) } else { self.writeBuffer.writeMultipleIntegers(