Skip to content

Commit

Permalink
Improve conditional waiting and shutdown handling (#2269)
Browse files Browse the repository at this point in the history
* Make use of conditional variables instead of polling loops
* Add awaitShutdown
* Improve shutdown logic to allow awaitShutdown to work
* Fire ShutdownEvent when requester is also shutdown
* Ensure shutdown is idempotent
  • Loading branch information
MinnDevelopment committed Jan 28, 2023
1 parent 299f4d0 commit 30ba384
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 74 deletions.
109 changes: 105 additions & 4 deletions src/main/java/net/dv8tion/jda/api/JDA.java
Expand Up @@ -55,12 +55,10 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.List;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
Expand Down Expand Up @@ -378,6 +376,109 @@ default JDA awaitReady() throws InterruptedException
return awaitStatus(Status.CONNECTED);
}

/**
* Blocks the current thread until {@link #getStatus()} returns {@link Status#SHUTDOWN}.
* <br>This can be useful in certain situations like disabling class loading.
*
* <p>Note that shutdown time depends on the length of the rate-limit queue.
* You can use {@link #shutdownNow()} to cancel all pending requests and immediately shutdown.
*
* <p><b>Example</b>
* <pre>{@code
* jda.shutdown();
* // Allow at most 10 seconds for remaining requests to finish
* if (!jda.awaitShutdown(10, TimeUnit.SECONDS)) {
* jda.shutdownNow(); // Cancel all remaining requests
* jda.awaitShutdown(); // Wait until shutdown is complete (indefinitely)
* }
* }</pre>
*
* <p><b>This will not implicitly call {@code shutdown()}, you are responsible to ensure that the shutdown process has started.</b>
*
* @param duration
* The maximum time to wait, or 0 to wait indefinitely
* @param unit
* The time unit for the duration
*
* @throws IllegalArgumentException
* If the provided unit is null
* @throws InterruptedException
* If the current thread is interrupted while waiting
*
* @return False, if the timeout has elapsed before the shutdown has completed, true otherwise.
*/
@CheckReturnValue
boolean awaitShutdown(long duration, @Nonnull TimeUnit unit) throws InterruptedException;

