Skip to content

Commit

Permalink
Add debug logging when processing events in the event registry
Browse files Browse the repository at this point in the history
  • Loading branch information
filiphr committed Nov 30, 2023
1 parent ed000a5 commit 5abe6ba
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.flowable.cmmn.api.CmmnRuntimeService;
import org.flowable.cmmn.api.repository.CaseDefinition;
import org.flowable.cmmn.api.runtime.CaseInstance;
import org.flowable.cmmn.api.runtime.CaseInstanceBuilder;
import org.flowable.cmmn.api.runtime.CaseInstanceQuery;
import org.flowable.cmmn.api.runtime.PlanItemInstanceState;
Expand Down Expand Up @@ -76,6 +77,9 @@ protected EventRegistryProcessingInfo eventReceived(EventInstance eventInstance)

Collection<CorrelationKey> correlationKeys = generateCorrelationKeys(eventInstance.getCorrelationParameterInstances());
List<EventSubscription> eventSubscriptions = findEventSubscriptions(ScopeTypes.CMMN, eventInstance, correlationKeys);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Found {} for {}", eventSubscriptions, eventInstance);
}
CmmnRuntimeService cmmnRuntimeService = cmmnEngineConfiguration.getCmmnRuntimeService();
for (EventSubscription eventSubscription : eventSubscriptions) {
EventConsumerInfo eventConsumerInfo = new EventConsumerInfo(eventSubscription.getId(), eventSubscription.getSubScopeId(),
Expand All @@ -93,39 +97,37 @@ protected EventRegistryProcessingInfo eventReceived(EventInstance eventInstance)
protected boolean handleEventSubscription(CmmnRuntimeService cmmnRuntimeService, EventSubscription eventSubscription,
EventInstance eventInstance, Collection<CorrelationKey> correlationKeys, EventConsumerInfo eventConsumerInfo) {

if (eventSubscription.getSubScopeId() != null) {
String planItemInstanceId = eventSubscription.getSubScopeId();
if (planItemInstanceId != null) {

// When a subscope id is set, this means that a plan item instance is waiting for the event

PlanItemInstanceEntity planItemInstanceEntity = (PlanItemInstanceEntity) cmmnRuntimeService.createPlanItemInstanceQuery().planItemInstanceId(eventSubscription.getSubScopeId()).singleResult();
PlanItemInstanceEntity planItemInstanceEntity = (PlanItemInstanceEntity) cmmnRuntimeService.createPlanItemInstanceQuery().planItemInstanceId(
planItemInstanceId).singleResult();
CmmnModel cmmnModel = cmmnEngineConfiguration.getCmmnRepositoryService().getCmmnModel(planItemInstanceEntity.getCaseDefinitionId());
PlanItem planItem = cmmnModel.findPlanItemByPlanItemDefinitionId(planItemInstanceEntity.getPlanItemDefinitionId());
if (PlanItemInstanceState.ACTIVE.equals(planItemInstanceEntity.getState())
|| (planItem != null && planItem.getPlanItemDefinition() instanceof EventListener
&& PlanItemInstanceState.AVAILABLE.equals(planItemInstanceEntity.getState()))) {

cmmnRuntimeService.createPlanItemInstanceTransitionBuilder(eventSubscription.getSubScopeId())

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Triggering {} with {}", planItemInstanceEntity, eventInstance);
}
cmmnRuntimeService.createPlanItemInstanceTransitionBuilder(planItemInstanceId)
.transientVariable(EventConstants.EVENT_INSTANCE, eventInstance)
.trigger();

} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignoring {} because {} was not in the right state", eventInstance, planItemInstanceEntity);
}
return false;
}

} else if (eventSubscription.getScopeDefinitionId() != null
&& eventSubscription.getScopeId() == null
&& eventSubscription.getSubScopeId() == null) {
} else if (eventSubscription.getScopeDefinitionId() != null && eventSubscription.getScopeId() == null) {

// If there is no scope/subscope id set, but there is a scope definition id set, it's an event that starts a case

CaseInstanceBuilder caseInstanceBuilder = cmmnRuntimeService.createCaseInstanceBuilder()
.caseDefinitionId(eventSubscription.getScopeDefinitionId())
.transientVariable(EventConstants.EVENT_INSTANCE, eventInstance);

if (eventInstance.getTenantId() != null && !Objects.equals(CmmnEngineConfiguration.NO_TENANT_ID, eventInstance.getTenantId())) {
caseInstanceBuilder.overrideCaseDefinitionTenantId(eventInstance.getTenantId());
}

if (correlationKeys != null) {
String startCorrelationConfiguration = getStartCorrelationConfiguration(eventSubscription);

Expand Down Expand Up @@ -182,7 +184,7 @@ protected boolean handleEventSubscription(CmmnRuntimeService cmmnRuntimeService,
return true;
}

startCaseInstance(caseInstanceBuilder, correlationKeyWithAllParameters.getValue(), ReferenceTypes.EVENT_CASE);
startCaseInstance(cmmnRuntimeService, eventSubscription, eventInstance, correlationKeyWithAllParameters);
return true;

} finally {
Expand All @@ -197,14 +199,14 @@ protected boolean handleEventSubscription(CmmnRuntimeService cmmnRuntimeService,
}

} else {
startCaseInstance(caseInstanceBuilder, correlationKeyWithAllParameters.getValue(), ReferenceTypes.EVENT_CASE);
startCaseInstance(cmmnRuntimeService, eventSubscription, eventInstance, correlationKeyWithAllParameters);
return true;
}

}
}

