Skip to content

Commit

Permalink
JVMCBC-1512 SDK fails to adapt to new cluster topology if only a port…
Browse files Browse the repository at this point in the history
… changed

Motivation
----------
The SDK wasn't noticing when a service moved
from one port to another on the same node.

Modifications
-------------
When adding a service, check for an existing
service on a different port. If one is found,
remove it before adding the new service.

Incidentally:

* Removed `synchronized` keyword from `addService()`
  and `removeService()`, since it was protecting
  only Mono creation, not execution. Config updates
  are serialized anyway; no synchronization needed.

* De-Mono-ify one of the `removeService` methods,
  so `addService` can call it easily.
  (Not a big change, since removeService was basically
  a blocking method wrapped in a Mono even before this.)

* Lift the service host:port calculation from
  `createService` up into into `addService`,
  so `addService` can compare against an existing
  service address.

* Add `Service.address()`, so `addService` can easily
  see the address of an existing service. This
  convenience method protects callers from having to
  dig around in the service context.

Change-Id: I93aa63ca5d313401a96d7df5ba369c10454fbf7e
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/208846
Reviewed-by: Michael Reiche <michael.reiche@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
dnault committed Apr 19, 2024
1 parent 441adc1 commit 0b4d16c
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 54 deletions.
64 changes: 40 additions & 24 deletions core-io/src/main/java/com/couchbase/client/core/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.couchbase.client.core.service.ViewService;
import com.couchbase.client.core.service.ViewServiceConfig;
import com.couchbase.client.core.util.CompositeStateful;
import com.couchbase.client.core.util.HostAndPort;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.core.util.Stateful;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -192,7 +193,7 @@ public synchronized Mono<Void> disconnect() {
return Flux
.fromIterable(entry.getValue().keySet())
.flatMap(serviceType ->
removeService(serviceType, Optional.of(entry.getKey()), true)
Mono.fromRunnable(() -> removeService(serviceType, Optional.of(entry.getKey()), true))
);
})
.then()
Expand Down Expand Up @@ -221,8 +222,11 @@ public synchronized Mono<Void> disconnect() {
* @param bucket the bucket name (if present).
* @return a {@link Mono} that completes once the service is added.
*/
public synchronized Mono<Void> addService(final ServiceType type, final int port,
final Optional<String> bucket) {
public Mono<Void> addService(
final ServiceType type,
final int port,
final Optional<String> bucket
) {
return Mono.fromRunnable(() -> {
if (disconnect.get()) {
ctx.environment().eventBus().publish(new ServiceAddIgnoredEvent(
Expand All @@ -233,9 +237,20 @@ public synchronized Mono<Void> addService(final ServiceType type, final int port
return;
}

HostAndPort newServiceAddress = new HostAndPort(
alternateAddress.orElseGet(identifier::address),
port
);

String name = type.scope() == ServiceScope.CLUSTER ? GLOBAL_SCOPE : bucket.orElse(BUCKET_GLOBAL_SCOPE);
Map<ServiceType, Service> localMap = services.computeIfAbsent(name, key -> new ConcurrentHashMap<>());

Service existingService = localMap.get(type);
if (existingService != null && !existingService.address().equals(newServiceAddress)) {
// Service moved to a different port. Remove the old one!
removeService(type, bucket, true);
}

if (localMap.containsKey(type)) {
ctx.environment().eventBus().publish(new ServiceAddIgnoredEvent(
Event.Severity.VERBOSE,
Expand All @@ -246,7 +261,7 @@ public synchronized Mono<Void> addService(final ServiceType type, final int port
}

NanoTimestamp start = NanoTimestamp.now();
Service service = createService(type, port, bucket);
Service service = createService(type, newServiceAddress, bucket);
serviceStates.register(service, service);
localMap.put(type, service);
enabledServices.set(enabledServices.get() | 1 << type.ordinal());
Expand All @@ -266,20 +281,21 @@ public synchronized Mono<Void> addService(final ServiceType type, final int port
* @return a mono once completed.
*/
public Mono<Void> removeService(final ServiceType type, final Optional<String> bucket) {
return removeService(type, bucket, false);
return Mono.fromRunnable(() -> removeService(type, bucket, false));
}

private synchronized Mono<Void> removeService(final ServiceType type,
final Optional<String> bucket,
boolean ignoreDisconnect) {
return Mono.defer(() -> {
private void removeService(
final ServiceType type,
final Optional<String> bucket,
final boolean ignoreDisconnect
) {
if (disconnect.get() && !ignoreDisconnect) {
ctx.environment().eventBus().publish(new ServiceRemoveIgnoredEvent(
Event.Severity.DEBUG,
ServiceRemoveIgnoredEvent.Reason.DISCONNECTED,
ctx
));
return Mono.empty();
return;
}

String name = type.scope() == ServiceScope.CLUSTER ? GLOBAL_SCOPE : bucket.orElse(BUCKET_GLOBAL_SCOPE);
Expand All @@ -290,7 +306,7 @@ private synchronized Mono<Void> removeService(final ServiceType type,
ServiceRemoveIgnoredEvent.Reason.NOT_PRESENT,
ctx
));
return Mono.empty();
return;
}

Service service = localMap.remove(type);
Expand All @@ -305,8 +321,6 @@ private synchronized Mono<Void> removeService(final ServiceType type,
ctx.environment().eventBus().publish(
new ServiceRemovedEvent(Duration.ofNanos(end - start), service.context())
);
return Mono.empty();
});
}

/**
Expand Down Expand Up @@ -422,50 +436,52 @@ public boolean hasServicesEnabled() {
* Helper method to create the {@link Service} based on the service type provided.
*
* @param serviceType the type of service to create.
* @param port the port for that service.
* @param address the host and port for that service.
* @param bucket optionally the bucket name.
* @return a created service, but not yet connected or anything.
*/
protected Service createService(final ServiceType serviceType, final int port,
protected Service createService(final ServiceType serviceType, final HostAndPort address,
final Optional<String> bucket) {
CoreEnvironment env = ctx.environment();
String address = alternateAddress.orElseGet(identifier::address);

String host = address.host();
int port = address.port();

switch (serviceType) {
case KV:
return new KeyValueService(
KeyValueServiceConfig.endpoints(env.ioConfig().numKvConnections()).build(), ctx, address, port, bucket, authenticator);
KeyValueServiceConfig.endpoints(env.ioConfig().numKvConnections()).build(), ctx, host, port, bucket, authenticator);
case MANAGER:
return new ManagerService(ctx, address, port);
return new ManagerService(ctx, host, port);
case QUERY:
return new QueryService(QueryServiceConfig
.maxEndpoints(env.ioConfig().maxHttpConnections())
.idleTime(env.ioConfig().idleHttpConnectionTimeout())
.build(),
ctx, address, port
ctx, host, port
);
case VIEWS:
return new ViewService(ViewServiceConfig
.maxEndpoints(env.ioConfig().maxHttpConnections())
.idleTime(env.ioConfig().idleHttpConnectionTimeout())
.build(),
ctx, address, port);
ctx, host, port);
case SEARCH:
return new SearchService(SearchServiceConfig
.maxEndpoints(env.ioConfig().maxHttpConnections())
.idleTime(env.ioConfig().idleHttpConnectionTimeout())
.build(),
ctx, address, port);
ctx, host, port);
case ANALYTICS:
return new AnalyticsService(AnalyticsServiceConfig
.maxEndpoints(env.ioConfig().maxHttpConnections())
.idleTime(env.ioConfig().idleHttpConnectionTimeout())
.build(),
ctx, address, port);
ctx, host, port);
case EVENTING:
return new EventingService(ctx, address, port);
return new EventingService(ctx, host, port);
case BACKUP:
return new BackupService(ctx, address, port);
return new BackupService(ctx, host, port);
default:
throw InvalidArgumentException.fromMessage("Unsupported ServiceType: " + serviceType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.couchbase.client.core.diagnostics.InternalEndpointDiagnostics;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.util.HostAndPort;
import com.couchbase.client.core.util.Stateful;

import java.util.stream.Stream;
Expand Down Expand Up @@ -79,6 +80,16 @@ public interface Service extends Stateful<ServiceState> {
*/
Stream<EndpointDiagnostics> diagnostics();

/**
* Returns the remote address for this service.
*/
default HostAndPort address() {
return new HostAndPort(
context().remoteHostname(),
context().remotePort()
);
}

@Stability.Internal
Stream<InternalEndpointDiagnostics> internalDiagnostics();
}

0 comments on commit 0b4d16c

Please sign in to comment.