Skip to content
This repository has been archived by the owner on Feb 13, 2021. It is now read-only.

fix: Events related to different process instances may be collected in one bunch #257

Merged
merged 6 commits into from Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,7 +21,6 @@
import org.activiti.cloud.api.model.shared.events.CloudRuntimeEvent;
import org.activiti.cloud.api.model.shared.impl.events.CloudRuntimeEventImpl;
import org.activiti.cloud.services.events.ProcessEngineChannels;
import org.activiti.cloud.services.events.converter.ExecutionContextInfoAppender;
import org.activiti.cloud.services.events.converter.RuntimeBundleInfoAppender;
import org.activiti.cloud.services.events.message.MessageBuilderChainFactory;
import org.activiti.engine.impl.context.ExecutionContext;
Expand All @@ -33,7 +32,6 @@
public class MessageProducerCommandContextCloseListener implements CommandContextCloseListener {

public static final String PROCESS_ENGINE_EVENTS = "processEngineEvents";
public static final String EXECUTION_CONTEXT = "executionContext";

private final ProcessEngineChannels producer;
private final MessageBuilderChainFactory<ExecutionContext> messageBuilderChainFactory;
Expand All @@ -54,29 +52,21 @@ public MessageProducerCommandContextCloseListener(ProcessEngineChannels producer
this.runtimeBundleInfoAppender = runtimeBundleInfoAppender;
}

protected ExecutionContextInfoAppender createExecutionContextInfoAppender(ExecutionContext executionContext) {
return new ExecutionContextInfoAppender(executionContext);
}

@Override
public void closed(CommandContext commandContext) {
List<CloudRuntimeEvent<?, ?>> events = commandContext.getGenericAttribute(PROCESS_ENGINE_EVENTS);

if (events != null && !events.isEmpty()) {
ExecutionContext executionContext = commandContext.getGenericAttribute(EXECUTION_CONTEXT);
erdemedeiros marked this conversation as resolved.
Show resolved Hide resolved

ExecutionContextInfoAppender executionContextInfoAppender = createExecutionContextInfoAppender(executionContext);

// Add execution context attributes to every event
// Add runtime bundle context attributes to every event
CloudRuntimeEvent<?, ?>[] payload = events.stream()
.filter(CloudRuntimeEventImpl.class::isInstance)
.map(CloudRuntimeEventImpl.class::cast)
.map(runtimeBundleInfoAppender::appendRuntimeBundleInfoTo)
.map(executionContextInfoAppender::appendExecutionContextInfoTo)
.toArray(CloudRuntimeEvent<?, ?>[]::new);

// Inject message headers from execution context
Message<CloudRuntimeEvent<?, ?>[]> message = messageBuilderChainFactory.create(executionContext)
// Inject message headers with null execution context as there may be events from several process instances
Message<CloudRuntimeEvent<?, ?>[]> message = messageBuilderChainFactory.create(null)
.withPayload(payload)
.build();
// Send message to audit producer channel
Expand Down
Expand Up @@ -17,12 +17,21 @@
package org.activiti.cloud.services.events.listeners;

import org.activiti.api.model.shared.model.VariableInstance;
import org.activiti.api.process.model.BPMNActivity;
import org.activiti.api.process.model.BPMNSequenceFlow;
import org.activiti.api.task.model.Task;
import org.activiti.cloud.api.model.shared.events.CloudRuntimeEvent;
import org.activiti.cloud.api.model.shared.events.CloudVariableEvent;
import org.activiti.cloud.api.model.shared.impl.events.CloudRuntimeEventImpl;
import org.activiti.cloud.api.process.model.events.CloudBPMNActivityEvent;
import org.activiti.cloud.api.process.model.events.CloudIntegrationEvent;
import org.activiti.cloud.api.process.model.events.CloudProcessRuntimeEvent;
import org.activiti.cloud.api.process.model.events.CloudSequenceFlowEvent;
import org.activiti.cloud.api.task.model.events.CloudTaskCandidateGroupEvent;
import org.activiti.cloud.api.task.model.events.CloudTaskCandidateUserEvent;
import org.activiti.cloud.api.task.model.events.CloudTaskRuntimeEvent;
import org.activiti.cloud.services.events.converter.CachingExecutionContext;
import org.activiti.cloud.services.events.converter.ExecutionContextInfoAppender;
import org.activiti.engine.impl.context.ExecutionContext;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.persistence.entity.ExecutionEntity;
Expand Down Expand Up @@ -52,25 +61,48 @@ protected String getAttributeKey() {

@Override
public void add(CloudRuntimeEvent<?, ?> element) {
super.add(element);

CommandContext commandContext = getCurrentCommandContext();

// Let's try resolve underlying execution Id
String executionId = resolveExecutionId(element);

// Let's find and cache ExecutionContext for executionId
if(executionId != null && commandContext.getGenericAttribute(MessageProducerCommandContextCloseListener.EXECUTION_CONTEXT) == null) {
ExecutionContext executionContext = resolveExecutionContext(commandContext, executionId);

// Let's inject execution context info into event using event execution process context
if(executionContext != null) {
ExecutionContextInfoAppender executionContextInfoAppender = createExecutionContextInfoAppender(executionContext);

CloudRuntimeEventImpl<?,?> event = CloudRuntimeEventImpl.class.cast(element);

element = executionContextInfoAppender.appendExecutionContextInfoTo(event);
}

super.add(element);

}

protected ExecutionContext resolveExecutionContext(CommandContext commandContext, String executionId) {

if(executionId != null && commandContext.getGenericAttribute(executionId) == null) {
ExecutionEntity executionEntity = commandContext.getExecutionEntityManager()
.findById(executionId);

ExecutionContext executionContext = createExecutionContext(executionEntity);

if (executionEntity != null) {
commandContext.addAttribute(MessageProducerCommandContextCloseListener.EXECUTION_CONTEXT,
commandContext.addAttribute(executionId,
executionContext);
}
}

return commandContext.getGenericAttribute(executionId);

}


protected ExecutionContextInfoAppender createExecutionContextInfoAppender(ExecutionContext executionContext) {
return new ExecutionContextInfoAppender(executionContext);
}

protected ExecutionContext createExecutionContext(ExecutionEntity executionEntity) {
Expand All @@ -84,6 +116,16 @@ protected String resolveExecutionId(CloudRuntimeEvent<?, ?> element) {
return ((VariableInstance) element.getEntity()).getProcessInstanceId();
} else if(element instanceof CloudTaskRuntimeEvent) {
return ((Task) element.getEntity()).getProcessInstanceId();
} else if(element instanceof CloudBPMNActivityEvent) {
return ((BPMNActivity) element.getEntity()).getProcessInstanceId();
} else if(element instanceof CloudSequenceFlowEvent) {
return ((BPMNSequenceFlow) element.getEntity()).getProcessInstanceId();
} else if(element instanceof CloudIntegrationEvent) {
return ((CloudIntegrationEvent) element).getEntity().getProcessInstanceId();
} else if(element instanceof CloudTaskCandidateUserEvent) {
return ((CloudTaskCandidateUserEvent) element).getProcessInstanceId();
} else if(element instanceof CloudTaskCandidateGroupEvent) {
return ((CloudTaskCandidateGroupEvent) element).getProcessInstanceId();
}

return null;
Expand Down
Expand Up @@ -21,12 +21,10 @@
public class AuditProducerRoutingKeyResolver extends AbstractMessageHeadersRoutingKeyResolver {

public final String ROUTING_KEY_PREFIX = "engineEvents";

public final String[] HEADER_KEYS = {RuntimeBundleInfoMessageHeaders.SERVICE_NAME,
RuntimeBundleInfoMessageHeaders.APP_NAME,
ExecutionContextMessageHeaders.PROCESS_DEFINITION_KEY,
ExecutionContextMessageHeaders.PROCESS_INSTANCE_ID,
ExecutionContextMessageHeaders.BUSINESS_KEY
};
RuntimeBundleInfoMessageHeaders.APP_NAME
};

@Override
public String resolve(Map<String, Object> headers) {
Expand Down
Expand Up @@ -45,13 +45,14 @@
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public class MessageProducerCommandContextCloseListenerTest {

private static final String MOCK_ROUTING_KEY = "engineEvents.springAppName.appName.mockProcessDefinitionKey.mockProcessInstanceId.mockBusinessKey";
private static final String MOCK_ROUTING_KEY = "engineEvents.springAppName.appName";
private static final String MOCK_PARENT_PROCESS_NAME = "mockParentProcessName";
private static final String LORG_ACTIVITI_CLOUD_API_MODEL_SHARED_EVENTS_CLOUD_RUNTIME_EVENT = "[Lorg.activiti.cloud.api.model.shared.events.CloudRuntimeEvent;";
private static final String MOCK_PROCESS_NAME = "mockProcessName";
Expand Down Expand Up @@ -92,10 +93,13 @@ public class MessageProducerCommandContextCloseListenerTest {
private ExecutionContextMessageBuilderFactory messageBuilderChainFactory =
new ExecutionContextMessageBuilderFactory(properties);

private ProcessEngineEventsAggregator processEngineEventsAggregator;

@Spy
private RuntimeBundleInfoAppender runtimeBundleInfoAppender =
new RuntimeBundleInfoAppender(properties);


@Mock
private MessageChannel auditChannel;

Expand All @@ -113,16 +117,21 @@ public void setUp() throws Exception {
event = new CloudProcessCreatedEventImpl();

when(producer.auditProducer()).thenReturn(auditChannel);

processEngineEventsAggregator = Mockito.spy(new ProcessEngineEventsAggregator(closeListener));

when(processEngineEventsAggregator.getCurrentCommandContext()).thenReturn(commandContext);

ExecutionContext executionContext = mockExecutionContext();
given(commandContext.getGenericAttribute(MessageProducerCommandContextCloseListener.EXECUTION_CONTEXT))
given(commandContext.getGenericAttribute(event.getEntityId()))
.willReturn(executionContext);

}

@Test
public void closedShouldSendEventsRegisteredOnTheCommandContext() {
// given
processEngineEventsAggregator.add(event);
given(commandContext.getGenericAttribute(MessageProducerCommandContextCloseListener.PROCESS_ENGINE_EVENTS))
.willReturn(Collections.singletonList(event));

Expand Down Expand Up @@ -194,16 +203,6 @@ public void closedShouldSendMessageHeadersWithExecutionContext() {
assertThat(messageArgumentCaptor.getValue()
.getHeaders()).containsEntry("routingKey", MOCK_ROUTING_KEY)
.containsEntry("messagePayloadType",LORG_ACTIVITI_CLOUD_API_MODEL_SHARED_EVENTS_CLOUD_RUNTIME_EVENT)
.containsEntry("businessKey",MOCK_BUSINESS_KEY)
.containsEntry("processInstanceId",MOCK_PROCESS_INSTANCE_ID)
.containsEntry("processName",MOCK_PROCESS_NAME)
.containsEntry("parentProcessInstanceId",MOCK_PARENT_PROCESS_INSTANCE_ID)
.containsEntry("parentProcessInstanceName",MOCK_PARENT_PROCESS_NAME)
.containsEntry("processDefinitionId",MOCK_PROCESS_DEFINITION_ID)
.containsEntry("processDefinitionKey",MOCK_PROCESS_DEFINITION_KEY)
.containsEntry("processDefinitionVersion", MOCK_PROCESS_DEFINITION_VERSION)
.containsEntry("deploymentId",MOCK_DEPLOYMENT_ID)
.containsEntry("deploymentName",MOCK_DEPLOYMENT_NAME)
.containsEntry("appName", APP_NAME)
.containsEntry("appVersion", APP_VERSION)
.containsEntry("serviceName",SPRING_APP_NAME)
Expand Down
Expand Up @@ -30,67 +30,55 @@ public class AuditProducerRoutingKeyResolverTest {
public void testResolveRoutingKeyFromValidHeadersInAnyOrder() {
// given
Map<String, Object> headers = MapBuilder.<String, Object> map(RuntimeBundleInfoMessageHeaders.APP_NAME, "app-name")
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "service-name")
.with(ExecutionContextMessageHeaders.BUSINESS_KEY, "business-key")
.with(ExecutionContextMessageHeaders.PROCESS_INSTANCE_ID, "process-instance-id")
.with(ExecutionContextMessageHeaders.PROCESS_DEFINITION_KEY, "process-definition-key");
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "service-name");

// when
String routingKey = subject.resolve(headers);

// then
assertThat(routingKey).isEqualTo("engineEvents.service-name.app-name.process-definition-key.process-instance-id.business-key");
assertThat(routingKey).isEqualTo("engineEvents.service-name.app-name");

}

@Test
public void testResolveRoutingKeyFromEmptyHeaders() {
// given
Map<String, Object> headers = MapBuilder.<String, Object> map(RuntimeBundleInfoMessageHeaders.APP_NAME, "app-name")
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "service-name")
.with(ExecutionContextMessageHeaders.PROCESS_DEFINITION_KEY, "process-definition-key")
.with(ExecutionContextMessageHeaders.PROCESS_INSTANCE_ID, "process-instance-id")
.with(ExecutionContextMessageHeaders.BUSINESS_KEY, "");
Map<String, Object> headers = MapBuilder.<String, Object> map(RuntimeBundleInfoMessageHeaders.APP_NAME, "")
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "service-name");

// when
String routingKey = subject.resolve(headers);

// then
assertThat(routingKey).isEqualTo("engineEvents.service-name.app-name.process-definition-key.process-instance-id._");
assertThat(routingKey).isEqualTo("engineEvents.service-name._");

}

@Test
public void testResolveRoutingKeyFromNullHeaders() {
// given
Map<String, Object> headers = MapBuilder.<String, Object> map(RuntimeBundleInfoMessageHeaders.APP_NAME, "app-name")
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "service-name")
.with(ExecutionContextMessageHeaders.PROCESS_DEFINITION_KEY, "process-definition-key")
.with(ExecutionContextMessageHeaders.PROCESS_INSTANCE_ID, "process-instance-id")
.with(ExecutionContextMessageHeaders.BUSINESS_KEY, null);
Map<String, Object> headers = MapBuilder.<String, Object> map(RuntimeBundleInfoMessageHeaders.APP_NAME, null)
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "service-name");

// when
String routingKey = subject.resolve(headers);

// then
assertThat(routingKey).isEqualTo("engineEvents.service-name.app-name.process-definition-key.process-instance-id._");
assertThat(routingKey).isEqualTo("engineEvents.service-name._");

}

@Test
public void testResolveRoutingKeyWithEscapedValues() {
// given
Map<String, Object> headers = MapBuilder.<String, Object> map(RuntimeBundleInfoMessageHeaders.APP_NAME, "app:name")
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "service.name")
.with(ExecutionContextMessageHeaders.PROCESS_DEFINITION_KEY, "process#definition-key")
.with(ExecutionContextMessageHeaders.PROCESS_INSTANCE_ID, "process*instance*id")
.with(ExecutionContextMessageHeaders.BUSINESS_KEY, "business key");
Map<String, Object> headers = MapBuilder.<String, Object> map(RuntimeBundleInfoMessageHeaders.APP_NAME, "app:na#me")
.with(RuntimeBundleInfoMessageHeaders.SERVICE_NAME, "ser.vice*na me");

// when
String routingKey = subject.resolve(headers);

// then
assertThat(routingKey).isEqualTo("engineEvents.service-name.app-name.process-definition-key.process-instance-id.business-key");
assertThat(routingKey).isEqualTo("engineEvents.ser-vice-na-me.app-na-me");

}

Expand All @@ -103,7 +91,7 @@ public void testResolveRoutingKeyWithNonExistingHeaders() {
String routingKey = subject.resolve(headers);

// then
assertThat(routingKey).isEqualTo("engineEvents._._._._._");
assertThat(routingKey).isEqualTo("engineEvents._._");

}

Expand Down