Skip to content

Commit

Permalink
feat(engine): Introduce operation log for sync message correlations
Browse files Browse the repository at this point in the history
Related to #3900
  • Loading branch information
mboskamp committed Jan 19, 2024
1 parent 8e38d12 commit 5939769
Show file tree
Hide file tree
Showing 24 changed files with 1,030 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
*/
package org.camunda.bpm.engine.history;

import org.camunda.bpm.engine.EntityTypes;

import java.util.Date;
import org.camunda.bpm.engine.EntityTypes;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.camunda.bpm.engine.impl;

import java.net.URL;

import org.camunda.bpm.application.impl.ProcessApplicationLogger;
import org.camunda.bpm.container.impl.ContainerIntegrationLogger;
import org.camunda.bpm.engine.ProcessEngineConfiguration;
Expand Down Expand Up @@ -126,10 +125,10 @@ public class ProcessEngineLogger extends BaseLogger {
MigrationLogger.class, PROJECT_CODE, "org.camunda.bpm.engine.migration", "23");

public static final ExternalTaskLogger EXTERNAL_TASK_LOGGER = BaseLogger.createLogger(
ExternalTaskLogger.class, PROJECT_CODE, "org.camunda.bpm.engine.externaltask", "24");
ExternalTaskLogger.class, PROJECT_CODE, "org.camunda.bpm.engine.externaltask", "24");

public static final SecurityLogger SECURITY_LOGGER = BaseLogger.createLogger(
SecurityLogger.class, PROJECT_CODE, "org.camunda.bpm.engine.security", "25");
SecurityLogger.class, PROJECT_CODE, "org.camunda.bpm.engine.security", "25");

public static final IncidentLogger INCIDENT_LOGGER = BaseLogger.createLogger(
IncidentLogger.class, PROJECT_CODE, "org.camunda.bpm.engine.incident", "26");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,18 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
*/
protected boolean restrictUserOperationLogToAuthenticatedUsers = true;

/**
* Maximum number of operation log entries written per synchronous operation.
* APIs that can affect multiple entities can produce an operation log entry each.
* This property controls how the operation logs are handled. Possible values:
* <ul>
* <li>1 (Default): Always write one summary operation log including message name, number of affected entities, number of variables set by operation, and async=false</li>
* <lI>-1: Disable limiting of operation logs for synchronous APIs. Write one log entry per affected entity. Caution: This may impact performance with large data.</li>
* <li>1<x<=Long.MAX_VALUE: Write at most x operation logs per synchronous operation. If an operation exceeds x, a {@link ProcessEngineException} is thrown.</li>
* </ul>
*/
protected long logEntriesPerSyncOperationLimit = 1L;

protected boolean disableStrictCallActivityValidation = false;

protected boolean isBpmnStacktraceVerbose = false;
Expand Down Expand Up @@ -1194,6 +1206,7 @@ protected void init() {
initAdminUser();
initAdminGroups();
initPasswordPolicy();
initOperationLog();
invokePostInit();
}

Expand Down Expand Up @@ -2852,6 +2865,14 @@ protected void initDeploymentRegistration() {
}
}

protected void initOperationLog() {
if(logEntriesPerSyncOperationLimit < -1 || logEntriesPerSyncOperationLimit == 0) {
throw new ProcessEngineException(
"Invalid configuration for logEntriesPerSyncOperationLimit. Configured value needs to be either -1 or greater than 0 but was "
+ logEntriesPerSyncOperationLimit + ".");
}
}

// cache factory //////////////////////////////////////////////////////////

protected void initCacheFactory() {
Expand Down Expand Up @@ -4593,6 +4614,15 @@ public ProcessEngineConfigurationImpl setRestrictUserOperationLogToAuthenticated
return this;
}

public long getLogEntriesPerSyncOperationLimit() {
return logEntriesPerSyncOperationLimit;
}

