Skip to content

Commit

Permalink
closed #24
Browse files Browse the repository at this point in the history
  • Loading branch information
Fernando Cerqueira committed Dec 5, 2023
1 parent d213e78 commit 624aa4c
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public IEnumerable<WeatherForecast> Get(CancellationToken token)
{
buffer.Invalidate();
}
token.WaitHandle.WaitOne(100);
}
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Expand Down
4 changes: 2 additions & 2 deletions samples/RingBufferPlusBenchmarkSample/BenchmarkProgram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class BenchmarkProgram
catch
{
}
cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(5));
cancellation.WaitHandle.WaitOne(1);
}
return model;
}
Expand Down Expand Up @@ -89,7 +89,7 @@ public class BenchmarkProgram
catch
{
}
cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(5));
cancellation.WaitHandle.WaitOne(1);
}
return model;
}
Expand Down
8 changes: 4 additions & 4 deletions samples/RingBufferPlusBenchmarkSample/ConsumerRoleProgram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal class ConsumerRoleProgram
model.QueueDeclare("log", false, false, false);
break;
}
cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(100));
cancellation.WaitHandle.WaitOne(1);
}
return model;
}
Expand All @@ -48,7 +48,7 @@ private static void Init()
UserName = "guest",
Password = "guest",
VirtualHost = "EnterpriseLog",
AutomaticRecoveryEnabled = true,
AutomaticRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromMinutes(1),
ClientProvidedName = "ConsumerRoleProgram"
};
Expand Down Expand Up @@ -96,7 +96,7 @@ public static void Start(ILogger logger, int delaysec)
Console.WriteLine($"Ring Buffer Capacity({connectionRingBuffer!.Capacity})");
Console.WriteLine($"Ring Buffer MinCapacity({connectionRingBuffer!.MinCapacity})");
Console.WriteLine($"Ring Buffer MaxCapacity({connectionRingBuffer!.MaxCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({connectionRingBuffer!.ScaleCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({connectionRingBuffer!.ScaleCapacity}/{connectionRingBuffer!.UserScale})");
Console.WriteLine($"Ring Buffer AccquireTimeout({connectionRingBuffer!.AccquireTimeout})");
Console.WriteLine($"Ring Buffer FactoryTimeout({connectionRingBuffer!.FactoryTimeout})");
Console.WriteLine($"Ring Buffer FactoryIdleRetry({connectionRingBuffer!.FactoryIdleRetry})");
Expand All @@ -121,7 +121,7 @@ public static void Start(ILogger logger, int delaysec)
Console.WriteLine($"Ring Buffer Capacity({modelRingBuffer.Capacity})");
Console.WriteLine($"Ring Buffer MinCapacity({modelRingBuffer.MinCapacity})");
Console.WriteLine($"Ring Buffer MaxCapacity({modelRingBuffer.MaxCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({modelRingBuffer.ScaleCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({modelRingBuffer.ScaleCapacity}/{modelRingBuffer.UserScale})");
Console.WriteLine($"Ring Buffer AccquireTimeout({modelRingBuffer.AccquireTimeout})");
Console.WriteLine($"Ring Buffer FactoryTimeout({modelRingBuffer.FactoryTimeout})");
Console.WriteLine($"Ring Buffer FactoryIdleRetry({modelRingBuffer.FactoryIdleRetry})");
Expand Down
7 changes: 4 additions & 3 deletions samples/RingBufferPlusBenchmarkSample/PublisherRoleProgram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal class PublisherRoleProgram
{
//try again or timeout by RingbufferPlus
}
cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(2));
cancellation.WaitHandle.WaitOne(1);
}
return model;
}
Expand Down Expand Up @@ -84,6 +84,7 @@ private static void Init()
log?.LogError($"{error.NameRingBuffer}: {error.Message}");
})
.Factory((cts) => ModelFactory(cts)!)
.FactoryErrorMaxPerc(40)
.BufferHealth((buffer) => buffer.IsOpen, TimeSpan.FromSeconds(5))
.ScaleUnit(ScaleMode.Automatic, 50, TimeSpan.FromSeconds(10))
.Slave(connectionRingBuffer)
Expand Down Expand Up @@ -121,7 +122,7 @@ public static void Start(ILogger logger,int delaysec)
Console.WriteLine($"Ring Buffer Capacity({connectionRingBuffer!.Capacity})");
Console.WriteLine($"Ring Buffer MinCapacity({connectionRingBuffer!.MinCapacity})");
Console.WriteLine($"Ring Buffer MaxCapacity({connectionRingBuffer!.MaxCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({connectionRingBuffer!.ScaleCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({connectionRingBuffer!.ScaleCapacity}/{connectionRingBuffer!.UserScale})");
Console.WriteLine($"Ring Buffer AccquireTimeout({connectionRingBuffer!.AccquireTimeout})");
Console.WriteLine($"Ring Buffer FactoryTimeout({connectionRingBuffer!.FactoryTimeout})");
Console.WriteLine($"Ring Buffer FactoryIdleRetry({connectionRingBuffer!.FactoryIdleRetry})");
Expand All @@ -145,7 +146,7 @@ public static void Start(ILogger logger,int delaysec)
Console.WriteLine($"Ring Buffer Capacity({modelRingBuffer.Capacity})");
Console.WriteLine($"Ring Buffer MinCapacity({modelRingBuffer.MinCapacity})");
Console.WriteLine($"Ring Buffer MaxCapacity({modelRingBuffer.MaxCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({modelRingBuffer.ScaleCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({modelRingBuffer.ScaleCapacity}/{modelRingBuffer!.UserScale})");
Console.WriteLine($"Ring Buffer AccquireTimeout({modelRingBuffer.AccquireTimeout})");
Console.WriteLine($"Ring Buffer FactoryTimeout({modelRingBuffer.FactoryTimeout})");
Console.WriteLine($"Ring Buffer FactoryIdleRetry({modelRingBuffer.FactoryIdleRetry})");
Expand Down
2 changes: 1 addition & 1 deletion samples/RingBufferPlusConsoleSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ static void Main(string[] args)
Console.WriteLine($"Ring Buffer Capacity({rb.Capacity})");
Console.WriteLine($"Ring Buffer MinCapacity({rb.MinCapacity})");
Console.WriteLine($"Ring Buffer MaxCapacity({rb.MaxCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({rb.ScaleCapacity})");
Console.WriteLine($"Ring Buffer ScaleCapacity({rb.ScaleCapacity}/{rb.UserScale})");
Console.WriteLine($"Ring Buffer AccquireTimeout({rb.AccquireTimeout})");
Console.WriteLine($"Ring Buffer FactoryTimeout({rb.FactoryTimeout})");
Console.WriteLine($"Ring Buffer FactoryIdleRetry({rb.FactoryIdleRetry})");
Expand Down
8 changes: 8 additions & 0 deletions src/Commands/IRingBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ public interface IRingBuffer<T>
/// <returns><see cref="IRingBuffer{T}"/>.</returns>
IRingBuffer<T> Factory(Func<CancellationToken,T> value, TimeSpan? timeout = null, TimeSpan? idleRetryError = null);

