Skip to content
This repository has been archived by the owner on Apr 30, 2021. It is now read-only.

Commit

Permalink
Added AutoFlushContainer<T>, which is the foundation for TimedAccumul…
Browse files Browse the repository at this point in the history
…ator<T>.
  • Loading branch information
bjorg committed Jan 26, 2016
1 parent 2d0b6f6 commit 89316af
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 95 deletions.
1 change: 1 addition & 0 deletions src/mindtouch.dream/mindtouch.dream.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
<Compile Include="dream\RegistrationInspector.cs" />
<Compile Include="Extensions\ReflectionEx.cs" />
<Compile Include="Numeric.cs" />
<Compile Include="system\Collections\AutoFlushContainer.cs" />
<Compile Include="system\DisposeCallback.cs" />
<Compile Include="system\GitBranchAttribute.cs" />
<Compile Include="system\GitRevisionAttribute.cs" />
Expand Down
122 changes: 122 additions & 0 deletions src/mindtouch.dream/system/Collections/AutoFlushContainer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* MindTouch Dream - a distributed REST framework
* Copyright (C) 2006-2014 MindTouch, Inc.
* www.mindtouch.com oss@mindtouch.com
*
* For community documentation and downloads visit mindtouch.com;
* please review the licensing section.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using MindTouch;
using MindTouch.Dream;
using MindTouch.Tasking;
using log4net;

namespace System.Collections {
public class AutoFlushContainer<T> : IDisposable {

//--- Types ---
public delegate void FlushDelegate(T state, bool disposing);

//--- Class Fields ---
private readonly ILog _log = LogUtils.CreateLog();

//--- Fields ---
private readonly object _syncRoot = new object();
private readonly T _state;
private readonly FlushDelegate _flush;
private readonly int _maxUpdates;
private readonly TimeSpan _autoFlushDelay;
private readonly TaskTimer _autoFlushTimer;
private int _pendingUpdates;
private bool _disposed;

//--- Constructors ---
public AutoFlushContainer(T initialState, FlushDelegate flush, int maxUpdates, TimeSpan autoFlushDelay, TaskTimerFactory timerFactory) {
if(flush == null) {
throw new ArgumentNullException("flush");
}
if(maxUpdates <= 0) {
throw new ArgumentException("maxItems must be positive", "maxUpdates");
}
if(autoFlushDelay <= TimeSpan.Zero) {
throw new ArgumentException("maxDelay must be positive", "autoFlushDelay");
}
_state = initialState;
_flush = flush;
_maxUpdates = maxUpdates;
_autoFlushDelay = autoFlushDelay;
_autoFlushTimer = timerFactory.New(DateTime.MaxValue, AutoFlushCallback, null, TaskEnv.None);
}

//--- Methods ---
public void Do(Action<T> callback) {
lock(_syncRoot) {
if(_disposed) {
throw new ObjectDisposedException("instance has been disposed");
}
callback(_state);
var updatesCounter = ++_pendingUpdates;
if((updatesCounter == 1) && (_maxUpdates > 1)) {
_autoFlushTimer.Change(_autoFlushDelay, TaskEnv.None);
} else if(updatesCounter == _maxUpdates) {
Flush();
}
}
}

public V Get<V>(Func<T, V> callback) {
lock(_syncRoot) {
if(_disposed) {
throw new ObjectDisposedException("instance has been disposed");
}
return callback(_state);
}
}

public void Flush() {
lock (_syncRoot) {
if(_pendingUpdates > 0) {
_autoFlushTimer.Cancel();
_pendingUpdates = 0;
try {
_flush(_state, _disposed);
} catch(Exception e) {
_log.Error("flush handler failed", e);
}
}
}
}

public void Dispose() {
lock(_syncRoot) {
if(_disposed) {
return;
}
_disposed = true;
Flush();
}
}

private void AutoFlushCallback(TaskTimer timer) {
lock(_syncRoot) {
if(_disposed) {
return;
}
Flush();
}
}
}
}
115 changes: 20 additions & 95 deletions src/mindtouch.dream/system/Collections/TimedAccumulator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,115 +20,40 @@
*/