startCaseInstance(caseInstanceBuilder, null, null);
startCaseInstance(cmmnRuntimeService, eventSubscription, eventInstance, null);
}

return true;
Expand All @@ -225,19 +227,42 @@ protected long countCaseInstances(CmmnRuntimeService cmmnRuntimeService, EventIn
return caseInstanceQuery.count();
}

protected void startCaseInstance(CaseInstanceBuilder caseInstanceBuilder, String referenceId, String referenceType) {
protected void startCaseInstance(CmmnRuntimeService cmmnRuntimeService, EventSubscription eventSubscription, EventInstance eventInstance,
CorrelationKey correlationKey) {
CaseInstanceBuilder caseInstanceBuilder = cmmnRuntimeService.createCaseInstanceBuilder()
.caseDefinitionId(eventSubscription.getScopeDefinitionId())
.transientVariable(EventConstants.EVENT_INSTANCE, eventInstance);

if (referenceId != null) {
caseInstanceBuilder.referenceId(referenceId);
if (eventInstance.getTenantId() != null && !Objects.equals(CmmnEngineConfiguration.NO_TENANT_ID, eventInstance.getTenantId())) {
caseInstanceBuilder.overrideCaseDefinitionTenantId(eventInstance.getTenantId());
}
if (referenceType != null) {
caseInstanceBuilder.referenceType(referenceType);

if (correlationKey != null) {
caseInstanceBuilder.referenceId(correlationKey.getValue())
.referenceType(ReferenceTypes.EVENT_CASE);
}

boolean debugLoggingEnabled = LOGGER.isDebugEnabled();
if (cmmnEngineConfiguration.isEventRegistryStartCaseInstanceAsync()) {
caseInstanceBuilder.startAsync();
if (debugLoggingEnabled) {
LOGGER.debug("Async starting case instance for {} with {}", eventSubscription, eventInstance);
}

CaseInstance caseInstance = caseInstanceBuilder.startAsync();

if (debugLoggingEnabled) {
LOGGER.debug("Started {} async for {} with {}", caseInstance, eventSubscription, eventInstance);
}
} else {
caseInstanceBuilder.start();
if (debugLoggingEnabled) {
LOGGER.debug("Starting case instance for {} with {}", eventSubscription, eventInstance);
}

CaseInstance caseInstance = caseInstanceBuilder.start();

if (debugLoggingEnabled) {
LOGGER.debug("Started {} for {} with {}", caseInstance, eventSubscription, eventInstance);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.flowable.engine.RuntimeService;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.engine.repository.ProcessDefinition;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.engine.runtime.ProcessInstanceBuilder;
import org.flowable.engine.runtime.ProcessInstanceQuery;
import org.flowable.eventregistry.api.EventConsumerInfo;
Expand Down Expand Up @@ -74,6 +75,9 @@ protected EventRegistryProcessingInfo eventReceived(EventInstance eventInstance)

Collection<CorrelationKey> correlationKeys = generateCorrelationKeys(eventInstance.getCorrelationParameterInstances());
List<EventSubscription> eventSubscriptions = findEventSubscriptions(ScopeTypes.BPMN, eventInstance, correlationKeys);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Found {} for {}", eventSubscriptions, eventInstance);
}
RuntimeService runtimeService = processEngineConfiguration.getRuntimeService();
for (EventSubscription eventSubscription : eventSubscriptions) {
EventConsumerInfo eventConsumerInfo = new EventConsumerInfo(eventSubscription.getId(), eventSubscription.getExecutionId(),
Expand All @@ -88,31 +92,22 @@ protected EventRegistryProcessingInfo eventReceived(EventInstance eventInstance)
protected void handleEventSubscription(RuntimeService runtimeService, EventSubscription eventSubscription,
EventInstance eventInstance, Collection<CorrelationKey> correlationKeys, EventConsumerInfo eventConsumerInfo) {

if (eventSubscription.getExecutionId() != null) {
String executionId = eventSubscription.getExecutionId();
if (executionId != null) {

// When an executionId is set, this means that the process instance is waiting at that step for an event

Map<String, Object> transientVariableMap = new HashMap<>();
transientVariableMap.put(EventConstants.EVENT_INSTANCE, eventInstance);
runtimeService.trigger(eventSubscription.getExecutionId(), null, transientVariableMap);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Triggering execution {} with {}", executionId, eventInstance);
}
runtimeService.trigger(executionId, null, transientVariableMap);

} else if (eventSubscription.getProcessDefinitionId() != null
&& eventSubscription.getProcessInstanceId() == null && eventSubscription.getExecutionId() == null) {
} else if (eventSubscription.getProcessDefinitionId() != null && eventSubscription.getProcessInstanceId() == null) {

// If there is no execution/process instance set, but a definition id is set, this means that it's a start event

ProcessInstanceBuilder processInstanceBuilder = runtimeService.createProcessInstanceBuilder()
.processDefinitionId(eventSubscription.getProcessDefinitionId())
.transientVariable(EventConstants.EVENT_INSTANCE, eventInstance);

if (StringUtils.isNotEmpty(eventSubscription.getActivityId())) {
processInstanceBuilder.startEventId(eventSubscription.getActivityId());
}

if (eventInstance.getTenantId() != null && !Objects.equals(ProcessEngineConfiguration.NO_TENANT_ID, eventInstance.getTenantId())) {
processInstanceBuilder.overrideProcessDefinitionTenantId(eventInstance.getTenantId());
}

if (correlationKeys != null) {
String startCorrelationConfiguration = getStartCorrelationConfiguration(eventSubscription);

Expand Down Expand Up @@ -170,7 +165,7 @@ protected void handleEventSubscription(RuntimeService runtimeService, EventSubsc
return;
}

startProcessInstance(processInstanceBuilder, correlationKeyWithAllParameters.getValue(), ReferenceTypes.EVENT_PROCESS);
startProcessInstance(runtimeService, eventSubscription, eventInstance, correlationKeyWithAllParameters);
return;

} finally {
Expand All @@ -188,16 +183,18 @@ protected void handleEventSubscription(RuntimeService runtimeService, EventSubsc


} else {
startProcessInstance(processInstanceBuilder, correlationKeyWithAllParameters.getValue(), ReferenceTypes.EVENT_PROCESS);
startProcessInstance(runtimeService, eventSubscription, eventInstance, correlationKeyWithAllParameters);
return;
}

}

}

startProcessInstance(processInstanceBuilder, null, null);
startProcessInstance(runtimeService, eventSubscription, eventInstance, null);

} else {
LOGGER.warn("Ignoring {}. It was acquired by the bpmn event consumer, but it is not used", eventSubscription);
}

}
Expand All @@ -217,19 +214,47 @@ protected long countProcessInstances(RuntimeService runtimeService, EventInstanc
return processInstanceQuery.count();
}

protected void startProcessInstance(ProcessInstanceBuilder processInstanceBuilder, String referenceId, String referenceType) {
protected void startProcessInstance(RuntimeService runtimeService, EventSubscription eventSubscription, EventInstance eventInstance,
CorrelationKey correlationKey) {
ProcessInstanceBuilder processInstanceBuilder = runtimeService.createProcessInstanceBuilder()
.processDefinitionId(eventSubscription.getProcessDefinitionId())
.transientVariable(EventConstants.EVENT_INSTANCE, eventInstance);

if (referenceId != null) {
processInstanceBuilder.referenceId(referenceId);
if (StringUtils.isNotEmpty(eventSubscription.getActivityId())) {
processInstanceBuilder.startEventId(eventSubscription.getActivityId());
}
if (referenceType != null) {
processInstanceBuilder.referenceType(referenceType);

if (eventInstance.getTenantId() != null && !Objects.equals(ProcessEngineConfiguration.NO_TENANT_ID, eventInstance.getTenantId())) {
processInstanceBuilder.overrideProcessDefinitionTenantId(eventInstance.getTenantId());
}

if (correlationKey != null) {
processInstanceBuilder
.referenceId(correlationKey.getValue())
.referenceType(ReferenceTypes.EVENT_PROCESS);
}

boolean debugLoggingEnabled = LOGGER.isDebugEnabled();
if (processEngineConfiguration.isEventRegistryStartProcessInstanceAsync()) {
processInstanceBuilder.startAsync();
if (debugLoggingEnabled) {
LOGGER.debug("Async starting process instance for {} with {}", eventSubscription, eventInstance);
}

ProcessInstance processInstance = processInstanceBuilder.startAsync();

if (debugLoggingEnabled) {
LOGGER.debug("Started {} async for {} with {}", processInstance, eventSubscription, eventInstance);
}
} else {
processInstanceBuilder.start();
if (debugLoggingEnabled) {
LOGGER.debug("Starting process instance for {} with {}", eventSubscription, eventInstance);
}

ProcessInstance processInstance = processInstanceBuilder.start();

if (debugLoggingEnabled) {
LOGGER.debug("Started {} for {} with {}", processInstance, eventSubscription, eventInstance);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.flowable.eventregistry.api;

import java.util.StringJoiner;

public class EventConsumerInfo {

protected String eventSubscriptionId;
Expand Down Expand Up @@ -59,4 +61,15 @@ public boolean isHasExistingInstancesForUniqueCorrelation() {
public void setHasExistingInstancesForUniqueCorrelation(boolean hasExistingInstancesForUniqueCorrelation) {
this.hasExistingInstancesForUniqueCorrelation = hasExistingInstancesForUniqueCorrelation;
}

@Override
public String toString() {
return new StringJoiner(", ", getClass().getSimpleName() + "[", "]")
.add("eventSubscriptionId='" + eventSubscriptionId + "'")
.add("subScopeId='" + subScopeId + "'")
.add("scopeType='" + scopeType + "'")
.add("scopeDefinitionId='" + scopeDefinitionId + "'")
.add("hasExistingInstancesForUniqueCorrelation=" + hasExistingInstancesForUniqueCorrelation)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;

public class EventRegistryProcessingInfo {

Expand All @@ -38,4 +39,11 @@ public List<EventConsumerInfo> getEventConsumerInfos() {
public void setEventConsumerInfos(List<EventConsumerInfo> eventConsumerInfos) {
this.eventConsumerInfos = eventConsumerInfos;
}

@Override
public String toString() {
return new StringJoiner(", ", getClass().getSimpleName() + "[", "]")
.add("eventConsumerInfos=" + eventConsumerInfos)
.toString();
}
}
Loading

0 comments on commit 5abe6ba

Please sign in to comment.