Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a repeated scheduling mechanism #488

Merged
merged 1 commit into from Jul 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 111 additions & 0 deletions Sources/NIO/EventLoop.swift
Expand Up @@ -50,6 +50,83 @@ public struct Scheduled<T> {
}
}

/// Returned once a task was scheduled to be repeatedly executed on the `EventLoop`.
///
/// A `RepeatedTask` allows the user to `cancel()` the repeated scheduling of further tasks.
public final class RepeatedTask {
private let delay: TimeAmount
private let eventLoop: EventLoop
private var scheduled: Scheduled<EventLoopFuture<Void>>?
private var task: (() -> EventLoopFuture<Void>)?

internal init(interval: TimeAmount, eventLoop: EventLoop, task: @escaping () -> EventLoopFuture<Void>) {
self.delay = interval
self.eventLoop = eventLoop
self.task = task
}

internal func begin(in delay: TimeAmount) {
if self.eventLoop.inEventLoop {
self.begin0(in: delay)
} else {
self.eventLoop.execute {
self.begin0(in: delay)
}
}
}

private func begin0(in delay: TimeAmount) {
assert(self.eventLoop.inEventLoop)
guard let task = self.task else {
return
}
self.scheduled = eventLoop.scheduleTask(in: delay, task)
self.reschedule()
}

/// Try to cancel the execution of the repeated task.
///
/// Whether the execution of the task is immediately canceled depends on whether the execution of a task has already begun.
/// This means immediate cancellation is not guaranteed.
public func cancel() {
if self.eventLoop.inEventLoop {
self.cancel0()
} else {
self.eventLoop.execute {
self.cancel0()
}
}
}

private func cancel0() {
assert(self.eventLoop.inEventLoop)
self.scheduled?.cancel()
self.scheduled = nil
self.task = nil
}

private func reschedule() {
assert(self.eventLoop.inEventLoop)
guard let scheduled = self.scheduled else {
return
}

scheduled.futureResult.whenSuccess { future in
future.whenComplete {
self.reschedule0()
}
}
}

private func reschedule0() {
assert(self.eventLoop.inEventLoop)
guard let task = self.task else {
return
}
self.scheduled = self.eventLoop.scheduleTask(in: self.delay, task)
self.reschedule()
}
}
/// An EventLoop processes IO / tasks in an endless loop for `Channel`s until it's closed.
///
/// Usually multiple `Channel`s share the same `EventLoop` for processing IO / tasks and so share the same processing `Thread`.
Expand Down Expand Up @@ -222,6 +299,40 @@ extension EventLoop {
public func close() throws {
// Do nothing
}

/// Schedule a repeated task to be executed by the `EventLoop` with a fixed delay between the end and start of each task.
///
/// - parameters:
/// - initialDelay: The delay after which the first task is executed.
/// - delay: The delay between the end of one task and the start of the next.
/// - task: The closure that will be executed.
/// - return: `RepeatedTask`
@discardableResult
public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping () throws -> Void) -> RepeatedTask {
let futureTask: () -> EventLoopFuture<Void> = {
do {
try task()
return self.newSucceededFuture(result: ())
} catch {
return self.newFailedFuture(error: error)
}
}
return self.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, futureTask)
}

/// Schedule a repeated task to be executed by the `EventLoop` with a fixed delay between the end and start of each task.
///
/// - parameters:
/// - initialDelay: The delay after which the first task is executed.
/// - delay: The delay between the end of one task and the start of the next.
/// - task: The closure that will be executed.
/// - return: `RepeatedTask`
@discardableResult
public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping () -> EventLoopFuture<Void>) -> RepeatedTask {
let repeated = RepeatedTask(interval: delay, eventLoop: self, task: task)
repeated.begin(in: initialDelay)
return repeated
}
}

