diff --git a/src/mindtouch.dream/Tasking/TaskEnv.cs b/src/mindtouch.dream/Tasking/TaskEnv.cs index 4d75a02..086b8bc 100644 --- a/src/mindtouch.dream/Tasking/TaskEnv.cs +++ b/src/mindtouch.dream/Tasking/TaskEnv.cs @@ -525,134 +525,6 @@ public void Invoke(Action handler, T1 arg1) { } } - /// - /// Invoke a two argument action. - /// - /// Type of first argument. - /// Type of second argument. - /// Action to invoke. - /// First argument. - /// Second argument. - public void Invoke(Action handler, T1 arg1, T2 arg2) { - if(handler == null) { - throw new ArgumentNullException("handler"); - } - if(_referenceCount < 1) { - throw new InvalidOperationException("Cannot call invoke an unaquired TaskEnv"); - } -#pragma warning disable 219 - System.Diagnostics.StackTrace stacktrace = DebugUtil.GetStackTrace(); -#pragma warning restore 219 - - // check if handler can be invoked in-place or needs to queued up - if(_dispatchQueue != null) { - _dispatchQueue.QueueWorkItem(() => { - - // store current thread-specific settings - TaskEnv previousEnv = _currentEnv; - try { - - // set thread-specific settings - _currentEnv = this; - - // execute handler - handler(arg1, arg2); - } catch(Exception e) { - _log.WarnExceptionMethodCall(e, "Invoke: unhandled exception in handler"); - } finally { - Release(); - - // restore thread-specific settings - _currentEnv = previousEnv; - } - }); - } else { - - // store current thread-specific settings - TaskEnv previousEnv = _currentEnv; - try { - - // set thread-specific settings - _currentEnv = this; - - // execute handler - handler(arg1, arg2); - } catch(Exception e) { - _log.WarnExceptionMethodCall(e, "Invoke: unhandled exception in handler"); - } finally { - Release(); - - // restore thread-specific settings - _currentEnv = previousEnv; - } - } - } - - /// - /// Invoke a three argument action. - /// - /// Type of first argument. - /// Type of second argument. - /// Type of third argument. - /// Action to invoke. - /// First argument. - /// Second argument. - /// Third argument. - public void Invoke(Action handler, T1 arg1, T2 arg2, T3 arg3) { - if(handler == null) { - throw new ArgumentNullException("handler"); - } - if(_referenceCount < 1) { - throw new InvalidOperationException("Cannot call invoke an unaquired TaskEnv"); - } -#pragma warning disable 219 - System.Diagnostics.StackTrace stacktrace = DebugUtil.GetStackTrace(); -#pragma warning restore 219 - - // check if handler can be invoked in-place or needs to queued up - if(_dispatchQueue != null) { - _dispatchQueue.QueueWorkItem(() => { - - // store current thread-specific settings - TaskEnv previousEnv = _currentEnv; - try { - - // set thread-specific settings - _currentEnv = this; - - // execute handler - handler(arg1, arg2, arg3); - } catch(Exception e) { - _log.WarnExceptionMethodCall(e, "Invoke: unhandled exception in handler"); - } finally { - Release(); - - // restore thread-specific settings - _currentEnv = previousEnv; - } - }); - } else { - - // store current thread-specific settings - TaskEnv previousEnv = _currentEnv; - try { - - // set thread-specific settings - _currentEnv = this; - - // execute handler - handler(arg1, arg2, arg3); - } catch(Exception e) { - _log.WarnExceptionMethodCall(e, "Invoke: unhandled exception in handler"); - } finally { - Release(); - - // restore thread-specific settings - _currentEnv = previousEnv; - } - } - } - /// /// Wrap a method call, delegate or lambda in an environment for later invocation. /// diff --git a/src/mindtouch.dream/Tasking/TaskTimerFactory.cs b/src/mindtouch.dream/Tasking/TaskTimerFactory.cs index e011fef..30a4f24 100644 --- a/src/mindtouch.dream/Tasking/TaskTimerFactory.cs +++ b/src/mindtouch.dream/Tasking/TaskTimerFactory.cs @@ -306,11 +306,11 @@ public void Shutdown() { // (or maybe this should just be part of the thread clean-up) lock(_queue) { while(_queue.Count > 0) { - TaskTimer timer = _queue.Dequeue(); + var timer = _queue.Dequeue(); if(timer.TryLockPending()) { // retrieve the associated behavior and reset the timer - TaskEnv env = timer.Env; + var env = timer.Env; timer.Env = null; timer.SetStatus(TaskTimerStatus.Done); @@ -325,15 +325,14 @@ public void Shutdown() { // an indefinite time. // check if any timers were gathered for immediate execution if(timers != null) { - foreach(KeyValuePair entry in timers) { + foreach(var entry in timers) { entry.Key.Execute(entry.Value); } } - _running = false; } - private void Tick(DateTime now, TimeSpan elapsed) { + private void Tick(DateTime now, TimeSpan elapsed, bool fastforward) { // ignore ticks that come in after we've initialized a shutdown if(_shutdown) { @@ -348,13 +347,13 @@ private void Tick(DateTime now, TimeSpan elapsed) { // dequeue all timers that are ready to go while((_queue.Count > 0) && (_queue.Peek().When <= now)) { - TaskTimer timer = _queue.Dequeue(); + var timer = _queue.Dequeue(); // check if timer can be transitioned if(timer.TryLockQueued()) { // retrieve the associated behavior and reset the timer - TaskEnv env = timer.Env; + var env = timer.Env; timer.Env = null; timer.SetStatus(TaskTimerStatus.Done); @@ -367,15 +366,15 @@ private void Tick(DateTime now, TimeSpan elapsed) { // check if a maintance run is due if(_maintenance <= now) { _maintenance = now.AddSeconds(TaskTimer.QUEUE_RESCAN); - DateTime horizon = now.AddSeconds(TaskTimer.QUEUE_CUTOFF); + var horizon = now.AddSeconds(TaskTimer.QUEUE_CUTOFF); lock(_pending) { - List activate = new List(); - foreach(TaskTimer timer in _pending.Keys) { + var activate = new List(); + foreach(var timer in _pending.Keys) { if(timer.When <= horizon) { activate.Add(timer); } } - foreach(TaskTimer timer in activate) { + foreach(var timer in activate) { _pending.Remove(timer); if(timer.TryQueuePending()) { _queue.Enqueue(timer); @@ -387,8 +386,14 @@ private void Tick(DateTime now, TimeSpan elapsed) { // run schedule on its own thread to avoid re-entrancy issues if(timers != null) { - foreach(KeyValuePair entry in timers) { - entry.Key.Execute(entry.Value); + if(fastforward) { + foreach(var entry in timers) { + entry.Key.ExecuteNow(entry.Value); + } + } else { + foreach(var entry in timers) { + entry.Key.Execute(entry.Value); + } } } } diff --git a/src/mindtouch.dream/Tasking/timer.cs b/src/mindtouch.dream/Tasking/timer.cs index 317757e..73f1834 100644 --- a/src/mindtouch.dream/Tasking/timer.cs +++ b/src/mindtouch.dream/Tasking/timer.cs @@ -317,6 +317,10 @@ internal void Execute(TaskEnv env) { env.Invoke(_handler, this); } + internal void ExecuteNow(TaskEnv env) { + env.InvokeNow(() => _handler(this)); + } + internal void SetStatus(TaskTimerStatus status) { Interlocked.Exchange(ref _status, (int)status); } diff --git a/src/mindtouch.dream/Threading/DispatchThreadScheduler.cs b/src/mindtouch.dream/Threading/DispatchThreadScheduler.cs index 7b1ca24..1ed9faa 100644 --- a/src/mindtouch.dream/Threading/DispatchThreadScheduler.cs +++ b/src/mindtouch.dream/Threading/DispatchThreadScheduler.cs @@ -188,7 +188,10 @@ public static void UnregisterHost(IDispatchHost host) { } } - private static void Tick(DateTime now, TimeSpan elapsed) { + private static void Tick(DateTime now, TimeSpan elapsed, bool fastforward) { + + // NOTE (2015-07-25, steveb): 'fastforward' is not used by the DispatchThreadScheduler + lock(_syncRoot) { // check if resource manager has been idle for a while diff --git a/src/mindtouch.dream/mindtouch.dream.csproj b/src/mindtouch.dream/mindtouch.dream.csproj index 760d5bc..219c699 100644 --- a/src/mindtouch.dream/mindtouch.dream.csproj +++ b/src/mindtouch.dream/mindtouch.dream.csproj @@ -134,6 +134,7 @@ + diff --git a/src/mindtouch.dream/system/Collections/Generic/DictionaryUtil.cs b/src/mindtouch.dream/system/Collections/Generic/DictionaryUtil.cs index 4b0ce56..7eeca7d 100644 --- a/src/mindtouch.dream/system/Collections/Generic/DictionaryUtil.cs +++ b/src/mindtouch.dream/system/Collections/Generic/DictionaryUtil.cs @@ -39,5 +39,34 @@ public static Dictionary ToDictionaryWithDuplicateErrorCallback(t } return result; } + + public static IEnumerable CollectAllValues(this IDictionary> dictionary, IEnumerable keys, out IEnumerable missing) { + var result = new List(); + var missingResult = new List(); + foreach(var key in keys) { + ICollection value; + if(dictionary.TryGetValue(key, out value)) { + result.AddRange(value); + } else { + missingResult.Add(key); + } + } + missing = missingResult; + return result; + } + + public static Dictionary ToDictionary(this IEnumerable> source, bool overwriteDuplicates = false) { + var result = new Dictionary(); + if(overwriteDuplicates) { + foreach(var pair in source) { + result[pair.Key] = pair.Value; + } + } else { + foreach(var pair in source) { + result.Add(pair.Key, pair.Value); + } + } + return result; + } } } diff --git a/src/mindtouch.dream/system/DisposeCallback.cs b/src/mindtouch.dream/system/DisposeCallback.cs new file mode 100644 index 0000000..4020fa2 --- /dev/null +++ b/src/mindtouch.dream/system/DisposeCallback.cs @@ -0,0 +1,50 @@ +/* + * MindTouch Dream - a distributed REST framework + * Copyright (C) 2006-2014 MindTouch, Inc. + * www.mindtouch.com oss@mindtouch.com + * + * For community documentation and downloads visit mindtouch.com; + * please review the licensing section. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace System { + + /// + /// Class to trigger a callback when Dispose() is called. + /// + public class DisposeCallback : IDisposable { + + //--- Fields --- + private readonly Action _callback; + private bool _disposed; + + //--- Constructors --- + public DisposeCallback(Action callback) { + if(callback == null) { + throw new ArgumentNullException("callback"); + } + _callback = callback; + } + + //--- Methods --- + public void Dispose() { + if(_disposed) { + throw new ObjectDisposedException("DisposeCallback"); + } + _disposed = true; + _callback(); + } + } +} diff --git a/src/mindtouch.dream/system/GlobalClock.cs b/src/mindtouch.dream/system/GlobalClock.cs index a0cdc84..b0c187f 100644 --- a/src/mindtouch.dream/system/GlobalClock.cs +++ b/src/mindtouch.dream/system/GlobalClock.cs @@ -26,8 +26,8 @@ using MindTouch.Tasking; namespace System { - using ClockCallback = Action; - using NamedClockCallback = KeyValuePair>; + using ClockCallback = Action; + using NamedClockCallback = KeyValuePair>; /// /// Provides a global timing mechanism that accepts registration of callback to be invoked by the clock. In most cases, a @@ -46,6 +46,7 @@ public static class GlobalClock { private static readonly ManualResetEvent _stopped = new ManualResetEvent(false); private static NamedClockCallback[] _callbacks = new NamedClockCallback[INITIAL_CALLBACK_CAPACITY]; private static readonly int _intervalMilliseconds; + private static object _suspendedTime; private static int _timeOffset; //--- Class Constructor --- @@ -62,7 +63,7 @@ static GlobalClock() { } //--- Class Properties --- - public static DateTime UtcNow { get { return DateTime.UtcNow + TimeSpan.FromMilliseconds(_timeOffset); } } + public static DateTime UtcNow { get { return ((DateTime?)_suspendedTime) ?? (DateTime.UtcNow + TimeSpan.FromMilliseconds(_timeOffset)); } } //--- Class Methods --- @@ -123,12 +124,47 @@ public static void RemoveCallback(ClockCallback callback) { /// Fast-forward time for the global clock. /// /// Timespan to fast-forward the global clock (cannot be negative). + /// Optional callback to prematurely cancel the fast-fwoard operation. /// DO NOT USE FOR PRODUCTION CODE!!! - public static void FastForward(TimeSpan time) { - if(time < TimeSpan.Zero) { - throw new ArgumentException("time cannot be negative"); + public static DateTime FastForward(TimeSpan time, Func cancelFastForward = null) { + + // TODO (2015-07-30, steveb): add flag to detect if this code is running in production and throw an exception if it is! + + if(time <= TimeSpan.Zero) { + throw new ArgumentException("time must be positive"); + } + lock(_syncRoot) { + var timeMilliseconds = (int)time.TotalMilliseconds; + var intervalMilliseconds = _intervalMilliseconds; + do { + var elapsed = Math.Min(timeMilliseconds, intervalMilliseconds); + Interlocked.Add(ref _timeOffset, elapsed); + var now = UtcNow; + MasterTick(now, TimeSpan.FromMilliseconds(elapsed), true); + if((cancelFastForward != null) && cancelFastForward()) { + return now; + } + timeMilliseconds -= elapsed; + } while(timeMilliseconds > 0); + return UtcNow; } - Interlocked.Add(ref _timeOffset, (int)time.TotalMilliseconds); + } + + /// + /// Suspend the global clock from progressing. + /// + /// Object that when disposed resumes the global clock. + /// DO NOT USE FOR PRODUCTION CODE!!! + public static IDisposable Suspend() { + + // TODO (2015-07-30, steveb): add flag to detect if this code is running in production and throw an exception if it is! + + Monitor.Enter(_syncRoot); + var suspendedTime = Interlocked.Exchange(ref _suspendedTime, (DateTime?)UtcNow); + return new DisposeCallback(() => { + Interlocked.Exchange(ref _suspendedTime, suspendedTime); + Monitor.Exit(_syncRoot); + }); } internal static bool Shutdown(TimeSpan timeout) { @@ -157,21 +193,25 @@ private static void MasterTickThread() { // execute all callbacks lock(_syncRoot) { - var callbacks = _callbacks; - foreach(var callback in callbacks) { - if(callback.Value != null) { - try { - callback.Value(now, elapsed); - } catch(Exception e) { - _log.ErrorExceptionMethodCall(e, "GlobalClock callback failed", callback.Key); - } - } - } + MasterTick(now, elapsed, false); } } // indicate that this thread has exited _stopped.Set(); } + + private static void MasterTick(DateTime now, TimeSpan elapsed, bool fastforward) { + var callbacks = _callbacks; + foreach(var callback in callbacks) { + if(callback.Value != null) { + try { + callback.Value(now, elapsed, fastforward); + } catch(Exception e) { + _log.ErrorExceptionMethodCall(e, "GlobalClock callback failed", callback.Key); + } + } + } + } } } diff --git a/src/tests/DreamMisc/TaskTests.cs b/src/tests/DreamMisc/TaskTests.cs index 43277d7..1fb1bdc 100644 --- a/src/tests/DreamMisc/TaskTests.cs +++ b/src/tests/DreamMisc/TaskTests.cs @@ -374,18 +374,6 @@ public void Invoke1arg_an_unacquired_TaskEnv_throws() { TaskEnv.Current.Invoke(x => _log.Debug("invoke now"), 1); } - [Test] - [ExpectedException(typeof(InvalidOperationException))] - public void Invoke2arg_an_unacquired_TaskEnv_throws() { - TaskEnv.Current.Invoke((x, y) => _log.Debug("invoke now"), 1, 2); - } - - [Test] - [ExpectedException(typeof(InvalidOperationException))] - public void Invoke3arg_an_unacquired_TaskEnv_throws() { - TaskEnv.Current.Invoke((x, y, z) => _log.Debug("invoke now"), 1, 2, 3); - } - [Test] public void Invokenow_does_not_release() { var state = new TaskLifeSpanState("baz");