Skip to content

Commit

Permalink
merge: #10705 #10743
Browse files Browse the repository at this point in the history
10705: feat(client-java): support set variables when failing a job r=korthout a=skayliu

## Description

Add support set variables when failing a job.

`client.newFailCommand(...).variables(...) `

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9404 

depends on #10701, #10702 



10743: feat(broker): reduce log level of received exporter position r=oleschoenburg a=deepthidevaki

## Description

Logging for every update when receiving exporter position was flooding the logs. To reduce the noise, the log level is reduced to trace. In addition, added a new log which will be logged periodically at every 60 seconds. This would be enough in most cases to monitor the exporter position.

There are two logs because in case if it is needed during debugging, the log level of the trace log can be increased to debug dynamically, which will then logged for every message. The frequency of periodic logger cannot be increased dynamically.


Co-authored-by: skayliu <skay463@163.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
3 people committed Oct 18, 2022
3 parents 0d9dba6 + 0965015 + 0b15483 commit ff72a85
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.system.partitions.PartitionMessagingService;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

public class ExporterPositionsDistributionService implements AutoCloseable {

private static final Logger PERIODIC_LOGGER =
new ThrottledLogger(Loggers.EXPORTER_LOGGER, Duration.ofSeconds(60));
private final PartitionMessagingService partitionMessagingService;
private final String exporterPositionsTopic;
private final BiConsumer<String, Long> exporterPositionConsumer;
Expand All @@ -41,7 +46,8 @@ private void storeExporterPositions(final ByteBuffer byteBuffer) {

final var exporterPositions = exportPositionsMessage.getExporterPositions();

Loggers.EXPORTER_LOGGER.debug("Received new exporter state {}", exporterPositions);
Loggers.EXPORTER_LOGGER.trace("Received new exporter state {}", exporterPositions);
PERIODIC_LOGGER.debug("Current exporter state {}", exporterPositions);

exporterPositions.forEach(exporterPositionConsumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.camunda.zeebe.client.api.command;

import io.camunda.zeebe.client.api.response.FailJobResponse;
import java.io.InputStream;
import java.time.Duration;
import java.util.Map;

public interface FailJobCommandStep1 {

Expand Down Expand Up @@ -56,5 +58,41 @@ interface FailJobCommandStep2 extends FinalCommandStep<FailJobResponse> {
* it to the broker.
*/
FailJobCommandStep2 errorMessage(String errorMsg);

/**
* Set the variables of this job.
*
* @param variables the variables (JSON) as stream
* @return the builder for this command. Call {@link #send()} to complete the command and send
* it to the broker.
*/
FailJobCommandStep2 variables(InputStream variables);

/**
* Set the variables of this job.
*
* @param variables the variables (JSON) as String
* @return the builder for this command. Call {@link #send()} to complete the command and send
* it to the broker.
*/
FailJobCommandStep2 variables(String variables);

/**
* Set the variables of this job.
*
* @param variables the variables as map
* @return the builder for this command. Call {@link #send()} to complete the command and send
* it to the broker.
*/
FailJobCommandStep2 variables(Map<String, Object> variables);

/**
* Set the variables of this job.
*
* @param variables the variables as object
* @return the builder for this command. Call {@link #send()} to complete the command and send
* it to the broker.
*/
FailJobCommandStep2 variables(Object variables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.camunda.zeebe.client.impl.command;

import io.camunda.zeebe.client.api.JsonMapper;
import io.camunda.zeebe.client.api.ZeebeFuture;
import io.camunda.zeebe.client.api.command.FailJobCommandStep1;
import io.camunda.zeebe.client.api.command.FailJobCommandStep1.FailJobCommandStep2;
Expand All @@ -31,7 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

public final class FailJobCommandImpl implements FailJobCommandStep1, FailJobCommandStep2 {
public final class FailJobCommandImpl extends CommandWithVariables<FailJobCommandStep2>
implements FailJobCommandStep1, FailJobCommandStep2 {

private final GatewayStub asyncStub;
private final Builder builder;
Expand All @@ -40,9 +42,11 @@ public final class FailJobCommandImpl implements FailJobCommandStep1, FailJobCom

public FailJobCommandImpl(
final GatewayStub asyncStub,
final JsonMapper jsonMapper,
final long key,
final Duration requestTimeout,
final Predicate<Throwable> retryPredicate) {
super(jsonMapper);
this.asyncStub = asyncStub;
this.requestTimeout = requestTimeout;
this.retryPredicate = retryPredicate;
Expand All @@ -68,6 +72,12 @@ public FailJobCommandStep2 errorMessage(final String errorMsg) {
return this;
}

@Override
public FailJobCommandStep2 setVariablesInternal(final String variables) {
builder.setVariables(variables);
return this;
}

@Override
public FinalCommandStep<FailJobResponse> requestTimeout(final Duration requestTimeout) {
this.requestTimeout = requestTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public CompleteJobCommandStep1 newCompleteCommand(ActivatedJob job) {
@Override
public FailJobCommandStep1 newFailCommand(final long jobKey) {
return new FailJobCommandImpl(
asyncStub, jobKey, config.getDefaultRequestTimeout(), retryPredicate);
asyncStub, jsonMapper, jobKey, config.getDefaultRequestTimeout(), retryPredicate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.FailJobResponse;
import io.camunda.zeebe.client.util.ClientTest;
import io.camunda.zeebe.client.util.JsonUtil;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest;
import java.time.Duration;
import java.util.Collections;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -147,4 +149,23 @@ public void shouldNotHaveNullResponse() {
// then
assertThat(response).isNotNull();
}

@Test
public void shouldFailJobWithJsonStringVariables() {
// given
final long jobKey = 12;
final int newRetries = 0;

final String json = JsonUtil.toJson(Collections.singletonMap("key", "val"));

// when
final FailJobResponse response =
client.newFailCommand(jobKey).retries(newRetries).variables(json).send().join();

// then
final FailJobRequest request = gatewayService.getLastRequest();
assertThat(request.getJobKey()).isEqualTo(jobKey);
JsonUtil.assertEquality(request.getVariables(), json);
assertThat(response).isNotNull();
}
}

0 comments on commit ff72a85

Please sign in to comment.