Skip to content

Commit

Permalink
merge: #9360
Browse files Browse the repository at this point in the history
9360: Return empty topology response when not known r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
The topology is a way to view the cluster from the gateway's point of view. Returning UNAVAILABLE hides this and makes it hard to distinguish between a call where the gateway is unavailable and where the topology is simply not known. By returning an empty topology we can differentiate between an unavailable gateway and a not known topology.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9096 



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed May 11, 2022
2 parents 35713bd + 56d59cd commit db90b94
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 36 deletions.
41 changes: 17 additions & 24 deletions gateway/src/main/java/io/camunda/zeebe/gateway/EndpointManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse;
import io.camunda.zeebe.util.VersionUtil;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -276,35 +274,30 @@ public void topology(final ServerStreamObserver<TopologyResponse> responseObserv
final TopologyResponse.Builder topologyResponseBuilder = TopologyResponse.newBuilder();
final BrokerClusterState topology = topologyManager.getTopology();

if (topology == null) {
final StatusRuntimeException error =
Status.UNAVAILABLE.augmentDescription("No brokers available").asRuntimeException();
responseObserver.onError(error);
return;
}

topologyResponseBuilder
.setClusterSize(topology.getClusterSize())
.setPartitionsCount(topology.getPartitionsCount())
.setReplicationFactor(topology.getReplicationFactor());

final String gatewayVersion = VersionUtil.getVersion();
if (gatewayVersion != null && !gatewayVersion.isBlank()) {
topologyResponseBuilder.setGatewayVersion(gatewayVersion);
}

final ArrayList<BrokerInfo> brokers = new ArrayList<>();

topology
.getBrokers()
.forEach(
brokerId -> {
final Builder brokerInfo = BrokerInfo.newBuilder();
addBrokerInfo(brokerInfo, brokerId, topology);
addPartitionInfoToBrokerInfo(brokerInfo, brokerId, topology);

brokers.add(brokerInfo.build());
});
if (topology != null) {
topologyResponseBuilder
.setClusterSize(topology.getClusterSize())
.setPartitionsCount(topology.getPartitionsCount())
.setReplicationFactor(topology.getReplicationFactor());

topology
.getBrokers()
.forEach(
brokerId -> {
final Builder brokerInfo = BrokerInfo.newBuilder();
addBrokerInfo(brokerInfo, brokerId, topology);
addPartitionInfoToBrokerInfo(brokerInfo, brokerId, topology);

brokers.add(brokerInfo.build());
});
}

topologyResponseBuilder.addAllBrokers(brokers);
final TopologyResponse response = topologyResponseBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.command.ClientStatusException;
import io.camunda.zeebe.gateway.StandaloneGateway;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.grpc.Status.Code;
import io.restassured.builder.RequestSpecBuilder;
import io.restassured.filter.log.RequestLoggingFilter;
import io.restassured.filter.log.ResponseLoggingFilter;
Expand Down Expand Up @@ -70,16 +68,8 @@ void smokeTest() {
.then()
.statusCode(200);

// depending on how fast the gateway is, we may get an unavailable error or an empty topology
try {
final var result = topology.join(5L, TimeUnit.SECONDS);
assertThat(result.getBrokers()).as("there are no known brokers").isEmpty();
} catch (final ClientStatusException e) {
assertThat(e).hasRootCauseMessage("UNAVAILABLE: No brokers available");
assertThat(e.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
} catch (final Exception e) {
throw e;
}
final var result = topology.join(5L, TimeUnit.SECONDS);
assertThat(result.getBrokers()).as("there are no known brokers").isEmpty();
}
}

Expand Down

0 comments on commit db90b94

Please sign in to comment.