-
Notifications
You must be signed in to change notification settings - Fork 412
/
BoundedModulePool.cs
96 lines (80 loc) · 2.8 KB
/
BoundedModulePool.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.JsonRpc.Exceptions;
namespace Nethermind.JsonRpc.Modules
{
public static class RpcLimits
{
public static void Init(int limit)
{
Limit = limit;
}
private static int Limit { get; set; }
private static bool Enabled => Limit > 0;
private static int _queuedCalls = 0;
public static void IncrementQueuedCalls()
{
if (Enabled)
Interlocked.Increment(ref _queuedCalls);
}
public static void DecrementQueuedCalls()
{
if (Enabled)
Interlocked.Decrement(ref _queuedCalls);
}
public static void EnsureLimits()
{
if (Enabled && _queuedCalls > Limit)
{
throw new LimitExceededException($"Unable to start new queued requests. Too many queued requests. Queued calls {_queuedCalls}.");
}
}
}
public class BoundedModulePool<T> : IRpcModulePool<T> where T : IRpcModule
{
private readonly int _timeout;
private readonly T _shared;
private readonly Task<T> _sharedAsTask;
private readonly ConcurrentQueue<T> _pool = new();
private readonly SemaphoreSlim _semaphore;
public BoundedModulePool(IRpcModuleFactory<T> factory, int exclusiveCapacity, int timeout)
{
_timeout = timeout;
Factory = factory;
_semaphore = new SemaphoreSlim(exclusiveCapacity);
for (int i = 0; i < exclusiveCapacity; i++)
{
_pool.Enqueue(Factory.Create());
}
_shared = factory.Create();
_sharedAsTask = Task.FromResult(_shared);
}
public Task<T> GetModule(bool canBeShared) => canBeShared ? _sharedAsTask : SlowPath();
private async Task<T> SlowPath()
{
RpcLimits.EnsureLimits();
RpcLimits.IncrementQueuedCalls();
if (!await _semaphore.WaitAsync(_timeout))
{
RpcLimits.DecrementQueuedCalls();
throw new ModuleRentalTimeoutException($"Unable to rent an instance of {typeof(T).Name}. Too many concurrent requests.");
}
RpcLimits.DecrementQueuedCalls();
_pool.TryDequeue(out T result);
return result;
}
public void ReturnModule(T module)
{
if (ReferenceEquals(module, _shared))
{
return;
}
_pool.Enqueue(module);
_semaphore.Release();
}
public IRpcModuleFactory<T> Factory { get; }
}
}