1515using System . IO ;
1616using System . Linq ;
1717using System . Threading ;
18+ using System . Threading . Tasks ;
19+ using Nito . AsyncEx ;
1820using ServiceStack . Caching ;
1921using ServiceStack . Logging ;
2022using ServiceStack . Text ;
@@ -34,6 +36,9 @@ public partial class PooledRedisClientManager
3436 private const string PoolTimeoutError =
3537 "Redis Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use." ;
3638
39+ private AsyncManualResetEvent readAsyncEvent ;
40+ private AsyncManualResetEvent writeAsyncEvent ;
41+
3742 protected readonly int PoolSizeMultiplier = 20 ;
3843 public int RecheckPoolAfterMs = 100 ;
3944 public int ? PoolTimeout { get ; set ; }
@@ -42,7 +47,7 @@ public partial class PooledRedisClientManager
4247 public int ? SocketReceiveTimeout { get ; set ; }
4348 public int ? IdleTimeOutSecs { get ; set ; }
4449 public bool AssertAccessOnlyOnSameThread { get ; set ; }
45-
50+
4651 /// <summary>
4752 /// Gets or sets object key prefix.
4853 /// </summary>
@@ -212,13 +217,136 @@ protected virtual void OnStart()
212217 this . Start ( ) ;
213218 }
214219
220+ private void pulseAllRead ( )
221+ {
222+ readAsyncEvent ? . Set ( ) ;
223+ readAsyncEvent ? . Reset ( ) ;
224+ Monitor . PulseAll ( readClients ) ;
225+ }
226+
227+ private void pulseAllWrite ( )
228+ {
229+ writeAsyncEvent ? . Set ( ) ;
230+ writeAsyncEvent ? . Reset ( ) ;
231+ Monitor . PulseAll ( writeClients ) ;
232+ }
233+
234+ private async Task < bool > waitForWriter ( int msTimeout )
235+ {
236+ if ( writeAsyncEvent == null ) // If we're not doing async, no need to create this till we need it.
237+ writeAsyncEvent = new AsyncManualResetEvent ( false ) ;
238+ var cts = new CancellationTokenSource ( TimeSpan . FromMilliseconds ( msTimeout ) ) ;
239+ try
240+ {
241+ await writeAsyncEvent . WaitAsync ( cts . Token ) ;
242+ }
243+ catch ( OperationCanceledException ) { return false ; }
244+ return true ;
245+ }
246+
215247 /// <summary>
216248 /// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
217249 /// </summary>
218250 /// <returns></returns>
219- public IRedisClient GetClient ( ) => GetClient ( false ) ;
251+ public IRedisClient GetClient ( ) => GetClientBlocking ( ) ;
220252
221- private RedisClient GetClient ( bool forAsync )
253+ private async ValueTask < IRedisClientAsync > GetClientAsync ( )
254+ {
255+ try
256+ {
257+ var inactivePoolIndex = - 1 ;
258+ do
259+ {
260+ RedisClient inActiveClient ;
261+ lock ( writeClients )
262+ {
263+ AssertValidReadWritePool ( ) ;
264+
265+ // If it's -1, then we want to try again after a delay of some kind. So if it's NOT negative one, process it...
266+ if ( ( inactivePoolIndex = GetInActiveWriteClient ( out inActiveClient ) ) != - 1 )
267+ {
268+ //inActiveClient != null only for Valid InActive Clients
269+ if ( inActiveClient != null )
270+ {
271+ WritePoolIndex ++ ;
272+ inActiveClient . Activate ( ) ;
273+
274+ InitClient ( inActiveClient ) ;
275+
276+ return inActiveClient ;
277+ }
278+ else
279+ {
280+ // Still need to be in lock for this!
281+ break ;
282+ }
283+ }
284+ }
285+
286+ if ( PoolTimeout . HasValue )
287+ {
288+ // We have a timeout value set - so try to not wait longer than this.
289+ if ( ! await waitForWriter ( PoolTimeout . Value ) )
290+ {
291+ throw new TimeoutException ( PoolTimeoutError ) ;
292+ }
293+ }
294+ else
295+ {
296+ // Wait forever, so just retry till we get one.
297+ await waitForWriter ( RecheckPoolAfterMs ) ;
298+ }
299+ } while ( true ) ; // Just keep repeating until we get a slot.
300+
301+ //Reaches here when there's no Valid InActive Clients, but we have a slot for one!
302+ try
303+ {
304+ //inactivePoolIndex = index of reservedSlot || index of invalid client
305+ var existingClient = writeClients [ inactivePoolIndex ] ;
306+ if ( existingClient != null && existingClient != reservedSlot && existingClient . HadExceptions )
307+ {
308+ RedisState . DeactivateClient ( existingClient ) ;
309+ }
310+
311+ var newClient = InitNewClient ( RedisResolver . CreateMasterClient ( inactivePoolIndex ) ) ;
312+
313+ //Put all blocking I/O or potential Exceptions before lock
314+ lock ( writeClients )
315+ {
316+ //If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
317+ if ( writeClients [ inactivePoolIndex ] != existingClient )
318+ {
319+ if ( Log . IsDebugEnabled )
320+ Log . Debug ( "writeClients[inactivePoolIndex] != existingClient: {0}" . Fmt ( writeClients [ inactivePoolIndex ] ) ) ;
321+
322+ return newClient ; //return client outside of pool
323+ }
324+
325+ WritePoolIndex ++ ;
326+ writeClients [ inactivePoolIndex ] = newClient ;
327+
328+ return ( ! AssertAccessOnlyOnSameThread )
329+ ? newClient
330+ : newClient . LimitAccessToThread ( Thread . CurrentThread . ManagedThreadId , Environment . StackTrace ) ;
331+ }
332+ }
333+ catch
334+ {
335+ //Revert free-slot for any I/O exceptions that can throw (before lock)
336+ lock ( writeClients )
337+ {
338+ writeClients [ inactivePoolIndex ] = null ; //free slot
339+ }
340+ throw ;
341+ }
342+ }
343+ finally
344+ {
345+ RedisState . DisposeExpiredClients ( ) ;
346+ }
347+ }
348+
349+ private RedisClient GetClientBlocking ( )
222350 {
223351 try
224352 {
@@ -254,7 +382,7 @@ private RedisClient GetClient(bool forAsync)
254382
255383 InitClient ( inActiveClient ) ;
256384
257- return ( ! AssertAccessOnlyOnSameThread || forAsync )
385+ return ( ! AssertAccessOnlyOnSameThread )
258386 ? inActiveClient
259387 : inActiveClient . LimitAccessToThread ( Thread . CurrentThread . ManagedThreadId , Environment . StackTrace ) ;
260388 }
@@ -290,7 +418,7 @@ private RedisClient GetClient(bool forAsync)
290418 WritePoolIndex ++ ;
291419 writeClients [ inactivePoolIndex ] = newClient ;
292420
293- return ( ! AssertAccessOnlyOnSameThread || forAsync )
421+ return ( ! AssertAccessOnlyOnSameThread )
294422 ? newClient
295423 : newClient . LimitAccessToThread ( Thread . CurrentThread . ManagedThreadId , Environment . StackTrace ) ;
296424 }
@@ -546,7 +674,7 @@ public void DisposeClient(RedisNativeClient client)
546674 client . Deactivate ( ) ;
547675 }
548676
549- Monitor . PulseAll ( readClients ) ;
677+ pulseAllRead ( ) ;
550678 return ;
551679 }
552680 }
@@ -567,16 +695,21 @@ public void DisposeClient(RedisNativeClient client)
567695 client . Deactivate ( ) ;
568696 }
569697
570- Monitor . PulseAll ( writeClients ) ;
698+ pulseAllWrite ( ) ;
571699 return ;
572700 }
573701 }
574702
575703 //Client not found in any pool, pulse both pools.
576704 lock ( readClients )
577- Monitor . PulseAll ( readClients ) ;
705+ {
706+ pulseAllRead ( ) ;
707+ }
708+
578709 lock ( writeClients )
579- Monitor . PulseAll ( writeClients ) ;
710+ {
711+ pulseAllWrite ( ) ;
712+ }
580713 }
581714
582715 /// <summary>
@@ -588,7 +721,7 @@ public void DisposeReadOnlyClient(RedisNativeClient client)
588721 lock ( readClients )
589722 {
590723 client . Deactivate ( ) ;
591- Monitor . PulseAll ( readClients ) ;
724+ pulseAllRead ( ) ;
592725 }
593726 }
594727
@@ -601,7 +734,7 @@ public void DisposeWriteClient(RedisNativeClient client)
601734 lock ( writeClients )
602735 {
603736 client . Deactivate ( ) ;
604- Monitor . PulseAll ( writeClients ) ;
737+ pulseAllWrite ( ) ;
605738 }
606739 }
607740
0 commit comments