Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Sources/GRPCHTTP2Core/Compression/Zlib.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ extension Zlib {
/// Creates a new compressor for the given compression format.
///
/// This compressor is only suitable for compressing whole messages at a time.
///
/// - Important: ``Compressor/end()`` must be called when the compressor is not needed
/// anymore, to deallocate any resources allocated by `Zlib`.
struct Compressor {
// TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version.

Expand Down Expand Up @@ -86,6 +89,9 @@ extension Zlib {
/// Creates a new decompressor for the given compression format.
///
/// This decompressor is only suitable for compressing whole messages at a time.
///
/// - Important: ``Decompressor/end()`` must be called when the compressor is not needed
/// anymore, to deallocate any resources allocated by `Zlib`.
struct Decompressor {
// TODO: Make this ~Copyable when 5.9 is the lowest supported Swift version.

Expand Down
38 changes: 21 additions & 17 deletions Sources/GRPCHTTP2Core/GRPCMessageFramer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,28 @@ struct GRPCMessageFramer {
/// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
static let maximumWriteBufferLength = 65_536

private var pendingMessages: OneOrManyQueue<PendingMessage>

private struct PendingMessage {
let bytes: [UInt8]
let compress: Bool
}
private var pendingMessages: OneOrManyQueue<[UInt8]>

private var writeBuffer: ByteBuffer

/// Create a new ``GRPCMessageFramer``.
init() {
self.pendingMessages = OneOrManyQueue()
self.writeBuffer = ByteBuffer()
}

/// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
/// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
/// If `compress` is true, then the given bytes will be compressed using the configured compression algorithm.
mutating func append(_ bytes: [UInt8], compress: Bool) {
self.pendingMessages.append(PendingMessage(bytes: bytes, compress: compress))
mutating func append(_ bytes: [UInt8]) {
self.pendingMessages.append(bytes)
}

/// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
/// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`.
/// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise
/// they'll be framed as-is.
/// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
mutating func next() throws -> ByteBuffer? {
mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? {
if self.pendingMessages.isEmpty {
// Nothing pending: exit early.
return nil
Expand All @@ -72,27 +69,34 @@ struct GRPCMessageFramer {

var requiredCapacity = 0
for message in self.pendingMessages {
requiredCapacity += message.bytes.count + Self.metadataLength
requiredCapacity += message.count + Self.metadataLength
}
self.writeBuffer.clear(minimumCapacity: requiredCapacity)

while let message = self.pendingMessages.pop() {
try self.encode(message)
try self.encode(message, compressor: compressor)
}

return self.writeBuffer
}

private mutating func encode(_ message: PendingMessage) throws {
if message.compress {
private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws {
if let compressor {
self.writeBuffer.writeInteger(UInt8(1)) // Set compression flag
// TODO: compress message and write the compressed message length + bytes

// Write zeroes as length - we'll write the actual compressed size after compression.
let lengthIndex = self.writeBuffer.writerIndex
self.writeBuffer.writeInteger(UInt32(0))

// Compress and overwrite the payload length field with the right length.
let writtenBytes = try compressor.compress(message, into: &self.writeBuffer)
self.writeBuffer.setInteger(UInt32(writtenBytes), at: lengthIndex)
} else {
self.writeBuffer.writeMultipleIntegers(
UInt8(0), // Clear compression flag
UInt32(message.bytes.count) // Set message length
UInt32(message.count) // Set message length
)
self.writeBuffer.writeBytes(message.bytes)
self.writeBuffer.writeBytes(message)
}
}
}
42 changes: 40 additions & 2 deletions Tests/GRPCHTTP2CoreTests/GRPCMessageFramerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import XCTest
final class GRPCMessageFramerTests: XCTestCase {
func testSingleWrite() throws {
var framer = GRPCMessageFramer()
framer.append(Array(repeating: 42, count: 128), compress: false)
framer.append(Array(repeating: 42, count: 128))

var buffer = try XCTUnwrap(framer.next())
let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
Expand All @@ -35,12 +35,49 @@ final class GRPCMessageFramerTests: XCTestCase {
XCTAssertNil(try framer.next())
}

private func testSingleWrite(compressionMethod: Zlib.Method) throws {
let compressor = Zlib.Compressor(method: compressionMethod)
defer {
compressor.end()
}
var framer = GRPCMessageFramer()

let message = [UInt8](repeating: 42, count: 128)
framer.append(message)

var buffer = ByteBuffer()
let testCompressor = Zlib.Compressor(method: compressionMethod)
let compressedSize = try testCompressor.compress(message, into: &buffer)
let compressedMessage = buffer.readSlice(length: compressedSize)
defer {
testCompressor.end()
}

buffer = try XCTUnwrap(framer.next(compressor: compressor))
let (compressed, length) = try XCTUnwrap(buffer.readMessageHeader())
XCTAssertTrue(compressed)
XCTAssertEqual(length, UInt32(compressedSize))
XCTAssertEqual(buffer.readSlice(length: Int(length)), compressedMessage)
XCTAssertEqual(buffer.readableBytes, 0)

// No more bufers.
XCTAssertNil(try framer.next())
}

func testSingleWriteDeflateCompressed() throws {
try self.testSingleWrite(compressionMethod: .deflate)
}

func testSingleWriteGZIPCompressed() throws {
try self.testSingleWrite(compressionMethod: .gzip)
}

func testMultipleWrites() throws {
var framer = GRPCMessageFramer()

let messages = 100
for _ in 0 ..< messages {
framer.append(Array(repeating: 42, count: 128), compress: false)
framer.append(Array(repeating: 42, count: 128))
}

var buffer = try XCTUnwrap(framer.next())
Expand All @@ -56,6 +93,7 @@ final class GRPCMessageFramerTests: XCTestCase {
// No more bufers.
XCTAssertNil(try framer.next())
}

}

extension ByteBuffer {
Expand Down