Skip to content

Commit

Permalink
Remove timeout for rate-limit pool (#2458)
Browse files Browse the repository at this point in the history
* Remove timeout for rate-limit pool
* Improve shutdown logic
  • Loading branch information
MinnDevelopment committed May 2, 2023
1 parent ee59f09 commit ce27c30
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 34 deletions.
Expand Up @@ -113,18 +113,13 @@ public void stop(boolean shutdown, @Nonnull Runnable callback)
shutdownHandle.thenRun(callback);
if (!doShutdown)
{
int size = buckets.size();
int average = (int) Math.ceil(
buckets.values().stream()
.map(Bucket::getRequests)
.mapToInt(Collection::size)
.average().orElse(0)
);

if (size > 0 && average > 0)
log.info("Waiting for {} bucket(s) to finish. Average queue size of {} requests", size, average);
else if (size == 0)
doShutdown = true;
int count = buckets.values().stream()
.mapToInt(bucket -> bucket.getRequests().size())
.sum();

if (count > 0)
log.info("Waiting for {} requests to finish.", count);
doShutdown = count == 0;
}
}
if (doShutdown && !isShutdown)
Expand Down Expand Up @@ -184,19 +179,26 @@ private void cleanup()
bucket.requests.removeIf(Work::isSkipped); // Remove cancelled requests

// Check if the bucket is empty
if (bucket.isUninit() && bucket.requests.isEmpty())
entries.remove(); // remove uninit if requests are empty
// If the requests of the bucket are drained and the reset is expired the bucket has no valuable information
else if (bucket.requests.isEmpty() && bucket.reset <= getNow())
entries.remove();
// Remove empty buckets when the rate limiter is stopped
else if (bucket.requests.isEmpty() && isStopped)
entries.remove();
if (bucket.requests.isEmpty())
{
// remove uninit if requests are empty
if (bucket.isUninit())
entries.remove();
// If the requests of the bucket are drained and the reset is expired the bucket has no valuable information
else if (bucket.reset <= getNow())
entries.remove();
// Remove empty buckets when the rate limiter is stopped
else if (isStopped)
entries.remove();
}
}

// Log how many buckets were removed
size -= buckets.size();
if (size > 0)
log.debug("Removed {} expired buckets", size);
else if (isStopped && !isShutdown)
shutdown();
});
}

Expand Down
Expand Up @@ -20,7 +20,10 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Supplier;

public class ThreadingConfig
Expand Down Expand Up @@ -97,19 +100,6 @@ public void shutdown()
eventPool.shutdown();
if (shutdownAudioPool && audioPool != null)
audioPool.shutdown();
if (shutdownRateLimitPool)
{
if (rateLimitPool instanceof ScheduledThreadPoolExecutor)
{
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) rateLimitPool;
executor.setKeepAliveTime(5L, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
}
else
{
rateLimitPool.shutdown();
}
}
}

public void shutdownRequester()
Expand Down

0 comments on commit ce27c30

Please sign in to comment.