Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,18 @@ public void StartThread(Action<object> action, object state)

private sealed class Timer : IDisposable
{
private object _state;
private volatile object _state;
private Action<object> _action;
private volatile System.Threading.Timer _timer;
private IDisposable _timer;

private static readonly object DisposedState = new object();

public Timer(Action<object> action, object state, TimeSpan dueTime)
{
_state = state;
_action = action;

// Don't want the spin wait in Tick to get stuck if this thread gets aborted.
try { }
finally
{
//
// Rooting of the timer happens through the Timer's state
// which is the current instance and has a field to store the Timer instance.
//
_timer = new System.Threading.Timer(_ => Tick(_), this, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
}
Disposable.SetSingle(ref _timer, new System.Threading.Timer(_ => Tick(_), this, dueTime, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)));
}

private static void Tick(object state)
Expand All @@ -179,27 +172,24 @@ private static void Tick(object state)

try
{
timer._action(timer._state);
var timerState = timer._state;
if (timerState != DisposedState)
{
timer._action(timerState);
}
}
finally
{
SpinWait.SpinUntil(timer.IsTimerAssigned);
timer.Dispose();
Disposable.TryDispose(ref timer._timer);
}
}

private bool IsTimerAssigned() => _timer != null;

public void Dispose()
{
var timer = _timer;
if (timer != TimerStubs.Never)
if (Disposable.TryDispose(ref _timer))
{
_action = Stubs<object>.Ignore;
_timer = TimerStubs.Never;
_state = null;

timer.Dispose();
_state = DisposedState;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,36 @@ public void Virtual_ThreadSafety()
{
var scheduler = new TestScheduler();
var seq = Observable.Never<string>();
var d = default(IDisposable);

Task.Run(()=>
var sync = 2;

Task.Run(() =>
{
Task.Delay(50).Wait();
seq.Timeout(TimeSpan.FromSeconds(10), scheduler).Subscribe(s => { });
if (Interlocked.Decrement(ref sync) != 0)
{
while (Volatile.Read(ref sync) != 0) ;
}

Task.Delay(10).Wait();

d = seq.Timeout(TimeSpan.FromSeconds(5), scheduler).Subscribe(s => { });
});

var watch = scheduler.StartStopwatch();
try
{
while (watch.Elapsed < TimeSpan.FromSeconds(20))
if (Interlocked.Decrement(ref sync) != 0)
{
scheduler.AdvanceBy(10);
while (Volatile.Read(ref sync) != 0) ;
}

while (watch.Elapsed < TimeSpan.FromSeconds(11))
{
scheduler.AdvanceBy(50);
}

throw new Exception("Should have thrown!");
}
catch (TimeoutException)
{
Expand All @@ -158,6 +174,7 @@ public void Virtual_ThreadSafety()
{
Assert.True(false, string.Format("Virtual time {0}, exception {1}", watch.Elapsed, ex));
}
d?.Dispose();
}
}
}
Expand Down