diff --git a/Sources/DistributedActors/ActorSystem.swift b/Sources/DistributedActors/ActorSystem.swift index 148bb4be4..59db00c82 100644 --- a/Sources/DistributedActors/ActorSystem.swift +++ b/Sources/DistributedActors/ActorSystem.swift @@ -548,6 +548,8 @@ extension ActorSystem: ActorRefFactory { dispatcher = self.dispatcher case .callingThread: dispatcher = CallingThreadDispatcher() + case .dispatchQueue(let queue): + dispatcher = DispatchQueueDispatcher(queue: queue) case .nio(let group): dispatcher = NIOEventLoopGroupDispatcher(group) default: diff --git a/Sources/DistributedActors/Dispatchers.swift b/Sources/DistributedActors/Dispatchers.swift index 634172da3..306d771dd 100644 --- a/Sources/DistributedActors/Dispatchers.swift +++ b/Sources/DistributedActors/Dispatchers.swift @@ -15,6 +15,8 @@ import Dispatch import NIO +// TODO: Consider renaming to "ActorScheduler" perhaps? + /// An `Executor` is a low building block that is able to take blocks and schedule them for running public protocol MessageDispatcher { // TODO: we should make it dedicated to dispatch() rather than raw executing perhaps? This way it can take care of fairness things @@ -65,6 +67,7 @@ internal struct CallingThreadDispatcher: MessageDispatcher { } } +// ==== ---------------------------------------------------------------------------------------------------------------- // MARK: NIO Dispatcher only for internal use internal struct NIOEventLoopGroupDispatcher: MessageDispatcher { @@ -88,3 +91,24 @@ extension NIOEventLoopGroupDispatcher: InternalMessageDispatcher { self.group.shutdownGracefully(queue: DispatchQueue.global()) { _ in () } } } + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: DispatchQueue Dispatcher + +internal struct DispatchQueueDispatcher: MessageDispatcher { + let queue: DispatchQueue + + init(queue: DispatchQueue) { + self.queue = queue + } + + public var name: String { + "dispatchQueue:\(self.queue)" + } + + func execute(_ f: @escaping () -> Void) { + self.queue.async { + f() + } + } +} diff --git a/Sources/DistributedActors/Props.swift b/Sources/DistributedActors/Props.swift index 7872ca9e2..999875bee 100644 --- a/Sources/DistributedActors/Props.swift +++ b/Sources/DistributedActors/Props.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import Dispatch import NIO // ==== ---------------------------------------------------------------------------------------------------------------- @@ -83,6 +84,8 @@ public enum DispatcherProps { // I'd rather implement such style, as it actually is build "for" actors, and not accidentally running them well... // case OurOwnFancyActorSpecificDispatcher + case dispatchQueue(DispatchQueue) + /// WARNING: Use with Caution! /// /// This dispatcher will keep a real dedicated Thread for this actor. This is very rarely something you want, @@ -110,6 +113,7 @@ public enum DispatcherProps { case .default: return "default" case .pinnedThread: return "pinnedThread" case .nio: return "nioEventLoopGroup" + case .dispatchQueue: return "dispatchQueue" case .callingThread: return "callingThread" } } diff --git a/Tests/DistributedActorsTests/DispatcherTests.swift b/Tests/DistributedActorsTests/DispatcherTests.swift index 7d8abc2d3..3a90a76e4 100644 --- a/Tests/DistributedActorsTests/DispatcherTests.swift +++ b/Tests/DistributedActorsTests/DispatcherTests.swift @@ -60,4 +60,44 @@ final class DispatcherTests: ActorSystemXCTestCase { let dispatcher: String = try p.expectMessage() dispatcher.dropFirst("Dispatcher: ".count).shouldStartWith(prefix: "nio:") } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Grand Central Dispatch + + func test_runOn_dispatchQueue() throws { + let p = self.testKit.spawnTestProbe(expecting: String.self) + let behavior: Behavior = .receive { context, message in + context.log.info("HELLO") + p.tell("\(message)") + p.tell("\((context as! ActorShell)._dispatcher.name)") + return .same + } + + let global: DispatchQueue = .global() + let w = try system.spawn(.anonymous, props: .dispatcher(.dispatchQueue(global)), behavior) + w.tell("Hello") + w.tell("World") + + func expectWasOnDispatchQueue(p: ActorTestProbe) throws { + #if os(Linux) + try p.expectMessage().shouldContain("Dispatch.DispatchQueue") + #else + try p.expectMessage().shouldContain("OS_dispatch_queue_global:") + #endif + } + + try p.expectMessage("Hello") + try expectWasOnDispatchQueue(p: p) + + try p.expectMessage("World") + try expectWasOnDispatchQueue(p: p) + + for i in 1 ... 100 { + w.tell("\(i)") + } + for i in 1 ... 100 { + try p.expectMessage("\(i)") + try expectWasOnDispatchQueue(p: p) + } + } }