public ProcessEngineConfigurationImpl setLogEntriesPerSyncOperationLimit(long logEntriesPerSyncOperationLimit) {
this.logEntriesPerSyncOperationLimit = logEntriesPerSyncOperationLimit;
return this;
}

public ProcessEngineConfigurationImpl setTenantIdProvider(TenantIdProvider tenantIdProvider) {
this.tenantIdProvider = tenantIdProvider;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public abstract class AbstractCorrelateMessageCmd {
protected final MessageCorrelationBuilderImpl builder;

protected ExecutionVariableSnapshotObserver variablesListener;
protected boolean variablesEnabled = false;
protected boolean variablesInResultEnabled = false;
protected long variablesCount = 0;
protected boolean deserializeVariableValues = false;

/**
Expand All @@ -55,11 +56,13 @@ public abstract class AbstractCorrelateMessageCmd {
protected AbstractCorrelateMessageCmd(MessageCorrelationBuilderImpl builder) {
this.builder = builder;
this.messageName = builder.getMessageName();
countVariables();

}

protected AbstractCorrelateMessageCmd(MessageCorrelationBuilderImpl builder, boolean variablesEnabled, boolean deserializeVariableValues) {
this(builder);
this.variablesEnabled = variablesEnabled;
this.variablesInResultEnabled = variablesEnabled;
this.deserializeVariableValues = deserializeVariableValues;
}

Expand All @@ -81,11 +84,11 @@ protected ProcessInstance instantiateProcess(CommandContext commandContext, Corr
ActivityImpl messageStartEvent = processDefinitionEntity.findActivity(correlationResult.getStartEventActivityId());
ExecutionEntity processInstance = processDefinitionEntity.createProcessInstance(builder.getBusinessKey(), messageStartEvent);

if (variablesEnabled) {
if (variablesInResultEnabled) {
variablesListener = new ExecutionVariableSnapshotObserver(processInstance, false, deserializeVariableValues);
}

VariableMap startVariables = resolveStartVariables();
VariableMap startVariables = resolveVariables();

processInstance.start(startVariables);

Expand All @@ -112,7 +115,11 @@ protected MessageCorrelationResultImpl createMessageCorrelationResult(final Comm
MessageCorrelationResultImpl resultWithVariables = new MessageCorrelationResultImpl(handlerResult);
if (MessageCorrelationResultType.Execution.equals(handlerResult.getResultType())) {
ExecutionEntity execution = findProcessInstanceExecution(commandContext, handlerResult);
if (variablesEnabled && execution != null) {

ProcessInstance processInstance = execution.getProcessInstance();
resultWithVariables.setProcessInstance(processInstance);

if (variablesInResultEnabled && execution != null) {
variablesListener = new ExecutionVariableSnapshotObserver(execution, false, deserializeVariableValues);
}
triggerExecution(commandContext, handlerResult);
Expand All @@ -133,12 +140,23 @@ protected ExecutionEntity findProcessInstanceExecution(final CommandContext comm
return execution;
}

protected VariableMap resolveStartVariables() {
protected VariableMap resolveVariables() {
VariableMap mergedVariables = Variables.createVariables();
mergedVariables.putAll(builder.getPayloadProcessInstanceVariables());
mergedVariables.putAll(builder.getPayloadProcessInstanceVariablesLocal());
mergedVariables.putAll(builder.getPayloadProcessInstanceVariablesToTriggeredScope());
return mergedVariables;
}

protected void countVariables() {
if(builder.getPayloadProcessInstanceVariables() != null) {
variablesCount += builder.getPayloadProcessInstanceVariables().size();
}
if(builder.getPayloadProcessInstanceVariablesLocal() != null) {
variablesCount += builder.getPayloadProcessInstanceVariablesLocal().size();
}
if(builder.getPayloadProcessInstanceVariablesToTriggeredScope() != null) {
variablesCount += builder.getPayloadProcessInstanceVariablesToTriggeredScope().size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,32 @@
package org.camunda.bpm.engine.impl.cmd;


import static org.camunda.bpm.engine.impl.util.EnsureUtil.ensureAtLeastOneNotNull;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.camunda.bpm.engine.history.UserOperationLogEntry;
import org.camunda.bpm.engine.impl.MessageCorrelationBuilderImpl;
import org.camunda.bpm.engine.impl.context.Context;
import org.camunda.bpm.engine.impl.history.SynchronousOperationLogProducer;
import org.camunda.bpm.engine.impl.interceptor.Command;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.persistence.entity.PropertyChange;
import org.camunda.bpm.engine.impl.runtime.CorrelationHandler;
import org.camunda.bpm.engine.impl.runtime.CorrelationHandlerResult;
import org.camunda.bpm.engine.impl.runtime.CorrelationSet;
import org.camunda.bpm.engine.impl.runtime.MessageCorrelationResultImpl;
import org.camunda.bpm.engine.impl.runtime.CorrelationHandlerResult;
import static org.camunda.bpm.engine.impl.util.EnsureUtil.ensureAtLeastOneNotNull;
import org.camunda.bpm.engine.runtime.ProcessInstance;

/**
* @author Thorben Lindhauer
* @author Daniel Meyer
* @author Michael Scholz
*/
public class CorrelateAllMessageCmd extends AbstractCorrelateMessageCmd implements Command<List<MessageCorrelationResultImpl>> {
public class CorrelateAllMessageCmd extends AbstractCorrelateMessageCmd implements Command<List<MessageCorrelationResultImpl>>, SynchronousOperationLogProducer<MessageCorrelationResultImpl> {

/**
* Initialize the command with a builder
Expand Down Expand Up @@ -70,6 +76,54 @@ public List<CorrelationHandlerResult> call() throws Exception {
results.add(createMessageCorrelationResult(commandContext, correlationResult));
}

produceOperationLog(commandContext, results);

return results;
}

@Override
public void createOperationLogEntry(CommandContext commandContext, MessageCorrelationResultImpl result, List<PropertyChange> propChanges, boolean isSummary) {
String processInstanceId = null;
String processDefinitionId = null;
if(result.getProcessInstance() != null) {
if(!isSummary) {
processInstanceId = result.getProcessInstance().getId();
}
processDefinitionId = result.getProcessInstance().getProcessDefinitionId();
}
commandContext.getOperationLogManager()
.logProcessInstanceOperation(UserOperationLogEntry.OPERATION_TYPE_CORRELATE_MESSAGE, processInstanceId, processDefinitionId, null, propChanges);
}

@Override
public Map<MessageCorrelationResultImpl, List<PropertyChange>> getPropChangesForOperation(List<MessageCorrelationResultImpl> results) {
Map<MessageCorrelationResultImpl, List<PropertyChange>> resultPropChanges = new HashMap<>();
for (MessageCorrelationResultImpl messageCorrelationResultImpl : results) {
List<PropertyChange> propChanges = getGenericPropChangesForOperation();
ProcessInstance processInstance = messageCorrelationResultImpl.getProcessInstance();
if(processInstance != null) {
propChanges.add(new PropertyChange("processInstanceId", null, processInstance.getId()));
}
resultPropChanges.put(messageCorrelationResultImpl, propChanges);
}
return resultPropChanges;
}

@Override
public List<PropertyChange> getSummarizingPropChangesForOperation(List<MessageCorrelationResultImpl> results) {
List<PropertyChange> propChanges = getGenericPropChangesForOperation();
propChanges.add(new PropertyChange("nrOfInstances", null, results.size()));
return propChanges;
}

protected List<PropertyChange> getGenericPropChangesForOperation() {
ArrayList<PropertyChange> propChanges = new ArrayList<>();

propChanges.add(new PropertyChange("messageName", null, messageName));
if(variablesCount > 0) {
propChanges.add(new PropertyChange("nrOfVariables", null, variablesCount));
}

return propChanges;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.List;
import java.util.concurrent.Callable;

import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
import org.camunda.bpm.engine.impl.MessageCorrelationBuilderImpl;
import org.camunda.bpm.engine.impl.ProcessEngineLogger;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.engine.impl.history;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.camunda.bpm.engine.ProcessEngineException;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.persistence.entity.PropertyChange;

/**
* Interface for Commands that synchronously modify multiple entities in one operation.
* The methods of this interface take care of producing operation log entries based on the
* {@link ProcessEngineConfigurationImpl#getLogEntriesPerSyncOperationLimit() logEntriesPerSyncOperationLimit} property.
*/
public interface SynchronousOperationLogProducer<T> {

Long SUMMARY_LOG = 1L;
Long UNLIMITED_LOG = -1L;

/**
* Returns a map containing a list of changed properties for every result of the operation.
* Used to produce an operation log entry per entry contained in the returned map.
*/
Map<T, List<PropertyChange>> getPropChangesForOperation(List<T> results);

/**
* Returns a list of changed properties summarizing the whole operation involving multiple entities.
*/
List<PropertyChange> getSummarizingPropChangesForOperation(List<T> results);

/**
* Calls the code that produces the operation log. Usually <code>commandContext.getOperationLogManager().log...</code>
*
* The implementation must be capable of producing a single, summarizing operation log that contain information about an operation
* spanning affecting multiple entities as well as producing a single, detailed operation log containing information about a single
* affected entity. This method is called by the {@link SynchronousOperationLogProducer#produceOperationLog(CommandContext, List) produceOperationLog}
* method.
*
* @param commandContext the current command context
* @param result An object resulting from the operation for which this method produces the operation log. In case the operation produced
* multiple objects, depending on the implementation a representative object from the list of results or null can be passed.
* @param propChanges property changes to be attached to the operation log
* @param isSummary indicates whether the implementation should produce a summary log or a detailed log
*/
void createOperationLogEntry(CommandContext commandContext, T result, List<PropertyChange> propChanges, boolean isSummary);

/**
* The implementing command can call this method to produce the operation log entries for the current operation.
*/
default void produceOperationLog(CommandContext commandContext, List<T> results) {
if(results == null || results.isEmpty()) {
return;
}

long logEntriesPerSyncOperationLimit = commandContext.getProcessEngineConfiguration()
.getLogEntriesPerSyncOperationLimit();
if(logEntriesPerSyncOperationLimit == SUMMARY_LOG && results.size() > 1) {
// create summary from multi-result operation
List<PropertyChange> propChangesForOperation = getSummarizingPropChangesForOperation(results);
if(propChangesForOperation == null) {
// convert null return value to empty list
propChangesForOperation = Collections.singletonList(PropertyChange.EMPTY_CHANGE);
}
// use first result as representative for summarized operation log entry
createOperationLogEntry(commandContext, results.get(0), propChangesForOperation, true);
} else {
// create detailed log for each operation result
Map<T, List<PropertyChange>> propChangesForOperation = getPropChangesForOperation(results);
if(propChangesForOperation == null ) {
// create a map with empty result lists for each result item
propChangesForOperation = results.stream().collect(Collectors.toMap(Function.identity(), (result) -> Collections.singletonList(PropertyChange.EMPTY_CHANGE)));
}
if (logEntriesPerSyncOperationLimit != UNLIMITED_LOG && logEntriesPerSyncOperationLimit < propChangesForOperation.size()) {
throw new ProcessEngineException(
"Maximum number of operation log entries for operation type synchronous APIs reached. Configured limit is "
+ logEntriesPerSyncOperationLimit + " but " + propChangesForOperation.size() + " entities were affected by API call.");
} else {
// produce one operation log per affected entity
for (Entry<T, List<PropertyChange>> propChanges : propChangesForOperation.entrySet()) {
createOperationLogEntry(commandContext, propChanges.getKey(), propChanges.getValue(), false);
}
}
}
}
}

0 comments on commit 5939769

Please sign in to comment.