Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 0 additions & 63 deletions Sources/NTPClient/Deadline.swift

This file was deleted.

3 changes: 1 addition & 2 deletions Sources/NTPClient/NTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public struct NTPClient: Sendable {
/// - Parameter timeout: A duration after which the operation will timeout.
/// - Returns: response from the NTP server with some NTP specific calculations.
public func query(timeout: Duration) async throws -> NTPResponse {
let deadlineInstant: ContinuousClock.Instant = ContinuousClock.Instant.now + timeout
return try await withDeadline(deadlineInstant, clock: ContinuousClock()) {
try await withTimeout(in: timeout, clock: .continuous) {
let bootstrap = DatagramBootstrap(
group: MultiThreadedEventLoopGroup.singleton
).channelInitializer { channel in
Expand Down
98 changes: 98 additions & 0 deletions Sources/NTPClient/Timeout.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

private enum TaskResult<T: Sendable>: Sendable {
case success(T)
case error(any Error)
case timedOut
case cancelled
}

package struct TimeOutError: Error, CustomStringConvertible, CustomDebugStringConvertible {
var underlying: any Error

package var description: String {
"TimeOutError(\(String(describing: underlying))"
}

package var debugDescription: String {
description
}
}

@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *)
package func withTimeout<T: Sendable, Clock: _Concurrency.Clock>(
in timeout: Clock.Duration,
clock: Clock,
isolation: isolated (any Actor)? = #isolation,
body: sending @escaping @isolated(any) () async throws -> T
) async throws -> T {
// This is needed so we can make body sending since we don't have call-once closures yet
let body = { body }
let result: Result<T, any Error> = await withTaskGroup(of: TaskResult<T>.self) { group in
let body = body()
group.addTask {
do {
return .success(try await body())
} catch {
return .error(error)
}
}
group.addTask {
do {
try await clock.sleep(for: timeout, tolerance: .zero)
return .timedOut
} catch {
return .cancelled
}
}

switch await group.next() {
case .success(let result):
// Work returned a result. Cancel the timer task and return
group.cancelAll()
return .success(result)
case .error(let error):
// Work threw. Cancel the timer task and rethrow
group.cancelAll()
return .failure(error)
case .timedOut:
// Timed out, cancel the work task.
group.cancelAll()

switch await group.next() {
case .success(let result):
return .success(result)
case .error(let error):
return .failure(TimeOutError(underlying: error))
case .timedOut, .cancelled, .none:
// We already got a result from the sleeping task so we can't get another one or none.
fatalError("Unexpected task result")
}
case .cancelled:
switch await group.next() {
case .success(let result):
return .success(result)
case .error(let error):
return .failure(TimeOutError(underlying: error))
case .timedOut, .cancelled, .none:
// We already got a result from the sleeping task so we can't get another one or none.
fatalError("Unexpected task result")
}
case .none:
fatalError("Unexpected task result")
}
}
return try result.get()
}
2 changes: 1 addition & 1 deletion Tests/NTPClientTests/NTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import Testing
)
func testNTPQueryTimeout(d: Duration) async {
let ntp = NTPClient(config: .init(), server: "169.254.0.1")
await #expect(throws: DeadlineFailure.self, "deadline should be thrown in \(d) seconds") {
await #expect(throws: TimeOutError.self, "notEnoughBytes") {
let _ = try await ntp.query(timeout: d)
}
}
80 changes: 80 additions & 0 deletions Tests/NTPClientTests/TimeoutTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NTPClient
import Testing

@Suite
struct TimeoutTests {
@Test
func workCompletes() async throws {
let expectedValue = "success"

let result = try await withTimeout(in: .seconds(1), clock: .continuous) {
expectedValue
}

#expect(result == expectedValue)
}

@Test
func workTimesOut() async throws {

let result = await withThrowingTaskGroup(of: Void.self) { group in
// Task to run the test
group.addTask {
_ = try await withTimeout(in: .seconds(1), clock: .continuous) {
// Task that will take longer than the timeout
try await Task.sleep(for: .seconds(10), clock: .continuous)
Issue.record("Should not be reached")
}
}

return await group.nextResult()
}
#expect(throws: TimeOutError.self) {
try result?.get()
}
}

@Test
func workThrowsError() async throws {
struct TestError: Error {
var message: String
}
await #expect(throws: TestError.self) {
_ = try await withTimeout(in: .seconds(1), clock: .continuous) {
throw TestError(message: "hi")
}
}
}

@Test
func overallCancelled() async throws {
// Run a task that will not finish for a long time
let workTask = Task {
try await withTimeout(in: .seconds(100), clock: .continuous) {
try await Task.sleep(for: .seconds(10_000))
}
}
// Cancel it
workTask.cancel()

// It should throw an error
await #expect(throws: (any Error).self) {
try await workTask.value
}
}

}