Skip to content

Commit

Permalink
redis: async error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mcmonkey4eva committed Aug 14, 2022
1 parent 11cf4e1 commit 77e372a
Showing 1 changed file with 26 additions and 6 deletions.
Expand Up @@ -23,6 +23,7 @@
import redis.clients.jedis.util.SafeEncoder;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class RedisCommand extends AbstractCommand implements Holdable {

Expand All @@ -33,6 +34,7 @@ public RedisCommand() {
isProcedural = false;
setPrefixesHandled("auth", "port", "id", "message", "args");
setBooleansHandled("ssl");
isEnabled.set(true);
}

// <--[command]
Expand Down Expand Up @@ -141,8 +143,11 @@ public RedisCommand() {
public static Map<String, Jedis> connections = new HashMap<>();
public static Map<String, JedisPubSub> subscriptions = new HashMap<>();

public static AtomicBoolean isEnabled = new AtomicBoolean(true);

@Override
public void onDisable() {
isEnabled.set(false);
for (Map.Entry<String, JedisPubSub> entry : subscriptions.entrySet()) {
try {
entry.getValue().punsubscribe();
Expand Down Expand Up @@ -226,6 +231,22 @@ else if (response instanceof Long) {
}
}

public static void runChecked(Runnable r, ScriptEntry scriptEntry) {
DenizenCore.schedule(new AsyncSchedulable(new OneTimeSchedulable(() -> {
try {
r.run();
}
catch (Throwable ex) {
if (isEnabled.get()) { // Ignore errors when server is shutting down
DenizenCore.schedule(new OneTimeSchedulable(() -> {
Debug.echoError(ex);
scriptEntry.setFinished(true);
}, 0));
}
}
}, 0)));
}

@Override
public void execute(final ScriptEntry scriptEntry) {
if (!CoreConfiguration.allowRedis) {
Expand Down Expand Up @@ -266,7 +287,7 @@ public void execute(final ScriptEntry scriptEntry) {
scriptEntry.setFinished(true);
return;
}
DenizenCore.schedule(new AsyncSchedulable(new OneTimeSchedulable(() -> {
runChecked(() -> {
Jedis con = null;
if (CoreConfiguration.debugVerbose) {
Debug.echoDebug(scriptEntry, "Connecting to " + host + " on port " + port);
Expand Down Expand Up @@ -309,7 +330,7 @@ public void execute(final ScriptEntry scriptEntry) {
}
}, 0));
}
}, 0)));
}, scriptEntry);
}
else if (action.asString().equalsIgnoreCase("disconnect")) {
scriptEntry.setFinished(true);
Expand Down Expand Up @@ -354,8 +375,7 @@ else if (action.asString().equalsIgnoreCase("subscribe")) {
for (int i = 0; i < channels.size(); i++) {
channelArr[i] = CoreUtilities.toLowerCase(channels.get(i));
}
Thread thr = new Thread(() -> { con.psubscribe(jedisPubSub, channelArr); scriptEntry.setFinished(true); });
thr.start();
runChecked(() -> { con.psubscribe(jedisPubSub, channelArr); scriptEntry.setFinished(true); }, scriptEntry);
}
else if (action.asString().equalsIgnoreCase("unsubscribe")) {
scriptEntry.setFinished(true);
Expand Down Expand Up @@ -410,7 +430,7 @@ else if (action.asString().equalsIgnoreCase("publish")) {
}
};
if (scriptEntry.shouldWaitFor()) {
DenizenCore.schedule(new AsyncSchedulable(new OneTimeSchedulable(doQuery, 0)));
runChecked(doQuery, scriptEntry);
}
else {
doQuery.run();
Expand Down Expand Up @@ -462,7 +482,7 @@ else if (action.asString().equalsIgnoreCase("command")) {
}
};
if (scriptEntry.shouldWaitFor()) {
DenizenCore.schedule(new AsyncSchedulable(new OneTimeSchedulable(doQuery, 0)));
runChecked(doQuery, scriptEntry);
}
else {
doQuery.run();
Expand Down

0 comments on commit 77e372a

Please sign in to comment.