Skip to content

Commit 7ea32ae

Browse files
committed
More refactoring, and working timeout support on ManagedIORing
1 parent ef94a37 commit 7ea32ae

File tree

7 files changed

+208
-39
lines changed

7 files changed

+208
-39
lines changed

Sources/System/AsyncFileDescriptor.swift

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ public struct AsyncFileDescriptor: ~Copyable {
2121
let cstr = path.withCString {
2222
return $0 // bad
2323
}
24-
let res = await ring.submitAndWait(
25-
IORequest(
24+
let res = try await ring.submit(request: IORequest(
2625
opening: cstr,
2726
in: directory,
2827
into: fileSlot,
@@ -46,7 +45,7 @@ public struct AsyncFileDescriptor: ~Copyable {
4645

4746
@inlinable @inline(__always)
4847
public consuming func close(isolation actor: isolated (any Actor)? = #isolation) async throws {
49-
let res = await ring.submitAndWait(IORequest(closing: fileSlot))
48+
let res = try await ring.submit(request: IORequest(closing: fileSlot))
5049
if res.result < 0 {
5150
throw Errno(rawValue: -res.result)
5251
}
@@ -59,8 +58,7 @@ public struct AsyncFileDescriptor: ~Copyable {
5958
atAbsoluteOffset offset: UInt64 = UInt64.max,
6059
isolation actor: isolated (any Actor)? = #isolation
6160
) async throws -> UInt32 {
62-
let res = await ring.submitAndWait(
63-
IORequest(
61+
let res = try await ring.submit(request: IORequest(
6462
reading: fileSlot,
6563
into: buffer,
6664
at: offset
@@ -78,8 +76,7 @@ public struct AsyncFileDescriptor: ~Copyable {
7876
atAbsoluteOffset offset: UInt64 = UInt64.max,
7977
isolation actor: isolated (any Actor)? = #isolation
8078
) async throws -> UInt32 {
81-
let res = await ring.submitAndWait(
82-
IORequest(
79+
let res = try await ring.submit(request: IORequest(
8380
reading: fileSlot,
8481
into: buffer,
8582
at: offset

Sources/System/IORing.swift

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -138,54 +138,77 @@ internal func _blockingGetSubmissionEntry(ring: inout SQRing, submissionQueueEnt
138138
io_uring_sqe
139139
> {
140140
while true {
141-
if let entry = _getSubmissionEntry(ring: &ring, submissionQueueEntries: submissionQueueEntries) {
141+
if let entry = _getSubmissionEntry(
142+
ring: &ring,
143+
submissionQueueEntries: submissionQueueEntries
144+
) {
142145
return entry
143146
}
144147
// TODO: actually block here instead of spinning
145148
}
146149

147150
}
148151

149-
internal func _submitRequests(ring: inout SQRing, ringDescriptor: Int32) {
150-
let flushedEvents = _flushQueue(ring: &ring)
151-
152-
// Ring always needs enter right now;
152+
//TODO: omitting signal mask for now
153+
//Tell the kernel that we've submitted requests and/or are waiting for completions
154+
internal func _enter(
155+
ringDescriptor: Int32,
156+
numEvents: UInt32,
157+
minCompletions: UInt32,
158+
flags: UInt32
159+
) throws -> Int32 {
160+
// Ring always needs enter right now;
153161
// TODO: support SQPOLL here
154162
while true {
155-
let ret = io_uring_enter(ringDescriptor, flushedEvents, 0, 0, nil)
163+
let ret = io_uring_enter(ringDescriptor, numEvents, minCompletions, flags, nil)
156164
// error handling:
157165
// EAGAIN / EINTR (try again),
158166
// EBADF / EBADFD / EOPNOTSUPP / ENXIO
159167
// (failure in ring lifetime management, fatal),
160168
// EINVAL (bad constant flag?, fatal),
161169
// EFAULT (bad address for argument from library, fatal)
162170
if ret == -EAGAIN || ret == -EINTR {
171+
//TODO: should we wait a bit on AGAIN?
163172
continue
164173
} else if ret < 0 {
165174
fatalError(
166175
"fatal error in submitting requests: " + Errno(rawValue: -ret).debugDescription
167176
)
168177
} else {
169-
break
178+
return ret
170179
}
171180
}
172181
}
173182

183+
internal func _submitRequests(ring: inout SQRing, ringDescriptor: Int32) throws {
184+
let flushedEvents = _flushQueue(ring: &ring)
185+
_ = try _enter(ringDescriptor: ringDescriptor, numEvents: flushedEvents, minCompletions: 0, flags: 0)
186+
}
187+
188+
internal func _getUnconsumedSubmissionCount(ring: inout SQRing) -> UInt32 {
189+
return ring.userTail - ring.kernelHead.pointee.load(ordering: .acquiring)
190+
}
191+
192+
internal func _getUnconsumedCompletionCount(ring: inout CQRing) -> UInt32 {
193+
return ring.kernelTail.pointee.load(ordering: .acquiring) - ring.kernelHead.pointee.load(ordering: .acquiring)
194+
}
195+
196+
//TODO: pretty sure this is supposed to do more than it does
174197
internal func _flushQueue(ring: inout SQRing) -> UInt32 {
175198
ring.kernelTail.pointee.store(
176-
ring.userTail, ordering: .relaxed
199+
ring.userTail, ordering: .releasing
177200
)
178-
return ring.userTail - ring.kernelHead.pointee.load(ordering: .relaxed)
201+
return _getUnconsumedSubmissionCount(ring: &ring)
179202
}
180203

181204
@usableFromInline @inline(__always)
182205
internal func _getSubmissionEntry(ring: inout SQRing, submissionQueueEntries: UnsafeMutableBufferPointer<io_uring_sqe>) -> UnsafeMutablePointer<
183206
io_uring_sqe
184207
>? {
185-
let next = ring.userTail + 1
208+
let next = ring.userTail &+ 1 //this is expected to wrap
186209

187210
// FEAT: smp load when SQPOLL in use (not in MVP)
188-
let kernelHead = ring.kernelHead.pointee.load(ordering: .relaxed)
211+
let kernelHead = ring.kernelHead.pointee.load(ordering: .acquiring)
189212

190213
// FEAT: 128-bit event support (not in MVP)
191214
if next - kernelHead <= ring.array.count {
@@ -383,18 +406,45 @@ public struct IORing: @unchecked Sendable, ~Copyable {
383406

384407
func _tryConsumeCompletion(ring: inout CQRing) -> IOCompletion? {
385408
let tail = ring.kernelTail.pointee.load(ordering: .acquiring)
386-
let head = ring.kernelHead.pointee.load(ordering: .relaxed)
409+
let head = ring.kernelHead.pointee.load(ordering: .acquiring)
387410

388411
if tail != head {
389412
// 32 byte copy - oh well
390413
let res = ring.cqes[Int(head & ring.ringMask)]
391-
ring.kernelHead.pointee.store(head + 1, ordering: .relaxed)
414+
ring.kernelHead.pointee.store(head &+ 1, ordering: .releasing)
392415
return IOCompletion(rawValue: res)
393416
}
394417

395418
return nil
396419
}
397420

421+
internal func handleRegistrationResult(_ result: Int32) throws {
422+
//TODO: error handling
423+
}
424+
425+
public mutating func registerEventFD(ring: inout IORing, _ descriptor: FileDescriptor) throws {
426+
var rawfd = descriptor.rawValue
427+
let result = withUnsafePointer(to: &rawfd) { fdptr in
428+
return io_uring_register(
429+
ring.ringDescriptor,
430+
IORING_REGISTER_EVENTFD,
431+
UnsafeMutableRawPointer(mutating: fdptr),
432+
1
433+
)
434+
}
435+
try handleRegistrationResult(result)
436+
}
437+
438+
public mutating func unregisterEventFD(ring: inout IORing) throws {
439+
let result = io_uring_register(
440+
ring.ringDescriptor,
441+
IORING_UNREGISTER_EVENTFD,
442+
nil,
443+
0
444+
)
445+
try handleRegistrationResult(result)
446+
}
447+
398448
public mutating func registerFiles(count: UInt32) {
399449
guard self.registeredFiles == nil else { fatalError() }
400450
let fileBuf = UnsafeMutableBufferPointer<UInt32>.allocate(capacity: Int(count))
@@ -445,9 +495,9 @@ public struct IORing: @unchecked Sendable, ~Copyable {
445495
fatalError("failed to unregister buffers: TODO")
446496
}
447497

448-
public func submitRequests() {
449-
submissionMutex.withLock { ring in
450-
_submitRequests(ring: &ring, ringDescriptor: ringDescriptor)
498+
public func submitRequests() throws {
499+
try submissionMutex.withLock { ring in
500+
try _submitRequests(ring: &ring, ringDescriptor: ringDescriptor)
451501
}
452502
}
453503

Sources/System/IORingError.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1-
enum IORingError: Error {
1+
//TODO: make this not an enum
2+
public enum IORingError: Error, Equatable {
23
case missingRequiredFeatures
4+
case operationCanceled
5+
case unknown
36
}

Sources/System/ManagedIORing.swift

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
fileprivate func handleCompletionError(
2+
_ result: Int32,
3+
for continuation: UnsafeContinuation<IOCompletion, any Error>) {
4+
var error: IORingError = .unknown
5+
switch result {
6+
case -(_ECANCELED):
7+
error = .operationCanceled
8+
default:
9+
error = .unknown
10+
}
11+
continuation.resume(throwing: error)
12+
}
13+
114
final public class ManagedIORing: @unchecked Sendable {
215
var internalRing: IORing
316

@@ -11,25 +24,64 @@ final public class ManagedIORing: @unchecked Sendable {
1124
private func startWaiter() {
1225
Task.detached {
1326
while !Task.isCancelled {
27+
//TODO: should timeout handling be sunk into IORing?
1428
let cqe = self.internalRing.blockingConsumeCompletion()
1529

30+
if cqe.userData == 0 {
31+
continue
32+
}
1633
let cont = unsafeBitCast(
17-
cqe.userData, to: UnsafeContinuation<IOCompletion, Never>.self)
18-
cont.resume(returning: cqe)
34+
cqe.userData, to: UnsafeContinuation<IOCompletion, any Error>.self)
35+
36+
if cqe.result < 0 {
37+
var err = system_strerror(cqe.result * -1)
38+
let len = system_strlen(err!)
39+
err!.withMemoryRebound(to: UInt8.self, capacity: len) {
40+
let errStr = String(decoding: UnsafeBufferPointer(start: $0, count: len), as: UTF8.self)
41+
print("\(errStr)")
42+
}
43+
handleCompletionError(cqe.result, for: cont)
44+
} else {
45+
cont.resume(returning: cqe)
46+
}
1947
}
2048
}
2149
}
2250

23-
public func submitAndWait(_ request: __owned IORequest, isolation actor: isolated (any Actor)? = #isolation) async -> IOCompletion {
51+
public func submit(
52+
request: __owned IORequest,
53+
timeout: Duration? = nil,
54+
isolation actor: isolated (any Actor)? = #isolation
55+
) async throws -> IOCompletion {
2456
var consumeOnceWorkaround: IORequest? = request
25-
return await withUnsafeContinuation { cont in
26-
return internalRing.submissionMutex.withLock { ring in
27-
let request = consumeOnceWorkaround.take()!
28-
let entry = _blockingGetSubmissionEntry(
29-
ring: &ring, submissionQueueEntries: internalRing.submissionQueueEntries)
30-
entry.pointee = request.makeRawRequest().rawValue
31-
entry.pointee.user_data = unsafeBitCast(cont, to: UInt64.self)
32-
_submitRequests(ring: &ring, ringDescriptor: internalRing.ringDescriptor)
57+
return try await withUnsafeThrowingContinuation { cont in
58+
do {
59+
try internalRing.submissionMutex.withLock { ring in
60+
let request = consumeOnceWorkaround.take()!
61+
let entry = _blockingGetSubmissionEntry(
62+
ring: &ring, submissionQueueEntries: internalRing.submissionQueueEntries)
63+
entry.pointee = request.makeRawRequest().rawValue
64+
entry.pointee.user_data = unsafeBitCast(cont, to: UInt64.self)
65+
if let timeout {
66+
//TODO: if IORING_FEAT_MIN_TIMEOUT is supported we can do this more efficiently
67+
let timeoutEntry = _blockingGetSubmissionEntry(
68+
ring: &ring,
69+
submissionQueueEntries: internalRing.submissionQueueEntries
70+
)
71+
try RawIORequest.withTimeoutRequest(
72+
linkedTo: entry,
73+
in: timeoutEntry,
74+
duration: timeout,
75+
flags: .relativeTime
76+
) {
77+
try _submitRequests(ring: &ring, ringDescriptor: internalRing.ringDescriptor)
78+
}
79+
} else {
80+
try _submitRequests(ring: &ring, ringDescriptor: internalRing.ringDescriptor)
81+
}
82+
}
83+
} catch (let e) {
84+
cont.resume(throwing: e)
3385
}
3486
}
3587

Sources/System/RawIORequest.swift

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ extension RawIORequest {
2424
case sendMessage = 9
2525
case receiveMessage = 10
2626
// ...
27+
case link_timeout = 15
28+
// ...
2729
case openAt = 18
2830
case close = 19
2931
case filesUpdate = 20
@@ -103,7 +105,7 @@ extension RawIORequest {
103105
// poll32_events
104106
// sync_range_flags
105107
// msg_flags
106-
// timeout_flags
108+
case timeoutFlags(TimeOutFlags)
107109
// accept_flags
108110
// cancel_flags
109111
case openFlags(FileDescriptor.OpenOptions)
@@ -132,12 +134,48 @@ extension RawIORequest {
132134
// append to end of the file
133135
public static let append = ReadWriteFlags(rawValue: 1 << 4)
134136
}
137+
138+
public struct TimeOutFlags: OptionSet, Hashable, Codable {
139+
public var rawValue: UInt32
140+
141+
public init(rawValue: UInt32) {
142+
self.rawValue = rawValue
143+
}
144+
145+
public static let relativeTime: RawIORequest.TimeOutFlags = TimeOutFlags(rawValue: 0)
146+
public static let absoluteTime: RawIORequest.TimeOutFlags = TimeOutFlags(rawValue: 1 << 0)
147+
}
135148
}
136149

137150
extension RawIORequest {
138151
static func nop() -> RawIORequest {
139-
var req = RawIORequest()
152+
var req: RawIORequest = RawIORequest()
140153
req.operation = .nop
141154
return req
142155
}
156+
157+
//TODO: typed errors
158+
static func withTimeoutRequest<R>(
159+
linkedTo opEntry: UnsafeMutablePointer<io_uring_sqe>,
160+
in timeoutEntry: UnsafeMutablePointer<io_uring_sqe>,
161+
duration: Duration,
162+
flags: TimeOutFlags,
163+
work: () throws -> R) rethrows -> R {
164+
165+
opEntry.pointee.flags |= Flags.linkRequest.rawValue
166+
opEntry.pointee.off = 1
167+
var ts = __kernel_timespec(
168+
tv_sec: duration.components.seconds,
169+
tv_nsec: duration.components.attoseconds / 1_000_000_000
170+
)
171+
return try withUnsafePointer(to: &ts) { tsPtr in
172+
var req: RawIORequest = RawIORequest()
173+
req.operation = .link_timeout
174+
req.rawValue.timeout_flags = flags.rawValue
175+
req.rawValue.len = 1
176+
req.rawValue.addr = UInt64(UInt(bitPattern: tsPtr))
177+
timeoutEntry.pointee = req.rawValue
178+
return try work()
179+
}
180+
}
143181
}

Tests/SystemTests/IORingTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ final class IORingTests: XCTestCase {
1414
func testNop() throws {
1515
var ring = try IORing(queueDepth: 32)
1616
ring.writeRequest(IORequest())
17-
ring.submitRequests()
17+
try ring.submitRequests()
1818
let completion = ring.blockingConsumeCompletion()
1919
XCTAssertEqual(completion.result, 0)
2020
}

0 commit comments

Comments
 (0)