Skip to content

Commit

Permalink
merge: #9008 #9022
Browse files Browse the repository at this point in the history
9008: [Backport stable/1.3] fix: don't wait for actor being submitted r=deepthidevaki a=oleschoenburg

## Description

Manual backport of #8993 


9022: [Backport stable/1.3] chore(maven): add trailing slashes to new Artifactory URL r=oleschoenburg a=github-actions[bot]

# Description
Backport of #9017 to `stable/1.3`.

relates to 

Co-authored-by: Roman <roman.smirnov@camunda.com>
Co-authored-by: Christian Nicolai <christian.nicolai@camunda.com>
  • Loading branch information
3 people committed Mar 30, 2022
3 parents 5c6db9c + 9075101 + 7b144cc commit d8cc5ea
Show file tree
Hide file tree
Showing 16 changed files with 203 additions and 74 deletions.
4 changes: 2 additions & 2 deletions benchmarks/project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
</snapshots>
<id>zeebe</id>
<name>Zeebe Repository</name>
<url>https://artifacts.camunda.com/artifactory/zeebe-io</url>
<url>https://artifacts.camunda.com/artifactory/zeebe-io/</url>
</repository>

<repository>
Expand All @@ -111,7 +111,7 @@
</snapshots>
<id>zeebe-snapshots</id>
<name>Zeebe Snapshot Repository</name>
<url>https://artifacts.camunda.com/artifactory/zeebe-io-snapshots</url>
<url>https://artifacts.camunda.com/artifactory/zeebe-io-snapshots/</url>
</repository>
</repositories>

Expand Down
8 changes: 4 additions & 4 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

<properties>
<!-- release parent settings -->
<nexus.snapshot.repository>https://artifacts.camunda.com/artifactory/zeebe-io-snapshots</nexus.snapshot.repository>
<nexus.release.repository>https://artifacts.camunda.com/artifactory/zeebe-io</nexus.release.repository>
<nexus.snapshot.repository>https://artifacts.camunda.com/artifactory/zeebe-io-snapshots/</nexus.snapshot.repository>
<nexus.release.repository>https://artifacts.camunda.com/artifactory/zeebe-io/</nexus.release.repository>
<nexus.sonatype.url>https://s01.oss.sonatype.org</nexus.sonatype.url>

<version.java>11</version.java>
Expand Down Expand Up @@ -237,7 +237,7 @@
</snapshots>
<id>zeebe</id>
<name>Zeebe Repository</name>
<url>https://artifacts.camunda.com/artifactory/zeebe-io</url>
<url>https://artifacts.camunda.com/artifactory/zeebe-io/</url>
</repository>

<repository>
Expand All @@ -249,7 +249,7 @@
</snapshots>
<id>zeebe-snapshots</id>
<name>Zeebe Snapshot Repository</name>
<url>https://artifacts.camunda.com/artifactory/zeebe-io-snapshots</url>
<url>https://artifacts.camunda.com/artifactory/zeebe-io-snapshots/</url>
</repository>
</repositories>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@ void startupInternal(
clusterServices.getMembershipService(),
clusterServices.getEventService());

