Skip to content

Commit

Permalink
Fix #800 apply a timeout on full member list requests
Browse files Browse the repository at this point in the history
Resume timeout error with empty Mono in StoreEntityRetriever
  • Loading branch information
quanticc committed Nov 17, 2020
1 parent 41af466 commit b36af22
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 25 deletions.
37 changes: 25 additions & 12 deletions core/src/main/java/discord4j/core/GatewayDiscordClient.java
Expand Up @@ -37,11 +37,7 @@
import discord4j.core.spec.GuildCreateSpec;
import discord4j.core.spec.UserEditSpec;
import discord4j.core.util.ValidationUtil;
import discord4j.discordjson.json.ActivityUpdateRequest;
import discord4j.discordjson.json.EmojiData;
import discord4j.discordjson.json.GuildData;
import discord4j.discordjson.json.GuildUpdateData;
import discord4j.discordjson.json.RoleData;
import discord4j.discordjson.json.*;
import discord4j.discordjson.json.gateway.GuildMembersChunk;
import discord4j.discordjson.json.gateway.RequestGuildMembers;
import discord4j.discordjson.json.gateway.StatusUpdate;
Expand All @@ -62,16 +58,20 @@
import reactor.util.Logger;
import reactor.util.Loggers;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static discord4j.common.LogUtil.format;

/**
* An aggregation of all dependencies Discord4J requires to operate with the Discord Gateway, REST API and Voice
* Gateway. Maintains a set of connections to every shard logged in from the same {@link GatewayBootstrap} and therefore
Expand Down Expand Up @@ -505,19 +505,27 @@ public Flux<Member> requestMembers(Snowflake guildId) {
* the {@link Flux}.
*/
public Flux<Member> requestMembers(Snowflake guildId, Set<Snowflake> userIds) {
return requestMembers(RequestGuildMembers.builder()
.guildId(guildId.asString())
.userIds(userIds.stream().map(Snowflake::asString).collect(Collectors.toList()))
.limit(0)
.build());
return Flux.fromIterable(userIds)
.map(Snowflake::asString)
.buffer(100)
.concatMap(userIdBuffer -> requestMembers(RequestGuildMembers.builder()
.guildId(guildId.asString())
.userIds(userIdBuffer)
.limit(0)
.build()));
}

/**
* Submit a {@link RequestGuildMembers} payload using the current Gateway connection and wait for its completion,
* delivering {@link Member} elements asynchronously through a {@link Flux}. This method performs a check to
* validate whether the given guild's data can be obtained from this {@link GatewayDiscordClient}.
* <p>
* A timeout given by is used to fail this request if the operation is unable to complete due to disallowed or
* disabled members intent. This is particularly relevant when requesting a complete member list. If the timeout is
* triggered, a {@link TimeoutException} is forwarded through the {@link Flux}.
*
* @param request the member request to submit. Create one using {@link RequestGuildMembers#builder()}.
* {@link Flux#timeout(Duration)}
* @return a {@link Flux} of {@link Member} for the given {@link Guild}. If an error occurs, it is emitted through
* the {@link Flux}.
*/
Expand All @@ -542,13 +550,18 @@ public Flux<Member> requestMembers(RequestGuildMembers request) {
.map(data -> new Member(this, data, guildId.asLong()))
.collect(Collectors.toList())))
.orElseThrow(() -> new IllegalStateException("Unable to find gateway client"));
return getGuildById(guildId) // check if this operation is valid otherwise request+waiting will hang
Duration timeout = gatewayResources.getMemberRequestTimeout();
return Flux.deferWithContext(ctx -> getGuildById(guildId)
.then(gatewayClientGroup.unicast(ShardGatewayPayload.requestGuildMembers(
RequestGuildMembers.builder()
.from(request)
.nonce(nonce)
.build(), shardId)))
.thenMany(Flux.defer(incomingMembers));
.thenMany(Flux.defer(incomingMembers))
.transform(flux -> ValidationUtil.isRequestingEntireList(request) ? flux.timeout(timeout) : flux)
.doOnComplete(() -> log.debug(format(ctx, "Member request completed: {}"), request))
.doOnError(TimeoutException.class,
t -> log.warn(format(ctx, "Member request timed out: {}"), request)));
}

