Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ extension ConnectInterceptor: Interceptor {
tracingInfo: response.tracingInfo
)
}
}
},
responseMetricsFunction: { $0 }
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ extension GRPCWebInterceptor: Interceptor {
tracingInfo: response.tracingInfo
)
}
}
},
responseMetricsFunction: { $0 }
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ struct InterceptorChain {
interceptors.reversed().map(\.responseFunction),
initial: response
)
},
responseMetricsFunction: { metrics in
return executeInterceptors(
interceptors.reversed().map(\.responseMetricsFunction),
initial: metrics
)
}
)
}
Expand Down
63 changes: 35 additions & 28 deletions Libraries/Connect/Implementation/ProtocolClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,49 +69,56 @@ extension ProtocolClient: ProtocolClientInterface {
headers: headers,
message: data
))
return self.httpClient.unary(request: request) { response in
let response = chain.responseFunction(response)
let responseMessage: ResponseMessage<Output>
if response.code != .ok {
let error = (response.error as? ConnectError)
return self.httpClient.unary(
request: request,
onMetrics: { metrics in
// Response is unused, but metrics are passed to interceptors
_ = chain.responseMetricsFunction(metrics)
},
onResponse: { response in
let response = chain.responseFunction(response)
let responseMessage: ResponseMessage<Output>
if response.code != .ok {
let error = (response.error as? ConnectError)
?? ConnectError.from(
code: response.code, headers: response.headers, source: response.message
)
responseMessage = ResponseMessage(
code: response.code,
headers: response.headers,
result: .failure(error),
trailers: response.trailers
)
} else if let message = response.message {
do {
responseMessage = ResponseMessage(
code: response.code,
headers: response.headers,
result: .success(try codec.deserialize(source: message)),
result: .failure(error),
trailers: response.trailers
)
} catch let error {
} else if let message = response.message {
do {
responseMessage = ResponseMessage(
code: response.code,
headers: response.headers,
result: .success(try codec.deserialize(source: message)),
trailers: response.trailers
)
} catch let error {
responseMessage = ResponseMessage(
code: response.code,
headers: response.headers,
result: .failure(ConnectError(
code: response.code, message: nil, exception: error,
details: [], metadata: response.headers
)),
trailers: response.trailers
)
}
} else {
responseMessage = ResponseMessage(
code: response.code,
headers: response.headers,
result: .failure(ConnectError(
code: response.code, message: nil, exception: error,
details: [], metadata: response.headers
)),
result: .success(.init()),
trailers: response.trailers
)
}
} else {
responseMessage = ResponseMessage(
code: response.code,
headers: response.headers,
result: .success(.init()),
trailers: response.trailers
)
completion(responseMessage)
}
completion(responseMessage)
}
)
}

public func bidirectionalStream<
Expand Down
24 changes: 20 additions & 4 deletions Libraries/Connect/Implementation/URLSessionHTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import os.log
open class URLSessionHTTPClient: NSObject, HTTPClientInterface {
/// Lock used for safely accessing stream storage.
private let lock = Lock()
/// Closures stored for notifying when metrics are available.
private var metricsClosures = [Int: (HTTPMetrics) -> Void]()
/// Force unwrapped to allow using `self` as the delegate.
private var session: URLSession!
/// List of active streams.
Expand All @@ -39,12 +41,14 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface {

@discardableResult
open func unary(
request: HTTPRequest, completion: @Sendable @escaping (HTTPResponse) -> Void
request: HTTPRequest,
onMetrics: @Sendable @escaping (HTTPMetrics) -> Void,
onResponse: @Sendable @escaping (HTTPResponse) -> Void
) -> Cancelable {
let urlRequest = URLRequest(httpRequest: request)
let task = self.session.dataTask(with: urlRequest) { data, urlResponse, error in
if let httpURLResponse = urlResponse as? HTTPURLResponse {
completion(HTTPResponse(
onResponse(HTTPResponse(
code: Code.fromURLSessionCode(httpURLResponse.statusCode),
headers: httpURLResponse.formattedLowercasedHeaders(),
message: data,
Expand All @@ -54,7 +58,7 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface {
))
} else if let error = error {
let code = Code.fromURLSessionCode((error as NSError).code)
completion(HTTPResponse(
onResponse(HTTPResponse(
code: code,
headers: [:],
message: data,
Expand All @@ -67,7 +71,7 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface {
tracingInfo: nil
))
} else {
completion(HTTPResponse(
onResponse(HTTPResponse(
code: .unknown,
headers: [:],
message: data,
Expand All @@ -81,6 +85,7 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface {
))
}
}
self.lock.perform { self.metricsClosures[task.taskIdentifier] = onMetrics }
task.resume()
return Cancelable(cancel: task.cancel)
}
Expand Down Expand Up @@ -142,6 +147,17 @@ extension URLSessionHTTPClient: URLSessionTaskDelegate {
let stream = self.lock.perform { self.streams.removeValue(forKey: task.taskIdentifier) }
stream?.handleCompletion(error: error)
}

open func urlSession(
_ session: URLSession, task: URLSessionTask,
didFinishCollecting metrics: URLSessionTaskMetrics
) {
if let metricsClosure = self.lock.perform(
action: { self.metricsClosures.removeValue(forKey: task.taskIdentifier) }
) {
metricsClosure(HTTPMetrics(taskMetrics: metrics))
}
}
}

extension HTTPURLResponse {
Expand Down
11 changes: 8 additions & 3 deletions Libraries/Connect/Interfaces/HTTPClientInterface.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ public protocol HTTPClientInterface {
/// Perform a unary HTTP request.
///
/// - parameter request: The outbound request headers and data.
/// - parameter completion: Closure that should be called upon completion of the request.
/// - parameter onMetrics: Closure that should be called when metrics are finalized. This may be
/// called before or after `onResponse`.
/// - parameter onResponse: Closure that should be called when a response is received.
///
/// - returns: A type which can be used to cancel the outbound request.
@discardableResult
func unary(request: HTTPRequest, completion: @Sendable @escaping (HTTPResponse) -> Void)
-> Cancelable
func unary(
request: HTTPRequest,
onMetrics: @Sendable @escaping (HTTPMetrics) -> Void,
onResponse: @Sendable @escaping (HTTPResponse) -> Void
) -> Cancelable

/// Initialize a new HTTP stream.
///
Expand Down
24 changes: 24 additions & 0 deletions Libraries/Connect/Interfaces/HTTPMetrics.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation

/// Contains metrics collected during the span of an HTTP request.
public struct HTTPMetrics: Sendable {
public let taskMetrics: URLSessionTaskMetrics?

public init(taskMetrics: URLSessionTaskMetrics?) {
self.taskMetrics = taskMetrics
}
}
5 changes: 4 additions & 1 deletion Libraries/Connect/Interfaces/Interceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ public protocol Interceptor {
public struct UnaryFunction {
public let requestFunction: (HTTPRequest) -> HTTPRequest
public let responseFunction: (HTTPResponse) -> HTTPResponse
public let responseMetricsFunction: (HTTPMetrics) -> HTTPMetrics

public init(
requestFunction: @escaping (HTTPRequest) -> HTTPRequest,
responseFunction: @escaping (HTTPResponse) -> HTTPResponse
responseFunction: @escaping (HTTPResponse) -> HTTPResponse,
responseMetricsFunction: @escaping (HTTPMetrics) -> HTTPMetrics = { $0 }
) {
self.requestFunction = requestFunction
self.responseFunction = responseFunction
self.responseMetricsFunction = responseMetricsFunction
}
}

Expand Down
18 changes: 16 additions & 2 deletions Tests/ConnectLibraryTests/ConnectTests/InterceptorChainTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ private struct MockUnaryInterceptor: Interceptor {
let headerID: String
let requestExpectation: XCTestExpectation
let responseExpectation: XCTestExpectation
let responseMetricsExpectation: XCTestExpectation

func unaryFunction() -> Connect.UnaryFunction {
return UnaryFunction(
Expand All @@ -45,6 +46,10 @@ private struct MockUnaryInterceptor: Interceptor {
error: response.error,
tracingInfo: .init(httpStatus: 200)
)
},
responseMetricsFunction: { metrics in
self.responseMetricsExpectation.fulfill()
return metrics
}
)
}
Expand Down Expand Up @@ -112,20 +117,24 @@ final class InterceptorChainTests: XCTestCase {
let bRequestExpectation = self.expectation(description: "Filter B called with request")
let aResponseExpectation = self.expectation(description: "Filter A called with response")
let bResponseExpectation = self.expectation(description: "Filter B called with response")
let aMetricsExpectation = self.expectation(description: "Filter A called with metrics")
let bMetricsExpectation = self.expectation(description: "Filter B called with metrics")
let chain = InterceptorChain(
interceptors: [
{ _ in
return MockUnaryInterceptor(
headerID: "filter-a",
requestExpectation: aRequestExpectation,
responseExpectation: aResponseExpectation
responseExpectation: aResponseExpectation,
responseMetricsExpectation: aMetricsExpectation
)
},
{ _ in
return MockUnaryInterceptor(
headerID: "filter-b",
requestExpectation: bRequestExpectation,
responseExpectation: bResponseExpectation
responseExpectation: bResponseExpectation,
responseMetricsExpectation: bMetricsExpectation
)
},
],
Expand All @@ -150,11 +159,16 @@ final class InterceptorChainTests: XCTestCase {
))
XCTAssertEqual(interceptedResponse.headers["filter-chain"], ["filter-b", "filter-a"])

let interceptedMetrics = chain.responseMetricsFunction(HTTPMetrics(taskMetrics: nil))
XCTAssertNil(interceptedMetrics.taskMetrics)

XCTAssertEqual(XCTWaiter().wait(for: [
aRequestExpectation,
bRequestExpectation,
bResponseExpectation,
aResponseExpectation,
bMetricsExpectation,
aMetricsExpectation,
], timeout: 1.0, enforceOrder: true), .completed)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import XCTest

private struct NoopInterceptor: Interceptor {
func unaryFunction() -> UnaryFunction {
return .init(requestFunction: { $0 }, responseFunction: { $0 })
return .init(
requestFunction: { $0 }, responseFunction: { $0 }, responseMetricsFunction: { $0 }
)
}

func streamFunction() -> StreamFunction {
Expand Down