brokerStartupContext.setEmbeddedGatewayService(embeddedGatewayService);
final var embeddedGatewayServiceFuture = embeddedGatewayService.start();
concurrencyControl.runOnCompletion(
embeddedGatewayServiceFuture,
(gateway, error) -> {
if (error != null) {
startupFuture.completeExceptionally(error);
return;
}

final var springBridge = brokerStartupContext.getSpringBrokerBridge();
final var gateway = embeddedGatewayService.get();
springBridge.registerBrokerClient(gateway::getBrokerClient);

startupFuture.complete(brokerStartupContext);
brokerStartupContext.setEmbeddedGatewayService(embeddedGatewayService);
final var springBridge = brokerStartupContext.getSpringBrokerBridge();
springBridge.registerBrokerClient(gateway::getBrokerClient);
startupFuture.complete(brokerStartupContext);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.util.sched.ActorScheduler;
import java.io.IOException;
import java.io.UncheckedIOException;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.util.function.Function;

public final class EmbeddedGatewayService implements AutoCloseable {
Expand All @@ -34,7 +33,6 @@ public EmbeddedGatewayService(
new BrokerClientImpl(
cfg, messagingService, membershipService, eventService, actorScheduler, false);
gateway = new Gateway(configuration.getGateway(), brokerClientFactory, actorScheduler);
startGateway();
}

@Override
Expand All @@ -48,11 +46,7 @@ public Gateway get() {
return gateway;
}

private void startGateway() {
try {
gateway.start();
} catch (final IOException e) {
throw new UncheckedIOException("Gateway was not able to start", e);
}
public ActorFuture<Gateway> start() {
return gateway.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.TestConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
Expand Down Expand Up @@ -58,6 +59,7 @@ class StartupBehavior {

private ActorFuture<BrokerStartupContext> startupFuture;
private ActorScheduler actorScheduler;
private Actor actor;

@BeforeEach
void setUp() {
Expand All @@ -81,6 +83,9 @@ void setUp() {
final var port = SocketUtil.getNextAddress().getPort();
final var commandApiCfg = TEST_BROKER_CONFIG.getGateway().getNetwork();
commandApiCfg.setPort(port);

actor = Actor.newActor().build();
actorScheduler.submitActor(actor);
}

@AfterEach
Expand All @@ -95,7 +100,10 @@ void tearDown() {
@Test
void shouldCompleteFuture() {
// when
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
actor.run(
() -> {
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
});

// then
assertThat(startupFuture).succeedsWithin(TIME_OUT);
Expand All @@ -105,7 +113,10 @@ void shouldCompleteFuture() {
@Test
void shouldStartAndInstallEmbeddedGatewayService() {
// when
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
actor.run(
() -> {
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
});
await().until(startupFuture::isDone);

// then
Expand Down
81 changes: 61 additions & 20 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@
import io.camunda.zeebe.gateway.interceptors.impl.DecoratedInterceptor;
import io.camunda.zeebe.gateway.interceptors.impl.InterceptorRepository;
import io.camunda.zeebe.gateway.query.impl.QueryApiImpl;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import me.dinowernli.grpc.prometheus.Configuration;
Expand Down Expand Up @@ -110,31 +110,65 @@ public BrokerClient getBrokerClient() {
return brokerClient;
}

public void start() throws IOException {
public ActorFuture<Gateway> start() {
final var resultFuture = new CompletableActorFuture<Gateway>();

healthManager.setStatus(Status.STARTING);
brokerClient = buildBrokerClient();

final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
submitActorToActivateJobs((Consumer<ActorControl>) activateJobsHandler);
createAndStartActivateJobsHandler(brokerClient)
.whenComplete(
(activateJobsHandler, error) -> {
if (error != null) {
resultFuture.completeExceptionally(error);
return;
}

final var serverResult = createAndStartServer(activateJobsHandler);
if (serverResult.isLeft()) {
final var exception = serverResult.getLeft();
resultFuture.completeExceptionally(exception);
} else {
server = serverResult.get();
healthManager.setStatus(Status.RUNNING);
resultFuture.complete(this);
}
});

return resultFuture;
}

private Either<Exception, Server> createAndStartServer(
final ActivateJobsHandler activateJobsHandler) {
final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
final ServerBuilder<?> serverBuilder = serverBuilderFactory.apply(gatewayCfg);

try {
final var serverBuilder = serverBuilderFactory.apply(gatewayCfg);
applySecurityConfigurationIfEnabled(serverBuilder);
final var server = buildServer(serverBuilder, gatewayGrpcService);
server.start();
return Either.right(server);
} catch (Exception e) {
return Either.left(e);
}
}

private void applySecurityConfigurationIfEnabled(final ServerBuilder<?> serverBuilder) {
final SecurityCfg securityCfg = gatewayCfg.getSecurity();
if (securityCfg.isEnabled()) {
setSecurityConfig(serverBuilder, securityCfg);
}
}

server =
serverBuilder
.addService(applyInterceptors(gatewayGrpcService))
.addService(
ServerInterceptors.intercept(
healthManager.getHealthService(), MONITORING_SERVER_INTERCEPTOR))
.build();
server.start();
healthManager.setStatus(Status.RUNNING);
private Server buildServer(
final ServerBuilder<?> serverBuilder, final BindableService interceptorService) {
return serverBuilder
.addService(applyInterceptors(interceptorService))
.addService(
ServerInterceptors.intercept(
healthManager.getHealthService(), MONITORING_SERVER_INTERCEPTOR))
.build();
}

private static NettyServerBuilder setNetworkConfig(final NetworkCfg cfg) {
Expand Down Expand Up @@ -186,15 +220,22 @@ private BrokerClient buildBrokerClient() {
return brokerClientFactory.apply(gatewayCfg);
}

private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
private CompletableFuture<ActivateJobsHandler> createAndStartActivateJobsHandler(
final BrokerClient brokerClient) {
final var handler = buildActivateJobsHandler(brokerClient);
return submitActorToActivateJobs(handler);
}

private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(
final ActivateJobsHandler handler) {
final var future = new CompletableFuture<ActivateJobsHandler>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.actorStartedHandler(handler.andThen(t -> future.complete(handler)))
.build();
actorSchedulingService.submitActor(actor);
actorStartedFuture.join();
return future;
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.util.sched.ActorControl;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/** Can handle an 'activate jobs' request from a client. */
public interface ActivateJobsHandler {
public interface ActivateJobsHandler extends Consumer<ActorControl> {

static final AtomicLong ACTIVATE_JOBS_REQUEST_ID_GENERATOR = new AtomicLong(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;

/**
* Adds long polling to the handling of activate job requests. When there are no jobs available to
* activate, the response will be kept open.
*/
public final class LongPollingActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {
public final class LongPollingActivateJobsHandler implements ActivateJobsHandler {

private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
private static final Logger LOG = Loggers.GATEWAY_LOGGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
* Iterates in round-robin fashion over partitions to activate jobs. Uses a map from job type to
* partition-IDs to determine the next partition to use.
*/
public final class RoundRobinActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {
public final class RoundRobinActivateJobsHandler implements ActivateJobsHandler {

private static final String ACTIVATE_JOB_NOT_SENT_MSG = "Failed to send activated jobs to client";
private static final String ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static void setUp() throws IOException {
cluster.getMembershipService(),
cluster.getEventService(),
actorScheduler);
gateway.start();
gateway.start().join();

final String gatewayAddress = NetUtil.toSocketAddressString(networkCfg.toSocketAddress());
client = ZeebeClient.newClientBuilder().gatewayAddress(gatewayAddress).usePlaintext().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -991,13 +990,13 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
}

private void submitActorToActivateJobs(final LongPollingActivateJobsHandler handler) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var future = new CompletableFuture<>();
final var actor =
Actor.newActor()
.name("LongPollingHandler-Test")
.actorStartedHandler(handler.andThen(actorStartedFuture::complete))
.actorStartedHandler(handler.andThen(future::complete))
.build();
actorSchedulerRule.submitActor(actor);
actorStartedFuture.join();
future.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public GatewayBlockingStub buildClient() {
}

private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var future = new CompletableFuture<>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.actorStartedHandler(consumer.andThen(future::complete))
.build();
actorScheduler.submitActor(actor);
actorStartedFuture.join();
future.join();
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
Expand Down
Loading

0 comments on commit d8cc5ea

Please sign in to comment.