1515using System . IO ;
1616using System . Linq ;
1717using System . Threading ;
18+ using System . Threading . Tasks ;
19+ using Cleary . AsyncExtensions ;
1820using ServiceStack . Caching ;
1921using ServiceStack . Logging ;
2022using ServiceStack . Text ;
@@ -34,6 +36,9 @@ public partial class PooledRedisClientManager
3436 private const string PoolTimeoutError =
3537 "Redis Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use." ;
3638
39+ private AsyncMonitor readMonitor ;
40+ private AsyncMonitor writeMonitor ;
41+
3742 protected readonly int PoolSizeMultiplier = 20 ;
3843 public int RecheckPoolAfterMs = 100 ;
3944 public int ? PoolTimeout { get ; set ; }
@@ -42,7 +47,7 @@ public partial class PooledRedisClientManager
4247 public int ? SocketReceiveTimeout { get ; set ; }
4348 public int ? IdleTimeOutSecs { get ; set ; }
4449 public bool AssertAccessOnlyOnSameThread { get ; set ; }
45-
50+
4651 /// <summary>
4752 /// Gets or sets object key prefix.
4853 /// </summary>
@@ -211,15 +216,128 @@ protected virtual void OnStart()
211216 {
212217 this . Start ( ) ;
213218 }
219+
220+ private async Task < bool > waitForWriter ( int msTimeout )
221+ {
222+ writeMonitor = new AsyncMonitor ( ) ;
223+ var cts = new CancellationTokenSource ( ) ;
224+ cts . CancelAfter ( msTimeout ) ;
225+ await writeMonitor . WaitAsync ( cts . Token ) ;
226+ if ( cts . IsCancellationRequested ) return false ;
227+ cts . Cancel ( ) ;
228+ return true ;
229+ }
214230
215231 /// <summary>
216232 /// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
217233 /// </summary>
218234 /// <returns></returns>
219235 public IRedisClient GetClient ( ) => GetClient ( false ) ;
220236
237+ private async ValueTask < IRedisClientAsync > GetClientAsync ( )
238+ {
239+ try
240+ {
241+ var poolTimedOut = false ;
242+ var inactivePoolIndex = - 1 ;
243+
244+ do
245+ {
246+ RedisClient inActiveClient ;
247+ lock ( writeClients )
248+ {
249+ AssertValidReadWritePool ( ) ;
250+
251+ // If it's -1, then we want to try again after a delay of some kind. So if it's NOT negative one, process it...
252+ if ( ( inactivePoolIndex = GetInActiveWriteClient ( out inActiveClient ) ) != - 1 )
253+ {
254+ //inActiveClient != null only for Valid InActive Clients
255+ if ( inActiveClient != null )
256+ {
257+ WritePoolIndex ++ ;
258+ inActiveClient . Activate ( ) ;
259+
260+ InitClient ( inActiveClient ) ;
261+
262+ return inActiveClient ;
263+ }
264+ else
265+ {
266+ // Still need to be in lock for this!
267+ break ;
268+ }
269+ }
270+ }
271+
272+ // Didn't get one, so let's wait a bit for a new one.
273+ if ( PoolTimeout . HasValue )
274+ {
275+ if ( ! await waitForWriter ( PoolTimeout . Value ) )
276+ {
277+ poolTimedOut = true ;
278+ break ;
279+ }
280+ }
281+ else
282+ {
283+ await waitForWriter ( RecheckPoolAfterMs ) ;
284+ }
285+ } while ( true ) ; // Just keep repeating until we get a
286+
287+ if ( poolTimedOut )
288+ throw new TimeoutException ( PoolTimeoutError ) ;
289+
290+ //Reaches here when there's no Valid InActive Clients
291+ try
292+ {
293+ //inactivePoolIndex = index of reservedSlot || index of invalid client
294+ var existingClient = writeClients [ inactivePoolIndex ] ;
295+ if ( existingClient != null && existingClient != reservedSlot && existingClient . HadExceptions )
296+ {
297+ RedisState . DeactivateClient ( existingClient ) ;
298+ }
299+
300+ var newClient = InitNewClient ( RedisResolver . CreateMasterClient ( inactivePoolIndex ) ) ;
301+
302+ //Put all blocking I/O or potential Exceptions before lock
303+ lock ( writeClients )
304+ {
305+ //If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
306+ if ( writeClients [ inactivePoolIndex ] != existingClient )
307+ {
308+ if ( Log . IsDebugEnabled )
309+ Log . Debug ( "writeClients[inactivePoolIndex] != existingClient: {0}" . Fmt ( writeClients [ inactivePoolIndex ] ) ) ;
310+
311+ return newClient ; //return client outside of pool
312+ }
313+
314+ WritePoolIndex ++ ;
315+ writeClients [ inactivePoolIndex ] = newClient ;
316+
317+ return ( ! AssertAccessOnlyOnSameThread )
318+ ? newClient
319+ : newClient . LimitAccessToThread ( Thread . CurrentThread . ManagedThreadId , Environment . StackTrace ) ;
320+ }
321+ }
322+ catch
323+ {
324+ //Revert free-slot for any I/O exceptions that can throw (before lock)
325+ lock ( writeClients )
326+ {
327+ writeClients [ inactivePoolIndex ] = null ; //free slot
328+ }
329+ throw ;
330+ }
331+ }
332+ finally
333+ {
334+ RedisState . DisposeExpiredClients ( ) ;
335+ }
336+ }
337+
221338 private RedisClient GetClient ( bool forAsync )
222339 {
340+ if ( forAsync ) throw new Exception ( "Call GetClientAsync instead" ) ;
223341 try
224342 {
225343 var poolTimedOut = false ;
@@ -547,6 +665,7 @@ public void DisposeClient(RedisNativeClient client)
547665 }
548666
549667 Monitor . PulseAll ( readClients ) ;
668+ readMonitor ? . PulseAll ( ) ;
550669 return ;
551670 }
552671 }
@@ -568,15 +687,23 @@ public void DisposeClient(RedisNativeClient client)
568687 }
569688
570689 Monitor . PulseAll ( writeClients ) ;
690+ writeMonitor ? . PulseAll ( ) ;
571691 return ;
572692 }
573693 }
574694
575695 //Client not found in any pool, pulse both pools.
576696 lock ( readClients )
577- Monitor . PulseAll ( readClients ) ;
697+ {
698+ Monitor . PulseAll ( readClients ) ;
699+ readMonitor ? . PulseAll ( ) ;
700+ }
701+
578702 lock ( writeClients )
579- Monitor . PulseAll ( writeClients ) ;
703+ {
704+ Monitor . PulseAll ( writeClients ) ;
705+ writeMonitor ? . PulseAll ( ) ;
706+ }
580707 }
581708
582709 /// <summary>
@@ -589,6 +716,7 @@ public void DisposeReadOnlyClient(RedisNativeClient client)
589716 {
590717 client . Deactivate ( ) ;
591718 Monitor . PulseAll ( readClients ) ;
719+ readMonitor ? . PulseAll ( ) ;
592720 }
593721 }
594722
@@ -602,6 +730,7 @@ public void DisposeWriteClient(RedisNativeClient client)
602730 {
603731 client . Deactivate ( ) ;
604732 Monitor . PulseAll ( writeClients ) ;
733+ writeMonitor ? . PulseAll ( ) ;
605734 }
606735 }
607736
0 commit comments