Skip to content

Commit

Permalink
Add timeout mechanic for chunk requests (#788)
Browse files Browse the repository at this point in the history
Now the controller will attempt to request chunks again if more than 10 seconds have passed since the last request. This should eliminate issues with resumes where its unclear if the requests were actually received.
  • Loading branch information
MinnDevelopment committed Oct 2, 2018
1 parent 03c752b commit caf8aae
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 47 deletions.
2 changes: 2 additions & 0 deletions src/main/java/net/dv8tion/jda/core/entities/impl/JDAImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,8 @@ public synchronized void shutdownInternals()
audioManagers.forEach(AudioManager::closeAudioConnection);
audioManagers.clear();

guildSetupController.close();

if (audioLifeCyclePool != null)
audioLifeCyclePool.shutdownNow();

Expand Down
131 changes: 84 additions & 47 deletions src/main/java/net/dv8tion/jda/core/handle/GuildSetupController.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package net.dv8tion.jda.core.handle;

import gnu.trove.iterator.TLongIterator;
import gnu.trove.iterator.TLongLongIterator;
import gnu.trove.iterator.TLongObjectIterator;
import gnu.trove.map.TLongLongMap;
import gnu.trove.map.TLongObjectMap;
import gnu.trove.map.hash.TLongLongHashMap;
import gnu.trove.map.hash.TLongObjectHashMap;
import gnu.trove.set.TLongSet;
import gnu.trove.set.hash.TLongHashSet;
Expand All @@ -34,23 +37,29 @@
import org.slf4j.Logger;

import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@SuppressWarnings("WeakerAccess")
public class GuildSetupController
{
protected static final int CHUNK_TIMEOUT = 10000;
protected static final Logger log = JDALogger.getLog(GuildSetupController.class);

private final UpstreamReference<JDAImpl> api;
private final TLongObjectMap<GuildSetupNode> setupNodes = new TLongObjectHashMap<>();
private final TLongSet chunkingGuilds = new TLongHashSet();
private final TLongLongMap pendingChunks = new TLongLongHashMap();
private final TLongSet syncingGuilds;

private int incompleteCount = 0;
private int syncingCount = 0;

StatusListener listener = (id, oldStatus, newStatus) -> log.trace("[{}] Updated status {}->{}", id, oldStatus, newStatus);
private Future<?> timeoutHandle;

protected StatusListener listener = (id, oldStatus, newStatus) -> log.trace("[{}] Updated status {}->{}", id, oldStatus, newStatus);

public GuildSetupController(JDAImpl api)
{
Expand All @@ -63,10 +72,7 @@ public GuildSetupController(JDAImpl api)

JDAImpl getJDA()
{
JDAImpl tmp = api.get();
if (tmp == null)
throw new IllegalStateException();
return tmp;
return api.get();
}

boolean isClient()
Expand Down Expand Up @@ -115,10 +121,6 @@ void remove(long id)
setupNodes.remove(id);
}

// Called by:

// - ReadyHandler
// - GuildSetupNode
public void ready(long id)
{
setupNodes.remove(id);
Expand All @@ -129,7 +131,6 @@ public void ready(long id)
tryChunking();
}

// - ReadyHandler
public boolean setIncompleteCount(int count)
{
log.debug("Setting incomplete count to {}", count);
Expand All @@ -138,10 +139,11 @@ public boolean setIncompleteCount(int count)
boolean ready = count == 0;
if (ready)
getJDA().getClient().ready();
else
startTimeout();
return !ready;
}

// - ReadyHandler
public void onReady(long id, JSONObject obj)
{
log.trace("Adding id to setup cache {}", id);
Expand All @@ -160,33 +162,6 @@ public void onReady(long id, JSONObject obj)
}
}

// // - WebSocketClient
// public void onResume(boolean isInit)
// {
// if (setupNodes.isEmpty())
// return;
// if (isInit && incompleteCount > 0)
// {
// //Override current chunking and syncing state - we were interrupted
// // this count will be adjusted by addGuildForX(id, join) later, we need to fix the displacement here
// Set<GuildSetupNode> joinedGuilds = setupNodes.valueCollection().stream().filter((node) -> node.join).collect(Collectors.toSet());
// long displacementChunking = joinedGuilds.stream().filter((node) -> node.requestedChunk).count();
// long displacementSyncing = joinedGuilds.stream().filter((node) -> node.sync).count();
// this.incompleteCount -= (int) displacementChunking;
// this.syncingCount -= (int) displacementSyncing;
// }
//
// setupNodes.forEachEntry((id, node) -> {
// if (node.sync)
// addGuildForSyncing(id, node.join);
// if (node.requestedChunk)
// addGuildForChunking(id, node.join);
// return true;
// });
// }

// - ReadyHandler (for client accounts)
// - GuildCreateHandler
public void onCreate(long id, JSONObject obj)
{
boolean available = obj.isNull("unavailable") || !obj.getBoolean("unavailable");
Expand All @@ -210,7 +185,6 @@ else if (node.markedUnavailable && available && incompleteCount > 0)
node.handleCreate(obj);
}

// - GuildDeleteHandler
public boolean onDelete(long id, JSONObject obj)
{
boolean available = obj.isNull("unavailable") || !obj.getBoolean("unavailable");
Expand Down Expand Up @@ -249,16 +223,18 @@ public boolean onDelete(long id, JSONObject obj)
return true;
}

// - GuildMemberChunkHandler
public void onMemberChunk(long id, JSONArray chunk)
{
log.debug("Received member chunk for guild id: {} size: {}", id, chunk.length());
synchronized (pendingChunks)
{
pendingChunks.remove(id);
}
GuildSetupNode node = setupNodes.get(id);
if (node != null)
node.handleMemberChunk(chunk);
}

// - GuildMemberAddHandler
public boolean onAddMember(long id, JSONObject member)
{
GuildSetupNode node = setupNodes.get(id);
Expand All @@ -269,7 +245,6 @@ public boolean onAddMember(long id, JSONObject member)
return true;
}

// - GuildMemberRemoveHandler
public boolean onRemoveMember(long id, JSONObject member)
{
GuildSetupNode node = setupNodes.get(id);
Expand All @@ -287,8 +262,6 @@ public void onSync(long id, JSONObject obj)
node.handleSync(obj);
}

// Anywhere \\

public boolean isLocked(long id)
{
return setupNodes.containsKey(id);
Expand All @@ -308,6 +281,17 @@ public void clearCache()
setupNodes.clear();
chunkingGuilds.clear();
incompleteCount = 0;
close();
synchronized (pendingChunks)
{
pendingChunks.clear();
}
}

public void close()
{
if (timeoutHandle != null)
timeoutHandle.cancel(false);
}

public boolean containsMember(long userId, @Nullable GuildSetupNode excludedNode)
Expand Down Expand Up @@ -353,6 +337,21 @@ private void sendChunkRequest(Object obj)
{
log.debug("Sending chunking requests for {} guilds", obj instanceof JSONArray ? ((JSONArray) obj).length() : 1);

long timeout = System.currentTimeMillis() + CHUNK_TIMEOUT;
synchronized (pendingChunks)
{
if (obj instanceof JSONArray)
{
JSONArray arr = (JSONArray) obj;
for (Object o : arr)
pendingChunks.put((long) o, timeout);
}
else
{
pendingChunks.put((long) obj, timeout);
}
}

getJDA().getClient().chunkOrSyncRequest(
new JSONObject()
.put("op", WebSocketCode.MEMBER_CHUNK_REQUEST)
Expand Down Expand Up @@ -388,6 +387,11 @@ private void tryChunking()
}
}

private void startTimeout()
{
timeoutHandle = getJDA().getGatewayPool().scheduleAtFixedRate(new ChunkTimeout(), CHUNK_TIMEOUT, CHUNK_TIMEOUT, TimeUnit.MILLISECONDS);
}

// Syncing

private void sendSyncRequest(JSONArray arr)
Expand Down Expand Up @@ -443,4 +447,37 @@ public interface StatusListener
{
void onStatusChange(long guildId, Status oldStatus, Status newStatus);
}

private class ChunkTimeout implements Runnable
{
@Override
public void run()
{
if (pendingChunks.isEmpty())
return;
synchronized (pendingChunks)
{
TLongLongIterator it = pendingChunks.iterator();
List<JSONArray> requests = new LinkedList<>();
JSONArray arr = new JSONArray();
while (it.hasNext())
{
// key=guild_id, value=timeout
it.advance();
if (System.currentTimeMillis() <= it.value())
continue;
arr.put(it.key());

if (arr.length() == 50)
{
requests.add(arr);
arr = new JSONArray();
}
}
if (arr.length() > 0)
requests.add(arr);
requests.forEach(GuildSetupController.this::sendChunkRequest);
}
}
}
}

0 comments on commit caf8aae

Please sign in to comment.