Skip to content

Commit

Permalink
Merge #4126
Browse files Browse the repository at this point in the history
4126: 3803 job retries feel expression r=pihme a=pihme

## Description

Added FEEL expressions for job retries

## Related issues

closes #3803

#

Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors[bot] and pihme committed Mar 26, 2020
2 parents 919d29e + 333709f commit 79ca231
Show file tree
Hide file tree
Showing 23 changed files with 322 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,17 @@ public B zeebeJobTypeExpression(final String expression) {
return zeebeJobType(asZeebeExpression(expression));
}

public B zeebeTaskRetries(final int retries) {
public B zeebeJobRetries(final String retries) {
final ZeebeTaskDefinition taskDefinition =
getCreateSingleExtensionElement(ZeebeTaskDefinition.class);
taskDefinition.setRetries(retries);
return myself;
}

public B zeebeJobRetriesExpression(final String expression) {
return zeebeJobRetries(asZeebeExpression(expression));
}

public B zeebeTaskHeader(final String key, final String value) {
final ZeebeTaskHeaders taskHeaders = getCreateSingleExtensionElement(ZeebeTaskHeaders.class);
final ZeebeHeader header = createChild(taskHeaders, ZeebeHeader.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ZeebeTaskDefinitionImpl extends BpmnModelElementInstanceImpl
implements ZeebeTaskDefinition {

protected static Attribute<String> typeAttribute;
protected static Attribute<Integer> retriesAttribute;
protected static Attribute<String> retriesAttribute;

public ZeebeTaskDefinitionImpl(final ModelTypeInstanceContext instanceContext) {
super(instanceContext);
Expand All @@ -45,12 +45,12 @@ public void setType(final String type) {
}

@Override
public Integer getRetries() {
public String getRetries() {
return retriesAttribute.getValue(this);
}

@Override
public void setRetries(final int retries) {
public void setRetries(final String retries) {
retriesAttribute.setValue(this, retries);
}

Expand All @@ -70,7 +70,7 @@ public static void registerType(final ModelBuilder modelBuilder) {

retriesAttribute =
typeBuilder
.integerAttribute(ZeebeConstants.ATTRIBUTE_RETRIES)
.stringAttribute(ZeebeConstants.ATTRIBUTE_RETRIES)
.defaultValue(ZeebeTaskDefinition.DEFAULT_RETRIES)
.namespace(BpmnModelConstants.ZEEBE_NS)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

public interface ZeebeTaskDefinition extends BpmnModelElementInstance {

int DEFAULT_RETRIES = 3;
String DEFAULT_RETRIES = "3";

String getType();

void setType(String type);

Integer getRetries();
String getRetries();

void setRetries(int retries);
void setRetries(String retries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void validate(
}

if (element.getRetries() == null) {
validationResultCollector.addError(0, "Task retries must be an integer");
validationResultCollector.addError(0, "Task retries must be present and not empty");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void shouldBuildServiceTask() {
"foo",
b ->
b.zeebeJobType("taskType")
.zeebeTaskRetries(5)
.zeebeJobRetries("5")
.zeebeTaskHeader("foo", "f")
.zeebeTaskHeader("bar", "b"))
.endEvent()
Expand All @@ -57,7 +57,7 @@ public void shouldBuildServiceTask() {
final ZeebeTaskDefinition taskDefinition =
getExtensionElement(serviceTask, ZeebeTaskDefinition.class);
assertThat(taskDefinition.getType()).isEqualTo("taskType");
assertThat(taskDefinition.getRetries()).isEqualTo(5);
assertThat(taskDefinition.getRetries()).isEqualTo("5");

final ZeebeTaskHeaders taskHeaders = getExtensionElement(serviceTask, ZeebeTaskHeaders.class);
final Collection<ZeebeHeader> headerCollection = taskHeaders.getHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ public Collection<ChildElementAssumption> getChildElementAssumptions() {
public Collection<AttributeAssumption> getAttributesAssumptions() {
return Arrays.asList(
new AttributeAssumption(BpmnModelConstants.ZEEBE_NS, "type", false, true),
new AttributeAssumption(BpmnModelConstants.ZEEBE_NS, "retries", false, false, 3));
new AttributeAssumption(BpmnModelConstants.ZEEBE_NS, "retries", false, false, "3"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import static java.util.Collections.singletonList;

import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.impl.BpmnModelConstants;
import io.zeebe.model.bpmn.impl.ZeebeConstants;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeTaskDefinition;
import org.junit.runners.Parameterized.Parameters;

Expand All @@ -42,22 +40,6 @@ public static Object[][] parameters() {
.endEvent()
.done(),
singletonList(expect(ZeebeTaskDefinition.class, "Task type must be present and not empty"))
},
{
Bpmn.createExecutableProcess("process")
.startEvent()
.serviceTask("task")
.addExtensionElement(
ZeebeTaskDefinition.class,
b ->
b.setAttributeValueNs(
BpmnModelConstants.ZEEBE_NS,
ZeebeConstants.ATTRIBUTE_RETRIES,
"notANumber"))
.zeebeJobType("type")
.endEvent()
.done(),
singletonList(expect(ZeebeTaskDefinition.class, "Task retries must be an integer"))
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ public Optional<DirectBuffer> evaluateStringExpression(
.map(this::wrapResult);
}

/**
* Evaluates the given expression and returns the result as long. If the evaluation fails or the
* result is not a number then an incident is raised.
*
* @param expression the expression to evaluate
* @param context the element context to load the variables from
* @return the evaluation result as long, or {@link Optional#empty()} if an incident is raised
*/
public Optional<Long> evaluateLongExpression(
final Expression expression, final BpmnStepContext<?> context) {

final var evaluationResult = evaluateExpression(expression, context.getKey());
return failureCheck(evaluationResult, ErrorType.EXTRACT_VALUE_ERROR, context)
.flatMap(
result -> typeCheck(result, ResultType.NUMBER, ErrorType.EXTRACT_VALUE_ERROR, context))
.map(EvaluationResult::getNumber)
.map(Number::longValue);
}

/**
* Evaluates the given expression and returns the result as boolean. If the evaluation fails or
* the result is not a boolean then an incident is raised.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class ExecutableServiceTask extends ExecutableActivity {

private Expression type;
private int retries;
private Expression retries;
private DirectBuffer encodedHeaders = JobRecord.NO_HEADERS;

public ExecutableServiceTask(final String id) {
Expand All @@ -29,11 +29,11 @@ public void setType(final Expression type) {
this.type = type;
}

public int getRetries() {
public Expression getRetries() {
return retries;
}

public void setRetries(final int retries) {
public void setRetries(final Expression retries) {
this.retries = retries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.zeebe.util.buffer.BufferUtil.wrapString;

import io.zeebe.el.Expression;
import io.zeebe.el.ExpressionLanguage;
import io.zeebe.engine.Loggers;
import io.zeebe.engine.processor.workflow.deployment.model.BpmnStep;
import io.zeebe.engine.processor.workflow.deployment.model.element.ExecutableServiceTask;
Expand Down Expand Up @@ -71,11 +72,16 @@ private void transformTaskDefinition(
final ZeebeTaskDefinition taskDefinition =
element.getSingleExtensionElement(ZeebeTaskDefinition.class);

final ExpressionLanguage expressionLanguage = context.getExpressionLanguage();
final Expression jobTypeExpression =
context.getExpressionLanguage().parseExpression(taskDefinition.getType());
expressionLanguage.parseExpression(taskDefinition.getType());

serviceTask.setType(jobTypeExpression);
serviceTask.setRetries(taskDefinition.getRetries());

final Expression retriesExpression =
expressionLanguage.parseExpression(taskDefinition.getRetries());

serviceTask.setRetries(retriesExpression);
}

private void transformTaskHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static final Collection<ModelElementValidator<?>> getValidators(
.build(expressionLanguage),
ZeebeExpressionValidator.verifyThat(ZeebeTaskDefinition.class)
.hasValidExpression(ZeebeTaskDefinition::getType)
.hasValidExpression(ZeebeTaskDefinition::getRetries)
.build(expressionLanguage));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ private ServiceTaskBuilder addServiceTask(
final AbstractFlowNodeBuilder<?, ?> builder, final YamlTask task) {
final String id = task.getId();
final String taskType = task.getType();
final int taskRetries = task.getRetries();
final String taskRetries = task.getRetries();

final ServiceTaskBuilder serviceTaskBuilder =
builder.serviceTask(id).zeebeJobType(taskType).zeebeTaskRetries(taskRetries);
builder.serviceTask(id).zeebeJobType(taskType).zeebeJobRetries(taskRetries);

for (final Entry<String, String> header : task.getHeaders().entrySet()) {
serviceTaskBuilder.zeebeTaskHeader(header.getKey(), header.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class YamlTask {
private String id = "";

private String type = "";
private int retries = ZeebeTaskDefinition.DEFAULT_RETRIES;
private String retries = ZeebeTaskDefinition.DEFAULT_RETRIES;

private Map<String, String> headers = new HashMap<>();

Expand Down Expand Up @@ -49,11 +49,11 @@ public void setType(final String type) {
this.type = type;
}

public int getRetries() {
public String getRetries() {
return retries;
}

public void setRetries(final int retries) {
public void setRetries(final String retries) {
this.retries = retries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ protected boolean handleState(final BpmnStepContext<T> context) {
final Optional<DirectBuffer> optJobType =
expressionProcessor.evaluateStringExpression(serviceTask.getType(), context);

if (optJobType.isPresent()) {
populateJobFromTask(context, value, optJobType.get(), serviceTask);
final Optional<Long> optRetries =
expressionProcessor.evaluateLongExpression(serviceTask.getRetries(), context);

if (optJobType.isPresent() && optRetries.isPresent()) {
populateJobFromTask(
context, value, optJobType.get(), optRetries.get().intValue(), serviceTask);
context.getCommandWriter().appendNewCommand(JobIntent.CREATE, jobCommand);
}

Expand All @@ -59,13 +63,14 @@ private void populateJobFromTask(
final BpmnStepContext<T> context,
final WorkflowInstanceRecord value,
final DirectBuffer jobType,
final int retries,
final ExecutableServiceTask serviceTask) {
final DirectBuffer headers = serviceTask.getEncodedHeaders();

jobCommand.reset();
jobCommand
.setType(jobType)
.setRetries(serviceTask.getRetries())
.setRetries(retries)
.setVariables(DocumentValue.EMPTY_DOCUMENT)
.setCustomHeaders(headers)
.setBpmnProcessId(value.getBpmnProcessIdBuffer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void shouldCreateJobFromServiceTaskWithJobTypeExpression() {
// given
ENGINE
.deployment()
.withXmlResource(workflow(t -> t.zeebeJobTypeExpression("\"test\"").zeebeTaskRetries(5)))
.withXmlResource(workflow(t -> t.zeebeJobTypeExpression("\"test\"")))
.deploy();

// when
Expand All @@ -67,6 +67,26 @@ public void shouldCreateJobFromServiceTaskWithJobTypeExpression() {
Assertions.assertThat(job.getValue()).hasType("test");
}

@Test
public void shouldCreateJobFromServiceTaskWithJobRetriesExpression() {
// given
ENGINE
.deployment()
.withXmlResource(workflow(t -> t.zeebeJobType("test").zeebeJobRetriesExpression("5+3")))
.deploy();

// when
final long workflowInstanceKey = ENGINE.workflowInstance().ofBpmnProcessId(PROCESS_ID).create();

// then
final Record<JobRecordValue> job =
RecordingExporter.jobRecords(JobIntent.CREATE)
.withWorkflowInstanceKey(workflowInstanceKey)
.getFirst();

Assertions.assertThat(job.getValue()).hasRetries(8);
}

@Test
public void shouldActivateServiceTask() {
// given
Expand Down Expand Up @@ -105,7 +125,7 @@ public void shouldCreateJob() {
// given
ENGINE
.deployment()
.withXmlResource(workflow(t -> t.zeebeJobType("test").zeebeTaskRetries(5)))
.withXmlResource(workflow(t -> t.zeebeJobType("test").zeebeJobRetries("5")))
.deploy();

// when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ public static Object[][] parameters() {
.done(),
Arrays.asList(expect(ZeebeTaskDefinition.class, INVALID_EXPRESSION_MESSAGE))
},
{
// invalid job retries expression
Bpmn.createExecutableProcess("process")
.startEvent()
.serviceTask("task", t -> t.zeebeJobRetriesExpression(INVALID_EXPRESSION))
.done(),
Arrays.asList(expect(ZeebeTaskDefinition.class, INVALID_EXPRESSION_MESSAGE))
},
{
// output element expression is not supported
Bpmn.createExecutableProcess("process")
Expand Down

0 comments on commit 79ca231

Please sign in to comment.