/**
Expand Down
30 changes: 27 additions & 3 deletions core/src/main/java/discord4j/core/GatewayResources.java
Expand Up @@ -31,6 +31,8 @@
import discord4j.store.api.Store;
import discord4j.voice.VoiceReactorResources;

import java.time.Duration;

/**
* A set of dependencies required to build and coordinate multiple {@link GatewayClient} instances.
*/
Expand All @@ -44,6 +46,7 @@ public class GatewayResources {
private final VoiceReactorResources voiceReactorResources;
private final ReconnectOptions voiceReconnectOptions;
private final Possible<IntentSet> intents;
private final Duration memberRequestTimeout;

/**
* Create a new {@link GatewayResources} with the given parameters.
Expand All @@ -55,12 +58,18 @@ public class GatewayResources {
* @param gatewayReactorResources a custom set of Reactor resources targeting Gateway operations
* @param voiceReactorResources a set of Reactor resources targeting Voice Gateway operations
* @param voiceReconnectOptions a reconnection policy for Voice Gateway connections
* @param intents an optional set of events to subscribe when connecting to the Gateway
* @param memberRequestTimeout a {@link Duration} to limit the time member list requests take
*/
public GatewayResources(StateView stateView, EventDispatcher eventDispatcher,
ShardCoordinator shardCoordinator, MemberRequestFilter memberRequestFilter,
public GatewayResources(StateView stateView,
EventDispatcher eventDispatcher,
ShardCoordinator shardCoordinator,
MemberRequestFilter memberRequestFilter,
GatewayReactorResources gatewayReactorResources,
VoiceReactorResources voiceReactorResources,
ReconnectOptions voiceReconnectOptions, Possible<IntentSet> intents) {
ReconnectOptions voiceReconnectOptions,
Possible<IntentSet> intents,
Duration memberRequestTimeout) {
this.stateView = stateView;
this.eventDispatcher = eventDispatcher;
this.shardCoordinator = shardCoordinator;
Expand All @@ -69,6 +78,7 @@ public GatewayResources(StateView stateView, EventDispatcher eventDispatcher,
this.voiceReactorResources = voiceReactorResources;
this.voiceReconnectOptions = voiceReconnectOptions;
this.intents = intents;
this.memberRequestTimeout = memberRequestTimeout;
}

/**
Expand Down Expand Up @@ -146,4 +156,18 @@ public VoiceReactorResources getVoiceReactorResources() {
public ReconnectOptions getVoiceReconnectOptions() {
return voiceReconnectOptions;
}

/**
* Return a {@link Duration} to be used when the target client requests a complete list of guild members through
* the Gateway. Such requests might never be fulfilled if Gateway Intents are not used and privileged guild
* members intent is not enabled in the developer panel.
*
* @return the default timeout to be applied on complete member list requests
* @deprecated to be removed in v3.2, as Gateway Intents are mandatory and client-side validations can be
* reliably performed
*/
@Deprecated
public Duration getMemberRequestTimeout() {
return memberRequestTimeout;
}
}
Expand Up @@ -16,19 +16,20 @@
*/
package discord4j.core.retriever;

import discord4j.common.util.Snowflake;
import discord4j.core.GatewayDiscordClient;
import discord4j.core.object.entity.*;
import discord4j.core.object.entity.channel.Channel;
import discord4j.core.object.entity.channel.GuildChannel;
import discord4j.core.state.StateView;
import discord4j.core.util.EntityUtil;
import discord4j.gateway.intent.Intent;
import discord4j.common.util.Snowflake;
import discord4j.store.api.util.LongLongTuple2;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.concurrent.TimeoutException;

public class StoreEntityRetriever implements EntityRetriever {

Expand Down Expand Up @@ -114,7 +115,8 @@ public Flux<Member> getGuildMembers(Snowflake guildId) {
.flatMap(memberId -> stateView.getMemberStore()
.find(LongLongTuple2.of(guildId.asLong(), Snowflake.asLong(memberId))))
.map(member -> new Member(gateway, member, guildId.asLong())))
.switchIfEmpty(gateway.requestMembers(guildId));
.switchIfEmpty(gateway.requestMembers(guildId))
.onErrorResume(TimeoutException.class, t -> Mono.empty());
}

@Override
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/java/discord4j/core/shard/GatewayBootstrap.java
Expand Up @@ -141,6 +141,7 @@ public class GatewayBootstrap<O extends GatewayOptions> {
private DispatchEventMapper dispatchEventMapper = null;
private int maxMissedHeartbeatAck = 1;
private Function<EventDispatcher, Publisher<?>> dispatcherFunction;
private Duration memberRequestTimeout = Duration.ofSeconds(10);

/**
* Create a default {@link GatewayBootstrap} based off the given {@link DiscordClient} that provides an instance
Expand Down Expand Up @@ -186,6 +187,7 @@ public static GatewayBootstrap<GatewayOptions> create(DiscordClient client) {
this.dispatchEventMapper = source.dispatchEventMapper;
this.maxMissedHeartbeatAck = source.maxMissedHeartbeatAck;
this.dispatcherFunction = source.dispatcherFunction;
this.memberRequestTimeout = source.memberRequestTimeout;
}

/**
Expand Down Expand Up @@ -603,6 +605,22 @@ public GatewayBootstrap<O> setMaxMissedHeartbeatAck(int maxMissedHeartbeatAck) {
return this;
}

/**
* Set a {@link Duration} to apply a timeout on Gateway member list requests. Defaults to 10 seconds and it is
* useful to detect scenarios when no intents are used but the guild members privileged intent is not enabled in the
* bot developer portal.
*
* @param memberRequestTimeout the maximum {@link Duration} allowed between response member chunks
* @return this builder
* @deprecated for removal in v3.2, as Gateway Intents become mandatory there is no need to apply a forced timeout
* on a request as invalid ones will be rejected upon subscription.
*/
@Deprecated
public GatewayBootstrap<O> setMemberRequestTimeout(Duration memberRequestTimeout) {
this.memberRequestTimeout = Objects.requireNonNull(memberRequestTimeout);
return this;
}

/**
* Set an initial subscriber to the bootstrapped {@link EventDispatcher} to gain access to early startup events. The
* subscriber is derived from the given {@link Function} which returns a {@link Publisher} that is subscribed early
Expand Down Expand Up @@ -689,7 +707,7 @@ public Mono<GatewayDiscordClient> login(Function<O, GatewayClient> clientFactory
VoiceReactorResources voiceReactorResources = b.initVoiceReactorResources();
GatewayResources resources = new GatewayResources(stateView, eventDispatcher, shardCoordinator,
b.memberRequestFilter, gatewayReactorResources, b.initVoiceReactorResources(),
b.initReconnectOptions(voiceReactorResources), b.intents);
b.initReconnectOptions(voiceReactorResources), b.intents, b.memberRequestTimeout);
MonoProcessor<Void> closeProcessor = MonoProcessor.create();
EntityRetrievalStrategy entityRetrievalStrategy = b.initEntityRetrievalStrategy();
DispatchEventMapper dispatchMapper = b.initDispatchEventMapper();
Expand Down
19 changes: 15 additions & 4 deletions core/src/main/java/discord4j/core/util/ValidationUtil.java
Expand Up @@ -23,12 +23,14 @@
import discord4j.gateway.intent.IntentSet;

public class ValidationUtil {

/**
* Throws if the request is invalid given the current intents.
*
* @param request The request to validate
* @param possibleIntents The current intents
* @see <a href="https://discord.com/developers/docs/topics/gateway#request-guild-members">https://discord.com/developers/docs/topics/gateway#request-guild-members</a>
* @see
* <a href="https://discord.com/developers/docs/topics/gateway#request-guild-members">https://discord.com/developers/docs/topics/gateway#request-guild-members</a>
*/
public static void validateRequestGuildMembers(RequestGuildMembers request, Possible<IntentSet> possibleIntents) {
if (request.query().isAbsent() == request.userIds().isAbsent()) {
Expand All @@ -46,10 +48,19 @@ public static void validateRequestGuildMembers(RequestGuildMembers request, Poss
throw new IllegalArgumentException("GUILD_PRESENCES intent is required to set presences = true.");
}

boolean requestingEntireList = request.query().toOptional().map(String::isEmpty).orElse(false)
&& request.limit() == 0;
if (requestingEntireList && !intents.contains(Intent.GUILD_MEMBERS)) {
if (isRequestingEntireList(request) && !intents.contains(Intent.GUILD_MEMBERS)) {
throw new IllegalArgumentException("GUILD_MEMBERS intent is required to request the entire member list");
}
}

/**
* Return whether the given {@link RequestGuildMembers} instance is requesting an entire guild's list of members.
*
* @param request the request to check
* @return {@code true} if this request will attempt to retrieve the complete list of guild members, and {@code
* false} otherwise
*/
public static boolean isRequestingEntireList(RequestGuildMembers request) {
return request.query().toOptional().map(String::isEmpty).orElse(false) && request.limit() == 0;
}
}
6 changes: 3 additions & 3 deletions core/src/test/resources/logback.xml
Expand Up @@ -2,16 +2,16 @@
<configuration scan="true">
<logger name="io.netty" level="INFO"/>
<logger name="io.lettuce" level="INFO"/>
<!-- <logger name="reactor" level="INFO"/>-->
<!-- <logger name="reactor.netty" level="DEBUG"/>-->
<!-- <logger name="reactor" level="INFO"/>-->
<!-- <logger name="reactor.netty" level="DEBUG"/>-->
<logger name="reactor.retry" level="INFO"/>

<logger name="discord4j.core" level="DEBUG"/>
<logger name="discord4j.gateway.protocol.sender" level="TRACE"/>
<logger name="discord4j.gateway.protocol.receiver" level="TRACE"/>
<logger name="discord4j.voice.protocol.sender" level="TRACE"/>
<logger name="discord4j.voice.protocol.receiver" level="TRACE"/>
<!-- <logger name="discord4j.voice.protocol.udp.sender" level="TRACE"/>-->
<!-- <logger name="discord4j.voice.protocol.udp.sender" level="TRACE"/>-->
<logger name="discord4j.voice.protocol.udp.receiver" level="TRACE"/>
<logger name="discord4j.rest.http.client.DiscordWebClient" level="INFO"/>
<logger name="discord4j.rest.request" level="DEBUG"/>
Expand Down

0 comments on commit b36af22

Please sign in to comment.