Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a repeated scheduling mechanism #488

Merged
merged 1 commit into from Jul 10, 2018
Merged

Implement a repeated scheduling mechanism #488

merged 1 commit into from Jul 10, 2018

Conversation

FranzBusch
Copy link
Contributor

@FranzBusch FranzBusch commented Jun 19, 2018

Create a way to schedule repeated tasks

Motivation:
I think it would be nice to have a method to repeatedly schedule a task, this implementation just provides the basics to do so, but is definitely not the final solution. In my opinion, we should provide a method to schedule a task with at least these parameters:

  • delay
  • interval
  • numberOfTimes

The other part I was thinking about is that with my current implementation the scheduling is done by an EventLoop but you could as well provide a method on RepeatedTask like func begin(on: EventLoop) which would allow it to be scheduled on multiple EventLoops at a time, which I don't know if it's thread safe?

I just wanted to get early input if my approach is going in the right direction and if you have any additional requirements for such an implementation.

@swift-nio-bot
Copy link

Can one of the admins verify this patch?

1 similar comment
@swift-nio-bot
Copy link

Can one of the admins verify this patch?

@Lukasa Lukasa requested review from Lukasa, weissi and normanmaurer and removed request for Lukasa and weissi June 19, 2018 10:00
Copy link
Contributor

@Lukasa Lukasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some concerns about how this interacts with event loop teardown: in particular, I don't think our SelectableEventLoops stop accepting work when they're torn down. @weissi do you know?

}
private let task: () throws -> Void

public init(interval: TimeAmount, task: @escaping () throws -> Void) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this init to be public.

reschedule()
}
}
private let task: () throws -> Void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this should return an EventLoopFuture<Void> that causes the reschedule to occur only after the future completes. That can help prevent problems in overload states.

We can still support the non-future returning task by creating a separate init for it and synthesising the returned Future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this as well and I think there should be 2 different options one where a task is scheduled in an interval and one where there is a fixed delay after each task is completed. I think both cases could make sense depending on what the user actually wants to achieve


