Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions Examples/ElizaSharedSources/AppSources/MenuView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ extension MessagingConnectionType: Identifiable {
}

struct MenuView: View {
private func createClient(withProtocol networkProtocol: NetworkProtocol)
-> Buf_Connect_Demo_Eliza_V1_ElizaServiceClient
{
let protocolClient = ProtocolClient(
httpClient: URLSessionHTTPClient(),
config: ProtocolClientConfig(
host: "https://demo.connect.build",
networkProtocol: networkProtocol,
codec: ProtoCodec() // Protobuf binary, or JSONCodec() for JSON
)
)
return Buf_Connect_Demo_Eliza_V1_ElizaServiceClient(client: protocolClient)
}

var body: some View {
NavigationView {
VStack(spacing: 15) {
Expand All @@ -51,7 +65,7 @@ struct MenuView: View {
destination: LazyNavigationView {
MessagingView(
viewModel: UnaryMessagingViewModel(
protocolOption: ConnectClientOption()
client: self.createClient(withProtocol: .connect)
)
)
}
Expand All @@ -64,7 +78,7 @@ struct MenuView: View {
destination: LazyNavigationView {
MessagingView(
viewModel: BidirectionalStreamingMessagingViewModel(
protocolOption: ConnectClientOption()
client: self.createClient(withProtocol: .connect)
)
)
}
Expand All @@ -77,7 +91,7 @@ struct MenuView: View {
destination: LazyNavigationView {
MessagingView(
viewModel: UnaryMessagingViewModel(
protocolOption: GRPCWebClientOption()
client: self.createClient(withProtocol: .grpcWeb)
)
)
}
Expand All @@ -90,7 +104,7 @@ struct MenuView: View {
destination: LazyNavigationView {
MessagingView(
viewModel: BidirectionalStreamingMessagingViewModel(
protocolOption: GRPCWebClientOption()
client: self.createClient(withProtocol: .grpcWeb)
)
)
}
Expand Down
32 changes: 8 additions & 24 deletions Examples/ElizaSharedSources/AppSources/MessagingViewModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,19 @@ protocol MessagingViewModel: ObservableObject {
/// View model that uses unary requests for messaging.
@MainActor
final class UnaryMessagingViewModel: MessagingViewModel {
private let protocolClient: ProtocolClient
private lazy var elizaClient = Buf_Connect_Demo_Eliza_V1_ElizaServiceClient(
client: self.protocolClient
)
private let client: Buf_Connect_Demo_Eliza_V1_ElizaServiceClientInterface

@Published private(set) var messages = [Message]()

init(protocolOption: ProtocolClientOption) {
self.protocolClient = ProtocolClient(
host: "https://demo.connect.build",
httpClient: URLSessionHTTPClient(),
ProtoClientOption(), // Send protobuf binary on the wire
protocolOption // Specify the protocol to use for the client
)
init(client: Buf_Connect_Demo_Eliza_V1_ElizaServiceClientInterface) {
self.client = client
}

func send(_ sentence: String) async {
let request = SayRequest.with { $0.sentence = sentence }
self.addMessage(Message(message: sentence, author: .user))

let response = await self.elizaClient.say(request: request)
let response = await self.client.say(request: request, headers: [:])
os_log(.debug, "Eliza unary response: %@", String(describing: response))
self.addMessage(Message(
message: response.message?.sentence ?? "No response", author: .eliza
Expand All @@ -78,21 +70,13 @@ final class UnaryMessagingViewModel: MessagingViewModel {
/// View model that uses bidirectional streaming for messaging.
@MainActor
final class BidirectionalStreamingMessagingViewModel: MessagingViewModel {
private let protocolClient: ProtocolClient
private lazy var elizaClient = Buf_Connect_Demo_Eliza_V1_ElizaServiceClient(
client: self.protocolClient
)
private lazy var elizaStream = self.elizaClient.converse()
private let client: Buf_Connect_Demo_Eliza_V1_ElizaServiceClientInterface
private lazy var elizaStream = self.client.converse(headers: [:])

@Published private(set) var messages = [Message]()

init(protocolOption: ProtocolClientOption) {
self.protocolClient = ProtocolClient(
host: "https://demo.connect.build",
httpClient: URLSessionHTTPClient(),
ProtoClientOption(), // Send protobuf binary on the wire
protocolOption // Specify the protocol to use for the client
)
init(client: Buf_Connect_Demo_Eliza_V1_ElizaServiceClientInterface) {
self.client = client
self.observeResponses()
}

Expand Down
30 changes: 4 additions & 26 deletions Libraries/Connect/Implementation/Codecs/Envelope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,44 +49,22 @@ public enum Envelope {
return Int(messageLength)
}

/// Determines whether message data should be compressed.
///
/// - parameter source: The message payload to optionally be compressed.
/// - parameter compressionMinBytes: The minimum size of the input message for compression to be
/// applied.
///
/// - returns: Whether the message should be compressed.
public static func shouldCompress(_ source: Data, compressionMinBytes: Int?) -> Bool {
if source.isEmpty {
return false
}

if let minBytes = compressionMinBytes, source.count >= minBytes {
return true
}

return false
}

/// Packs a message into an "envelope", adding required header bytes and optionally
/// applying compression.
///
/// Compliant with Connect streams: https://connect.build/docs/protocol/#streaming-request
/// And gRPC: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
///
/// - parameter source: The input message data.
/// - parameter compressionPool: A compression pool that can be used for this envelope.
/// - parameter compressionMinBytes: The minimum size of the input message for compression to be
/// applied.
/// - parameter compression: Configuration to use for compressing the message.
///
/// - returns: Serialized/enveloped data for transmission.
public static func packMessage(
_ source: Data, compressionPool: CompressionPool?, compressionMinBytes: Int?
_ source: Data, using compression: ProtocolClientConfig.RequestCompression?
) -> Data {
var buffer = Data()
if self.shouldCompress(source, compressionMinBytes: compressionMinBytes),
let compressionPool = compressionPool,
let compressedSource = try? compressionPool.compress(data: source)
if let compression = compression, compression.shouldCompress(source),
let compressedSource = try? compression.pool.compress(data: source)
{
buffer.append(0b00000001) // 1 byte with the compression bit active
self.write(lengthOf: compressedSource, to: &buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Foundation
import zlib

/// Compression pool that handles gzip compression/decompression.
public struct GzipCompressionPool {
public struct GzipCompressionPool: Sendable {
public init() {}

public enum GzipError: Error {
Expand All @@ -26,7 +26,7 @@ public struct GzipCompressionPool {
}

extension GzipCompressionPool: CompressionPool {
public static func name() -> String {
public func name() -> String {
return "gzip"
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,9 @@

import Foundation

/// Enables the client to speak using the Connect protocol:
/// https://connect.build/docs
///
/// Should not be specified alongside other options like `GRPCWebClientOption`, as only one protocol
/// should be used per `ProtocolClient`.
public struct ConnectClientOption {
public init() {}
}

extension ConnectClientOption: ProtocolClientOption {
public func apply(_ config: ProtocolClientConfig) -> ProtocolClientConfig {
return config.clone(interceptors: [ConnectInterceptor.init] + config.interceptors)
}
}

/// The Connect protocol is implemented as an interceptor in the request/response chain.
private struct ConnectInterceptor {
/// Implementation of the Connect protocol as an interceptor.
/// https://connect.build/docs/protocol
struct ConnectInterceptor {
private let config: ProtocolClientConfig

private static let protocolVersion = "1"
Expand All @@ -50,14 +36,12 @@ extension ConnectInterceptor: Interceptor {

let requestBody = request.message ?? Data()
let finalRequestBody: Data
if Envelope.shouldCompress(
requestBody, compressionMinBytes: self.config.compressionMinBytes
), let compressionPool = self.config.requestCompressionPool() {
if let compression = self.config.requestCompression,
compression.shouldCompress(requestBody)
{
do {
headers[HeaderConstants.contentEncoding] = [
type(of: compressionPool).name(),
]
finalRequestBody = try compressionPool.compress(data: requestBody)
finalRequestBody = try compression.pool.compress(data: requestBody)
headers[HeaderConstants.contentEncoding] = [compression.pool.name()]
} catch {
finalRequestBody = requestBody
}
Expand Down Expand Up @@ -87,7 +71,7 @@ extension ConnectInterceptor: Interceptor {
})

if let encoding = response.headers[HeaderConstants.contentEncoding]?.first,
let compressionPool = self.config.compressionPools[encoding],
let compressionPool = self.config.responseCompressionPool(forName: encoding),
let message = response.message.flatMap({ data in
return try? compressionPool.decompress(data: data)
})
Expand Down Expand Up @@ -121,7 +105,7 @@ extension ConnectInterceptor: Interceptor {
var headers = request.headers
headers[HeaderConstants.connectProtocolVersion] = [Self.protocolVersion]
headers[HeaderConstants.connectStreamingContentEncoding] = self.config
.compressionName.map { [$0] }
.requestCompression.map { [$0.pool.name()] }
headers[HeaderConstants.connectStreamingAcceptEncoding] = self.config
.acceptCompressionPoolNames()
return HTTPRequest(
Expand All @@ -132,11 +116,7 @@ extension ConnectInterceptor: Interceptor {
)
},
requestDataFunction: { data in
return Envelope.packMessage(
data,
compressionPool: self.config.requestCompressionPool(),
compressionMinBytes: self.config.compressionMinBytes
)
return Envelope.packMessage(data, using: self.config.requestCompression)
},
streamResultFunc: { result in
switch result {
Expand All @@ -148,7 +128,7 @@ extension ConnectInterceptor: Interceptor {
do {
let responseCompressionPool = responseHeaders?[
HeaderConstants.connectStreamingContentEncoding
]?.first.flatMap { self.config.compressionPools[$0] }
]?.first.flatMap { self.config.responseCompressionPool(forName: $0) }
let (headerByte, message) = try Envelope.unpackMessage(
data, compressionPool: responseCompressionPool
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,9 @@

import Foundation

/// Enables the client to speak using the gRPC Web protocol:
/// Implementation of the gRPC-Web protocol as an interceptor.
/// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
///
/// Should not be specified alongside other options like `ConnectClientOption`, as only one protocol
/// should be used per `ProtocolClient`.
public struct GRPCWebClientOption {
public init() {}
}

extension GRPCWebClientOption: ProtocolClientOption {
public func apply(_ config: ProtocolClientConfig) -> ProtocolClientConfig {
return config.clone(interceptors: [GRPCWebInterceptor.init] + config.interceptors)
}
}

/// The gRPC Web protocol is implemented as an interceptor in the request/response chain.
private struct GRPCWebInterceptor {
struct GRPCWebInterceptor {
private let config: ProtocolClientConfig

init(config: ProtocolClientConfig) {
Expand All @@ -44,9 +30,7 @@ extension GRPCWebInterceptor: Interceptor {
requestFunction: { request in
// GRPC unary payloads are enveloped.
let envelopedRequestBody = Envelope.packMessage(
request.message ?? Data(),
compressionPool: self.config.requestCompressionPool(),
compressionMinBytes: self.config.compressionMinBytes
request.message ?? Data(), using: self.config.requestCompression
)

return HTTPRequest(
Expand Down Expand Up @@ -79,7 +63,7 @@ extension GRPCWebInterceptor: Interceptor {

let compressionPool = response.headers[HeaderConstants.grpcContentEncoding]?
.first
.flatMap { self.config.compressionPools[$0] }
.flatMap { self.config.responseCompressionPool(forName: $0) }
do {
// gRPC Web returns data in 2 chunks (either/both of which may be compressed):
// 1. OPTIONAL (when not trailers-only): The (headers and length prefixed)
Expand Down Expand Up @@ -137,11 +121,7 @@ extension GRPCWebInterceptor: Interceptor {
)
},
requestDataFunction: { data in
return Envelope.packMessage(
data,
compressionPool: self.config.requestCompressionPool(),
compressionMinBytes: self.config.compressionMinBytes
)
return Envelope.packMessage(data, using: self.config.requestCompression)
},
streamResultFunc: { result in
switch result {
Expand All @@ -162,7 +142,7 @@ extension GRPCWebInterceptor: Interceptor {
do {
let responseCompressionPool = responseHeaders?[
HeaderConstants.grpcContentEncoding
]?.first.flatMap { self.config.compressionPools[$0] }
]?.first.flatMap { self.config.responseCompressionPool(forName: $0) }
let (headerByte, unpackedData) = try Envelope.unpackMessage(
data, compressionPool: responseCompressionPool
)
Expand Down Expand Up @@ -206,8 +186,8 @@ private extension Headers {
var headers = self
headers[HeaderConstants.grpcAcceptEncoding] = config
.acceptCompressionPoolNames()
headers[HeaderConstants.grpcContentEncoding] = config.requestCompressionPool()
.map { [type(of: $0).name()] }
headers[HeaderConstants.grpcContentEncoding] = config.requestCompression
.map { [$0.pool.name()] }
headers[HeaderConstants.grpcTE] = ["trailers"]

// Note that we do not comply with the recommended structure for user-agent:
Expand Down
Loading