From 173a55083e55076e45dc19a9ff4b26b4390eff21 Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Tue, 23 Jul 2024 09:27:00 +0100 Subject: [PATCH 01/10] Add new target for the Health service tests Motivation: This is needed to compile the tests and their dependencies into a test suite. Modifications: - Add a new target "grpcHealthTests" with its necessary dependencies to Package@swift-6.swift. Result: The tests will be able to access their dependencies. --- Package@swift-6.swift | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Package@swift-6.swift b/Package@swift-6.swift index 31c28b55b..b78a35c5b 100644 --- a/Package@swift-6.swift +++ b/Package@swift-6.swift @@ -154,6 +154,7 @@ extension Target.Dependency { static var grpcHTTP2Core: Self { .target(name: "GRPCHTTP2Core") } static var grpcHTTP2TransportNIOPosix: Self { .target(name: "GRPCHTTP2TransportNIOPosix") } static var grpcHTTP2TransportNIOTransportServices: Self { .target(name: "GRPCHTTP2TransportNIOTransportServices") } + static var grpcHealth: Self { .target(name: "GRPCHealth") } } // MARK: - Targets @@ -468,6 +469,19 @@ extension Target { ) } + static var grpcHealthTests: Target { + .testTarget( + name: "GRPCHealthTests", + dependencies: [ + .grpcHealth, + .grpcProtobuf, + .grpcInProcessTransport + ], + path: "Tests/Services/HealthTests", + swiftSettings: [.swiftLanguageVersion(.v5), .enableUpcomingFeature("ExistentialAny")] + ) + } + static var interopTestModels: Target { .target( name: "GRPCInteroperabilityTestModels", @@ -988,6 +1002,7 @@ let package = Package( .grpcInterceptorsTests, .grpcHTTP2CoreTests, .grpcHTTP2TransportTests, + .grpcHealthTests, .grpcProtobufTests, .grpcProtobufCodeGenTests, .inProcessInteroperabilityTests, From 2a78c8a80e878b48a4702afcb3117fd601e1c36e Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Tue, 23 Jul 2024 09:43:01 +0100 Subject: [PATCH 02/10] Adjust Protos/generate.sh for the Health service tests Motivation: The tests for the Health service need access to the client and server stubs for the service. Only the server stub was generated but its visibility is `internal`. Therefore, the tests cannot access it. Modifications: - Adjust the function that generates the stubs to: - make the visibility of the server stub `package`. - generate the client stub too. Result: The tests will now have access to both the client and server stubs. --- Protos/generate.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Protos/generate.sh b/Protos/generate.sh index 4be32ec1d..3e5c05631 100755 --- a/Protos/generate.sh +++ b/Protos/generate.sh @@ -244,8 +244,8 @@ function generate_health_service { local proto="$here/upstream/grpc/health/v1/health.proto" local output="$root/Sources/Services/Health/Generated" - generate_message "$proto" "$(dirname "$proto")" "$output" "Visibility=Internal" - generate_grpc "$proto" "$(dirname "$proto")" "$output" "Visibility=Internal" "Client=false" "Server=true" "_V2=true" + generate_message "$proto" "$(dirname "$proto")" "$output" "Visibility=Package" + generate_grpc "$proto" "$(dirname "$proto")" "$output" "Visibility=Package" "Client=true" "Server=true" "_V2=true" } #------------------------------------------------------------------------------ From 209e48b97f69ad8f8f2d9e9c3770904f76db18e9 Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Tue, 23 Jul 2024 09:57:44 +0100 Subject: [PATCH 03/10] Generate stubs --- .../Health/Generated/health.grpc.swift | 187 ++++++++++++++++-- .../Services/Health/Generated/health.pb.swift | 50 ++--- 2 files changed, 194 insertions(+), 43 deletions(-) diff --git a/Sources/Services/Health/Generated/health.grpc.swift b/Sources/Services/Health/Generated/health.grpc.swift index 769b3a893..05c4387b4 100644 --- a/Sources/Services/Health/Generated/health.grpc.swift +++ b/Sources/Services/Health/Generated/health.grpc.swift @@ -27,40 +27,44 @@ import GRPCCore import GRPCProtobuf -internal enum Grpc_Health_V1_Health { - internal enum Method { - internal enum Check { - internal typealias Input = Grpc_Health_V1_HealthCheckRequest - internal typealias Output = Grpc_Health_V1_HealthCheckResponse - internal static let descriptor = MethodDescriptor( +package enum Grpc_Health_V1_Health { + package enum Method { + package enum Check { + package typealias Input = Grpc_Health_V1_HealthCheckRequest + package typealias Output = Grpc_Health_V1_HealthCheckResponse + package static let descriptor = MethodDescriptor( service: "grpc.health.v1.Health", method: "Check" ) } - internal enum Watch { - internal typealias Input = Grpc_Health_V1_HealthCheckRequest - internal typealias Output = Grpc_Health_V1_HealthCheckResponse - internal static let descriptor = MethodDescriptor( + package enum Watch { + package typealias Input = Grpc_Health_V1_HealthCheckRequest + package typealias Output = Grpc_Health_V1_HealthCheckResponse + package static let descriptor = MethodDescriptor( service: "grpc.health.v1.Health", method: "Watch" ) } - internal static let descriptors: [MethodDescriptor] = [ + package static let descriptors: [MethodDescriptor] = [ Check.descriptor, Watch.descriptor ] } @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) - internal typealias StreamingServiceProtocol = Grpc_Health_V1_HealthStreamingServiceProtocol + package typealias StreamingServiceProtocol = Grpc_Health_V1_HealthStreamingServiceProtocol @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) - internal typealias ServiceProtocol = Grpc_Health_V1_HealthServiceProtocol + package typealias ServiceProtocol = Grpc_Health_V1_HealthServiceProtocol + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + package typealias ClientProtocol = Grpc_Health_V1_HealthClientProtocol + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + package typealias Client = Grpc_Health_V1_HealthClient } /// Health is gRPC's mechanism for checking whether a server is able to handle /// RPCs. Its semantics are documented in /// https://github.com/grpc/grpc/blob/master/doc/health-checking.md. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -internal protocol Grpc_Health_V1_HealthStreamingServiceProtocol: GRPCCore.RegistrableRPCService { +package protocol Grpc_Health_V1_HealthStreamingServiceProtocol: GRPCCore.RegistrableRPCService { /// Check gets the health of the specified service. If the requested service /// is unknown, the call will fail with status NOT_FOUND. If the caller does /// not specify a service name, the server should respond with its overall @@ -94,7 +98,7 @@ internal protocol Grpc_Health_V1_HealthStreamingServiceProtocol: GRPCCore.Regist @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension Grpc_Health_V1_Health.StreamingServiceProtocol { @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) - internal func registerMethods(with router: inout GRPCCore.RPCRouter) { + package func registerMethods(with router: inout GRPCCore.RPCRouter) { router.registerHandler( forMethod: Grpc_Health_V1_Health.Method.Check.descriptor, deserializer: ProtobufDeserializer(), @@ -118,7 +122,7 @@ extension Grpc_Health_V1_Health.StreamingServiceProtocol { /// RPCs. Its semantics are documented in /// https://github.com/grpc/grpc/blob/master/doc/health-checking.md. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -internal protocol Grpc_Health_V1_HealthServiceProtocol: Grpc_Health_V1_Health.StreamingServiceProtocol { +package protocol Grpc_Health_V1_HealthServiceProtocol: Grpc_Health_V1_Health.StreamingServiceProtocol { /// Check gets the health of the specified service. If the requested service /// is unknown, the call will fail with status NOT_FOUND. If the caller does /// not specify a service name, the server should respond with its overall @@ -151,13 +155,160 @@ internal protocol Grpc_Health_V1_HealthServiceProtocol: Grpc_Health_V1_Health.St /// Partial conformance to `Grpc_Health_V1_HealthStreamingServiceProtocol`. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) extension Grpc_Health_V1_Health.ServiceProtocol { - internal func check(request: ServerRequest.Stream) async throws -> ServerResponse.Stream { + package func check(request: ServerRequest.Stream) async throws -> ServerResponse.Stream { let response = try await self.check(request: ServerRequest.Single(stream: request)) return ServerResponse.Stream(single: response) } - internal func watch(request: ServerRequest.Stream) async throws -> ServerResponse.Stream { + package func watch(request: ServerRequest.Stream) async throws -> ServerResponse.Stream { let response = try await self.watch(request: ServerRequest.Single(stream: request)) return response } +} + +/// Health is gRPC's mechanism for checking whether a server is able to handle +/// RPCs. Its semantics are documented in +/// https://github.com/grpc/grpc/blob/master/doc/health-checking.md. +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +package protocol Grpc_Health_V1_HealthClientProtocol: Sendable { + /// Check gets the health of the specified service. If the requested service + /// is unknown, the call will fail with status NOT_FOUND. If the caller does + /// not specify a service name, the server should respond with its overall + /// health status. + /// + /// Clients should set a deadline when calling Check, and can declare the + /// server unhealthy if they do not receive a timely response. + /// + /// Check implementations should be idempotent and side effect free. + func check( + request: ClientRequest.Single, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + options: CallOptions, + _ body: @Sendable @escaping (ClientResponse.Single) async throws -> R + ) async throws -> R where R: Sendable + + /// Performs a watch for the serving status of the requested service. + /// The server will immediately send back a message indicating the current + /// serving status. It will then subsequently send a new message whenever + /// the service's serving status changes. + /// + /// If the requested service is unknown when the call is received, the + /// server will send a message setting the serving status to + /// SERVICE_UNKNOWN but will *not* terminate the call. If at some + /// future point, the serving status of the service becomes known, the + /// server will send a new message with the service's serving status. + /// + /// If the call terminates with status UNIMPLEMENTED, then clients + /// should assume this method is not supported and should not retry the + /// call. If the call terminates with any other status (including OK), + /// clients should retry the call with appropriate exponential backoff. + func watch( + request: ClientRequest.Single, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + options: CallOptions, + _ body: @Sendable @escaping (ClientResponse.Stream) async throws -> R + ) async throws -> R where R: Sendable +} + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +extension Grpc_Health_V1_Health.ClientProtocol { + package func check( + request: ClientRequest.Single, + options: CallOptions = .defaults, + _ body: @Sendable @escaping (ClientResponse.Single) async throws -> R + ) async throws -> R where R: Sendable { + try await self.check( + request: request, + serializer: ProtobufSerializer(), + deserializer: ProtobufDeserializer(), + options: options, + body + ) + } + + package func watch( + request: ClientRequest.Single, + options: CallOptions = .defaults, + _ body: @Sendable @escaping (ClientResponse.Stream) async throws -> R + ) async throws -> R where R: Sendable { + try await self.watch( + request: request, + serializer: ProtobufSerializer(), + deserializer: ProtobufDeserializer(), + options: options, + body + ) + } +} + +/// Health is gRPC's mechanism for checking whether a server is able to handle +/// RPCs. Its semantics are documented in +/// https://github.com/grpc/grpc/blob/master/doc/health-checking.md. +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +package struct Grpc_Health_V1_HealthClient: Grpc_Health_V1_Health.ClientProtocol { + private let client: GRPCCore.GRPCClient + + package init(client: GRPCCore.GRPCClient) { + self.client = client + } + + /// Check gets the health of the specified service. If the requested service + /// is unknown, the call will fail with status NOT_FOUND. If the caller does + /// not specify a service name, the server should respond with its overall + /// health status. + /// + /// Clients should set a deadline when calling Check, and can declare the + /// server unhealthy if they do not receive a timely response. + /// + /// Check implementations should be idempotent and side effect free. + package func check( + request: ClientRequest.Single, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + options: CallOptions = .defaults, + _ body: @Sendable @escaping (ClientResponse.Single) async throws -> R + ) async throws -> R where R: Sendable { + try await self.client.unary( + request: request, + descriptor: Grpc_Health_V1_Health.Method.Check.descriptor, + serializer: serializer, + deserializer: deserializer, + options: options, + handler: body + ) + } + + /// Performs a watch for the serving status of the requested service. + /// The server will immediately send back a message indicating the current + /// serving status. It will then subsequently send a new message whenever + /// the service's serving status changes. + /// + /// If the requested service is unknown when the call is received, the + /// server will send a message setting the serving status to + /// SERVICE_UNKNOWN but will *not* terminate the call. If at some + /// future point, the serving status of the service becomes known, the + /// server will send a new message with the service's serving status. + /// + /// If the call terminates with status UNIMPLEMENTED, then clients + /// should assume this method is not supported and should not retry the + /// call. If the call terminates with any other status (including OK), + /// clients should retry the call with appropriate exponential backoff. + package func watch( + request: ClientRequest.Single, + serializer: some MessageSerializer, + deserializer: some MessageDeserializer, + options: CallOptions = .defaults, + _ body: @Sendable @escaping (ClientResponse.Stream) async throws -> R + ) async throws -> R where R: Sendable { + try await self.client.serverStreaming( + request: request, + descriptor: Grpc_Health_V1_Health.Method.Watch.descriptor, + serializer: serializer, + deserializer: deserializer, + options: options, + handler: body + ) + } } \ No newline at end of file diff --git a/Sources/Services/Health/Generated/health.pb.swift b/Sources/Services/Health/Generated/health.pb.swift index 883a55443..f95e659f8 100644 --- a/Sources/Services/Health/Generated/health.pb.swift +++ b/Sources/Services/Health/Generated/health.pb.swift @@ -37,29 +37,29 @@ fileprivate struct _GeneratedWithProtocGenSwiftVersion: SwiftProtobuf.ProtobufAP typealias Version = _2 } -struct Grpc_Health_V1_HealthCheckRequest: Sendable { +package struct Grpc_Health_V1_HealthCheckRequest: Sendable { // SwiftProtobuf.Message conformance is added in an extension below. See the // `Message` and `Message+*Additions` files in the SwiftProtobuf library for // methods supported on all messages. - var service: String = String() + package var service: String = String() - var unknownFields = SwiftProtobuf.UnknownStorage() + package var unknownFields = SwiftProtobuf.UnknownStorage() - init() {} + package init() {} } -struct Grpc_Health_V1_HealthCheckResponse: Sendable { +package struct Grpc_Health_V1_HealthCheckResponse: Sendable { // SwiftProtobuf.Message conformance is added in an extension below. See the // `Message` and `Message+*Additions` files in the SwiftProtobuf library for // methods supported on all messages. - var status: Grpc_Health_V1_HealthCheckResponse.ServingStatus = .unknown + package var status: Grpc_Health_V1_HealthCheckResponse.ServingStatus = .unknown - var unknownFields = SwiftProtobuf.UnknownStorage() + package var unknownFields = SwiftProtobuf.UnknownStorage() - enum ServingStatus: SwiftProtobuf.Enum, Swift.CaseIterable { - typealias RawValue = Int + package enum ServingStatus: SwiftProtobuf.Enum, Swift.CaseIterable { + package typealias RawValue = Int case unknown // = 0 case serving // = 1 case notServing // = 2 @@ -68,11 +68,11 @@ struct Grpc_Health_V1_HealthCheckResponse: Sendable { case serviceUnknown // = 3 case UNRECOGNIZED(Int) - init() { + package init() { self = .unknown } - init?(rawValue: Int) { + package init?(rawValue: Int) { switch rawValue { case 0: self = .unknown case 1: self = .serving @@ -82,7 +82,7 @@ struct Grpc_Health_V1_HealthCheckResponse: Sendable { } } - var rawValue: Int { + package var rawValue: Int { switch self { case .unknown: return 0 case .serving: return 1 @@ -93,7 +93,7 @@ struct Grpc_Health_V1_HealthCheckResponse: Sendable { } // The compiler won't synthesize support with the UNRECOGNIZED case. - static let allCases: [Grpc_Health_V1_HealthCheckResponse.ServingStatus] = [ + package static let allCases: [Grpc_Health_V1_HealthCheckResponse.ServingStatus] = [ .unknown, .serving, .notServing, @@ -102,7 +102,7 @@ struct Grpc_Health_V1_HealthCheckResponse: Sendable { } - init() {} + package init() {} } // MARK: - Code below here is support for the SwiftProtobuf runtime. @@ -110,12 +110,12 @@ struct Grpc_Health_V1_HealthCheckResponse: Sendable { fileprivate let _protobuf_package = "grpc.health.v1" extension Grpc_Health_V1_HealthCheckRequest: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - static let protoMessageName: String = _protobuf_package + ".HealthCheckRequest" - static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + package static let protoMessageName: String = _protobuf_package + ".HealthCheckRequest" + package static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ 1: .same(proto: "service"), ] - mutating func decodeMessage(decoder: inout D) throws { + package mutating func decodeMessage(decoder: inout D) throws { while let fieldNumber = try decoder.nextFieldNumber() { // The use of inline closures is to circumvent an issue where the compiler // allocates stack space for every case branch when no optimizations are @@ -127,14 +127,14 @@ extension Grpc_Health_V1_HealthCheckRequest: SwiftProtobuf.Message, SwiftProtobu } } - func traverse(visitor: inout V) throws { + package func traverse(visitor: inout V) throws { if !self.service.isEmpty { try visitor.visitSingularStringField(value: self.service, fieldNumber: 1) } try unknownFields.traverse(visitor: &visitor) } - static func ==(lhs: Grpc_Health_V1_HealthCheckRequest, rhs: Grpc_Health_V1_HealthCheckRequest) -> Bool { + package static func ==(lhs: Grpc_Health_V1_HealthCheckRequest, rhs: Grpc_Health_V1_HealthCheckRequest) -> Bool { if lhs.service != rhs.service {return false} if lhs.unknownFields != rhs.unknownFields {return false} return true @@ -142,12 +142,12 @@ extension Grpc_Health_V1_HealthCheckRequest: SwiftProtobuf.Message, SwiftProtobu } extension Grpc_Health_V1_HealthCheckResponse: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - static let protoMessageName: String = _protobuf_package + ".HealthCheckResponse" - static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + package static let protoMessageName: String = _protobuf_package + ".HealthCheckResponse" + package static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ 1: .same(proto: "status"), ] - mutating func decodeMessage(decoder: inout D) throws { + package mutating func decodeMessage(decoder: inout D) throws { while let fieldNumber = try decoder.nextFieldNumber() { // The use of inline closures is to circumvent an issue where the compiler // allocates stack space for every case branch when no optimizations are @@ -159,14 +159,14 @@ extension Grpc_Health_V1_HealthCheckResponse: SwiftProtobuf.Message, SwiftProtob } } - func traverse(visitor: inout V) throws { + package func traverse(visitor: inout V) throws { if self.status != .unknown { try visitor.visitSingularEnumField(value: self.status, fieldNumber: 1) } try unknownFields.traverse(visitor: &visitor) } - static func ==(lhs: Grpc_Health_V1_HealthCheckResponse, rhs: Grpc_Health_V1_HealthCheckResponse) -> Bool { + package static func ==(lhs: Grpc_Health_V1_HealthCheckResponse, rhs: Grpc_Health_V1_HealthCheckResponse) -> Bool { if lhs.status != rhs.status {return false} if lhs.unknownFields != rhs.unknownFields {return false} return true @@ -174,7 +174,7 @@ extension Grpc_Health_V1_HealthCheckResponse: SwiftProtobuf.Message, SwiftProtob } extension Grpc_Health_V1_HealthCheckResponse.ServingStatus: SwiftProtobuf._ProtoNameProviding { - static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + package static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ 0: .same(proto: "UNKNOWN"), 1: .same(proto: "SERVING"), 2: .same(proto: "NOT_SERVING"), From b08fddee3bac604577d8bb00ca5b548f497d56bd Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Tue, 23 Jul 2024 09:58:51 +0100 Subject: [PATCH 04/10] Implement Health service Motivation: Implement the Health service defined in Protos/upstream/grpc/health/v1/health.proto. Modifications: - Implement the check and watch methods for the service. - Implement providers (public APIs) to interact with the service. - Add tests. Result: The Health service will be available for use in grpc-swift. --- Sources/Services/Health/Health.swift | 31 +++ Sources/Services/Health/HealthProvider.swift | 55 ++++ Sources/Services/Health/HealthService.swift | 27 ++ .../Health/InternalHealthService.swift | 131 ++++++++++ Sources/Services/Health/ServingStatus.swift | 35 +++ Tests/Services/HealthTests/HealthTests.swift | 236 ++++++++++++++++++ 6 files changed, 515 insertions(+) create mode 100644 Sources/Services/Health/Health.swift create mode 100644 Sources/Services/Health/HealthProvider.swift create mode 100644 Sources/Services/Health/HealthService.swift create mode 100644 Sources/Services/Health/InternalHealthService.swift create mode 100644 Sources/Services/Health/ServingStatus.swift create mode 100644 Tests/Services/HealthTests/HealthTests.swift diff --git a/Sources/Services/Health/Health.swift b/Sources/Services/Health/Health.swift new file mode 100644 index 000000000..5f79b3d76 --- /dev/null +++ b/Sources/Services/Health/Health.swift @@ -0,0 +1,31 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A coupled Health service and provider. +@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public struct Health { + /// A registerable RPC service to probe whether a server is able to handle RPCs. + public let service: HealthService + + /// Provides handlers to interact with the coupled Health service. + public let provider: HealthProvider + + /// Constructs a new ``Health``, coupling a ``HealthService`` and a ``HealthProvider``. + public init() { + self.service = HealthService() + self.provider = HealthProvider(healthService: self.service) + } +} diff --git a/Sources/Services/Health/HealthProvider.swift b/Sources/Services/Health/HealthProvider.swift new file mode 100644 index 000000000..21955a9e4 --- /dev/null +++ b/Sources/Services/Health/HealthProvider.swift @@ -0,0 +1,55 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +/// Provides handlers to interact with a Health service. +@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public struct HealthProvider: Sendable { + private let healthService: HealthService + + /// Updates the status of a service in the Health service. + public func updateService( + descriptor: ServiceDescriptor, + status: ServingStatus + ) throws { + try self.healthService.service.updateService( + descriptor: descriptor, + status: Grpc_Health_V1_HealthCheckResponse.ServingStatus(from: status) + ) + } + + /// Constructs a new ``HealthProvider``. + /// + /// - Parameters: + /// - healthService: The Health service to handle. + internal init(healthService: HealthService) { + self.healthService = healthService + } +} + +extension Grpc_Health_V1_HealthCheckResponse.ServingStatus { + /// Constructs a new ``Grpc_Health_V1_HealthCheckResponse.ServingStatus`` from ``ServingStatus``. + /// + /// - Parameters: + /// - from: The base status. + package init(from status: ServingStatus) { + switch status.value { + case .serving: self = .serving + case .notServing: self = .notServing + } + } +} diff --git a/Sources/Services/Health/HealthService.swift b/Sources/Services/Health/HealthService.swift new file mode 100644 index 000000000..819d9f4ae --- /dev/null +++ b/Sources/Services/Health/HealthService.swift @@ -0,0 +1,27 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +/// A registerable RPC service to probe whether a server is able to handle RPCs. +@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public final class HealthService: RegistrableRPCService { + internal let service = InternalHealthService() + + public func registerMethods(with router: inout RPCRouter) { + self.service.registerMethods(with: &router) + } +} diff --git a/Sources/Services/Health/InternalHealthService.swift b/Sources/Services/Health/InternalHealthService.swift new file mode 100644 index 000000000..0ea6d5ff8 --- /dev/null +++ b/Sources/Services/Health/InternalHealthService.swift @@ -0,0 +1,131 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore + +@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +internal final class InternalHealthService: Grpc_Health_V1_HealthServiceProtocol { + private let lockedStorage = LockedValueBox([String: StatusAndContinuations]()) + + /// Creates a response with `status` and writes that response to a stream. + private func writeResponseToStream( + writer: RPCWriter, + status: Grpc_Health_V1_HealthCheckResponse.ServingStatus + ) async throws { + var response = Grpc_Health_V1_HealthCheckResponse() + response.status = status + + try await writer.write(response) + } + + func check( + request: ServerRequest.Single + ) async throws -> ServerResponse.Single { + let service = request.message.service + + if let statusAndContinuations = self.lockedStorage.withLockedValue({ $0[service] }) { + var response = Grpc_Health_V1_HealthCheckResponse() + response.status = statusAndContinuations.status + + return ServerResponse.Single(message: response) + } + + return ServerResponse.Single( + error: RPCError(status: Status(code: .notFound, message: "Requested service unknown."))! + ) + } + + func watch( + request: ServerRequest.Single + ) async throws -> ServerResponse.Stream { + let service = request.message.service + + let statusStream = AsyncStream { + continuation in + + self.lockedStorage.withLockedValue { storage in + if storage[service] == nil { + storage[service] = StatusAndContinuations(status: .serviceUnknown) + } + + storage[service]!.addContinuation(continuation) + } + } + + return ServerResponse.Stream( + of: Grpc_Health_V1_HealthCheckResponse.self + ) { writer in + try await self.writeResponseToStream( + writer: writer, + status: self.lockedStorage.withLockedValue { storage in + assert(storage[service] != nil) + + return storage[service]!.status + } + ) + + for await status in statusStream { + try await self.writeResponseToStream(writer: writer, status: status) + } + + return Metadata() + } + } + + /// Updates the status of a service in the storage. + func updateService( + descriptor: ServiceDescriptor, + status: Grpc_Health_V1_HealthCheckResponse.ServingStatus + ) throws { + try self.lockedStorage.withLockedValue { storage in + if storage[descriptor.fullyQualifiedService] == nil { + storage[descriptor.fullyQualifiedService] = StatusAndContinuations(status: status) + } else { + try storage[descriptor.fullyQualifiedService]!.update(status) + } + } + } + + /// The status of a service, and the continuation of its "watch" streams. + private struct StatusAndContinuations { + private(set) var status: Grpc_Health_V1_HealthCheckResponse.ServingStatus + private var continuations = [ + AsyncStream.Continuation + ]() + + /// Updates the status and provides values to the streams. + fileprivate mutating func update( + _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus + ) throws { + self.status = status + + for continuation in self.continuations { + continuation.yield(status) + } + } + + /// Adds a continuation for a stream of statuses. + fileprivate mutating func addContinuation( + _ continuation: AsyncStream.Continuation + ) { + self.continuations.append(continuation) + } + + fileprivate init(status: Grpc_Health_V1_HealthCheckResponse.ServingStatus) { + self.status = status + } + } +} diff --git a/Sources/Services/Health/ServingStatus.swift b/Sources/Services/Health/ServingStatus.swift new file mode 100644 index 000000000..c15b67ef1 --- /dev/null +++ b/Sources/Services/Health/ServingStatus.swift @@ -0,0 +1,35 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// The status of a service. +public struct ServingStatus: Sendable, Hashable { + package enum Value: Sendable, Hashable { + case serving + case notServing + } + + /// A status indicating that a service is healthy. + public static let serving = ServingStatus(.serving) + + /// A status indicating that a service unhealthy. + public static let notServing = ServingStatus(.notServing) + + package var value: Value + + private init(_ value: Value) { + self.value = value + } +} diff --git a/Tests/Services/HealthTests/HealthTests.swift b/Tests/Services/HealthTests/HealthTests.swift new file mode 100644 index 000000000..a0e16b7f3 --- /dev/null +++ b/Tests/Services/HealthTests/HealthTests.swift @@ -0,0 +1,236 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCHealth +import GRPCInProcessTransport +import XCTest + +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +final class HealthTests: XCTestCase { + private func withHealthClient( + _ body: @Sendable (Grpc_Health_V1_HealthClient, HealthProvider) async throws -> Void + ) async throws { + let health = Health() + let inProcess = InProcessTransport.makePair() + let server = GRPCServer(transport: inProcess.server, services: [health.service]) + let client = GRPCClient(transport: inProcess.client) + let healthClient = Grpc_Health_V1_HealthClient(client: client) + + try await withThrowingDiscardingTaskGroup { group in + group.addTask { + try await server.run() + } + + group.addTask { + try await client.run() + } + + do { + try await body(healthClient, health.provider) + } catch { + XCTFail("Unexpected error: \(error)") + } + + group.cancelAll() + } + } + + func testCheckOnKnownService() async throws { + try await withHealthClient { (healthClient, healthProvider) in + let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") + + try healthProvider.updateService( + descriptor: testServiceDescriptor, + status: .serving + ) + + try healthProvider.updateService( + descriptor: ServiceDescriptor(package: "package.to.be.ignored", service: "IgnoredService"), + status: .notServing + ) + + var message = Grpc_Health_V1_HealthCheckRequest() + message.service = testServiceDescriptor.fullyQualifiedService + + try await healthClient.check(request: ClientRequest.Single(message: message)) { response in + try XCTAssertEqual(response.message.status, .serving) + } + } + } + + func testCheckOnUnknownService() async throws { + try await withHealthClient { (healthClient, healthProvider) in + try healthProvider.updateService( + descriptor: ServiceDescriptor(package: "package.to.be.ignored", service: "IgnoredService"), + status: .notServing + ) + + var message = Grpc_Health_V1_HealthCheckRequest() + message.service = + ServiceDescriptor(package: "does.not", service: "Exist").fullyQualifiedService + + try await healthClient.check(request: ClientRequest.Single(message: message)) { response in + try XCTAssertThrowsError(response.message) { error in + XCTAssertTrue(error is RPCError) + XCTAssertEqual((error as! RPCError).code, .notFound) + } + } + } + } + + func testWatchOnKnownService() async throws { + try await withHealthClient { (healthClient, healthProvider) in + let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") + let ignoredServiceDescriptor = ServiceDescriptor( + package: "package.to.be.ignored", + service: "IgnoredService" + ) + + let statuses: [ServingStatus] = [.serving, .notServing, .serving, .serving, .notServing] + + try healthProvider.updateService( + descriptor: testServiceDescriptor, + status: statuses[0] + ) + + try healthProvider.updateService( + descriptor: ignoredServiceDescriptor, + status: .notServing + ) + + var message = Grpc_Health_V1_HealthCheckRequest() + message.service = testServiceDescriptor.fullyQualifiedService + + try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in + var responseStreamIterator = response.messages.makeAsyncIterator() + + for i in 0 ..< statuses.count { + let next = try await responseStreamIterator.next()! + let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(from: statuses[i]) + + XCTAssertEqual(next.status, expectedStatus) + + if i < statuses.count - 1 { + try healthProvider.updateService( + descriptor: testServiceDescriptor, + status: statuses[i + 1] + ) + + try healthProvider.updateService( + descriptor: ignoredServiceDescriptor, + status: .notServing + ) + } + } + } + } + } + + func testWatchOnUnknownService() async throws { + try await withHealthClient { (healthClient, healthProvider) in + try healthProvider.updateService( + descriptor: ServiceDescriptor(package: "package.to.be.ignored", service: "IgnoredService"), + status: .serving + ) + + let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") + + var message = Grpc_Health_V1_HealthCheckRequest() + message.service = testServiceDescriptor.fullyQualifiedService + + try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in + var responseStreamIterator = response.messages.makeAsyncIterator() + var next = try await responseStreamIterator.next()! + + XCTAssertEqual(next.status, .serviceUnknown) + + try healthProvider.updateService( + descriptor: testServiceDescriptor, + status: .notServing + ) + + next = try await responseStreamIterator.next()! + + XCTAssertEqual(next.status, .notServing) + } + } + } + + func testMultipleWatchOnTheSameService() async throws { + try await withHealthClient { (healthClient, healthProvider) in + let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") + + let receivedStatuses1 = AsyncStream + .makeStream() + let receivedStatuses2 = AsyncStream + .makeStream() + + let statusesToBeSent: [ServingStatus] = [ + .serving, + .notServing, + .serving, + .serving, + .notServing, + ] + + @Sendable func runWatch( + continuation: AsyncStream.Continuation + ) async throws { + var message = Grpc_Health_V1_HealthCheckRequest() + message.service = testServiceDescriptor.fullyQualifiedService + + try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in + var responseStreamIterator = response.messages.makeAsyncIterator() + + // Since responseStreamIterator.next() will never be nil (as the "watch" response stream + // is always open), the iteration cannot be based on when responseStreamIterator.next() + // is nil. Else, the iteration infinitely awaits and the test never finishes. Hence, it is + // based on the expected number of statuses to be received. + for _ in 0 ..< statusesToBeSent.count { + try await continuation.yield(responseStreamIterator.next()!.status) + } + } + } + + try await withThrowingDiscardingTaskGroup { group in + group.addTask { + try await runWatch(continuation: receivedStatuses1.continuation) + } + + group.addTask { + try await runWatch(continuation: receivedStatuses2.continuation) + } + + var iterator1 = receivedStatuses1.stream.makeAsyncIterator() + var iterator2 = receivedStatuses2.stream.makeAsyncIterator() + + for status in statusesToBeSent { + try healthProvider.updateService( + descriptor: testServiceDescriptor, + status: status + ) + + let sent = Grpc_Health_V1_HealthCheckResponse.ServingStatus(from: status) + let received1 = await iterator1.next() + let received2 = await iterator2.next() + + XCTAssertEqual(sent, received1) + XCTAssertEqual(sent, received2) + } + } + } + } +} From 0acd2079755f345346babd70146c32050f319acf Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Mon, 29 Jul 2024 12:48:46 +0100 Subject: [PATCH 05/10] Implement feedback --- Package@swift-6.swift | 2 +- Sources/Services/Health/Health.swift | 67 ++++- Sources/Services/Health/HealthProvider.swift | 55 ---- .../Health/InternalHealthService.swift | 126 ++++---- Tests/Services/HealthTests/HealthTests.swift | 284 ++++++++++++------ .../Test Utilities/XCTest+Utilities.swift | 19 +- 6 files changed, 323 insertions(+), 230 deletions(-) delete mode 100644 Sources/Services/Health/HealthProvider.swift rename Sources/Services/Health/HealthService.swift => Tests/Services/HealthTests/Test Utilities/XCTest+Utilities.swift (62%) diff --git a/Package@swift-6.swift b/Package@swift-6.swift index b78a35c5b..32214cab9 100644 --- a/Package@swift-6.swift +++ b/Package@swift-6.swift @@ -478,7 +478,7 @@ extension Target { .grpcInProcessTransport ], path: "Tests/Services/HealthTests", - swiftSettings: [.swiftLanguageVersion(.v5), .enableUpcomingFeature("ExistentialAny")] + swiftSettings: [.swiftLanguageVersion(.v6), .enableUpcomingFeature("ExistentialAny")] ) } diff --git a/Sources/Services/Health/Health.swift b/Sources/Services/Health/Health.swift index 5f79b3d76..65902cd15 100644 --- a/Sources/Services/Health/Health.swift +++ b/Sources/Services/Health/Health.swift @@ -14,18 +14,73 @@ * limitations under the License. */ +import GRPCCore + /// A coupled Health service and provider. @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct Health { +public struct Health: Sendable { + private let internalHealthService = InternalHealthService() + /// A registerable RPC service to probe whether a server is able to handle RPCs. - public let service: HealthService + public let service: Health.Service /// Provides handlers to interact with the coupled Health service. - public let provider: HealthProvider + public let provider: Provider - /// Constructs a new ``Health``, coupling a ``HealthService`` and a ``HealthProvider``. + /// Constructs a new ``Health``, coupling a ``Health.Service`` and a ``Health.Provider``. public init() { - self.service = HealthService() - self.provider = HealthProvider(healthService: self.service) + self.service = Health.Service(internalHealthService: self.internalHealthService) + self.provider = Health.Provider(internalHealthService: self.internalHealthService) + } +} + +@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension Health { + /// A registerable RPC service to probe whether a server is able to handle RPCs. + public struct Service: RegistrableRPCService, Sendable { + private let internalHealthService: InternalHealthService + + public func registerMethods(with router: inout RPCRouter) { + self.internalHealthService.registerMethods(with: &router) + } + + fileprivate init(internalHealthService: InternalHealthService) { + self.internalHealthService = internalHealthService + } + } + + /// Provides handlers to interact with a Health service. + public struct Provider: Sendable { + private let internalHealthService: InternalHealthService + + /// Updates the status of a service in the Health service. + public func updateStatus( + _ status: ServingStatus, + ofService service: ServiceDescriptor + ) { + self.internalHealthService.updateStatus( + Grpc_Health_V1_HealthCheckResponse.ServingStatus(status), + ofService: service.fullyQualifiedService + ) + } + + fileprivate init(internalHealthService: InternalHealthService) { + self.internalHealthService = internalHealthService + } + } +} + +extension Grpc_Health_V1_HealthCheckResponse.ServingStatus { + /// Constructs a new ``Grpc_Health_V1_HealthCheckResponse.ServingStatus`` from ``ServingStatus``. + /// + /// - Parameters: + /// - status: The base status. + package init(_ status: ServingStatus) { + switch status.value { + case .serving: + self = .serving + case .notServing: + self = .notServing + } } } diff --git a/Sources/Services/Health/HealthProvider.swift b/Sources/Services/Health/HealthProvider.swift deleted file mode 100644 index 21955a9e4..000000000 --- a/Sources/Services/Health/HealthProvider.swift +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2024, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import GRPCCore - -/// Provides handlers to interact with a Health service. -@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct HealthProvider: Sendable { - private let healthService: HealthService - - /// Updates the status of a service in the Health service. - public func updateService( - descriptor: ServiceDescriptor, - status: ServingStatus - ) throws { - try self.healthService.service.updateService( - descriptor: descriptor, - status: Grpc_Health_V1_HealthCheckResponse.ServingStatus(from: status) - ) - } - - /// Constructs a new ``HealthProvider``. - /// - /// - Parameters: - /// - healthService: The Health service to handle. - internal init(healthService: HealthService) { - self.healthService = healthService - } -} - -extension Grpc_Health_V1_HealthCheckResponse.ServingStatus { - /// Constructs a new ``Grpc_Health_V1_HealthCheckResponse.ServingStatus`` from ``ServingStatus``. - /// - /// - Parameters: - /// - from: The base status. - package init(from status: ServingStatus) { - switch status.value { - case .serving: self = .serving - case .notServing: self = .notServing - } - } -} diff --git a/Sources/Services/Health/InternalHealthService.swift b/Sources/Services/Health/InternalHealthService.swift index 0ea6d5ff8..c37a2a2e2 100644 --- a/Sources/Services/Health/InternalHealthService.swift +++ b/Sources/Services/Health/InternalHealthService.swift @@ -18,102 +18,105 @@ import GRPCCore @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) internal final class InternalHealthService: Grpc_Health_V1_HealthServiceProtocol { - private let lockedStorage = LockedValueBox([String: StatusAndContinuations]()) - - /// Creates a response with `status` and writes that response to a stream. - private func writeResponseToStream( - writer: RPCWriter, - status: Grpc_Health_V1_HealthCheckResponse.ServingStatus - ) async throws { - var response = Grpc_Health_V1_HealthCheckResponse() - response.status = status - - try await writer.write(response) - } + private let state = InternalHealthService.State() func check( request: ServerRequest.Single ) async throws -> ServerResponse.Single { let service = request.message.service - if let statusAndContinuations = self.lockedStorage.withLockedValue({ $0[service] }) { + if let status = self.state.getCurrentStatus(ofService: service) { var response = Grpc_Health_V1_HealthCheckResponse() - response.status = statusAndContinuations.status + response.status = status return ServerResponse.Single(message: response) } - return ServerResponse.Single( - error: RPCError(status: Status(code: .notFound, message: "Requested service unknown."))! - ) + throw RPCError(code: .notFound, message: "Requested service unknown.") } func watch( request: ServerRequest.Single ) async throws -> ServerResponse.Stream { let service = request.message.service + let statuses = AsyncStream.makeStream(of: Grpc_Health_V1_HealthCheckResponse.ServingStatus.self) - let statusStream = AsyncStream { - continuation in - - self.lockedStorage.withLockedValue { storage in - if storage[service] == nil { - storage[service] = StatusAndContinuations(status: .serviceUnknown) - } - - storage[service]!.addContinuation(continuation) - } - } + self.state.addContinuation(statuses.continuation, forService: service) return ServerResponse.Stream( of: Grpc_Health_V1_HealthCheckResponse.self ) { writer in - try await self.writeResponseToStream( - writer: writer, - status: self.lockedStorage.withLockedValue { storage in - assert(storage[service] != nil) - - return storage[service]!.status - } - ) + var response = Grpc_Health_V1_HealthCheckResponse() - for await status in statusStream { - try await self.writeResponseToStream(writer: writer, status: status) + for await status in statuses.stream { + response.status = status + try await writer.write(response) } - return Metadata() + return [:] } } - /// Updates the status of a service in the storage. - func updateService( - descriptor: ServiceDescriptor, - status: Grpc_Health_V1_HealthCheckResponse.ServingStatus - ) throws { - try self.lockedStorage.withLockedValue { storage in - if storage[descriptor.fullyQualifiedService] == nil { - storage[descriptor.fullyQualifiedService] = StatusAndContinuations(status: status) - } else { - try storage[descriptor.fullyQualifiedService]!.update(status) + func updateStatus( + _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus, + ofService service: String + ) { + self.state.updateStatus(status, ofService: service) + } +} + +@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension InternalHealthService { + private struct State: Sendable { + private let lockedStorage = LockedValueBox([String: ServiceState]()) + + fileprivate func getCurrentStatus( + ofService service: String + ) -> Grpc_Health_V1_HealthCheckResponse.ServingStatus? { + return self.lockedStorage.withLockedValue { $0[service]?.currentStatus } + } + + fileprivate func updateStatus( + _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus, + ofService service: String + ) { + self.lockedStorage.withLockedValue { storage in + if storage[service] == nil { + storage[service] = ServiceState(status: status) + } else { + storage[service]!.updateStatus(status) + } + } + } + + fileprivate func addContinuation( + _ continuation: AsyncStream.Continuation, + forService service: String + ) { + self.lockedStorage.withLockedValue { storage in + storage[service, default: ServiceState(status: .serviceUnknown)] + .addContinuation(continuation) + continuation.yield(storage[service]!.currentStatus) } } } - /// The status of a service, and the continuation of its "watch" streams. - private struct StatusAndContinuations { - private(set) var status: Grpc_Health_V1_HealthCheckResponse.ServingStatus - private var continuations = [ - AsyncStream.Continuation - ]() + /// Encapsulates the current status of a service and the continuations of its "watch" streams. + private struct ServiceState: Sendable { + private(set) var currentStatus: Grpc_Health_V1_HealthCheckResponse.ServingStatus + private var continuations: + [AsyncStream.Continuation] /// Updates the status and provides values to the streams. - fileprivate mutating func update( + fileprivate mutating func updateStatus( _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus - ) throws { - self.status = status + ) { + if self.currentStatus != status { + self.currentStatus = status - for continuation in self.continuations { - continuation.yield(status) + for continuation in self.continuations { + continuation.yield(status) + } } } @@ -125,7 +128,8 @@ internal final class InternalHealthService: Grpc_Health_V1_HealthServiceProtocol } fileprivate init(status: Grpc_Health_V1_HealthCheckResponse.ServingStatus) { - self.status = status + self.currentStatus = status + self.continuations = [] } } } diff --git a/Tests/Services/HealthTests/HealthTests.swift b/Tests/Services/HealthTests/HealthTests.swift index a0e16b7f3..47cda3edc 100644 --- a/Tests/Services/HealthTests/HealthTests.swift +++ b/Tests/Services/HealthTests/HealthTests.swift @@ -21,7 +21,7 @@ import XCTest @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) final class HealthTests: XCTestCase { private func withHealthClient( - _ body: @Sendable (Grpc_Health_V1_HealthClient, HealthProvider) async throws -> Void + _ body: @Sendable (Grpc_Health_V1_HealthClient, Health.Provider) async throws -> Void ) async throws { let health = Health() let inProcess = InProcessTransport.makePair() @@ -50,17 +50,9 @@ final class HealthTests: XCTestCase { func testCheckOnKnownService() async throws { try await withHealthClient { (healthClient, healthProvider) in - let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") + let testServiceDescriptor = ServiceDescriptor.testService - try healthProvider.updateService( - descriptor: testServiceDescriptor, - status: .serving - ) - - try healthProvider.updateService( - descriptor: ServiceDescriptor(package: "package.to.be.ignored", service: "IgnoredService"), - status: .notServing - ) + healthProvider.updateStatus(.serving, ofService: testServiceDescriptor) var message = Grpc_Health_V1_HealthCheckRequest() message.service = testServiceDescriptor.fullyQualifiedService @@ -73,43 +65,38 @@ final class HealthTests: XCTestCase { func testCheckOnUnknownService() async throws { try await withHealthClient { (healthClient, healthProvider) in - try healthProvider.updateService( - descriptor: ServiceDescriptor(package: "package.to.be.ignored", service: "IgnoredService"), - status: .notServing - ) - var message = Grpc_Health_V1_HealthCheckRequest() message.service = - ServiceDescriptor(package: "does.not", service: "Exist").fullyQualifiedService + ServiceDescriptor(package: "does.not", service: "Exist") + .fullyQualifiedService try await healthClient.check(request: ClientRequest.Single(message: message)) { response in - try XCTAssertThrowsError(response.message) { error in - XCTAssertTrue(error is RPCError) - XCTAssertEqual((error as! RPCError).code, .notFound) + try XCTAssertThrowsError(ofType: RPCError.self, response.message) { error in + XCTAssertEqual(error.code, .notFound) } } } } - func testWatchOnKnownService() async throws { + func testCheckOnServer() async throws { try await withHealthClient { (healthClient, healthProvider) in - let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") - let ignoredServiceDescriptor = ServiceDescriptor( - package: "package.to.be.ignored", - service: "IgnoredService" - ) + healthProvider.updateStatus(.notServing, ofService: ServiceDescriptor.server) + + let message = Grpc_Health_V1_HealthCheckRequest() + + try await healthClient.check(request: ClientRequest.Single(message: message)) { response in + try XCTAssertEqual(response.message.status, .notServing) + } + } + } - let statuses: [ServingStatus] = [.serving, .notServing, .serving, .serving, .notServing] + func testWatchOnKnownService() async throws { + try await withHealthClient { (healthClient, healthProvider) in + let testServiceDescriptor = ServiceDescriptor.testService - try healthProvider.updateService( - descriptor: testServiceDescriptor, - status: statuses[0] - ) + let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving] - try healthProvider.updateService( - descriptor: ignoredServiceDescriptor, - status: .notServing - ) + healthProvider.updateStatus(statusesToBeSent[0], ofService: testServiceDescriptor) var message = Grpc_Health_V1_HealthCheckRequest() message.service = testServiceDescriptor.fullyQualifiedService @@ -117,120 +104,219 @@ final class HealthTests: XCTestCase { try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in var responseStreamIterator = response.messages.makeAsyncIterator() - for i in 0 ..< statuses.count { - let next = try await responseStreamIterator.next()! - let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(from: statuses[i]) - - XCTAssertEqual(next.status, expectedStatus) + for i in 0 ..< statusesToBeSent.count { + let next = try await responseStreamIterator.next() + let message = try XCTUnwrap(next) + let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i]) - if i < statuses.count - 1 { - try healthProvider.updateService( - descriptor: testServiceDescriptor, - status: statuses[i + 1] - ) + XCTAssertEqual(message.status, expectedStatus) - try healthProvider.updateService( - descriptor: ignoredServiceDescriptor, - status: .notServing - ) + if i < statusesToBeSent.count - 1 { + healthProvider.updateStatus(statusesToBeSent[i + 1], ofService: testServiceDescriptor) } } } } } - func testWatchOnUnknownService() async throws { + func testWatchOnUnknownServiceDoesNotTerminateTheRPC() async throws { try await withHealthClient { (healthClient, healthProvider) in - try healthProvider.updateService( - descriptor: ServiceDescriptor(package: "package.to.be.ignored", service: "IgnoredService"), - status: .serving - ) - - let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") + let testServiceDescriptor = ServiceDescriptor.testService var message = Grpc_Health_V1_HealthCheckRequest() message.service = testServiceDescriptor.fullyQualifiedService try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in var responseStreamIterator = response.messages.makeAsyncIterator() - var next = try await responseStreamIterator.next()! + var next = try await responseStreamIterator.next() + var message = try XCTUnwrap(next) - XCTAssertEqual(next.status, .serviceUnknown) + XCTAssertEqual(message.status, .serviceUnknown) - try healthProvider.updateService( - descriptor: testServiceDescriptor, - status: .notServing - ) + healthProvider.updateStatus(.notServing, ofService: testServiceDescriptor) - next = try await responseStreamIterator.next()! + next = try await responseStreamIterator.next() + message = try XCTUnwrap(next) - XCTAssertEqual(next.status, .notServing) + // The RPC was not terminated and a status update was received successfully. + XCTAssertEqual(message.status, .notServing) } } } func testMultipleWatchOnTheSameService() async throws { try await withHealthClient { (healthClient, healthProvider) in - let testServiceDescriptor = ServiceDescriptor(package: "test.package", service: "TestService") + let testServiceDescriptor = ServiceDescriptor.testService + + let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving] + + try await withThrowingTaskGroup( + of: [Grpc_Health_V1_HealthCheckResponse.ServingStatus].self + ) { group in + var message = Grpc_Health_V1_HealthCheckRequest() + message.service = testServiceDescriptor.fullyQualifiedService + + let immutableMessage = message + + /// The `continuation` of this stream is used to signal when the "watch" response streams are up and ready. + let signal = AsyncStream.makeStream(of: Void.self) + let numberOfWatches = 2 + + for _ in 0 ..< numberOfWatches { + group.addTask { + return try await healthClient.watch( + request: ClientRequest.Single(message: immutableMessage) + ) { response in + signal.continuation.yield() // Make signal + + var statuses = [Grpc_Health_V1_HealthCheckResponse.ServingStatus]() + var responseStreamIterator = response.messages.makeAsyncIterator() + + // Since responseStreamIterator.next() will never be nil (ideally, as the response + // stream is always open), the iteration cannot be based on when + // responseStreamIterator.next() is nil. Else, the iteration infinitely awaits and the + // test never finishes. Hence, it is based on the expected number of statuses to be + // received. + for _ in 0 ..< statusesToBeSent.count + 1 { + // As the service will be "watched" before being updated, the first status received + // should be `.serviceUnknown`. Hence, the range of this iteration is increased by 1. + + let next = try await responseStreamIterator.next() + let message = try XCTUnwrap(next) + statuses.append(message.status) + } + + return statuses + } + } + } + + // Wait until all the "watch" streams are up and ready. + for await _ in signal.stream.prefix(numberOfWatches) {} + + for status in statusesToBeSent { + healthProvider.updateStatus(status, ofService: testServiceDescriptor) + } + + for try await receivedStatuses in group { + XCTAssertEqual(receivedStatuses[0], .serviceUnknown) + + for i in 0 ..< statusesToBeSent.count { + let sentStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i]) + XCTAssertEqual(sentStatus, receivedStatuses[i + 1]) + } + } + } + } + } - let receivedStatuses1 = AsyncStream - .makeStream() - let receivedStatuses2 = AsyncStream - .makeStream() + func testWatchWithUnchangingStatusUpdates() async throws { + try await withHealthClient { (healthClient, healthProvider) in + let testServiceDescriptor = ServiceDescriptor.testService let statusesToBeSent: [ServingStatus] = [ .serving, .notServing, + .notServing, + .notServing, .serving, + ] + + let expectedReceivedStatuses: [Grpc_Health_V1_HealthCheckResponse.ServingStatus] = [ + // As the service will be "watched" before being updated, the first status received should + // be .serviceUnknown. + .serviceUnknown, + .serving, + .notServing, // The repeated `.notServing` updates should be received only once. .serving, - .notServing, ] - @Sendable func runWatch( - continuation: AsyncStream.Continuation - ) async throws { + try await withThrowingTaskGroup( + of: [Grpc_Health_V1_HealthCheckResponse.ServingStatus].self + ) { group in var message = Grpc_Health_V1_HealthCheckRequest() message.service = testServiceDescriptor.fullyQualifiedService - try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in - var responseStreamIterator = response.messages.makeAsyncIterator() + let immutableMessage = message - // Since responseStreamIterator.next() will never be nil (as the "watch" response stream - // is always open), the iteration cannot be based on when responseStreamIterator.next() - // is nil. Else, the iteration infinitely awaits and the test never finishes. Hence, it is - // based on the expected number of statuses to be received. - for _ in 0 ..< statusesToBeSent.count { - try await continuation.yield(responseStreamIterator.next()!.status) + /// The `continuation` of this stream is used to signal when the "watch" response stream is up and ready. + let signal = AsyncStream.makeStream(of: Void.self) + + group.addTask { + return try await healthClient.watch( + request: ClientRequest.Single(message: immutableMessage) + ) { response in + signal.continuation.finish() // Make signal + + var statuses = [Grpc_Health_V1_HealthCheckResponse.ServingStatus]() + var responseStreamIterator = response.messages.makeAsyncIterator() + + // Since responseStreamIterator.next() will never be nil (ideally, as the response + // stream is always open), the iteration cannot be based on when + // responseStreamIterator.next() is nil. Else, the iteration infinitely awaits and the + // test never finishes. Hence, it is based on the expected number of statuses to be + // received. + for _ in 0 ..< expectedReceivedStatuses.count { + let next = try await responseStreamIterator.next() + let message = try XCTUnwrap(next) + statuses.append(message.status) + } + + return statuses } } - } - try await withThrowingDiscardingTaskGroup { group in - group.addTask { - try await runWatch(continuation: receivedStatuses1.continuation) + // Wait until the "watch" stream is up and ready. + for await _ in signal.stream {} + + for status in statusesToBeSent { + healthProvider.updateStatus(status, ofService: testServiceDescriptor) } - group.addTask { - try await runWatch(continuation: receivedStatuses2.continuation) + for try await receivedStatuses in group { + XCTAssertEqual(receivedStatuses.count, expectedReceivedStatuses.count) + + for i in 0 ..< receivedStatuses.count { + XCTAssertEqual(receivedStatuses[i], expectedReceivedStatuses[i]) + } } + } + } + } - var iterator1 = receivedStatuses1.stream.makeAsyncIterator() - var iterator2 = receivedStatuses2.stream.makeAsyncIterator() + func testWatchOnServer() async throws { + try await withHealthClient { (healthClient, healthProvider) in + let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving] - for status in statusesToBeSent { - try healthProvider.updateService( - descriptor: testServiceDescriptor, - status: status - ) + healthProvider.updateStatus(statusesToBeSent[0], ofService: ServiceDescriptor.server) + + let message = Grpc_Health_V1_HealthCheckRequest() + + try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in + var responseStreamIterator = response.messages.makeAsyncIterator() - let sent = Grpc_Health_V1_HealthCheckResponse.ServingStatus(from: status) - let received1 = await iterator1.next() - let received2 = await iterator2.next() + for i in 0 ..< statusesToBeSent.count { + let next = try await responseStreamIterator.next() + let message = try XCTUnwrap(next) + let expectedStatus = Grpc_Health_V1_HealthCheckResponse.ServingStatus(statusesToBeSent[i]) - XCTAssertEqual(sent, received1) - XCTAssertEqual(sent, received2) + XCTAssertEqual(message.status, expectedStatus) + + if i < statusesToBeSent.count - 1 { + healthProvider.updateStatus( + statusesToBeSent[i + 1], + ofService: ServiceDescriptor.server + ) + } } } } } } + +extension ServiceDescriptor { + fileprivate static let testService = ServiceDescriptor(package: "test", service: "Service") + + /// An unspecified service name refers to the server. + fileprivate static let server = ServiceDescriptor(package: "", service: "") +} diff --git a/Sources/Services/Health/HealthService.swift b/Tests/Services/HealthTests/Test Utilities/XCTest+Utilities.swift similarity index 62% rename from Sources/Services/Health/HealthService.swift rename to Tests/Services/HealthTests/Test Utilities/XCTest+Utilities.swift index 819d9f4ae..3848388bc 100644 --- a/Sources/Services/Health/HealthService.swift +++ b/Tests/Services/HealthTests/Test Utilities/XCTest+Utilities.swift @@ -14,14 +14,17 @@ * limitations under the License. */ -import GRPCCore +import XCTest -/// A registerable RPC service to probe whether a server is able to handle RPCs. -@available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public final class HealthService: RegistrableRPCService { - internal let service = InternalHealthService() - - public func registerMethods(with router: inout RPCRouter) { - self.service.registerMethods(with: &router) +func XCTAssertThrowsError( + ofType: E.Type, + _ expression: @autoclosure () throws -> T, + _ errorHandler: (E) -> Void +) { + XCTAssertThrowsError(try expression()) { error in + guard let error = error as? E else { + return XCTFail("Error had unexpected type '\(type(of: error))'") + } + errorHandler(error) } } From 3c40aee3c1d46dfae4fbaf819449543da137e8c8 Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Wed, 31 Jul 2024 11:00:08 +0100 Subject: [PATCH 06/10] Implement feedback --- Package@swift-6.swift | 2 +- Sources/GRPCCore/ServiceDescriptor.swift | 7 + Sources/Services/Health/Health.swift | 45 +++--- ...ealthService.swift => HealthService.swift} | 50 +++---- Sources/Services/Health/ServingStatus.swift | 3 + Tests/Services/HealthTests/HealthTests.swift | 136 +++++++----------- 6 files changed, 114 insertions(+), 129 deletions(-) rename Sources/Services/Health/{InternalHealthService.swift => HealthService.swift} (77%) diff --git a/Package@swift-6.swift b/Package@swift-6.swift index a713c6c9d..4cb284ca2 100644 --- a/Package@swift-6.swift +++ b/Package@swift-6.swift @@ -492,7 +492,7 @@ extension Target { .grpcInProcessTransport ], path: "Tests/Services/HealthTests", - swiftSettings: [.swiftLanguageVersion(.v6), .enableUpcomingFeature("ExistentialAny")] + swiftSettings: [._swiftLanguageMode(.v6), .enableUpcomingFeature("ExistentialAny")] ) } diff --git a/Sources/GRPCCore/ServiceDescriptor.swift b/Sources/GRPCCore/ServiceDescriptor.swift index b09730c3b..fa41164bc 100644 --- a/Sources/GRPCCore/ServiceDescriptor.swift +++ b/Sources/GRPCCore/ServiceDescriptor.swift @@ -43,3 +43,10 @@ public struct ServiceDescriptor: Sendable, Hashable { self.service = service } } + +extension ServiceDescriptor { + /// The descriptor for a server. + /// + /// An unspecified service name refers to the server. + public static let server = ServiceDescriptor(package: "", service: "") +} diff --git a/Sources/Services/Health/Health.swift b/Sources/Services/Health/Health.swift index 65902cd15..62c73d1cb 100644 --- a/Sources/Services/Health/Health.swift +++ b/Sources/Services/Health/Health.swift @@ -16,21 +16,26 @@ import GRPCCore -/// A coupled Health service and provider. +/// ``Health`` is gRPC’s mechanism for checking whether a server is able to handle RPCs. Its semantics are documented in +/// https://github.com/grpc/grpc/blob/master/doc/health-checking.md. +/// +/// `Health` initializes a new `Health.Service` and a `Health.Provider`. +/// - `Health.Service` is a registerable RPC service to probe whether a server is able to handle RPCs. +/// - `Health.Provider` provides handlers to interact with `Health.Service`. @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) public struct Health: Sendable { - private let internalHealthService = InternalHealthService() - /// A registerable RPC service to probe whether a server is able to handle RPCs. public let service: Health.Service /// Provides handlers to interact with the coupled Health service. - public let provider: Provider + public let provider: Health.Provider - /// Constructs a new ``Health``, coupling a ``Health.Service`` and a ``Health.Provider``. + /// Constructs a new `Health`, coupling a `Health.Service` and a `Health.Provider`. public init() { - self.service = Health.Service(internalHealthService: self.internalHealthService) - self.provider = Health.Provider(internalHealthService: self.internalHealthService) + let healthService = HealthService() + + self.service = Health.Service(healthService: healthService) + self.provider = Health.Provider(healthService: healthService) } } @@ -38,40 +43,44 @@ public struct Health: Sendable { extension Health { /// A registerable RPC service to probe whether a server is able to handle RPCs. public struct Service: RegistrableRPCService, Sendable { - private let internalHealthService: InternalHealthService + private let healthService: HealthService public func registerMethods(with router: inout RPCRouter) { - self.internalHealthService.registerMethods(with: &router) + self.healthService.registerMethods(with: &router) } - fileprivate init(internalHealthService: InternalHealthService) { - self.internalHealthService = internalHealthService + fileprivate init(healthService: HealthService) { + self.healthService = healthService } } /// Provides handlers to interact with a Health service. public struct Provider: Sendable { - private let internalHealthService: InternalHealthService + private let healthService: HealthService /// Updates the status of a service in the Health service. + /// + /// - Parameters: + /// - status: The status of the service. + /// - service: The description of the service. public func updateStatus( _ status: ServingStatus, - ofService service: ServiceDescriptor + forService service: ServiceDescriptor ) { - self.internalHealthService.updateStatus( + self.healthService.updateStatus( Grpc_Health_V1_HealthCheckResponse.ServingStatus(status), - ofService: service.fullyQualifiedService + forService: service.fullyQualifiedService ) } - fileprivate init(internalHealthService: InternalHealthService) { - self.internalHealthService = internalHealthService + fileprivate init(healthService: HealthService) { + self.healthService = healthService } } } extension Grpc_Health_V1_HealthCheckResponse.ServingStatus { - /// Constructs a new ``Grpc_Health_V1_HealthCheckResponse.ServingStatus`` from ``ServingStatus``. + /// Constructs a new ``Grpc_Health_V1_HealthCheckResponse/ServingStatus`` from ``ServingStatus``. /// /// - Parameters: /// - status: The base status. diff --git a/Sources/Services/Health/InternalHealthService.swift b/Sources/Services/Health/HealthService.swift similarity index 77% rename from Sources/Services/Health/InternalHealthService.swift rename to Sources/Services/Health/HealthService.swift index c37a2a2e2..a9296ec9b 100644 --- a/Sources/Services/Health/InternalHealthService.swift +++ b/Sources/Services/Health/HealthService.swift @@ -17,22 +17,22 @@ import GRPCCore @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -internal final class InternalHealthService: Grpc_Health_V1_HealthServiceProtocol { - private let state = InternalHealthService.State() +internal final class HealthService: Grpc_Health_V1_HealthServiceProtocol { + private let state = HealthService.State() func check( request: ServerRequest.Single ) async throws -> ServerResponse.Single { let service = request.message.service - if let status = self.state.getCurrentStatus(ofService: service) { - var response = Grpc_Health_V1_HealthCheckResponse() - response.status = status - - return ServerResponse.Single(message: response) + guard let status = self.state.currentStatus(ofService: service) else { + throw RPCError(code: .notFound, message: "Requested service unknown.") } - throw RPCError(code: .notFound, message: "Requested service unknown.") + var response = Grpc_Health_V1_HealthCheckResponse() + response.status = status + + return ServerResponse.Single(message: response) } func watch( @@ -59,18 +59,20 @@ internal final class InternalHealthService: Grpc_Health_V1_HealthServiceProtocol func updateStatus( _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus, - ofService service: String + forService service: String ) { - self.state.updateStatus(status, ofService: service) + self.state.updateStatus(status, forService: service) } } @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension InternalHealthService { +extension HealthService { + /// The state of the Health service. private struct State: Sendable { + /// A locked value box of `["service name": ServiceState]`. private let lockedStorage = LockedValueBox([String: ServiceState]()) - fileprivate func getCurrentStatus( + fileprivate func currentStatus( ofService service: String ) -> Grpc_Health_V1_HealthCheckResponse.ServingStatus? { return self.lockedStorage.withLockedValue { $0[service]?.currentStatus } @@ -78,14 +80,10 @@ extension InternalHealthService { fileprivate func updateStatus( _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus, - ofService service: String + forService service: String ) { self.lockedStorage.withLockedValue { storage in - if storage[service] == nil { - storage[service] = ServiceState(status: status) - } else { - storage[service]!.updateStatus(status) - } + storage[service, default: ServiceState(status: status)].updateStatus(status) } } @@ -96,12 +94,11 @@ extension InternalHealthService { self.lockedStorage.withLockedValue { storage in storage[service, default: ServiceState(status: .serviceUnknown)] .addContinuation(continuation) - continuation.yield(storage[service]!.currentStatus) } } } - /// Encapsulates the current status of a service and the continuations of its "watch" streams. + /// Encapsulates the current status of a service and the continuations of its watch streams. private struct ServiceState: Sendable { private(set) var currentStatus: Grpc_Health_V1_HealthCheckResponse.ServingStatus private var continuations: @@ -111,12 +108,14 @@ extension InternalHealthService { fileprivate mutating func updateStatus( _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus ) { - if self.currentStatus != status { - self.currentStatus = status + guard status != self.currentStatus else { + return + } + + self.currentStatus = status - for continuation in self.continuations { - continuation.yield(status) - } + for continuation in self.continuations { + continuation.yield(status) } } @@ -125,6 +124,7 @@ extension InternalHealthService { _ continuation: AsyncStream.Continuation ) { self.continuations.append(continuation) + continuation.yield(self.currentStatus) } fileprivate init(status: Grpc_Health_V1_HealthCheckResponse.ServingStatus) { diff --git a/Sources/Services/Health/ServingStatus.swift b/Sources/Services/Health/ServingStatus.swift index c15b67ef1..f0ac30344 100644 --- a/Sources/Services/Health/ServingStatus.swift +++ b/Sources/Services/Health/ServingStatus.swift @@ -15,6 +15,9 @@ */ /// The status of a service. +/// +/// - `ServingStatus.serving` indicates that a service is healthy. +/// - `ServingStatus.notServing` indicates that a service is unhealthy. public struct ServingStatus: Sendable, Hashable { package enum Value: Sendable, Hashable { case serving diff --git a/Tests/Services/HealthTests/HealthTests.swift b/Tests/Services/HealthTests/HealthTests.swift index 47cda3edc..9c0e068ef 100644 --- a/Tests/Services/HealthTests/HealthTests.swift +++ b/Tests/Services/HealthTests/HealthTests.swift @@ -52,10 +52,11 @@ final class HealthTests: XCTestCase { try await withHealthClient { (healthClient, healthProvider) in let testServiceDescriptor = ServiceDescriptor.testService - healthProvider.updateStatus(.serving, ofService: testServiceDescriptor) + healthProvider.updateStatus(.serving, forService: testServiceDescriptor) - var message = Grpc_Health_V1_HealthCheckRequest() - message.service = testServiceDescriptor.fullyQualifiedService + let message = Grpc_Health_V1_HealthCheckRequest.with { + $0.service = testServiceDescriptor.fullyQualifiedService + } try await healthClient.check(request: ClientRequest.Single(message: message)) { response in try XCTAssertEqual(response.message.status, .serving) @@ -65,10 +66,10 @@ final class HealthTests: XCTestCase { func testCheckOnUnknownService() async throws { try await withHealthClient { (healthClient, healthProvider) in - var message = Grpc_Health_V1_HealthCheckRequest() - message.service = - ServiceDescriptor(package: "does.not", service: "Exist") - .fullyQualifiedService + let message = Grpc_Health_V1_HealthCheckRequest.with { + $0.service = + ServiceDescriptor(package: "does.not", service: "Exist").fullyQualifiedService + } try await healthClient.check(request: ClientRequest.Single(message: message)) { response in try XCTAssertThrowsError(ofType: RPCError.self, response.message) { error in @@ -80,7 +81,7 @@ final class HealthTests: XCTestCase { func testCheckOnServer() async throws { try await withHealthClient { (healthClient, healthProvider) in - healthProvider.updateStatus(.notServing, ofService: ServiceDescriptor.server) + healthProvider.updateStatus(.notServing, forService: ServiceDescriptor.server) let message = Grpc_Health_V1_HealthCheckRequest() @@ -96,10 +97,12 @@ final class HealthTests: XCTestCase { let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving] - healthProvider.updateStatus(statusesToBeSent[0], ofService: testServiceDescriptor) + // Before watching the service, make the status of the service known to the Health service. + healthProvider.updateStatus(statusesToBeSent[0], forService: testServiceDescriptor) - var message = Grpc_Health_V1_HealthCheckRequest() - message.service = testServiceDescriptor.fullyQualifiedService + let message = Grpc_Health_V1_HealthCheckRequest.with { + $0.service = testServiceDescriptor.fullyQualifiedService + } try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in var responseStreamIterator = response.messages.makeAsyncIterator() @@ -112,7 +115,7 @@ final class HealthTests: XCTestCase { XCTAssertEqual(message.status, expectedStatus) if i < statusesToBeSent.count - 1 { - healthProvider.updateStatus(statusesToBeSent[i + 1], ofService: testServiceDescriptor) + healthProvider.updateStatus(statusesToBeSent[i + 1], forService: testServiceDescriptor) } } } @@ -123,17 +126,20 @@ final class HealthTests: XCTestCase { try await withHealthClient { (healthClient, healthProvider) in let testServiceDescriptor = ServiceDescriptor.testService - var message = Grpc_Health_V1_HealthCheckRequest() - message.service = testServiceDescriptor.fullyQualifiedService + let message = Grpc_Health_V1_HealthCheckRequest.with { + $0.service = testServiceDescriptor.fullyQualifiedService + } try await healthClient.watch(request: ClientRequest.Single(message: message)) { response in var responseStreamIterator = response.messages.makeAsyncIterator() var next = try await responseStreamIterator.next() var message = try XCTUnwrap(next) + // As the service was watched before being updated, the first status received should be + // .serviceUnknown. XCTAssertEqual(message.status, .serviceUnknown) - healthProvider.updateStatus(.notServing, ofService: testServiceDescriptor) + healthProvider.updateStatus(.notServing, forService: testServiceDescriptor) next = try await responseStreamIterator.next() message = try XCTUnwrap(next) @@ -153,19 +159,18 @@ final class HealthTests: XCTestCase { try await withThrowingTaskGroup( of: [Grpc_Health_V1_HealthCheckResponse.ServingStatus].self ) { group in - var message = Grpc_Health_V1_HealthCheckRequest() - message.service = testServiceDescriptor.fullyQualifiedService - - let immutableMessage = message + let message = Grpc_Health_V1_HealthCheckRequest.with { + $0.service = testServiceDescriptor.fullyQualifiedService + } - /// The `continuation` of this stream is used to signal when the "watch" response streams are up and ready. + /// The continuation of this stream will be used to signal when the watch response streams are up and ready. let signal = AsyncStream.makeStream(of: Void.self) let numberOfWatches = 2 for _ in 0 ..< numberOfWatches { group.addTask { return try await healthClient.watch( - request: ClientRequest.Single(message: immutableMessage) + request: ClientRequest.Single(message: message) ) { response in signal.continuation.yield() // Make signal @@ -178,8 +183,8 @@ final class HealthTests: XCTestCase { // test never finishes. Hence, it is based on the expected number of statuses to be // received. for _ in 0 ..< statusesToBeSent.count + 1 { - // As the service will be "watched" before being updated, the first status received - // should be `.serviceUnknown`. Hence, the range of this iteration is increased by 1. + // As the service will be watched before being updated, the first status received + // should be .serviceUnknown. Hence, the range of this iteration is increased by 1. let next = try await responseStreamIterator.next() let message = try XCTUnwrap(next) @@ -191,11 +196,11 @@ final class HealthTests: XCTestCase { } } - // Wait until all the "watch" streams are up and ready. + // Wait until all the watch streams are up and ready. for await _ in signal.stream.prefix(numberOfWatches) {} for status in statusesToBeSent { - healthProvider.updateStatus(status, ofService: testServiceDescriptor) + healthProvider.updateStatus(status, forService: testServiceDescriptor) } for try await receivedStatuses in group { @@ -214,71 +219,35 @@ final class HealthTests: XCTestCase { try await withHealthClient { (healthClient, healthProvider) in let testServiceDescriptor = ServiceDescriptor.testService - let statusesToBeSent: [ServingStatus] = [ - .serving, - .notServing, - .notServing, - .notServing, - .serving, - ] + let statusesToBeSent: [ServingStatus] = [.notServing, .notServing, .notServing, .serving] - let expectedReceivedStatuses: [Grpc_Health_V1_HealthCheckResponse.ServingStatus] = [ - // As the service will be "watched" before being updated, the first status received should - // be .serviceUnknown. + // The repeated .notServing updates should be received only once. Also, as the service will + // be watched before being updated, the first status received should be .serviceUnknown. + let expectedStatuses: [Grpc_Health_V1_HealthCheckResponse.ServingStatus] = [ .serviceUnknown, - .serving, - .notServing, // The repeated `.notServing` updates should be received only once. + .notServing, .serving, ] - try await withThrowingTaskGroup( - of: [Grpc_Health_V1_HealthCheckResponse.ServingStatus].self - ) { group in - var message = Grpc_Health_V1_HealthCheckRequest() - message.service = testServiceDescriptor.fullyQualifiedService - - let immutableMessage = message - - /// The `continuation` of this stream is used to signal when the "watch" response stream is up and ready. - let signal = AsyncStream.makeStream(of: Void.self) - - group.addTask { - return try await healthClient.watch( - request: ClientRequest.Single(message: immutableMessage) - ) { response in - signal.continuation.finish() // Make signal - - var statuses = [Grpc_Health_V1_HealthCheckResponse.ServingStatus]() - var responseStreamIterator = response.messages.makeAsyncIterator() - - // Since responseStreamIterator.next() will never be nil (ideally, as the response - // stream is always open), the iteration cannot be based on when - // responseStreamIterator.next() is nil. Else, the iteration infinitely awaits and the - // test never finishes. Hence, it is based on the expected number of statuses to be - // received. - for _ in 0 ..< expectedReceivedStatuses.count { - let next = try await responseStreamIterator.next() - let message = try XCTUnwrap(next) - statuses.append(message.status) - } - - return statuses - } - } - - // Wait until the "watch" stream is up and ready. - for await _ in signal.stream {} + let message = Grpc_Health_V1_HealthCheckRequest.with { + $0.service = testServiceDescriptor.fullyQualifiedService + } + try await healthClient.watch( + request: ClientRequest.Single(message: message) + ) { response in + // Send all status updates. for status in statusesToBeSent { - healthProvider.updateStatus(status, ofService: testServiceDescriptor) + healthProvider.updateStatus(status, forService: testServiceDescriptor) } - for try await receivedStatuses in group { - XCTAssertEqual(receivedStatuses.count, expectedReceivedStatuses.count) + var responseStreamIterator = response.messages.makeAsyncIterator() - for i in 0 ..< receivedStatuses.count { - XCTAssertEqual(receivedStatuses[i], expectedReceivedStatuses[i]) - } + for i in 0 ..< expectedStatuses.count { + let next = try await responseStreamIterator.next() + let message = try XCTUnwrap(next) + + XCTAssertEqual(message.status, expectedStatuses[i]) } } } @@ -288,7 +257,7 @@ final class HealthTests: XCTestCase { try await withHealthClient { (healthClient, healthProvider) in let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving] - healthProvider.updateStatus(statusesToBeSent[0], ofService: ServiceDescriptor.server) + healthProvider.updateStatus(statusesToBeSent[0], forService: ServiceDescriptor.server) let message = Grpc_Health_V1_HealthCheckRequest() @@ -305,7 +274,7 @@ final class HealthTests: XCTestCase { if i < statusesToBeSent.count - 1 { healthProvider.updateStatus( statusesToBeSent[i + 1], - ofService: ServiceDescriptor.server + forService: ServiceDescriptor.server ) } } @@ -316,7 +285,4 @@ final class HealthTests: XCTestCase { extension ServiceDescriptor { fileprivate static let testService = ServiceDescriptor(package: "test", service: "Service") - - /// An unspecified service name refers to the server. - fileprivate static let server = ServiceDescriptor(package: "", service: "") } From ad0968c11abef3fcdb701a8397dbe7598282b081 Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Wed, 31 Jul 2024 17:29:58 +0100 Subject: [PATCH 07/10] Implement feedback --- Sources/GRPCCore/ServiceDescriptor.swift | 7 ---- Sources/Services/Health/Health.swift | 41 +++++++++++++------- Sources/Services/Health/HealthService.swift | 9 ++--- Sources/Services/Health/ServingStatus.swift | 4 +- Tests/Services/HealthTests/HealthTests.swift | 22 ++++++++--- 5 files changed, 49 insertions(+), 34 deletions(-) diff --git a/Sources/GRPCCore/ServiceDescriptor.swift b/Sources/GRPCCore/ServiceDescriptor.swift index fa41164bc..b09730c3b 100644 --- a/Sources/GRPCCore/ServiceDescriptor.swift +++ b/Sources/GRPCCore/ServiceDescriptor.swift @@ -43,10 +43,3 @@ public struct ServiceDescriptor: Sendable, Hashable { self.service = service } } - -extension ServiceDescriptor { - /// The descriptor for a server. - /// - /// An unspecified service name refers to the server. - public static let server = ServiceDescriptor(package: "", service: "") -} diff --git a/Sources/Services/Health/Health.swift b/Sources/Services/Health/Health.swift index 62c73d1cb..63005b9d2 100644 --- a/Sources/Services/Health/Health.swift +++ b/Sources/Services/Health/Health.swift @@ -19,18 +19,37 @@ import GRPCCore /// ``Health`` is gRPC’s mechanism for checking whether a server is able to handle RPCs. Its semantics are documented in /// https://github.com/grpc/grpc/blob/master/doc/health-checking.md. /// -/// `Health` initializes a new `Health.Service` and a `Health.Provider`. -/// - `Health.Service` is a registerable RPC service to probe whether a server is able to handle RPCs. -/// - `Health.Provider` provides handlers to interact with `Health.Service`. +/// `Health` initializes a new ``Health/Service-swift.struct`` and ``Health/Provider-swift.struct``. +/// - `Health.Service` implements the Health service from the `grpc.health.v1` package and can be registered with a server +/// like any other service. +/// - `Health.Provider` provides status updates to `Health.Service`. `Health.Service` doesn't know about the other +/// services running on a server so it must be provided with status updates via `Health.Provider`. To make specifying the service +/// being updated easier, the generated code for services includes an extension to `ServiceDescriptor`. +/// +/// The following shows an example of initializing a Health service and updating the status of the `Foo` service in the `bar` package. +/// +/// ```swift +/// let health = Health() +/// let server = GRPCServer( +/// transport: transport, +/// services: [health.service, FooService()] +/// ) +/// +/// health.provider.updateStatus( +/// .serving, +/// forService: .bar_Foo +/// ) +/// ``` @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) public struct Health: Sendable { - /// A registerable RPC service to probe whether a server is able to handle RPCs. + /// An implementation of the `grpc.health.v1.Health` service. public let service: Health.Service - /// Provides handlers to interact with the coupled Health service. + /// Provides status updates to the Health service. public let provider: Health.Provider - /// Constructs a new `Health`, coupling a `Health.Service` and a `Health.Provider`. + /// Constructs a new ``Health``, initializing a ``Health/Service-swift.struct`` and a + /// ``Health/Provider-swift.struct``. public init() { let healthService = HealthService() @@ -41,7 +60,7 @@ public struct Health: Sendable { @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) extension Health { - /// A registerable RPC service to probe whether a server is able to handle RPCs. + /// An implementation of the `grpc.health.v1.Health` service. public struct Service: RegistrableRPCService, Sendable { private let healthService: HealthService @@ -54,11 +73,11 @@ extension Health { } } - /// Provides handlers to interact with a Health service. + /// Provides status updates to ``Health/Service-swift.struct``. public struct Provider: Sendable { private let healthService: HealthService - /// Updates the status of a service in the Health service. + /// Updates the status of a service. /// /// - Parameters: /// - status: The status of the service. @@ -80,10 +99,6 @@ extension Health { } extension Grpc_Health_V1_HealthCheckResponse.ServingStatus { - /// Constructs a new ``Grpc_Health_V1_HealthCheckResponse/ServingStatus`` from ``ServingStatus``. - /// - /// - Parameters: - /// - status: The base status. package init(_ status: ServingStatus) { switch status.value { case .serving: diff --git a/Sources/Services/Health/HealthService.swift b/Sources/Services/Health/HealthService.swift index a9296ec9b..f3aec0ffb 100644 --- a/Sources/Services/Health/HealthService.swift +++ b/Sources/Services/Health/HealthService.swift @@ -67,9 +67,8 @@ internal final class HealthService: Grpc_Health_V1_HealthServiceProtocol { @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) extension HealthService { - /// The state of the Health service. private struct State: Sendable { - /// A locked value box of `["service name": ServiceState]`. + // The state of each service keyed by the fully qualified service name. private let lockedStorage = LockedValueBox([String: ServiceState]()) fileprivate func currentStatus( @@ -98,13 +97,12 @@ extension HealthService { } } - /// Encapsulates the current status of a service and the continuations of its watch streams. + // Encapsulates the current status of a service and the continuations of its watch streams. private struct ServiceState: Sendable { private(set) var currentStatus: Grpc_Health_V1_HealthCheckResponse.ServingStatus private var continuations: [AsyncStream.Continuation] - /// Updates the status and provides values to the streams. fileprivate mutating func updateStatus( _ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus ) { @@ -119,7 +117,6 @@ extension HealthService { } } - /// Adds a continuation for a stream of statuses. fileprivate mutating func addContinuation( _ continuation: AsyncStream.Continuation ) { @@ -127,7 +124,7 @@ extension HealthService { continuation.yield(self.currentStatus) } - fileprivate init(status: Grpc_Health_V1_HealthCheckResponse.ServingStatus) { + fileprivate init(status: Grpc_Health_V1_HealthCheckResponse.ServingStatus = .unknown) { self.currentStatus = status self.continuations = [] } diff --git a/Sources/Services/Health/ServingStatus.swift b/Sources/Services/Health/ServingStatus.swift index f0ac30344..1128e6ad7 100644 --- a/Sources/Services/Health/ServingStatus.swift +++ b/Sources/Services/Health/ServingStatus.swift @@ -16,8 +16,8 @@ /// The status of a service. /// -/// - `ServingStatus.serving` indicates that a service is healthy. -/// - `ServingStatus.notServing` indicates that a service is unhealthy. +/// - ``ServingStatus/serving`` indicates that a service is healthy. +/// - ``ServingStatus/notServing`` indicates that a service is unhealthy. public struct ServingStatus: Sendable, Hashable { package enum Value: Sendable, Hashable { case serving diff --git a/Tests/Services/HealthTests/HealthTests.swift b/Tests/Services/HealthTests/HealthTests.swift index 9c0e068ef..d4130bdd5 100644 --- a/Tests/Services/HealthTests/HealthTests.swift +++ b/Tests/Services/HealthTests/HealthTests.swift @@ -67,8 +67,7 @@ final class HealthTests: XCTestCase { func testCheckOnUnknownService() async throws { try await withHealthClient { (healthClient, healthProvider) in let message = Grpc_Health_V1_HealthCheckRequest.with { - $0.service = - ServiceDescriptor(package: "does.not", service: "Exist").fullyQualifiedService + $0.service = "does.not/Exist" } try await healthClient.check(request: ClientRequest.Single(message: message)) { response in @@ -81,7 +80,11 @@ final class HealthTests: XCTestCase { func testCheckOnServer() async throws { try await withHealthClient { (healthClient, healthProvider) in - healthProvider.updateStatus(.notServing, forService: ServiceDescriptor.server) + // An unspecified service refers to the server. + healthProvider.updateStatus( + .notServing, + forService: ServiceDescriptor(package: "", service: "") + ) let message = Grpc_Health_V1_HealthCheckRequest() @@ -163,7 +166,8 @@ final class HealthTests: XCTestCase { $0.service = testServiceDescriptor.fullyQualifiedService } - /// The continuation of this stream will be used to signal when the watch response streams are up and ready. + // The continuation of this stream will be used to signal when the watch response streams + // are up and ready. let signal = AsyncStream.makeStream(of: Void.self) let numberOfWatches = 2 @@ -257,7 +261,13 @@ final class HealthTests: XCTestCase { try await withHealthClient { (healthClient, healthProvider) in let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving] - healthProvider.updateStatus(statusesToBeSent[0], forService: ServiceDescriptor.server) + // An unspecified service refers to the server. + let serverDescriptor = ServiceDescriptor(package: "", service: "") + + healthProvider.updateStatus( + statusesToBeSent[0], + forService: serverDescriptor + ) let message = Grpc_Health_V1_HealthCheckRequest() @@ -274,7 +284,7 @@ final class HealthTests: XCTestCase { if i < statusesToBeSent.count - 1 { healthProvider.updateStatus( statusesToBeSent[i + 1], - forService: ServiceDescriptor.server + forService: serverDescriptor ) } } From 3dd3d0888c9360c8f88d08f958e7d9e7b020437e Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Fri, 2 Aug 2024 10:43:10 +0100 Subject: [PATCH 08/10] Implement feedback --- Sources/Services/Health/Health.swift | 17 +++++++++++++++++ Sources/Services/Health/HealthService.swift | 2 +- Tests/Services/HealthTests/HealthTests.swift | 19 ++++--------------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/Sources/Services/Health/Health.swift b/Sources/Services/Health/Health.swift index 63005b9d2..f0cdb1a34 100644 --- a/Sources/Services/Health/Health.swift +++ b/Sources/Services/Health/Health.swift @@ -92,6 +92,23 @@ extension Health { ) } + /// Updates the status of a service. + /// + /// - Parameters: + /// - status: The status of the service. + /// - service: The fully qualified service name in the format: + /// - "package.service": if the service is part of a package. For example, "helloworld.Greeter". + /// - "service": if the service is not part of a package. For example, "Greeter". + public func updateStatus( + _ status: ServingStatus, + forService service: String + ) { + self.healthService.updateStatus( + Grpc_Health_V1_HealthCheckResponse.ServingStatus(status), + forService: service + ) + } + fileprivate init(healthService: HealthService) { self.healthService = healthService } diff --git a/Sources/Services/Health/HealthService.swift b/Sources/Services/Health/HealthService.swift index f3aec0ffb..b4da79fbf 100644 --- a/Sources/Services/Health/HealthService.swift +++ b/Sources/Services/Health/HealthService.swift @@ -17,7 +17,7 @@ import GRPCCore @available(macOS 15.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -internal final class HealthService: Grpc_Health_V1_HealthServiceProtocol { +internal struct HealthService: Grpc_Health_V1_HealthServiceProtocol { private let state = HealthService.State() func check( diff --git a/Tests/Services/HealthTests/HealthTests.swift b/Tests/Services/HealthTests/HealthTests.swift index d4130bdd5..8e8517b0c 100644 --- a/Tests/Services/HealthTests/HealthTests.swift +++ b/Tests/Services/HealthTests/HealthTests.swift @@ -67,7 +67,7 @@ final class HealthTests: XCTestCase { func testCheckOnUnknownService() async throws { try await withHealthClient { (healthClient, healthProvider) in let message = Grpc_Health_V1_HealthCheckRequest.with { - $0.service = "does.not/Exist" + $0.service = "does.not.Exist" } try await healthClient.check(request: ClientRequest.Single(message: message)) { response in @@ -81,10 +81,7 @@ final class HealthTests: XCTestCase { func testCheckOnServer() async throws { try await withHealthClient { (healthClient, healthProvider) in // An unspecified service refers to the server. - healthProvider.updateStatus( - .notServing, - forService: ServiceDescriptor(package: "", service: "") - ) + healthProvider.updateStatus(.notServing, forService: "") let message = Grpc_Health_V1_HealthCheckRequest() @@ -262,12 +259,7 @@ final class HealthTests: XCTestCase { let statusesToBeSent: [ServingStatus] = [.serving, .notServing, .serving] // An unspecified service refers to the server. - let serverDescriptor = ServiceDescriptor(package: "", service: "") - - healthProvider.updateStatus( - statusesToBeSent[0], - forService: serverDescriptor - ) + healthProvider.updateStatus(statusesToBeSent[0], forService: "") let message = Grpc_Health_V1_HealthCheckRequest() @@ -282,10 +274,7 @@ final class HealthTests: XCTestCase { XCTAssertEqual(message.status, expectedStatus) if i < statusesToBeSent.count - 1 { - healthProvider.updateStatus( - statusesToBeSent[i + 1], - forService: serverDescriptor - ) + healthProvider.updateStatus(statusesToBeSent[i + 1], forService: "") } } } From 5fa5cb15439e6b61fa7d7e85ae570f6267dd3087 Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Mon, 5 Aug 2024 15:43:50 +0100 Subject: [PATCH 09/10] Implement feedback --- Sources/Services/Health/HealthService.swift | 2 +- Sources/Services/Health/ServingStatus.swift | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/Services/Health/HealthService.swift b/Sources/Services/Health/HealthService.swift index b4da79fbf..b8b54e80c 100644 --- a/Sources/Services/Health/HealthService.swift +++ b/Sources/Services/Health/HealthService.swift @@ -37,7 +37,7 @@ internal struct HealthService: Grpc_Health_V1_HealthServiceProtocol { func watch( request: ServerRequest.Single - ) async throws -> ServerResponse.Stream { + ) async -> ServerResponse.Stream { let service = request.message.service let statuses = AsyncStream.makeStream(of: Grpc_Health_V1_HealthCheckResponse.ServingStatus.self) diff --git a/Sources/Services/Health/ServingStatus.swift b/Sources/Services/Health/ServingStatus.swift index 1128e6ad7..cc0fd5b15 100644 --- a/Sources/Services/Health/ServingStatus.swift +++ b/Sources/Services/Health/ServingStatus.swift @@ -19,7 +19,7 @@ /// - ``ServingStatus/serving`` indicates that a service is healthy. /// - ``ServingStatus/notServing`` indicates that a service is unhealthy. public struct ServingStatus: Sendable, Hashable { - package enum Value: Sendable, Hashable { + internal enum Value: Sendable, Hashable { case serving case notServing } @@ -30,7 +30,7 @@ public struct ServingStatus: Sendable, Hashable { /// A status indicating that a service unhealthy. public static let notServing = ServingStatus(.notServing) - package var value: Value + internal var value: Value private init(_ value: Value) { self.value = value From 236677b86abf347b8fc5ff1c0457756ced528db0 Mon Sep 17 00:00:00 2001 From: cnkwocha Date: Tue, 6 Aug 2024 09:30:43 +0100 Subject: [PATCH 10/10] Remove `.enableUpcomingFeature("InternalImportsByDefault")` from `grpcHealth` target --- Package@swift-6.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Package@swift-6.swift b/Package@swift-6.swift index ab718a95b..5cbb769bd 100644 --- a/Package@swift-6.swift +++ b/Package@swift-6.swift @@ -907,8 +907,7 @@ extension Target { path: "Sources/Services/Health", swiftSettings: [ ._swiftLanguageMode(.v6), - .enableUpcomingFeature("ExistentialAny"), - .enableUpcomingFeature("InternalImportsByDefault") + .enableUpcomingFeature("ExistentialAny") ] ) }