using System.Collections.Generic;
using MindTouch;
using MindTouch.Dream;
using System.Linq;
using MindTouch.Tasking;
using log4net;

namespace System.Collections {
public class TimedAccumulator<T> : IDisposable {

//--- Class Fields ---
private readonly ILog _log = LogUtils.CreateLog();

//--- Class Methods ---
private static List<T> ExtractItems(List<T> items, int count) {
if(count > 0) {
var result = items.GetRange(0, count);
items.RemoveRange(0, count);
return result;
}
return null;
}

//--- Fields ---
private readonly List<T> _items = new List<T>();
private readonly Action<IEnumerable<T>> _handler;
private readonly int _maxItems;
private readonly TimeSpan _autoFlushDelay;
private readonly TaskTimer _autoFlushTimer;
private volatile bool _disposed;
private readonly AutoFlushContainer<List<T>> _accumulator;

//--- Constructors ---
public TimedAccumulator(Action<IEnumerable<T>> handler, int maxItems, TimeSpan autoFlushDelay, TaskTimerFactory timerFactory) {
if(handler == null) {
throw new ArgumentNullException("handler");
}
if(maxItems <= 0) {
throw new ArgumentException("maxItems must be positive", "maxItems");
}
if(autoFlushDelay <= TimeSpan.Zero) {
throw new ArgumentException("maxDelay must be positive", "autoFlushDelay");
}
_handler = handler;
_maxItems = maxItems;
_autoFlushDelay = autoFlushDelay;

// kick off timer
_autoFlushTimer = timerFactory.New(DateTime.MaxValue, AutoFlushCallback, null, TaskEnv.None);
}

//--- Methods ---
public void Enqueue(T item) {
if(_disposed) {
throw new ObjectDisposedException("The TimedAccumulator has been disposed");
}

// add item to queue
List<T> dispatch = null;
lock(_items) {
if((_items.Count == 0) && (_maxItems > 1)) {
_autoFlushTimer.Change(_autoFlushDelay, TaskEnv.None);
}
_items.Add(item);

// check if we have enough items to dispatch some
if(_items.Count >= _maxItems) {
dispatch = ExtractItems(_items, _maxItems);
}
}
CallHandler(dispatch);
_accumulator = new AutoFlushContainer<List<T>>(
initialState: new List<T>(),
flush: (list, disposing) => {
if(list.Any()) {
var items = list.GetRange(0, Math.Min(list.Count, maxItems));
list.RemoveRange(0, items.Count);
handler(items);
}
},
maxUpdates: maxItems,
autoFlushDelay: autoFlushDelay,
timerFactory: timerFactory
);
}

public void Dispose() {
if(_disposed) {
return;
}
_disposed = true;

// first we cancel the timer
_autoFlushTimer.Cancel();

// extract all items from the queue and dispatch them
List<T> dispatch;
lock(_items) {
dispatch = ExtractItems(_items, _items.Count);
}
CallHandler(dispatch);
}

private void CallHandler(IEnumerable<T> items) {
if(items.NullOrEmpty()) {
return;
}
try {
_handler(items);
} catch(Exception e) {
_log.Error("TimedAccumulator handler failed", e);
}
}

private void AutoFlushCallback(TaskTimer timer) {
if(_disposed) {
return;
}
//--- Properties ---
public int Count { get { return _accumulator.Get(list => list.Count); } }

// check if we have enough items to dispatch some
List<T> dispatch;
lock(_items) {
dispatch = ExtractItems(_items, _items.Count);
}
CallHandler(dispatch);
}
//--- Methods ---
public void Enqueue(T item) { _accumulator.Do(list => list.Add(item)); }
public void Dispose() { _accumulator.Dispose(); }
}
}

0 comments on commit 89316af

Please sign in to comment.