Skip to content

Commit

Permalink
merge: #14903
Browse files Browse the repository at this point in the history
14903: [Backport stable/8.3] Replace incremental requestId in AtomixServerTransport with unique SnowflakeId r=npepinpe a=backport-action

# Description
Backport of #14857 to `stable/8.3`.

relates to #5624
original author: `@rodrigo-lourenco-lopes`

Co-authored-by: rodrigolourencolopes <rodrigo.lopes@camunda.com>
  • Loading branch information
2 parents 0d46073 + 942847f commit 7489c86
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ private void startServerTransport(
final var schedulingService = brokerStartupContext.getActorSchedulingService();
final var messagingService = brokerStartupContext.getApiMessagingService();

final var atomixServerTransport = new AtomixServerTransport(messagingService);
final var atomixServerTransport =
new AtomixServerTransport(messagingService, brokerInfo.getNodeId());

concurrencyControl.runOnCompletion(
schedulingService.submitActor(atomixServerTransport),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected void before() {
.build();
cluster.start().join();
final var transportFactory = new TransportFactory(scheduler);
serverTransport = transportFactory.createServerTransport(0, cluster.getMessagingService());
serverTransport = transportFactory.createServerTransport(nodeId, cluster.getMessagingService());

channelHandler = new StubRequestHandler(msgPackHelper);
serverTransport.subscribe(1, RequestType.COMMAND, channelHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public TransportFactory(final ActorSchedulingService actorSchedulingService) {

public ServerTransport createServerTransport(
final int nodeId, final MessagingService messagingService) {
final var atomixServerTransport = new AtomixServerTransport(messagingService);
final var atomixServerTransport = new AtomixServerTransport(messagingService, nodeId);
actorSchedulingService.submitActor(atomixServerTransport);
return atomixServerTransport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,39 @@
import io.camunda.zeebe.transport.ServerTransport;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.IdGenerator;
import org.agrona.concurrent.SnowflakeIdGenerator;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

public class AtomixServerTransport extends Actor implements ServerTransport {

private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
private static final String API_TOPIC_FORMAT = "%s-api-%d";
// Unix epoch time for January 1, 2023 1:00:00 AM GMT+01:00
private static final long TIMESTAMP_OFFSET_2023 = 1672531200000L;
private static final String ERROR_MSG_MISSING_PARTITON_MAP =
"Node already unsubscribed from partition %d, this can only happen when atomix does not cleanly remove its handlers.";

private final Int2ObjectHashMap<Long2ObjectHashMap<CompletableFuture<byte[]>>>
partitionsRequestMap;
private final AtomicLong requestCount;
private final MessagingService messagingService;

public AtomixServerTransport(final MessagingService messagingService) {
private final IdGenerator idGenerator;

public AtomixServerTransport(final MessagingService messagingService, final int nodeId) {
this.messagingService = messagingService;
partitionsRequestMap = new Int2ObjectHashMap<>();
requestCount = new AtomicLong(0);
this.idGenerator =
new SnowflakeIdGenerator(
SnowflakeIdGenerator.NODE_ID_BITS_DEFAULT,
SnowflakeIdGenerator.SEQUENCE_BITS_DEFAULT,
nodeId,
TIMESTAMP_OFFSET_2023,
SystemEpochClock.INSTANCE);
}

@Override
Expand Down Expand Up @@ -89,7 +100,7 @@ private void removePartition(final int partitionId) {
}
}

private void removeRequestHandlers(final int partitionId, RequestType requestType) {
private void removeRequestHandlers(final int partitionId, final RequestType requestType) {
final var topicName = topicName(partitionId, requestType);
LOG.trace("Unsubscribe from topic {}", topicName);
messagingService.unregisterHandler(topicName);
Expand All @@ -103,7 +114,7 @@ private CompletableFuture<byte[]> handleAtomixRequest(
final var completableFuture = new CompletableFuture<byte[]>();
actor.call(
() -> {
final var requestId = requestCount.getAndIncrement();
final long requestId = idGenerator.nextId();
final var requestMap = partitionsRequestMap.get(partitionId);
if (requestMap == null) {
final var errorMsg = String.format(ERROR_MSG_MISSING_PARTITON_MAP, partitionId);
Expand Down Expand Up @@ -147,7 +158,7 @@ public void sendResponse(final ServerResponse response) {
final var length = response.getLength();
final var bytes = new byte[length];

// here we can't reuse an buffer, because sendResponse can be called concurrently
// here we can't reuse a buffer, because sendResponse can be called concurrently
final var unsafeBuffer = new UnsafeBuffer(bytes);
response.write(unsafeBuffer, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,29 @@ public void shouldOnlyHandleRequestsOfSubscribedTypes() {
assertThat(response.byteArray()).isEqualTo("messageABC".getBytes());
}

@Test
public void shouldCreateUniqueRequestsIds() {
final DirectlyResponder directlyResponder = new DirectlyResponder();

serverTransport.subscribe(0, RequestType.COMMAND, directlyResponder).join();

// when
final var requestFuture1 =
clientTransport.sendRequestWithRetry(
nodeAddressSupplier, new Request("messageABC"), REQUEST_TIMEOUT);
requestFuture1.join();
final long requestId1 = directlyResponder.serverResponse.getRequestId();

final var requestFuture2 =
clientTransport.sendRequestWithRetry(
nodeAddressSupplier, new Request("messageABC"), REQUEST_TIMEOUT);
requestFuture2.join();
final long requestId2 = directlyResponder.serverResponse.getRequestId();

// then
assertThat(requestId1).isNotEqualByComparingTo(requestId2);
}

private static final class Request implements ClientRequest {

private final String msg;
Expand Down Expand Up @@ -400,9 +423,10 @@ public void write(final MutableDirectBuffer buffer, final int offset) {
private static class DirectlyResponder implements RequestHandler {

private final Consumer<byte[]> requestConsumer;
private ServerResponseImpl serverResponse;

DirectlyResponder() {
this(bytes -> {});
this.requestConsumer = (bytes -> {});
}

DirectlyResponder(final Consumer<byte[]> requestConsumer) {
Expand All @@ -417,7 +441,7 @@ public void onRequest(
final DirectBuffer buffer,
final int offset,
final int length) {
final var serverResponse =
serverResponse =
new ServerResponseImpl()
.buffer(buffer, 0, length)
.setRequestId(requestId)
Expand Down

0 comments on commit 7489c86

Please sign in to comment.