-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[redis] allow the client to reconnect on redis exceptions #1306
base: main
Are you sure you want to change the base?
Changes from 4 commits
7cf2087
a90df00
93c44ca
4786d11
e33903c
f14fc93
be77c5b
fe8113a
b8d2ae5
64442e9
0d9bfa8
ebce4bb
d0b8f79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,20 +16,37 @@ | |
|
||
import io.grpc.Status; | ||
import io.grpc.Status.Code; | ||
import io.prometheus.client.Counter; | ||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.net.ConnectException; | ||
import java.net.SocketException; | ||
import java.net.SocketTimeoutException; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Consumer; | ||
import java.util.function.Supplier; | ||
import redis.clients.jedis.JedisCluster; | ||
import redis.clients.jedis.exceptions.JedisConnectionException; | ||
import redis.clients.jedis.exceptions.JedisDataException; | ||
import redis.clients.jedis.exceptions.JedisException; | ||
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException; | ||
|
||
/** | ||
* @class RedisClient | ||
* @brief Responsible for making calls to redis. | ||
*/ | ||
public class RedisClient implements Closeable { | ||
// Metrics to detect any kind of redis failures. | ||
// Often due to network issues are the redis cluster going down. | ||
private static final Counter redisErrorCounter = | ||
Counter.build().name("redis_client_error").help("Count of redis client failures").register(); | ||
private static final Counter redisClientRebuildErrorCounter = | ||
Counter.build() | ||
.name("redis_client_rebuild_error") | ||
.help("Count of failures rebuilding redis client") | ||
.register(); | ||
private boolean restablishClientOnFailures = false; | ||
|
||
private static final String MISCONF_RESPONSE = "MISCONF"; | ||
|
||
@FunctionalInterface | ||
|
@@ -56,14 +73,27 @@ public JedisMisconfigurationException(final String message, final Throwable caus | |
} | ||
} | ||
|
||
private final JedisCluster jedis; | ||
// We store the factory in case we want to re-create the jedis client. | ||
private Supplier<JedisCluster> jedisClusterFactory; | ||
|
||
// The jedis client. | ||
private JedisCluster jedis; | ||
|
||
private boolean closed = false; | ||
|
||
public RedisClient(JedisCluster jedis) { | ||
this.jedis = jedis; | ||
} | ||
|
||
public RedisClient( | ||
JedisCluster jedis, | ||
Supplier<JedisCluster> jedisClusterFactory, | ||
boolean restablishClientOnFailures) { | ||
this.jedis = jedis; | ||
this.jedisClusterFactory = jedisClusterFactory; | ||
this.restablishClientOnFailures = restablishClientOnFailures; | ||
} | ||
|
||
@Override | ||
public synchronized void close() { | ||
closed = true; | ||
|
@@ -81,7 +111,7 @@ private synchronized void throwIfClosed() throws IOException { | |
} | ||
|
||
public void run(Consumer<JedisCluster> withJedis) throws IOException { | ||
call( | ||
callImpl( | ||
(JedisContext<Void>) | ||
jedis -> { | ||
withJedis.accept(jedis); | ||
|
@@ -91,9 +121,14 @@ public void run(Consumer<JedisCluster> withJedis) throws IOException { | |
|
||
public <T> T blockingCall(JedisInterruptibleContext<T> withJedis) | ||
throws IOException, InterruptedException { | ||
return defaultBlockingCall(withJedis); | ||
} | ||
|
||
private <T> T defaultBlockingCall(JedisInterruptibleContext<T> withJedis) | ||
throws IOException, InterruptedException { | ||
AtomicReference<InterruptedException> interruption = new AtomicReference<>(null); | ||
T result = | ||
call( | ||
callImpl( | ||
jedis -> { | ||
try { | ||
return withJedis.run(jedis); | ||
|
@@ -109,8 +144,47 @@ public <T> T blockingCall(JedisInterruptibleContext<T> withJedis) | |
return result; | ||
} | ||
|
||
@SuppressWarnings("ConstantConditions") | ||
public <T> T call(JedisContext<T> withJedis) throws IOException { | ||
return callImpl(withJedis); | ||
} | ||
|
||
private <T> T callImpl(JedisContext<T> withJedis) throws IOException { | ||
// Typical configuration that does not handle exceptions | ||
// or try to restablish the jedis client on failures. | ||
if (!restablishClientOnFailures) { | ||
return defaultCall(withJedis); | ||
} | ||
|
||
// Alternatively, | ||
// Capture all redis problems at the client level. | ||
// Try to re-establish the client and log all issues. | ||
// This will block the overall thread until redis can be connected to. | ||
// It may be a useful strategy for gaining stability on a poorly performing network, | ||
// or a redis cluster that goes down. | ||
while (true) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer configuration to be a number of a reconnects. If it's 0 or null then we don't retry, otherwise we retry up to that number of times. Bonus if there is some backoff here where we don't spam retries continuously. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. Switched to retry amount + duration between retries. We have a |
||
try { | ||
return defaultCall(withJedis); | ||
} catch (Exception e) { | ||
redisErrorCounter.inc(); | ||
System.out.println("Failure in RedisClient::call"); | ||
System.out.println(e.toString()); | ||
rebuildJedisCluser(); | ||
} | ||
} | ||
} | ||
|
||
private void rebuildJedisCluser() { | ||
try { | ||
System.out.println("Rebuilding redis client"); | ||
jedis = jedisClusterFactory.get(); | ||
} catch (Exception e) { | ||
redisClientRebuildErrorCounter.inc(); | ||
System.out.println("Failed to rebuild redis client"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this plumb in log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
System.out.println(e.toString()); | ||
} | ||
} | ||
|
||
private <T> T defaultCall(JedisContext<T> withJedis) throws IOException { | ||
throwIfClosed(); | ||
try { | ||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: do we need to make a separate method for this still?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. callImpl folded into call