/// Internal representation of a `Registration` to an `Selector`.
Expand Down
4 changes: 4 additions & 0 deletions Tests/NIOTests/EventLoopTest+XCTest.swift
Expand Up @@ -29,6 +29,10 @@ extension EventLoopTest {
("testSchedule", testSchedule),
("testScheduleWithDelay", testScheduleWithDelay),
("testScheduleCancelled", testScheduleCancelled),
("testScheduleRepeatedTask", testScheduleRepeatedTask),
("testScheduleRepeatedTaskCancelFromDifferentThread", testScheduleRepeatedTaskCancelFromDifferentThread),
("testScheduleRepeatedTaskToNotRetainRepeatedTask", testScheduleRepeatedTaskToNotRetainRepeatedTask),
("testScheduleRepeatedTaskToNotRetainEventLoop", testScheduleRepeatedTaskToNotRetainEventLoop),
("testMultipleShutdown", testMultipleShutdown),
("testShuttingDownFailsRegistration", testShuttingDownFailsRegistration),
("testEventLoopThreads", testEventLoopThreads),
Expand Down
82 changes: 82 additions & 0 deletions Tests/NIOTests/EventLoopTest.swift
Expand Up @@ -91,6 +91,88 @@ public class EventLoopTest : XCTestCase {
XCTAssertFalse(ran.load())
}

public func testScheduleRepeatedTask() throws {
let nanos = DispatchTime.now().uptimeNanoseconds
let initialDelay: TimeAmount = .milliseconds(5)
let delay: TimeAmount = .milliseconds(10)
let count = 5
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}

let expect = expectation(description: "Is cancelling RepatedTask")
let counter = Atomic<Int>(value: 0)
var repeatedTask: RepeatedTask?
repeatedTask = eventLoopGroup.next().scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { () -> Void in
if counter.load() == 0 {
XCTAssertTrue(DispatchTime.now().uptimeNanoseconds - nanos >= initialDelay.nanoseconds)
} else if counter.load() == count {
expect.fulfill()
repeatedTask?.cancel()
}
counter.store(counter.load() + 1)
}

waitForExpectations(timeout: 1) { _ in
XCTAssertEqual(counter.load(), count + 1)
XCTAssertTrue(DispatchTime.now().uptimeNanoseconds - nanos >= initialDelay.nanoseconds + count * delay.nanoseconds)
}
}

public func testScheduleRepeatedTaskCancelFromDifferentThread() throws {
let nanos = DispatchTime.now().uptimeNanoseconds
let initialDelay: TimeAmount = .milliseconds(5)
let delay: TimeAmount = .milliseconds(10)
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}

let expect = expectation(description: "Is cancelling RepatedTask")
var repeatedTask: RepeatedTask?
repeatedTask = eventLoopGroup.next().scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { () -> Void in
DispatchQueue(label: "nio.repeatedtask.test").async {
repeatedTask?.cancel()
expect.fulfill()
}
}

waitForExpectations(timeout: 1) { _ in
XCTAssertTrue(DispatchTime.now().uptimeNanoseconds - nanos >= initialDelay.nanoseconds)
}
}

public func testScheduleRepeatedTaskToNotRetainRepeatedTask() throws {
let initialDelay: TimeAmount = .milliseconds(5)
let delay: TimeAmount = .milliseconds(10)
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)

var repeated: RepeatedTask?
weak var weakRepeated: RepeatedTask?
repeated = eventLoopGroup.next().scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { () -> Void in }
weakRepeated = repeated
XCTAssertNotNil(weakRepeated)
repeated?.cancel()
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
repeated = nil
XCTAssertNil(weakRepeated)
}

public func testScheduleRepeatedTaskToNotRetainEventLoop() throws {
let initialDelay: TimeAmount = .milliseconds(5)
let delay: TimeAmount = .milliseconds(10)
var eventLoopGroup: EventLoopGroup? = MultiThreadedEventLoopGroup(numberOfThreads: 1)
weak var weakEventLoop = eventLoopGroup?.next()

eventLoopGroup?.next().scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { () -> Void in }
XCTAssertNoThrow(try eventLoopGroup?.syncShutdownGracefully())
XCTAssertNotNil(weakEventLoop)
eventLoopGroup = nil
Thread.sleep(forTimeInterval: 0.01)
XCTAssertNil(weakEventLoop)
}

public func testMultipleShutdown() throws {
// This test catches a regression that causes it to intermittently fail: it reveals bugs in synchronous shutdown.
// Do not ignore intermittent failures in this test!
Expand Down