scheduled?.futureResult.whenFailure { error in
guard case EventLoopError.cancelled = error else {
self.scheduled = self.scheduled?.futureResult.eventLoop.scheduleTask(in: self.interval, self.task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to see the main body of a method inside the else clause of a guard: mind refactoring this to use if instead?

@weissi
Copy link
Member

weissi commented Jun 19, 2018

@Lukasa yes, they do continue accepting work (that they'll never run). The issue is that outside of the event loop thread you can't query the lifecycle state. We could and should probably fix this though. We could have another variable (like scheduledTasks) that mirrors (parts of) the lifecycleState but is readable from any thread when holding tasksLock.

@Lukasa
Copy link
Contributor

Lukasa commented Jun 19, 2018

So the main problem with them accepting work is that it means that they'll get stuck in a reference cycle and never be freed. Any scheduled task holds a future that fires on that event loop when the task is completed. This future keeps the loop alive. This is already pretty bad, but a repeated task essentially guarantees an event loop leak because it repeatedly reschedules work, ensuring there is always an event loop reference alive.

@FranzBusch
Copy link
Contributor Author

So this is blocked until we adopt the EventLoop to not be retained by scheduledTasks right?

@helje5
Copy link
Contributor

helje5 commented Jun 19, 2018

Maybe I don't get the issue, but if I schedule a repeating timer, I most definitely would want the event loop to stay alive and not go away (and presumably my timer with it). No?

@FranzBusch
Copy link
Contributor Author

I think it is quite reasonable to only have the repeating task scheduled as long as the event loop is actually running since the task should not be responsible for the lifetime of the eventloop right?

@Lukasa
Copy link
Contributor

Lukasa commented Jun 19, 2018

Maybe I don't get the issue, but if I schedule a repeating timer, I most definitely would want the event loop to stay alive and not go away (and presumably my timer with it). No?

The problem is not that the event loop will go away, it's that the event loop won't tell the repeating task that it will never execute again. That is, if you do this:

loop.scheduleRepeatedTask(interval: .seconds(10)) { print("Still alive") }
sleep(1000)
loop.syncShutdownGracefully()

Then after 1000 seconds the repeating task will stop executing (as the loop is closed), but the repeating task will never be told. Worse, the loop will hold a reference to the scheduled task item, which holds a reference to the loop, causing the loop to be leaked.

This is not the end of the world in production applications because generally speaking they do not stop and start loops regularly, but it's very annoying in tests, as any use of repeating tasks will cause each test to leak the loops unless the user very aggressively cleans up their repeating tasks.

Put another way, in an ideal world the task would be told when the event loop has gone away, and any attempt to schedule new work would fail. Neither of those things happens in NIO today: when we shut the loop down we leave any pending tasks pending, and those pending tasks almost always keep the event loop alive.

I don't know that this blocks the proposal, as the problem already exists today with scheduled tasks, but repeating tasks take this possibility and make it an inevitability, which is not necessarily an improvement.

@FranzBusch
Copy link
Contributor Author

So would a fix be as simple as removing all scheduled task while shutting down?

@Lukasa
Copy link
Contributor

Lukasa commented Jun 21, 2018

@FranzBusch That's not quite sufficient, because those shutdown tasks are likely to be necessary for some parts of cleanup. We probably need to design our way out of this, likely by using more than one task queue.

@FranzBusch
Copy link
Contributor Author

FranzBusch commented Jun 28, 2018

I just updated the PR it now includes methods to schedule a task with a fixedRate or fixedDelay. Furthermore, I removed the retain cycle created by the repeated task. Some Documentation and tests are still left to do.
@weissi and @Lukasa can you have a quick look if the implementation seems good so far?

@weissi
Copy link
Member

weissi commented Jun 29, 2018

@Lukasa & @FranzBusch do we actually need fixedRate? If we have an overloaded event loop and this keeps firing it will lead to the EL becomig more overloaded.
Also I’m sure sure I like the names (fixedRate vs fixedDelay) too much. I think we could use long and descriptive names. But ideally we would only have what we call fixedDelay today.

@FranzBusch
Copy link
Contributor Author

@weissi For my personal project, I only used fixedRate as it was used for a polling mechanism and should fire every second even though the last invocation may still be processing. I get your point that we may overload the EL if the triggered task is being queued again and again, but I see use cases where fixed rate makes sense.
Regarding the naming, I had a look at the netty implementation and went according to that, though I am open for better suggestions :)

@weissi
Copy link
Member

weissi commented Jun 29, 2018

@FranzBusch makes sense. If you really want that, why don't you just use eventloop.execute { myActualWork }; return eventloop.newSucceededFuture(value: ()) or similar to perform the other stuff? Then it will still fire every second even those myActualWork might still be going on?

@FranzBusch
Copy link
Contributor Author

FranzBusch commented Jun 29, 2018

@weissi Maybe, I don't get your example correctly but how would a call to execute repeatedly schedule a task and how would I cancel such a task sometime in the future?

@weissi
Copy link
Member

weissi commented Jun 29, 2018

@FranzBusch Sorry, I wasn't quite so clear: My assumption is that .fixedDelay and .fixedRate are the same if the work item that is scheduled doesn't take any time to execute (ie. succeeds the promise immediately). If that assumption is correct, you can just use .fixedDelay and make the work item take no time but enqueueing the real work item using el.execute. Does that make sense?

Isn't this just building .fixedRate from .fixedDelay?

extension EventLoop {
    func myFixedRate(initial: TimeAmount, delay: TimeAmount, workItem actualWorkItem: () -> Void) {
        eventLoopGroup.next().scheduleTaskWithFixedDelay(initialDelay: .seconds(2), delay: .seconds(2)) {
            self.execute {
                actualWorkItem()
            }
            self.newSucceededFuture(value: ())
        }
    }
}

so if a user really wants fixed rate (which is IMHO unadvisable) they can easily build it, no?

/// - task: The closure that will be executed.
/// - return: `RepeatedTask`
@discardableResult
public func scheduleTaskWithFixedDelay(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping () throws -> EventLoopFuture<Void>) -> RepeatedTask {
Copy link
Member

@weissi weissi Jun 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • we need a test for this
  • task shouldn't be throws (as it returns an EventLoopFuture)

/// - task: The closure that will be executed.
/// - return: `RepeatedTask`
@discardableResult
public func scheduleTaskWithFixedDelay(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping () throws -> Void) -> RepeatedTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a test for this

/// - task: The closure that will be executed.
/// - return: `RepeatedTask`
@discardableResult
public func scheduleTaskAtFixedRate(initialDelay: TimeAmount, period: TimeAmount, _ task: @escaping () throws -> EventLoopFuture<Void>) -> RepeatedTask {
Copy link
Member

@weissi weissi Jun 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • we need a test for this
  • task shouldn't be throws (as it's returning an EventLoopFuture)

@discardableResult
public func scheduleTaskAtFixedRate(initialDelay: TimeAmount, period: TimeAmount, _ task: @escaping () throws -> Void) -> RepeatedTask {
let futureTask: () throws -> EventLoopFuture<Void> = {
try task()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this should catch the error and make it a failed future

@FranzBusch
Copy link
Contributor Author

@weissi Thanks, for clearing that up! This makes sense for work which succeeds the promise immediately but in both of the implemented methods that cannot be assumed. Furthermore, execute is just an immediate scheduling mechanism and my intention with the implementation was to provide a convenient way to schedule any work repeating work with a delay and a rate/delay mode so that users don't have to implement that behaviour all on their own. Of course, you guys still decide if one of these methods actually endangers overloading the EL and thus should not be provided.
Or was your intention to actually change the implementation side of the repeated scheduling?

@FranzBusch
Copy link
Contributor Author

Just saw your Code example and I get what you mean. Yes your solution should be equal to mine :)

Copy link
Member

@weissi weissi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks very much @FranzBusch . This looks really good and we're almost there. Just two things remaining:

  • explicit self. everywhere please
  • make all tests a bit faster by reducing the times from 50 and 100 ms to something shorter

/// Whether the execution of the task is immediately canceled depends on whether the execution of a task has already begun.
/// This means immediate cancellation is not guaranteed.
public func cancel() {
scheduled?.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please prefix those (and the following) by self. (won't repeat this comment for all the other methods :) )

public func testScheduleRepeatedTask() throws {
let nanos = DispatchTime.now().uptimeNanoseconds
let initialDelay: TimeAmount = .milliseconds(50)
let delay: TimeAmount = .milliseconds(100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we maybe make this 5 and 10 milliseconds? Otherwise this test alone will run almost 1s which is quite slow

@FranzBusch
Copy link
Contributor Author

@weissi Done :)

Copy link
Member

@weissi weissi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, found a few more problems. Most importantly this isn't synchronised correctly...

@@ -50,6 +50,61 @@ public struct Scheduled<T> {
}
}

public class RepeatedTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be final

@@ -50,6 +50,61 @@ public struct Scheduled<T> {
}
}

public class RepeatedTask {
private let delay: TimeAmount
private var scheduled: Scheduled<EventLoopFuture<Void>>? {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry, didn't notice that both of those are var. Did you run the tests with TSan (thread sanitizer) enabled? I'm sure TSan will find the threading error here.

The assumption (and that should actually be documented) is that the user can cancel a RepeatedTask from any thread but this isn't the case here.

@@ -50,6 +50,61 @@ public struct Scheduled<T> {
}
}

public class RepeatedTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and have doc comments

}

private func reschedule0() {
guard let task = task else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self. missing

guard let task = task else {
return
}
self.scheduled = self.scheduled?.futureResult.eventLoop.scheduleTask(in: delay, task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self. missing

if case EventLoopError.shutdown = error {
return
}
self.reschedule0()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what other errors could we hit here?

Copy link
Contributor Author

@FranzBusch FranzBusch Jul 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be an EventLoopError.unsupportedOperation, EventLoopError.shutdownFailed or any of the users thrown errors right? Should I handle the two EventLoopError cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can. If you schedule a function that returns a Future, the user thrown errors from that function cannot appear in the outer future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be fair to say that if the scheduled futureResult fails, there is no reason to reschedule, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes with the current status the only way the futureResult fails is either during shutdown with .shutdown or if the task is cancelled with cancelled. No other error is possible at the moment. So I can remove this whenFailure case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to be sure it's one of those errors you could just assert that it's one of them:

assert(([EventLoopError.cancelled, EventLoopError.shutdown] as [Error]).contains(error), "unexpected error: \(error)")

@FranzBusch
Copy link
Contributor Author

FranzBusch commented Jul 2, 2018

@weissi I ran the tests with TSan and noticed multiple data races. One was just an error in the tests and I used an Atomic counter to fix it. The other was because of a cancel() call on a different thread as you described. To fix this I introduced a Lock in the RepeatedTask. Could you please take a look if the synchronization looks good?

private let delay: TimeAmount
private var scheduled: Scheduled<EventLoopFuture<Void>>?
private var task: (() throws -> EventLoopFuture<Void>)?
private let lock = Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to use a lock here: instead, we should use the event loop that this RepeatedTask belongs to to manage synchronization.

The TL;DR for this is that locking is hard, and right now this isn't handling it very well. In particular, begin does not handle the synchronisation correctly. The other advantage is that with using the parent event loop we can easily check whether we are already holding the "lock" and so provide a fast-path that does not involve cross-loop dispatch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about the begin method as well but since this is only used internally and the user can only call cancel after reschedule was triggered once it should be fine.

Would synchronization over the EL work with executing begin, reschedule and cancel on the EL? @Lukasa

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, should work fine. In fact, only cancel should require the explicit synchronisation: both begin and cancel should by definition always be called on the loop, and so can just degenerately assert that that's what's actually happening.

}

private func cancel0() {
assert(eventLoop.inEventLoop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: self

}

private func reschedule() {
assert(eventLoop.inEventLoop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: self

}

private func reschedule() {
assert(eventLoop.inEventLoop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We could guard let here to clean up the unwrapping code a bit.

if case EventLoopError.shutdown = error {
return
}
self.reschedule0()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can. If you schedule a function that returns a Future, the user thrown errors from that function cannot appear in the outer future.

}

private func reschedule0() {
assert(eventLoop.inEventLoop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: self


private func reschedule0() {
assert(eventLoop.inEventLoop)
guard let task = self.task else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May as well also unwrap self.scheduled here too.

@@ -396,7 +515,10 @@ internal final class SelectableEventLoop: EventLoop {
promise.fail(error: error)
},`in`)

let scheduled = Scheduled(promise: promise, cancellationTask: {
let scheduled = Scheduled(promise: promise, cancellationTask: { [weak self] in
guard let `self` = self else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't normally use weak references because their impact on performance. Why is this one necessary?

guard let task = self.task, let scheduled = self.scheduled else {
return
}
self.scheduled = scheduled.futureResult.eventLoop.scheduleTask(in: self.delay, task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we actually don't need the conditional binding of scheduled anymore, because we have an event loop on this object we can use.

@@ -93,7 +170,6 @@ public protocol EventLoop: EventLoopGroup {
/// Schedule a `task` that is executed by this `SelectableEventLoop` after the given amount of time.
func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T>
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: replace this whitespace line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I leave this since it follows the overall formatting where no empty line is before the closing brackets?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is after the closing brace.

@apple apple deleted a comment from Lukasa Jul 5, 2018
Copy link
Member

@weissi weissi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me now, thanks @FranzBusch

@weissi weissi added the 🔼 needs-minor-version-bump For PRs that when merged cause a bump of the minor version, ie. 1.x.0 -> 1.(x+1).0 label Jul 5, 2018
@weissi weissi added this to the 1.9.0 milestone Jul 5, 2018
private let delay: TimeAmount
private let eventLoop: EventLoop
private var scheduled: Scheduled<EventLoopFuture<Void>>?
private var task: (() throws -> EventLoopFuture<Void>)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we removed the need for this to be throws, didn't we? It doesn't actually manifest in the word try anywhere in the code for this object, but let's constrain the type further and not allow it to throw.

@weissi
Copy link
Member

weissi commented Jul 6, 2018

@swift-nio-bot test this please

@Lukasa
Copy link
Contributor

Lukasa commented Jul 6, 2018

Looks like testScheduleRepeatedTask is a bit racy and sometimes fails.

@weissi
Copy link
Member

weissi commented Jul 6, 2018

@FranzBusch ok, we need to address the test flakiness first. I'd recommend to test it with a for _ in 0..<10000 { /* real test code here */ } and see if you can repro what our CI hit.

@FranzBusch
Copy link
Contributor Author

I tried running the tests in a loop but couldn't reproduce the flakiness. I just increased the timeouts a bit maybe they were a bit too optimistic.

@Lukasa
Copy link
Contributor

Lukasa commented Jul 8, 2018

You may want to go to more than a second. The timeout should only be hit in failing cases anyway.

let expect = expectation(description: "Is cancelling RepatedTask")
var repeatedTask: RepeatedTask?
repeatedTask = eventLoopGroup.next().scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { () -> Void in
DispatchQueue.main.async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, what is going on here? You can't dispatch to the main dispatch queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not really sure what the utility of doing that is. Just create a separate serial dispatch queue to use. 😄

@Lukasa
Copy link
Contributor

Lukasa commented Jul 9, 2018

@swift-nio-bot test this please

Copy link
Contributor

@Lukasa Lukasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the tests pass, I'm happy with this change.

Copy link
Contributor

@Lukasa Lukasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agh, sorry @FranzBusch, one last note. Now that this patch is ready to land, can you squash all these commits down into a single commit and fill it out using our commit template?

Provide a mechanism to repeatedly schedule a task in the EventLoop

Motivation:
In some cases a certain task should run repeatedly, like polling an API or querying a database. For these scenarios a repeated scheduling mechanism is beneficial.

Modifications:
Added a method to EventLoop to schedule repeated tasks. 

Result:
It is possible to schedule task with an initial delay and a fixed delay now. Furthermore, these task can be cancelled.
@weissi
Copy link
Member

weissi commented Jul 10, 2018

@Lukasa given that @FranzBusch has filled the PR description out with the commit template, we could just use the 'squash & merge' button and it'll have the same effect, right?

Squash & merge will make one commit out of all the commits and use the PR description as that's commit's description which is exactly what we need, right?

@Lukasa
Copy link
Contributor

Lukasa commented Jul 10, 2018

@weissi That was my original plan, but actually the PR description contains only 1/3 of the commit template, which is why I didn't land on that plan.

@Lukasa
Copy link
Contributor

Lukasa commented Jul 10, 2018

@swift-nio-bot test this please

@weissi
Copy link
Member

weissi commented Jul 10, 2018

@Lukasa oh sorry, didn't notice that.

@Lukasa Lukasa merged commit c2c008e into apple:master Jul 10, 2018
@FranzBusch FranzBusch deleted the feature/RepeatedScheduling branch July 10, 2018 14:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🔼 needs-minor-version-bump For PRs that when merged cause a bump of the minor version, ie. 1.x.0 -> 1.(x+1).0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants