Skip to content

Commit

Permalink
Make connection pooling explicit in the code
Browse files Browse the repository at this point in the history
This commit makes it obvious in the code that all Redis clients
in fact use pooled connections. The pooled connection then provides
access to the actual ("internal") connection.

That also revealed an invalid class cast in `RedisSentinelClient`,
which is also fixed in this commit.
  • Loading branch information
Ladicek committed Jun 22, 2023
1 parent 50c3b1b commit 0dffca4
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public PooledRedisConnection(Lease<RedisConnectionInternal> lease, PoolMetrics<?
this.metric = metric;
}

public RedisConnection actual() {
public RedisConnectionInternal actual() {
return connection;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/redis/client/impl/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public RedisClient(Vertx vertx, RedisOptions options) {
public Future<RedisConnection> connect() {
// so that the caller is called back on its original context
Promise<RedisConnection> promise = vertx.promise();
connectionManager.getConnection(defaultAddress, null).onComplete(promise);
connectionManager.getConnection(defaultAddress, null).onComplete((Promise) promise);
return promise.future();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private void connect(List<String> endpoints, int index, Handler<AsyncResult<Redi
// create a cluster connection
final AtomicBoolean failed = new AtomicBoolean(false);
final AtomicInteger counter = new AtomicInteger();
final Map<String, RedisConnection> connections = new HashMap<>();
final Map<String, PooledRedisConnection> connections = new HashMap<>();

// validate if the pool config is valid
final int totalUniqueEndpoints = slots.endpoints().length;
Expand Down Expand Up @@ -198,7 +198,7 @@ private void connect(List<String> endpoints, int index, Handler<AsyncResult<Redi
});
}

private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, RedisConnection> connections, AtomicBoolean failed, Handler<AsyncResult<RedisConnection>> onConnect) {
private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections, AtomicBoolean failed, Handler<AsyncResult<RedisConnection>> onConnect) {
if (counter.incrementAndGet() == slots.endpoints().length) {
// end condition
if (failed.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public static void addMasterOnlyCommand(Command command) {
private final VertxInternal vertx;
private final RedisOptions options;
private final Slots slots;
private final Map<String, RedisConnection> connections;
private final Map<String, PooledRedisConnection> connections;

RedisClusterConnection(Vertx vertx, RedisOptions options, Slots slots, Map<String, RedisConnection> connections) {
RedisClusterConnection(Vertx vertx, RedisOptions options, Slots slots, Map<String, PooledRedisConnection> connections) {
this.vertx = (VertxInternal) vertx;
this.options = options;
this.slots = slots;
Expand Down Expand Up @@ -249,7 +249,7 @@ private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {

private void send(String endpoint, int retries, Request command, Handler<AsyncResult<Response>> handler) {

final RedisConnection connection = connections.get(endpoint);
final PooledRedisConnection connection = connections.get(endpoint);

if (connection == null) {
handler.handle(Future.failedFuture("Missing connection to: " + endpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private Future<Void> setup(ContextInternal ctx, RedisConnection connection, Requ
}
}

public Future<RedisConnection> getConnection(String connectionString, Request setup) {
public Future<PooledRedisConnection> getConnection(String connectionString, Request setup) {
final ContextInternal context = vertx.getOrCreateContext();
final EventLoopContext eventLoopContext;
if (context instanceof EventLoopContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private void connect(List<String> endpoints, int index, Handler<AsyncResult<Redi
// create a cluster connection
final List<Node> nodes = getNodes.result();
final AtomicInteger counter = new AtomicInteger();
final List<RedisConnection> connections = new ArrayList<>();
final List<PooledRedisConnection> connections = new ArrayList<>();

// validate if the pool config is valid
final int totalUniqueEndpoints = nodes.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public static void addMasterOnlyCommand(Command command) {
}

private final RedisOptions options;
private final RedisConnection master;
private final List<RedisConnection> replicas;
private final PooledRedisConnection master;
private final List<PooledRedisConnection> replicas;

RedisReplicationConnection(Vertx vertx, RedisOptions options, RedisConnection master, List<RedisConnection> replicas) {
RedisReplicationConnection(Vertx vertx, RedisOptions options, PooledRedisConnection master, List<PooledRedisConnection> replicas) {
this.options = options;
this.master = master;
this.replicas = replicas;
Expand Down Expand Up @@ -175,7 +175,7 @@ public boolean pendingQueueFull() {
return result;
}

private RedisConnection selectMasterOrReplicaEndpoint(boolean read, boolean forceMasterEndpoint) {
private PooledRedisConnection selectMasterOrReplicaEndpoint(boolean read, boolean forceMasterEndpoint) {
if (forceMasterEndpoint) {
return master;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ private static class Pair<L, R> {
private static final Logger LOG = LoggerFactory.getLogger(RedisSentinelClient.class);

private final RedisOptions options;
private RedisConnection sentinel;

public RedisSentinelClient(Vertx vertx, RedisOptions options) {
super(vertx, options);
Expand All @@ -71,7 +70,7 @@ public Future<RedisConnection> connect() {
return;
}

final RedisConnection conn = createConnection.result();
final PooledRedisConnection conn = createConnection.result();

createConnectionInternal(options, RedisRole.SENTINEL, create -> {
if (create.failed()) {
Expand All @@ -80,15 +79,15 @@ public Future<RedisConnection> connect() {
return;
}

sentinel = create.result();
PooledRedisConnection sentinel = create.result();

sentinel
.handler(msg -> {
if (msg.type() == ResponseType.MULTI) {
if ("MESSAGE".equalsIgnoreCase(msg.get(0).toString())) {
// we don't care about the payload
if (conn != null) {
((RedisStandaloneConnection) conn).fail(ErrorType.create("SWITCH-MASTER Received +switch-master message from Redis Sentinel."));
((RedisStandaloneConnection) conn.actual()).fail(ErrorType.create("SWITCH-MASTER Received +switch-master message from Redis Sentinel."));
} else {
LOG.warn("Received +switch-master message from Redis Sentinel.");
}
Expand All @@ -103,7 +102,7 @@ public Future<RedisConnection> connect() {

sentinel.exceptionHandler(t -> {
if (conn != null) {
((RedisStandaloneConnection) conn).fail(t);
((RedisStandaloneConnection) conn.actual()).fail(t);
} else {
LOG.error("Unhandled exception in Sentinel PUBSUB", t);
}
Expand All @@ -114,7 +113,7 @@ public Future<RedisConnection> connect() {
return promise.future();
}

private void createConnectionInternal(RedisOptions options, RedisRole role, Handler<AsyncResult<RedisConnection>> onCreate) {
private void createConnectionInternal(RedisOptions options, RedisRole role, Handler<AsyncResult<PooledRedisConnection>> onCreate) {

final Handler<AsyncResult<RedisURI>> createAndConnect = resolve -> {
if (resolve.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

public class RedisSentinelConnection implements RedisConnection {

private final RedisConnection connection;
private final RedisConnection sentinel;
private final PooledRedisConnection connection;
private final PooledRedisConnection sentinel;

public RedisSentinelConnection(RedisConnection connection, RedisConnection sentinel) {
public RedisSentinelConnection(PooledRedisConnection connection, PooledRedisConnection sentinel) {
this.connection = connection;
this.sentinel = sentinel;
}
Expand Down

0 comments on commit 0dffca4

Please sign in to comment.