/// <summary>
/// Maximum error percentage during buffer creation to cancel the operation
/// <br> Default value is 33 (33% - 1/3)</br>
/// </summary>
/// <param name="value">The percentage (0 - Any error to 100 - None)</param>
/// <returns><see cref="IRingBuffer{T}"/>.</returns>
IRingBuffer<T> FactoryErrorMaxPerc(int value);

/// <summary>
/// Check buffer health with each acquisition or after timeout
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions src/internal/IRingBufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal interface IRingBufferOptions<T>
Func<T,bool> BufferHealthHandler { get; }
TimeSpan BufferHealtTimeout { get; }
TimeSpan FactoryTimeout { get; }
int ErrorMaxPerc { get; }
TimeSpan FactoryIdleRetryError { get; }
ILogger Logger { get; }
bool HasScaleCapacity { get; }
Expand Down
17 changes: 16 additions & 1 deletion src/internal/RingBufferBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ internal class RingBufferBuilder<T>(string uniquename, ILoggerFactory? loggerFac

public bool HasScaleCallbackEvent { get; private set; }

public int ErrorMaxPerc { get; private set; } = 33;

public int Capacity { get; private set; } = 2;

public int MinCapacity { get; private set; } = 2;
Expand Down Expand Up @@ -77,14 +79,27 @@ internal class RingBufferBuilder<T>(string uniquename, ILoggerFactory? loggerFac

#region IRingBuffer


IRingBuffer<T> IRingBuffer<T>.Logger(ILogger value)
{
Logger = value;
return this;
}


IRingBuffer<T> IRingBuffer<T>.FactoryErrorMaxPerc(int value)
{
if (value < 0)
{
throw new ArgumentException("Value must be greater than or equal to 0", nameof(value));
}
if (value > 100)
{
throw new ArgumentException("Value must be less than or equal to 100", nameof(value));
}
ErrorMaxPerc = value;
return this;
}

IRingBufferService<T> IRingBuffer<T>.BuildWarmup(out bool fullcapacity, TimeSpan? timeout)
{
return SharedBuildWarmup(out fullcapacity, timeout);
Expand Down
26 changes: 14 additions & 12 deletions src/internal/RingBufferManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal class RingBufferManager<T> : IRingBufferService<T>, IRingBufferWarmup<T
private Task _errorBufferThread;
private Task _metricBufferThread;
private Task _scaleCapacityThread;

private Task _bufferHealthThread;

private Task _reportscaleCapacityThread;
Expand All @@ -49,6 +48,7 @@ internal class RingBufferManager<T> : IRingBufferService<T>, IRingBufferWarmup<T
private readonly CancellationToken _apptoken;
private readonly CancellationTokenSource _managertoken;
private readonly ILogger? _logger;
private int _errorMaxPerc;
private bool _disposed;
private bool _WarmupComplete;
private bool _WarmupRunning;
Expand Down Expand Up @@ -90,6 +90,7 @@ public RingBufferManager(IRingBufferOptions<T> ringBufferOptions, CancellationTo
BufferHealtTimeout = ringBufferOptions.BufferHealtTimeout;
UserScale = ringBufferOptions.UserSwithScale;

_errorMaxPerc = ringBufferOptions.ErrorMaxPerc;
_logger = ringBufferOptions.Logger;
_factoryHandler = ringBufferOptions.FactoryHandler;
_healthHandler = ringBufferOptions.BufferHealthHandler;
Expand Down Expand Up @@ -233,7 +234,7 @@ public RingBufferValue<T> Accquire(CancellationToken? cancellation = null)
}
if (fullavailable != _currentCapacity)
{
localcancellation.WaitHandle.WaitOne(5);
localcancellation.WaitHandle.WaitOne(1);
return new RingBufferValue<T>(Name, TimeSpan.Zero, false, default, null);
}
lock (_lockAccquire)
Expand Down Expand Up @@ -332,7 +333,7 @@ public RingBufferValue<T> Accquire(CancellationToken? cancellation = null)
_blockexceptionsBuffer.Add(new RingBufferException(Name, $"Accquire timeout {sw.Elapsed}"), _managertoken.Token);
break;
}
localcancellation.WaitHandle.WaitOne(5);
localcancellation.WaitHandle.WaitOne(1);
}
}
//not ok
Expand Down Expand Up @@ -551,7 +552,7 @@ private void Startup(TimeSpan timeoutfullcapacity)
}
WriteLogInfo(DateTime.Now, $"{Name} Internal Buffer Health done");
}
_managertoken.Token.WaitHandle.WaitOne(100);
_managertoken.Token.WaitHandle.WaitOne(1);
}
WriteLogInfo(DateTime.Now, $"{Name} Buffer Health Thread Stoped");
}
Expand Down Expand Up @@ -784,7 +785,7 @@ private void Startup(TimeSpan timeoutfullcapacity)
{
while (!_managertoken.IsCancellationRequested && !_WarmupComplete)
{
_managertoken.Token.WaitHandle.WaitOne(5);
_managertoken.Token.WaitHandle.WaitOne(1);
}
}
while (ScaleCapacity && UserScale == ScaleMode.Automatic && !_managertoken.IsCancellationRequested)
Expand Down Expand Up @@ -939,7 +940,7 @@ private void Startup(TimeSpan timeoutfullcapacity)
warmupcts.CancelAfter(timeoutfullcapacity);
while (!warmupcts.Token.IsCancellationRequested && _counterBuffer != Capacity)
{
warmupcts.Token.WaitHandle.WaitOne(5);
warmupcts.Token.WaitHandle.WaitOne(1);
}

