Skip to content

Commit

Permalink
merge: #8295
Browse files Browse the repository at this point in the history
8295: Expose Broker API to step down if not leader  r=oleschoenburg a=oleschoenburg

## Description

This introduces a new Admin API for the Broker.  The only supported request so far is `STEP_DOWN_IF_NOT_LEADER`. 
The handler has access to the Raft partitions by getting the partition manager injected after partitions are started. 

The handler is a `PartitionListener` so it subscribes and unsubscribes to messages when it transitions between leader, follower and inactive. This is optional behavior and could be removed as the `stepDownIfNotLeader` is a no-op when the broker is not the leader for this partition. 

## Related issues

closes #8224



Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-cloud[bot] and lenaschoenburg committed Dec 1, 2021
2 parents bc62298 + 7c35186 commit b698000
Show file tree
Hide file tree
Showing 21 changed files with 539 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public interface PartitionGroup extends Configured<PartitionGroupConfig> {
*/
String name();

/**
* Returns a partition by ID. Assumes that the partition ID belongs to this group.
*
* @param partitionId the partition identifier
* @return the partition or {@code null} if no partition with the given identifier exists
*/
Partition getPartition(int partitionId);

/**
* Returns a partition by ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public String name() {
return name;
}

@Override
public RaftPartition getPartition(final int partitionId) {
return getPartition(PartitionId.from(name, partitionId));
}

@Override
public RaftPartition getPartition(final PartitionId partitionId) {
return partitions.get(partitionId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.bootstrap;

import io.camunda.zeebe.broker.transport.adminapi.AdminApiRequestHandler;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;

public class AdminApiServiceStep extends AbstractBrokerStartupStep {

@Override
public String getName() {
return "Admin API";
}

@Override
void startupInternal(
final BrokerStartupContext brokerStartupContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> startupFuture) {
final var schedulingService = brokerStartupContext.getActorSchedulingService();
final var transport = brokerStartupContext.getCommandApiServerTransport();
final var handler = new AdminApiRequestHandler(transport);

concurrencyControl.runOnCompletion(
schedulingService.submitActor(handler),
proceed(
() -> {
if (brokerStartupContext.getAdminApiService() == null) {
brokerStartupContext.setAdminApiService(handler);
}
startupFuture.complete(brokerStartupContext);
},
startupFuture));
}

@Override
void shutdownInternal(
final BrokerStartupContext brokerShutdownContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> shutdownFuture) {
final var service = brokerShutdownContext.getAdminApiService();
if (service == null) {
shutdownFuture.complete(brokerShutdownContext);
return;
}
concurrencyControl.runOnCompletion(
service.closeAsync(),
proceed(
() -> {
brokerShutdownContext.setAdminApiService(null);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.transport.adminapi.AdminApiRequestHandler;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiServiceImpl;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
Expand Down Expand Up @@ -69,6 +70,10 @@ public interface BrokerStartupContext {

void setCommandApiService(CommandApiServiceImpl commandApiService);

AdminApiRequestHandler getAdminApiService();

void setAdminApiService(AdminApiRequestHandler adminApiService);

AtomixServerTransport getCommandApiServerTransport();

void setCommandApiServerTransport(AtomixServerTransport commandApiServerTransport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.transport.adminapi.AdminApiRequestHandler;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiServiceImpl;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
Expand Down Expand Up @@ -51,6 +52,7 @@ public final class BrokerStartupContextImpl implements BrokerStartupContext {
private AtomixServerTransport commandApiServerTransport;
private ManagedMessagingService commandApiMessagingService;
private CommandApiServiceImpl commandApiService;
private AdminApiRequestHandler adminApiService;
private SubscriptionApiCommandMessageHandlerService subscriptionApiService;
private EmbeddedGatewayService embeddedGatewayService;
private LeaderManagementRequestHandler leaderManagementRequestHandler;
Expand Down Expand Up @@ -163,6 +165,16 @@ public void setCommandApiService(final CommandApiServiceImpl commandApiService)
this.commandApiService = commandApiService;
}

@Override
public AdminApiRequestHandler getAdminApiService() {
return adminApiService;
}

@Override
public void setAdminApiService(final AdminApiRequestHandler adminApiService) {
this.adminApiService = adminApiService;
}

@Override
public AtomixServerTransport getCommandApiServerTransport() {
return commandApiServerTransport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private List<StartupStep<BrokerStartupContext>> buildStartupSteps(final BrokerCf

result.add(new ApiMessagingServiceStep());
result.add(new CommandApiServiceStep());
result.add(new AdminApiServiceStep());
result.add(new SubscriptionApiStep());
result.add(new LeaderManagementRequestHandlerStep());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ void startupInternal(
() ->
forwardExceptions(
() -> {
final var adminApi =
brokerStartupContext.getAdminApiService();
adminApi.injectPartitionManager(partitionManager);
final var adminService =
brokerStartupContext.getBrokerAdminService();
adminService.injectAdminAccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public ErrorResponseWriter(final ServerOutput output) {
this.output = output;
}

public <T> ErrorResponseWriter unsupportedMessage(
final String actualType, final T... expectedTypes) {
public <T> ErrorResponseWriter unsupportedMessage(final T actualType, final T... expectedTypes) {
return errorCode(ErrorCode.UNSUPPORTED_MESSAGE)
.errorMessage(
String.format(UNSUPPORTED_MESSAGE_FORMAT, Arrays.toString(expectedTypes), actualType));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.transport.adminapi;

import io.atomix.primitive.partition.PartitionId;
import io.atomix.raft.partition.RaftPartitionGroup;
import io.camunda.zeebe.broker.partitioning.PartitionManagerImpl;
import io.camunda.zeebe.broker.transport.ApiRequestHandler;
import io.camunda.zeebe.broker.transport.ErrorResponseWriter;
import io.camunda.zeebe.protocol.record.AdminRequestType;
import io.camunda.zeebe.transport.RequestType;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
import io.camunda.zeebe.util.Either;

public class AdminApiRequestHandler extends ApiRequestHandler<ApiRequestReader, ApiResponseWriter> {
private RaftPartitionGroup partitionGroup;
private final AtomixServerTransport transport;

public AdminApiRequestHandler(final AtomixServerTransport transport) {
super(new ApiRequestReader(), new ApiResponseWriter());
this.transport = transport;
}

@Override
protected Either<ErrorResponseWriter, ApiResponseWriter> handle(
final int partitionId,
final long requestId,
final ApiRequestReader requestReader,
final ApiResponseWriter responseWriter,
final ErrorResponseWriter errorWriter) {
if (requestReader.getMessageDecoder().type() == AdminRequestType.STEP_DOWN_IF_NOT_PRIMARY) {
return stepDownIfNotPrimary(responseWriter, partitionId, errorWriter);
}
return unknownRequest(errorWriter, requestReader.getMessageDecoder().type());
}

private Either<ErrorResponseWriter, ApiResponseWriter> unknownRequest(
final ErrorResponseWriter errorWriter, final AdminRequestType type) {
errorWriter.unsupportedMessage(type, AdminRequestType.values());
return Either.left(errorWriter);
}

private Either<ErrorResponseWriter, ApiResponseWriter> stepDownIfNotPrimary(
final ApiResponseWriter responseWriter,
final int partitionId,
final ErrorResponseWriter errorWriter) {
if (partitionGroup == null) {
errorWriter.partitionLeaderMismatch(partitionId);
return Either.left(errorWriter);
}
final var partition = partitionGroup.getPartition(partitionId);
partition.stepDownIfNotPrimary();

return Either.right(responseWriter);
}

public void injectPartitionManager(final PartitionManagerImpl partitionManager) {
partitionGroup = (RaftPartitionGroup) partitionManager.getPartitionGroup();
partitionGroup.getPartitionIds().stream()
.map(PartitionId::id)
.forEach(partitionId -> transport.subscribe(partitionId, RequestType.ADMIN, this));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.transport.adminapi;

import io.camunda.zeebe.broker.transport.ApiRequestHandler.RequestReader;
import io.camunda.zeebe.protocol.record.AdminRequestDecoder;
import io.camunda.zeebe.protocol.record.MessageHeaderDecoder;
import org.agrona.DirectBuffer;

public class ApiRequestReader implements RequestReader<AdminRequestDecoder> {
private final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder();
private final AdminRequestDecoder messageDecoder = new AdminRequestDecoder();

@Override
public void reset() {}

@Override
public AdminRequestDecoder getMessageDecoder() {
return messageDecoder;
}

@Override
public void wrap(final DirectBuffer buffer, final int offset, final int length) {
messageDecoder.wrapAndApplyHeader(buffer, offset, headerDecoder);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.transport.adminapi;

import io.camunda.zeebe.broker.transport.ApiRequestHandler.ResponseWriter;
import io.camunda.zeebe.protocol.record.AdminResponseEncoder;
import io.camunda.zeebe.protocol.record.ExecuteQueryResponseEncoder;
import io.camunda.zeebe.protocol.record.MessageHeaderEncoder;
import io.camunda.zeebe.transport.ServerOutput;
import io.camunda.zeebe.transport.impl.ServerResponseImpl;
import org.agrona.MutableDirectBuffer;

public class ApiResponseWriter implements ResponseWriter {
private final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
private final AdminResponseEncoder responseEncoder = new AdminResponseEncoder();
private final ServerResponseImpl response = new ServerResponseImpl();

@Override
public void tryWriteResponse(
final ServerOutput output, final int partitionId, final long requestId) {
try {
response.reset().writer(this).setPartitionId(partitionId).setRequestId(requestId);
output.sendResponse(response);
} finally {
reset();
}
}

@Override
public void reset() {}

@Override
public int getLength() {
return MessageHeaderEncoder.ENCODED_LENGTH + ExecuteQueryResponseEncoder.BLOCK_LENGTH;
}

@Override
public void write(final MutableDirectBuffer buffer, int offset) {
headerEncoder.wrap(buffer, offset);

headerEncoder
.blockLength(responseEncoder.sbeBlockLength())
.templateId(responseEncoder.sbeTemplateId())
.schemaId(responseEncoder.sbeSchemaId())
.version(responseEncoder.sbeSchemaVersion());

offset += headerEncoder.encodedLength();

responseEncoder.wrap(buffer, offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private void cleanLeadingPartition(final int partitionId) {

private void removeForPartitionId(final int partitionId) {
limiter.removePartition(partitionId);
serverTransport.unsubscribe(partitionId);
serverTransport.unsubscribe(partitionId, RequestType.COMMAND);
serverTransport.unsubscribe(partitionId, RequestType.QUERY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.camunda.zeebe.broker.system.management.BrokerAdminServiceImpl;
import io.camunda.zeebe.broker.system.management.LeaderManagementRequestHandler;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.transport.adminapi.AdminApiRequestHandler;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.sched.ActorScheduler;
Expand Down Expand Up @@ -83,6 +84,7 @@ void setUp() {
Collections.emptyList());
testBrokerStartupContext.setLeaderManagementRequestHandler(
mock(LeaderManagementRequestHandler.class, RETURNS_DEEP_STUBS));
testBrokerStartupContext.setAdminApiService(mock(AdminApiRequestHandler.class));
testBrokerStartupContext.setBrokerAdminService(mock(BrokerAdminServiceImpl.class));
testBrokerStartupContext.setClusterServices(
mock(ClusterServicesImpl.class, RETURNS_DEEP_STUBS));
Expand Down
Loading

0 comments on commit b698000

Please sign in to comment.