Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't wait for actor being submitted #8993

Merged
merged 1 commit into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@ void startupInternal(
clusterServices.getMembershipService(),
clusterServices.getEventService());

brokerStartupContext.setEmbeddedGatewayService(embeddedGatewayService);
final var embeddedGatewayServiceFuture = embeddedGatewayService.start();
concurrencyControl.runOnCompletion(
embeddedGatewayServiceFuture,
(gateway, error) -> {
if (error != null) {
startupFuture.completeExceptionally(error);
return;
}

final var springBridge = brokerStartupContext.getSpringBrokerBridge();
final var gateway = embeddedGatewayService.get();
springBridge.registerBrokerClient(gateway::getBrokerClient);

startupFuture.complete(brokerStartupContext);
brokerStartupContext.setEmbeddedGatewayService(embeddedGatewayService);
final var springBridge = brokerStartupContext.getSpringBrokerBridge();
springBridge.registerBrokerClient(gateway::getBrokerClient);
startupFuture.complete(brokerStartupContext);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.util.sched.ActorScheduler;
import java.io.IOException;
import java.io.UncheckedIOException;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.util.function.Function;

public final class EmbeddedGatewayService implements AutoCloseable {
Expand All @@ -34,7 +33,6 @@ public EmbeddedGatewayService(
new BrokerClientImpl(
cfg, messagingService, membershipService, eventService, actorScheduler, false);
gateway = new Gateway(configuration.getGateway(), brokerClientFactory, actorScheduler);
startGateway();
}

@Override
Expand All @@ -48,11 +46,7 @@ public Gateway get() {
return gateway;
}

private void startGateway() {
try {
gateway.start();
} catch (final IOException e) {
throw new UncheckedIOException("Gateway was not able to start", e);
}
public ActorFuture<Gateway> start() {
return gateway.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.TestConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
Expand Down Expand Up @@ -58,6 +59,7 @@ class StartupBehavior {

private ActorFuture<BrokerStartupContext> startupFuture;
private ActorScheduler actorScheduler;
private Actor actor;

@BeforeEach
void setUp() {
Expand All @@ -81,6 +83,9 @@ void setUp() {
final var port = SocketUtil.getNextAddress().getPort();
final var commandApiCfg = TEST_BROKER_CONFIG.getGateway().getNetwork();
commandApiCfg.setPort(port);

actor = Actor.newActor().build();
actorScheduler.submitActor(actor);
}

@AfterEach
Expand All @@ -95,7 +100,10 @@ void tearDown() {
@Test
void shouldCompleteFuture() {
// when
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
actor.run(
() -> {
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
});

// then
assertThat(startupFuture).succeedsWithin(TIME_OUT);
Expand All @@ -105,7 +113,10 @@ void shouldCompleteFuture() {
@Test
void shouldStartAndInstallEmbeddedGatewayService() {
// when
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
actor.run(
() -> {
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
});
await().until(startupFuture::isDone);

// then
Expand Down
81 changes: 61 additions & 20 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@
import io.camunda.zeebe.gateway.interceptors.impl.DecoratedInterceptor;
import io.camunda.zeebe.gateway.interceptors.impl.InterceptorRepository;
import io.camunda.zeebe.gateway.query.impl.QueryApiImpl;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import me.dinowernli.grpc.prometheus.Configuration;
Expand Down Expand Up @@ -110,31 +110,65 @@ public BrokerClient getBrokerClient() {
return brokerClient;
}

public void start() throws IOException {
public ActorFuture<Gateway> start() {
final var resultFuture = new CompletableActorFuture<Gateway>();

healthManager.setStatus(Status.STARTING);
brokerClient = buildBrokerClient();

final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
submitActorToActivateJobs((Consumer<ActorControl>) activateJobsHandler);
createAndStartActivateJobsHandler(brokerClient)
.whenComplete(
(activateJobsHandler, error) -> {
if (error != null) {
resultFuture.completeExceptionally(error);
return;
}

final var serverResult = createAndStartServer(activateJobsHandler);
if (serverResult.isLeft()) {
final var exception = serverResult.getLeft();
resultFuture.completeExceptionally(exception);
} else {
server = serverResult.get();
healthManager.setStatus(Status.RUNNING);
resultFuture.complete(this);
}
});

return resultFuture;
}

private Either<Exception, Server> createAndStartServer(
final ActivateJobsHandler activateJobsHandler) {
final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
final ServerBuilder<?> serverBuilder = serverBuilderFactory.apply(gatewayCfg);

try {
final var serverBuilder = serverBuilderFactory.apply(gatewayCfg);
applySecurityConfigurationIfEnabled(serverBuilder);
final var server = buildServer(serverBuilder, gatewayGrpcService);
server.start();
return Either.right(server);
} catch (Exception e) {
return Either.left(e);
}
}

private void applySecurityConfigurationIfEnabled(final ServerBuilder<?> serverBuilder) {
final SecurityCfg securityCfg = gatewayCfg.getSecurity();
if (securityCfg.isEnabled()) {
setSecurityConfig(serverBuilder, securityCfg);
}
}

server =
serverBuilder
.addService(applyInterceptors(gatewayGrpcService))
.addService(
ServerInterceptors.intercept(
healthManager.getHealthService(), MONITORING_SERVER_INTERCEPTOR))
.build();
server.start();
healthManager.setStatus(Status.RUNNING);
private Server buildServer(
final ServerBuilder<?> serverBuilder, final BindableService interceptorService) {
return serverBuilder
.addService(applyInterceptors(interceptorService))
.addService(
ServerInterceptors.intercept(
healthManager.getHealthService(), MONITORING_SERVER_INTERCEPTOR))
.build();
}

private static NettyServerBuilder setNetworkConfig(final NetworkCfg cfg) {
Expand Down Expand Up @@ -186,15 +220,22 @@ private BrokerClient buildBrokerClient() {
return brokerClientFactory.apply(gatewayCfg);
}

private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
private CompletableFuture<ActivateJobsHandler> createAndStartActivateJobsHandler(
final BrokerClient brokerClient) {
final var handler = buildActivateJobsHandler(brokerClient);
return submitActorToActivateJobs(handler);
}

private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(
final ActivateJobsHandler handler) {
final var future = new CompletableFuture<ActivateJobsHandler>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.actorStartedHandler(handler.andThen(t -> future.complete(handler)))
.build();
actorSchedulingService.submitActor(actor);
actorStartedFuture.join();
return future;
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.util.sched.ActorControl;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/** Can handle an 'activate jobs' request from a client. */
public interface ActivateJobsHandler {
public interface ActivateJobsHandler extends Consumer<ActorControl> {

static final AtomicLong ACTIVATE_JOBS_REQUEST_ID_GENERATOR = new AtomicLong(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;

/**
* Adds long polling to the handling of activate job requests. When there are no jobs available to
* activate, the response will be kept open.
*/
public final class LongPollingActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {
public final class LongPollingActivateJobsHandler implements ActivateJobsHandler {

private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
private static final Logger LOG = Loggers.GATEWAY_LOGGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
* Iterates in round-robin fashion over partitions to activate jobs. Uses a map from job type to
* partition-IDs to determine the next partition to use.
*/
public final class RoundRobinActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {
public final class RoundRobinActivateJobsHandler implements ActivateJobsHandler {

private static final String ACTIVATE_JOB_NOT_SENT_MSG = "Failed to send activated jobs to client";
private static final String ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static void setUp() throws IOException {
cluster.getMembershipService(),
cluster.getEventService(),
actorScheduler);
gateway.start();
gateway.start().join();

final String gatewayAddress = NetUtil.toSocketAddressString(networkCfg.toSocketAddress());
client = ZeebeClient.newClientBuilder().gatewayAddress(gatewayAddress).usePlaintext().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -991,13 +990,13 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
}

private void submitActorToActivateJobs(final LongPollingActivateJobsHandler handler) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var future = new CompletableFuture<>();
final var actor =
Actor.newActor()
.name("LongPollingHandler-Test")
.actorStartedHandler(handler.andThen(actorStartedFuture::complete))
.actorStartedHandler(handler.andThen(future::complete))
.build();
actorSchedulerRule.submitActor(actor);
actorStartedFuture.join();
future.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public GatewayBlockingStub buildClient() {
}

private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var future = new CompletableFuture<>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.actorStartedHandler(consumer.andThen(future::complete))
.build();
actorScheduler.submitActor(actor);
actorStartedFuture.join();
future.join();
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void shouldAbortDeploymentCalls() throws IOException {
config.getInterceptors().add(interceptorCfg);

// when
gateway.start();
gateway.start().join();
try (final var client = createZeebeClient()) {
final Future<DeploymentEvent> result =
client
Expand All @@ -110,7 +110,7 @@ void shouldInjectQueryApiViaContext() throws IOException {
config.getInterceptors().add(interceptorCfg);

// when
gateway.start();
gateway.start().join();
try (final var client = createZeebeClient()) {
try {
client.newTopologyRequest().send().join();
Expand Down
Loading