WriteLogInfo(DateTime.Now, $"{Name} Warmup complete with {_counterBuffer} items of {Capacity}");
Expand Down Expand Up @@ -1006,8 +1007,9 @@ private void RemoveAllBuffer(int? maxcount)
private void TryLoadBufferAsync(int diff)
{
var qtderr = 0;
//33% error stop!
var maxerr = (int)Math.Ceiling(diff / 3.0);
var qtdok = 0;
//error stop!
var maxerr = (int)Math.Ceiling(diff * _errorMaxPerc / 100.0);
var sw = Stopwatch.StartNew();
for (int i = 0; i < diff; i++)
{
Expand All @@ -1017,8 +1019,8 @@ private void TryLoadBufferAsync(int diff)
{
var tk = Task.Run(() =>
{
//33% error stop!
if (qtderr > maxerr)
//% error stop!
if (qtderr >= maxerr)
{
//force error
throw new RingBufferException(Name, "TryLoadBufferAsync Factory Exception");
Expand All @@ -1042,6 +1044,7 @@ private void TryLoadBufferAsync(int diff)
}
else
{
Interlocked.Increment(ref qtdok);
_availableBuffer.Enqueue(value);
_counterBuffer++;
WriteLogDebug(DateTime.Now, $"{Name} TryLoadBufferAsync Added New Item To Buffer : {_availableBuffer.Count} , Available : {_counterBuffer} Unavailable : {_counterAccquire}");
Expand All @@ -1053,9 +1056,8 @@ private void TryLoadBufferAsync(int diff)
}
catch (OperationCanceledException)
{
qtderr++;
//Send to retry
WriteLogWarning(DateTime.Now, $"{Name} TryLoadBufferAsync Send Factory to Retry (Timeout)");
WriteLogWarning(DateTime.Now, $"{Name} TryLoadBufferAsync Send Factory to Retry (Timeout). Created {qtdok}/{diff}");
_blockRetryFactoryBuffer.Add(DateTime.Now.Add(FactoryIdleRetry));
}
catch (Exception ex)
Expand Down

0 comments on commit 624aa4c

Please sign in to comment.