/**
* Blocks the current thread until {@link #getStatus()} returns {@link Status#SHUTDOWN}.
* <br>This can be useful in certain situations like disabling class loading.
*
* <p>Note that shutdown time depends on the length of the rate-limit queue.
* You can use {@link #shutdownNow()} to cancel all pending requests and immediately shutdown.
*
* <p><b>Example</b>
* <pre>{@code
* jda.shutdown();
* // Allow at most 10 seconds for remaining requests to finish
* if (!jda.awaitShutdown(Duration.ofSeconds(10))) {
* jda.shutdownNow(); // Cancel all remaining requests
* jda.awaitShutdown(); // Wait until shutdown is complete (indefinitely)
* }
* }</pre>
*
* <p><b>This will not implicitly call {@code shutdown()}, you are responsible to ensure that the shutdown process has started.</b>
*
* @param timeout
* The maximum time to wait, or {@link Duration#ZERO} to wait indefinitely
*
* @throws IllegalArgumentException
* If the provided timeout is null
* @throws InterruptedException
* If the current thread is interrupted while waiting
*
* @return False, if the timeout has elapsed before the shutdown has completed, true otherwise.
*/
@CheckReturnValue
default boolean awaitShutdown(@Nonnull Duration timeout) throws InterruptedException
{
Checks.notNull(timeout, "Timeout");
return awaitShutdown(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

/**
* Blocks the current thread until {@link #getStatus()} returns {@link Status#SHUTDOWN}.
* <br>This can be useful in certain situations like disabling class loading.
*
* <p>This will wait indefinitely by default. Use {@link #awaitShutdown(Duration)} to set a timeout.
*
* <p>Note that shutdown time depends on the length of the rate-limit queue.
* You can use {@link #shutdownNow()} to cancel all pending requests and immediately shutdown.
*
* <p><b>Example</b>
* <pre>{@code
* jda.shutdown();
* // Allow at most 10 seconds for remaining requests to finish
* if (!jda.awaitShutdown(Duration.ofSeconds(10))) {
* jda.shutdownNow(); // Cancel all remaining requests
* jda.awaitShutdown(); // Wait until shutdown is complete (indefinitely)
* }
* }</pre>
*
* <p><b>This will not implicitly call {@code shutdown()}, you are responsible to ensure that the shutdown process has started.</b>
*
* @throws IllegalArgumentException
* If the provided timeout is null
* @throws InterruptedException
* If the current thread is interrupted while waiting
*
* @return Always true
*/
default boolean awaitShutdown() throws InterruptedException
{
return awaitShutdown(0, TimeUnit.MILLISECONDS);
}

/**
* Cancels all currently scheduled {@link RestAction} requests.
* <br>When a {@link RestAction} is cancelled, a {@link java.util.concurrent.CancellationException} will be provided
Expand Down
119 changes: 91 additions & 28 deletions src/main/java/net/dv8tion/jda/internal/JDAImpl.java
Expand Up @@ -35,6 +35,7 @@
import net.dv8tion.jda.api.events.GatewayPingEvent;
import net.dv8tion.jda.api.events.GenericEvent;
import net.dv8tion.jda.api.events.StatusChangeEvent;
import net.dv8tion.jda.api.events.session.ShutdownEvent;
import net.dv8tion.jda.api.exceptions.AccountTypeException;
import net.dv8tion.jda.api.exceptions.InvalidTokenException;
import net.dv8tion.jda.api.exceptions.ParsingException;
Expand Down Expand Up @@ -76,6 +77,7 @@
import net.dv8tion.jda.internal.requests.restaction.CommandEditActionImpl;
import net.dv8tion.jda.internal.requests.restaction.CommandListUpdateActionImpl;
import net.dv8tion.jda.internal.requests.restaction.GuildActionImpl;
import net.dv8tion.jda.internal.utils.Helpers;
import net.dv8tion.jda.internal.utils.*;
import net.dv8tion.jda.internal.utils.cache.AbstractCacheView;
import net.dv8tion.jda.internal.utils.cache.SnowflakeCacheViewImpl;
Expand All @@ -88,11 +90,13 @@
import org.slf4j.MDC;

import javax.annotation.Nonnull;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

public class JDAImpl implements JDA
Expand Down Expand Up @@ -131,7 +135,6 @@ public class JDAImpl implements JDA
protected WebSocketClient client;
protected Requester requester;
protected IAudioSendFactory audioSendFactory = new DefaultSendFactory();
protected Status status = Status.INITIALIZING;
protected SelfUser selfUser;
protected ShardInfo shardInfo;
protected long responseTotal;
Expand All @@ -143,6 +146,12 @@ public class JDAImpl implements JDA
protected ShardManager shardManager = null;
protected MemberCachePolicy memberCachePolicy = MemberCachePolicy.ALL;

protected final AtomicReference<Status> status = new AtomicReference<>(Status.INITIALIZING);
protected final ReentrantLock statusLock = new ReentrantLock();
protected final Condition statusCondition = statusLock.newCondition();
protected final AtomicBoolean requesterShutdown = new AtomicBoolean(false);
protected final AtomicReference<ShutdownEvent> shutdownEvent = new AtomicReference<>(null);

public JDAImpl(AuthorizationConfig authConfig)
{
this(authConfig, null, null, null);
Expand All @@ -156,7 +165,7 @@ public JDAImpl(
this.threadConfig = threadConfig == null ? ThreadingConfig.getDefault() : threadConfig;
this.sessionConfig = sessionConfig == null ? SessionConfig.getDefault() : sessionConfig;
this.metaConfig = metaConfig == null ? MetaConfig.getDefault() : metaConfig;
this.shutdownHook = this.metaConfig.isUseShutdownHook() ? new Thread(this::shutdown, "JDA Shutdown Hook") : null;
this.shutdownHook = this.metaConfig.isUseShutdownHook() ? new Thread(this::shutdownNow, "JDA Shutdown Hook") : null;
this.presence = new PresenceImpl(this);
this.requester = new Requester(this);
this.requester.setRetryOnTimeout(this.sessionConfig.isRetryOnTimeout());
Expand Down Expand Up @@ -355,14 +364,15 @@ public void setToken(String token)

public void setStatus(Status status)
{
//noinspection SynchronizeOnNonFinalField
synchronized (this.status)
{
Status oldStatus = this.status;
this.status = status;
StatusChangeEvent event = MiscUtil.locked(statusLock, () -> {
Status oldStatus = this.status.getAndSet(status);
this.statusCondition.signalAll();

handleEvent(new StatusChangeEvent(this, status, oldStatus));
}
return new StatusChangeEvent(this, status, oldStatus);
});

if (event.getOldStatus() != event.getNewStatus())
handleEvent(event);
}

public void verifyToken()
Expand Down Expand Up @@ -437,7 +447,7 @@ public boolean isAutoReconnect()
@Override
public Status getStatus()
{
return status;
return status.get();
}

@Nonnull
Expand Down Expand Up @@ -480,22 +490,57 @@ public long getGatewayPing()
public JDA awaitStatus(@Nonnull Status status, @Nonnull Status... failOn) throws InterruptedException
{
Checks.notNull(status, "Status");
Checks.check(status.isInit(), "Cannot await the status %s as it is not part of the login cycle!", status);
if (getStatus() == Status.CONNECTED)
return this;
List<Status> failStatus = Arrays.asList(failOn);
while (!getStatus().isInit() // JDA might disconnect while starting
|| getStatus().ordinal() < status.ordinal()) // Wait until status is bypassed

MiscUtil.tryLock(statusLock);
try
{
if (getStatus() == Status.SHUTDOWN)
throw new IllegalStateException("Was shutdown trying to await status.\nReason: " + shutdownReason);
else if (failStatus.contains(getStatus()))
return this;
Thread.sleep(50);
EnumSet<Status> endCondition = EnumSet.of(status, failOn);
Status current = getStatus();
while (!current.isInit() // In case of disconnects during startup
|| current.ordinal() < status.ordinal()) // If we missed the status (e.g. LOGGING_IN -> CONNECTED happened while waiting for lock)
{
if (current == Status.SHUTDOWN)
throw new IllegalStateException("Was shutdown trying to await status.\nReason: " + shutdownReason);
if (endCondition.contains(current))
return this;

statusCondition.await();
current = getStatus();
}
}
finally
{
statusLock.unlock();
}

return this;
}

@Override
public boolean awaitShutdown(long timeout, @Nonnull TimeUnit unit) throws InterruptedException
{
timeout = unit.toMillis(timeout);
long deadline = timeout == 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeout;
MiscUtil.tryLock(statusLock);
try
{
Status current = getStatus();
while (current != Status.SHUTDOWN)
{
if (!statusCondition.await(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS))
return false;
current = getStatus();
}
return true;
}
finally
{
statusLock.unlock();
}
}

@Override
public int cancelRequests()
{
Expand Down Expand Up @@ -786,30 +831,34 @@ public synchronized void shutdownNow()
@Override
public synchronized void shutdown()
{
Status status = getStatus();
if (status == Status.SHUTDOWN || status == Status.SHUTTING_DOWN)
return;

setStatus(Status.SHUTTING_DOWN);
shutdownInternals();

WebSocketClient client = getClient();
if (client != null)
{
client.getChunkManager().shutdown();
client.shutdown();
}
else
{
shutdownInternals(new ShutdownEvent(this, OffsetDateTime.now(), 1000));
}
}

public synchronized void shutdownInternals()
public void shutdownInternals(ShutdownEvent event)
{
if (status == Status.SHUTDOWN)
if (getStatus() == Status.SHUTDOWN)
return;
//so we can shutdown from WebSocketClient properly
closeAudioConnections();
guildSetupController.close();

// stop accepting new requests
if (requester.stop()) // returns true if no more requests will be executed
if (requester.stop()) // returns true if no more requests will be executed
shutdownRequester(); // in that case shutdown entirely
threadConfig.shutdown();

Expand All @@ -822,14 +871,28 @@ public synchronized void shutdownInternals()
catch (Exception ignored) {}
}

setStatus(Status.SHUTDOWN);
// If the requester has been shutdown too, we can fire the shutdown event
boolean signal = MiscUtil.locked(statusLock, () -> shutdownEvent.getAndSet(event) == null && requesterShutdown.get());
if (signal)
signalShutdown();
}

public void shutdownRequester()
{
// Stop all request processing
requester.shutdown();
threadConfig.shutdownRequester();

// If the websocket has been shutdown too, we can fire the shutdown event
boolean signal = MiscUtil.locked(statusLock, () -> !requesterShutdown.getAndSet(true) && shutdownEvent.get() != null);
if (signal)
signalShutdown();
}

private void signalShutdown()
{
setStatus(Status.SHUTDOWN);
handleEvent(shutdownEvent.get());
}

private void closeAudioConnections()
Expand Down

0 comments on commit 30ba384

Please sign in to comment.