Skip to content

Commit

Permalink
merge: #9435
Browse files Browse the repository at this point in the history
9435: [Backport stable/1.3] Java client accepts topologies with `DEAD` partitions r=saig0 a=backport-action

# Description
Backport of #9410 to `stable/1.3`.

relates to #9387

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed May 24, 2022
2 parents 9b8dcfd + 210dcf1 commit 54d3ada
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@

public enum PartitionBrokerHealth {
HEALTHY,
UNHEALTHY
UNHEALTHY,
DEAD,
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class PartitionInfoImpl implements PartitionInfo {
public PartitionInfoImpl(final GatewayOuterClass.Partition partition) {
partitionId = partition.getPartitionId();

if (partition.getRole() == GatewayOuterClass.Partition.PartitionBrokerRole.LEADER) {
if (partition.getRole() == Partition.PartitionBrokerRole.LEADER) {
role = PartitionBrokerRole.LEADER;
} else if (partition.getRole() == GatewayOuterClass.Partition.PartitionBrokerRole.FOLLOWER) {
} else if (partition.getRole() == Partition.PartitionBrokerRole.FOLLOWER) {
role = PartitionBrokerRole.FOLLOWER;
} else if (partition.getRole() == Partition.PartitionBrokerRole.INACTIVE) {
role = PartitionBrokerRole.INACTIVE;
Expand All @@ -43,11 +43,13 @@ public PartitionInfoImpl(final GatewayOuterClass.Partition partition) {
"Unexpected partition broker role %s, should be one of %s",
partition.getRole(), Arrays.toString(PartitionBrokerRole.values())));
}
if (partition.getHealth() == GatewayOuterClass.Partition.PartitionBrokerHealth.HEALTHY) {
this.partitionBrokerHealth = PartitionBrokerHealth.HEALTHY;
} else if (partition.getHealth()
== GatewayOuterClass.Partition.PartitionBrokerHealth.UNHEALTHY) {
this.partitionBrokerHealth = PartitionBrokerHealth.UNHEALTHY;

if (partition.getHealth() == Partition.PartitionBrokerHealth.HEALTHY) {
partitionBrokerHealth = PartitionBrokerHealth.HEALTHY;
} else if (partition.getHealth() == Partition.PartitionBrokerHealth.UNHEALTHY) {
partitionBrokerHealth = PartitionBrokerHealth.UNHEALTHY;
} else if (partition.getHealth() == Partition.PartitionBrokerHealth.DEAD) {
partitionBrokerHealth = PartitionBrokerHealth.DEAD;
} else {
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static io.camunda.zeebe.client.util.RecordingGatewayService.broker;
import static io.camunda.zeebe.client.util.RecordingGatewayService.partition;
import static io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition.PartitionBrokerHealth.DEAD;
import static io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition.PartitionBrokerHealth.HEALTHY;
import static io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition.PartitionBrokerHealth.UNHEALTHY;
import static io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition.PartitionBrokerRole.FOLLOWER;
Expand Down Expand Up @@ -119,6 +120,25 @@ public void shouldRequestTopology() {
tuple(1, PartitionBrokerRole.INACTIVE, PartitionBrokerHealth.UNHEALTHY));
}

@Test
public void shouldAcceptDeadPartitions() {
// given
gatewayService.onTopologyRequest(
1,
1,
1,
"1.22.3-SNAPSHOT",
broker(0, "host1", 123, "1.22.3-SNAPSHOT", partition(0, LEADER, DEAD)));

// when
final Topology topology = client.newTopologyRequest().send().join();

// then
assertThat(topology.getBrokers().get(0).getPartitions().get(0))
.extracting(PartitionInfo::getHealth)
.isEqualTo(PartitionBrokerHealth.DEAD);
}

@Test
public void shouldRaiseExceptionOnError() {
// given
Expand Down

0 comments on commit 54d3ada

Please sign in to comment.