Skip to content

Commit

Permalink
Use NIOThreadPool in NIOFileSystem (#2692)
Browse files Browse the repository at this point in the history
Motivation:

There are two thread pools: one in NIOFileSystem and one in NIOPosix; we
should avoid the duplication.

Modifications:

- Switch to using NIOThreadPool
- Remove the old thread pool and associated code
- Add an init to FileSystem which takes a thread pool

Result:

Less duplication
  • Loading branch information
glbrntt committed Mar 26, 2024
1 parent 6b29fc3 commit 2d840c5
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 1,055 deletions.
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ let package = Package(
name: "_NIOFileSystem",
dependencies: [
"NIOCore",
"NIOPosix",
"CNIOLinux",
"CNIODarwin",
swiftAtomics,
Expand Down Expand Up @@ -488,6 +489,7 @@ let package = Package(
name: "NIOFileSystemIntegrationTests",
dependencies: [
"NIOCore",
"NIOPosix",
"_NIOFileSystem",
"NIOFoundationCompat",
],
Expand Down
62 changes: 32 additions & 30 deletions Sources/NIOFileSystem/DirectoryEntries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import CNIODarwin
import CNIOLinux
import NIOConcurrencyHelpers
import NIOPosix
@preconcurrency import SystemPackage

/// An `AsyncSequence` of entries in a directory.
Expand Down Expand Up @@ -151,16 +152,14 @@ extension BufferedStream where Element == [DirectoryEntry] {
)

source.onTermination = {
guard let executor = protectedState.withLockedValue({ $0.executorForClosing() }) else {
guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

executor.execute {
threadPool.submit { _ in // always run, even if cancelled
protectedState.withLockedValue { state in
state.closeIfNecessary()
}
} onCompletion: { _ in
// Ignore the result.
}
}

Expand Down Expand Up @@ -189,16 +188,21 @@ private struct DirectoryEntryProducer {
/// source it will either produce more or be scheduled to produce more. Stopping production
/// is signalled via the stream's 'onTermination' handler.
func produceMore() {
let executor = self.state.withLockedValue { state in
let threadPool = self.state.withLockedValue { state in
state.produceMore()
}

// No executor means we're done.
guard let executor = executor else { return }
// No thread pool means we're done.
guard let threadPool = threadPool else { return }

executor.execute {
try self.nextBatch()
} onCompletion: { result in
threadPool.submit {
let result: Result<[DirectoryEntry], Error>
switch $0 {
case .active:
result = Result { try self.nextBatch() }
case .cancelled:
result = .failure(CancellationError())
}
self.onNextBatchResult(result)
}
}
Expand Down Expand Up @@ -261,16 +265,14 @@ private struct DirectoryEntryProducer {
}

private func close() {
guard let executor = self.state.withLockedValue({ $0.executorForClosing() }) else {
guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

executor.execute {
threadPool.submit { _ in // always run, even if cancelled
self.state.withLockedValue { state in
state.closeIfNecessary()
}
} onCompletion: { _ in
// Ignore.
}
}
}
Expand All @@ -283,7 +285,7 @@ private struct DirectoryEnumerator: Sendable {
private enum State: @unchecked Sendable {
case modifying
case idle(SystemFileHandle.SendableView, recursive: Bool)
case open(IOExecutor, Source, [DirectoryEntry])
case open(NIOThreadPool, Source, [DirectoryEntry])
case done
}

Expand Down Expand Up @@ -351,23 +353,23 @@ private struct DirectoryEnumerator: Sendable {
self.path = handle.path
}

internal func produceMore() -> IOExecutor? {
internal func produceMore() -> NIOThreadPool? {
switch self.state {
case let .idle(handle, _):
return handle.executor
case let .open(executor, _, _):
return executor
return handle.threadPool
case let .open(threadPool, _, _):
return threadPool
case .done:
return nil
case .modifying:
fatalError()
}
}

internal func executorForClosing() -> IOExecutor? {
internal func threadPoolForClosing() -> NIOThreadPool? {
switch self.state {
case let .open(executor, _, _):
return executor
case let .open(threadPool, _, _):
return threadPool
case .idle, .done:
// Don't need to close in the idle state: we don't own the handle.
return nil
Expand Down Expand Up @@ -449,7 +451,7 @@ private struct DirectoryEnumerator: Sendable {
}

private mutating func processOpenState(
executor: IOExecutor,
threadPool: NIOThreadPool,
dir: CInterop.DirPointer,
entries: inout [DirectoryEntry],
count: Int
Expand Down Expand Up @@ -499,11 +501,11 @@ private struct DirectoryEnumerator: Sendable {
}

// We must have hit our 'count' limit.
return (.open(executor, .readdir(dir), entries), .yield(.success(entries)))
return (.open(threadPool, .readdir(dir), entries), .yield(.success(entries)))
}

private mutating func processOpenState(
executor: IOExecutor,
threadPool: NIOThreadPool,
fts: CInterop.FTSPointer,
entries: inout [DirectoryEntry],
count: Int
Expand Down Expand Up @@ -580,7 +582,7 @@ private struct DirectoryEnumerator: Sendable {
}

// We must have hit our 'count' limit.
return (.open(executor, .fts(fts), entries), .yield(.success(entries)))
return (.open(threadPool, .fts(fts), entries), .yield(.success(entries)))
}

private mutating func process(_ count: Int) -> ProcessResult {
Expand All @@ -596,21 +598,21 @@ private struct DirectoryEnumerator: Sendable {

switch result {
case let .success(source):
self.state = .open(handle.executor, source, [])
self.state = .open(handle.threadPool, source, [])
return .continue

case let .failure(error):
self.state = .done
return .yield(.failure(error))
}

case .open(let executor, let mode, var entries):
case .open(let threadPool, let mode, var entries):
self.state = .modifying

switch mode {
case .readdir(let dir):
let (state, result) = self.processOpenState(
executor: executor,
threadPool: threadPool,
dir: dir,
entries: &entries,
count: count
Expand All @@ -620,7 +622,7 @@ private struct DirectoryEnumerator: Sendable {

case .fts(let fts):
let (state, result) = self.processOpenState(
executor: executor,
threadPool: threadPool,
fts: fts,
entries: &entries,
count: count
Expand Down
22 changes: 14 additions & 8 deletions Sources/NIOFileSystem/FileChunks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) || os(Linux) || os(Android)
import NIOConcurrencyHelpers
import NIOCore
import NIOPosix
@preconcurrency import SystemPackage

/// An `AsyncSequence` of ordered chunks read from a file.
Expand Down Expand Up @@ -140,16 +141,21 @@ private struct FileChunkProducer: Sendable {
/// source it will either produce more or be scheduled to produce more. Stopping production
/// is signalled via the stream's 'onTermination' handler.
func produceMore() {
let executor = self.state.withLockedValue { state in
let threadPool = self.state.withLockedValue { state in
state.shouldProduceMore()
}

// No executor means we're done.
guard let executor = executor else { return }

executor.execute {
try self.readNextChunk()
} onCompletion: { result in
guard let threadPool = threadPool else { return }

threadPool.submit {
let result: Result<ByteBuffer, Error>
switch $0 {
case .active:
result = Result { try self.readNextChunk() }
case .cancelled:
result = .failure(CancellationError())
}
self.onReadNextChunkResult(result)
}
}
Expand Down Expand Up @@ -273,10 +279,10 @@ private struct ProducerState: Sendable {
}
}

mutating func shouldProduceMore() -> IOExecutor? {
mutating func shouldProduceMore() -> NIOThreadPool? {
switch self.state {
case let .producing(state):
return state.handle.executor
return state.handle.threadPool
case .done:
return nil
}
Expand Down

0 comments on commit 2d840c5

Please sign in to comment.