From 2aa287b1839ce6282562474b328176b77e77cfe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karel=20Kov=C3=A1=C5=99=C3=ADk?= Date: Wed, 14 Nov 2018 12:07:43 +0100 Subject: [PATCH] [OHFJIRA-6] : processing of messages in final states (OK, CANCEL, FAILED) - added job to search for messages - added implementation of processor to delete messages from db - new config params to control this feature added --- .../finalmessage/FinalMessageProcessor.java | 58 ++++ .../FinalMessagesProcessingService.java | 33 +++ .../api/asynch/finalmessage/package-info.java | 4 + .../openhub/api/configuration/CoreProps.java | 38 +++ .../core/common/asynch/LogContextHelper.java | 17 ++ .../AbstractFinalMessageProcessor.java | 91 +++++++ .../DeleteFinalMessageProcessor.java | 112 ++++++++ .../FinalMessageProcessingConfiguration.java | 56 ++++ .../FinalMessagesProcessingJob.java | 64 +++++ .../FinalMessagesProcessingServiceImpl.java | 206 ++++++++++++++ .../core/common/dao/ExternalCallDao.java | 7 + .../common/dao/ExternalCallDaoJpaImpl.java | 7 + .../openhub/core/common/dao/MessageDao.java | 7 + .../core/common/dao/MessageDaoJpaImpl.java | 69 ++++- .../core/common/dao/RequestResponseDao.java | 14 + .../common/dao/RequestResponseDaoJpaImpl.java | 14 + .../postgresql/V1_0_3__final_messages.sql | 24 ++ .../DeleteFinalMessageProcessorTest.java | 155 +++++++++++ ...inalMessagesProcessingServiceImplTest.java | 256 ++++++++++++++++++ .../config/application-test.properties | 2 + web/src/main/resources/application.properties | 5 + 21 files changed, 1224 insertions(+), 15 deletions(-) create mode 100644 core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessageProcessor.java create mode 100644 core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessagesProcessingService.java create mode 100644 core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/package-info.java create mode 100644 core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/AbstractFinalMessageProcessor.java create mode 100644 core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessor.java create mode 100644 core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessageProcessingConfiguration.java create mode 100644 core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingJob.java create mode 100644 core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImpl.java create mode 100644 core/src/main/resources/db/migration/postgresql/V1_0_3__final_messages.sql create mode 100644 core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessorTest.java create mode 100644 core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImplTest.java diff --git a/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessageProcessor.java b/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessageProcessor.java new file mode 100644 index 00000000..540e41d7 --- /dev/null +++ b/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.api.asynch.finalmessage; + +import org.openhubframework.openhub.api.entity.Message; +import org.springframework.core.Ordered; + +/** + * Contract for final messages processor. + * Messages that are no-longer considered relevant for openhub async workflow are expected + * to be processed by this processor. + * + * There can be multiple processors, that are invoked in order defined in getOrder method. + * + * All processor are expected to be executed in transaction, if any should throw + * an exception, transaction will be rolled back. + * + * @author Karel Kovarik + * @since 2.1 + */ +@FunctionalInterface +public interface FinalMessageProcessor extends Ordered { + + /** + * Default order, if {@link FinalMessageProcessor#getOrder()} method is not implemented. + */ + int DEFAULT_ORDER = 0; + + /** + * Process message in OpenHub datastore. + * + * @param message the message to be processed. + */ + void processMessage(Message message); + + /** + * Get the order for processor. + * + * @return the order of processor, lower value is expected to be invoked first. + */ + default int getOrder() { + return DEFAULT_ORDER; + } +} diff --git a/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessagesProcessingService.java b/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessagesProcessingService.java new file mode 100644 index 00000000..f2662f53 --- /dev/null +++ b/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/FinalMessagesProcessingService.java @@ -0,0 +1,33 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.api.asynch.finalmessage; + +/** + * Service to find final messages that are relevant for processing and + * invoke all {@link FinalMessageProcessor} beans on them. + * + * @author Karel Kovarik + * @since 2.1 + */ +@FunctionalInterface +public interface FinalMessagesProcessingService { + + /** + * Do trigger messages processing. + */ + void processMessages(); +} diff --git a/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/package-info.java b/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/package-info.java new file mode 100644 index 00000000..36fbac92 --- /dev/null +++ b/core-api/src/main/java/org/openhubframework/openhub/api/asynch/finalmessage/package-info.java @@ -0,0 +1,4 @@ +/** + * Final messages in async workflow handling. + */ +package org.openhubframework.openhub.api.asynch.finalmessage; diff --git a/core-api/src/main/java/org/openhubframework/openhub/api/configuration/CoreProps.java b/core-api/src/main/java/org/openhubframework/openhub/api/configuration/CoreProps.java index b61be05e..47d903e6 100644 --- a/core-api/src/main/java/org/openhubframework/openhub/api/configuration/CoreProps.java +++ b/core-api/src/main/java/org/openhubframework/openhub/api/configuration/CoreProps.java @@ -82,6 +82,44 @@ public class CoreProps { */ public static final String ASYNCH_POSTPONED_INTERVAL_WHEN_FAILED_SEC = PREFIX + "asynch.postponedIntervalWhenFailedSec"; + /** + * Final messages processing enabled or disabled. Note: cannot be defined in database, needs to be in properties. + */ + public static final String ASYNCH_FINAL_MESSAGES_PROCESSING_ENABLED = PREFIX + "asynch.finalMessages.processingEnabled"; + + /** + * Final messages processing job interval, in seconds. + */ + public static final String ASYNCH_FINAL_MESSAGES_PROCESSING_INTERVAL_SEC = PREFIX + "asynch.finalMessages.processingIntervalSec"; + + /** + * Maximum number of messages processed per job. + */ + public static final String ASYNCH_FINAL_MESSAGES_ITERATION_MESSAGE_LIMIT = PREFIX + "asynch.finalMessages.iterationMessageLimit"; + + /** + * Configuration of final message processor that does delete messages from the datastore. + * Note: final message processing needs to be enabled in order to do that. + */ + public static final String ASYNCH_FINAL_MESSAGES_DELETE_PROCESSOR_ENABLED = PREFIX + "asynch.finalMessages.deleteProcessor.enabled"; + + /** + * Prefix for setting duration to keep messages in final states in the datastore. + * After this period expires and there is no other action with the message, it will be processed as final (deleted probably). + * Setting to '0' means, that messages will be processed as soon as possible (next scheduled job). + * Setting to '-1' means, that messages in given state will NOT be processed. + * Property is set for each state separately, in pattern: + * ASYNCH_FINAL_MESSAGES_PREFIX + + ASYNCH_FINAL_MESSAGES_SAVE_TIME_IN_SEC_SUFFIX + * Example: + * ohf.asynch.finalMessages.ok.saveTimeInSec + */ + public static final String ASYNCH_FINAL_MESSAGES_PREFIX = PREFIX + "asynch.finalMessages."; + + /** + * Suffix to be used in conjuction with {@link CoreProps.ASYNCH_FINAL_MESSAGES_PREFIX}. + */ + public static final String ASYNCH_FINAL_MESSAGES_SAVE_TIME_IN_SEC_SUFFIX = ".saveTimeInSec"; + /** * Administrator email(s); if more emails, then separated them with semicolon, if empty then email won't be sent. */ diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/LogContextHelper.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/LogContextHelper.java index 36d0ec97..51bc126c 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/LogContextHelper.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/LogContextHelper.java @@ -72,4 +72,21 @@ public static void setLogContextParams(Message message, @Nullable String request LogContext.setContextValue(LogContextFilter.CTX_REQUEST_ID, new GUID().toString()); } } + + /** + * Remove context params, that are set in the {@link LogContextHelper#setLogContextParams(Message, String)}. + */ + public static void removeLogContextParams() { + // source system + LogContext.removeContextValue(LogContextFilter.CTX_SOURCE_SYSTEM); + + // correlation ID + LogContext.removeContextValue(LogContextFilter.CTX_CORRELATION_ID); + + // process ID + LogContext.removeContextValue(LogContextFilter.CTX_PROCESS_ID); + + // request ID + LogContext.removeContextValue(LogContextFilter.CTX_REQUEST_ID); + } } diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/AbstractFinalMessageProcessor.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/AbstractFinalMessageProcessor.java new file mode 100644 index 00000000..69c9250d --- /dev/null +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/AbstractFinalMessageProcessor.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.core.common.asynch.finalmessage; + +import javax.annotation.PostConstruct; + +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor; +import org.openhubframework.openhub.api.entity.Message; +import org.openhubframework.openhub.core.common.dao.MessageDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * Abstract class for custom implementations of {@link FinalMessageProcessor}. + * + * @author Karel Kovarik + * @since 2.1 + */ +public abstract class AbstractFinalMessageProcessor implements FinalMessageProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFinalMessageProcessor.class); + + @Autowired + protected MessageDao messageDao; + + // transactionTemplate + @Autowired + protected TransactionTemplate transactionTemplate; + + @Autowired + protected PlatformTransactionManager transactionManager; + + @PostConstruct + void initializeTransactionManager() { + this.transactionTemplate = new TransactionTemplate(transactionManager); + this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + } + + @Override + public final void processMessage(final Message message) { + LOG.trace("Will process message with id [{}].", message.getId()); + + // execute in REQUIRED transaction + transactionTemplate.execute((TransactionStatus status) -> { + // fetch the message to have it in current hibernate session + final Message loaded = messageDao.findMessage(message.getId()); + + LOG.trace("Message fetched successfully"); + // invoke the processing method. + doProcessMessage(loaded); + return null; // callback without result + }); + + LOG.trace("Message processing finished."); + } + + /** + * Get instance of messageDao repository. + * + * @return the dao bean. + */ + protected MessageDao getMessageDao() { + return messageDao; + } + + /** + * Do process message. + * + * @param message the message entity. + */ + protected abstract void doProcessMessage(Message message); +} diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessor.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessor.java new file mode 100644 index 00000000..5dada5a0 --- /dev/null +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessor.java @@ -0,0 +1,112 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.core.common.asynch.finalmessage; + +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor; +import org.openhubframework.openhub.api.entity.Message; +import org.openhubframework.openhub.api.entity.Request; +import org.openhubframework.openhub.core.common.dao.ExternalCallDao; +import org.openhubframework.openhub.core.common.dao.RequestResponseDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; + +/** + * FinalMessageProcessor implementation that does delete given message from datastore. + * + * It does delete message and all related entities as well. + * + * @author Karel Kovarik + * @since 2.1 + * @see FinalMessageProcessor + */ +@Qualifier(DeleteFinalMessageProcessor.QUALIFIER) +public class DeleteFinalMessageProcessor extends AbstractFinalMessageProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(DeleteFinalMessageProcessor.class); + + /** + * Qualifier used for the bean. + */ + public static final String QUALIFIER = "deleteFinalMessageProcessor"; + + /** + * The order used in FinalMessageProcessor hierarchy, see {@link FinalMessageProcessor#getOrder()}. + */ + public static final int ORDER = -1_000; + + @Autowired + protected RequestResponseDao requestResponseDao; + + @Autowired + protected ExternalCallDao externalCallDao; + + @Override + protected void doProcessMessage(final Message message) { + // delete external calls + deleteExternalCalls(message); + + // delete request responses + deleteRequestsResponses(message); + + // delete message entity itself + deleteMessageEntity(message); + + LOG.debug("Message [{}] deleted.", message.getId()); + } + + @Override + public int getOrder() { + return ORDER; + } + + /** + * Delete all external calls for given Message. + * + * @param message the message to be deleted. + */ + protected void deleteExternalCalls(final Message message) { + message.getExternalCalls().stream() + .peek(externalCall -> LOG.debug("Will delete external call [{}].", externalCall.getId())) + .forEach(externalCall -> externalCallDao.delete(externalCall)); + } + + /** + * Delete all request & responses for given Message. + * + * @param message the message to be deleted. + */ + protected void deleteRequestsResponses(final Message message) { + for (Request request : message.getRequests()) { + LOG.debug("Will delete request [{}] and response [{}].", + request.getId(), + request.getResponse() != null ? request.getResponse().getId() : null); + requestResponseDao.deleteResponse(request.getResponse()); + requestResponseDao.deleteRequest(request); + } + } + + /** + * Delete message entity itself. + * + * @param message the message to be deleted. + */ + protected void deleteMessageEntity(final Message message) { + getMessageDao().delete(message); + } +} diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessageProcessingConfiguration.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessageProcessingConfiguration.java new file mode 100644 index 00000000..310c4e33 --- /dev/null +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessageProcessingConfiguration.java @@ -0,0 +1,56 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.core.common.asynch.finalmessage; + +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor; +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessagesProcessingService; +import org.openhubframework.openhub.api.configuration.CoreProps; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * Configuration for final messages processing. + * Does declare all the necessary beans. + * + * For more info about final messages processing, see javadoc of {@link FinalMessagesProcessingService} & + * {@link FinalMessageProcessor}. + * + * @author Karel Kovarik + * @since 2.1 + */ +@Configuration +@ConditionalOnProperty(value = CoreProps.ASYNCH_FINAL_MESSAGES_PROCESSING_ENABLED) +public class FinalMessageProcessingConfiguration { + + @Bean + public FinalMessagesProcessingJob finalMessagesProcessingJob() { + return new FinalMessagesProcessingJob(); + } + + @Bean + public FinalMessagesProcessingService finalMessagesProcessingService(PlatformTransactionManager transactionManager) { + return new FinalMessagesProcessingServiceImpl(transactionManager); + } + + @Bean + @ConditionalOnProperty(value = CoreProps.ASYNCH_FINAL_MESSAGES_DELETE_PROCESSOR_ENABLED) + public FinalMessageProcessor deleteFinalMessageProcessor() { + return new DeleteFinalMessageProcessor(); + } +} diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingJob.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingJob.java new file mode 100644 index 00000000..818040e4 --- /dev/null +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingJob.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.core.common.asynch.finalmessage; + +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessagesProcessingService; +import org.openhubframework.openhub.api.common.quartz.JobExecuteTypeInCluster; +import org.openhubframework.openhub.api.common.quartz.QuartzSimpleTrigger; +import org.openhubframework.openhub.api.common.quartz.SimpleTriggerPropertyUnit; +import org.openhubframework.openhub.api.configuration.CoreProps; +import org.openhubframework.openhub.core.common.quartz.OpenHubQuartzJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.StopWatch; + +/** + * Definition of job to invoke final messages handling. + * + * @author Karel Kovarik + * @since 2.1 + * @see FinalMessagesProcessingService + */ +public class FinalMessagesProcessingJob { + private static final Logger LOG = LoggerFactory.getLogger(FinalMessagesProcessingJob.class); + + /** + * Unique job name. + */ + static final String JOB_NAME = "core_FinalMessageProcessing"; + + @Autowired + private FinalMessagesProcessingService finalMessagesProcessingService; + + @OpenHubQuartzJob( + name = JOB_NAME, + executeTypeInCluster = JobExecuteTypeInCluster.NOT_CONCURRENT, + simpleTriggers = @QuartzSimpleTrigger( + repeatIntervalProperty = CoreProps.ASYNCH_FINAL_MESSAGES_PROCESSING_INTERVAL_SEC, + intervalPropertyUnit = SimpleTriggerPropertyUnit.SECONDS) + ) + public final void processFinalMessages() { + LOG.trace("Start of job to process messages in final states."); + final StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + finalMessagesProcessingService.processMessages(); + stopWatch.stop(); + LOG.trace("Job finished successfully in '{}' millis.", stopWatch.getTotalTimeMillis()); + } + +} diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImpl.java b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImpl.java new file mode 100644 index 00000000..8843cd6f --- /dev/null +++ b/core/src/main/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImpl.java @@ -0,0 +1,206 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.core.common.asynch.finalmessage; + +import java.text.MessageFormat; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor; +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessagesProcessingService; +import org.openhubframework.openhub.api.configuration.ConfigurableValue; +import org.openhubframework.openhub.api.configuration.ConfigurationItem; +import org.openhubframework.openhub.api.configuration.CoreProps; +import org.openhubframework.openhub.api.entity.Message; +import org.openhubframework.openhub.api.entity.MessageFilter; +import org.openhubframework.openhub.api.entity.MsgStateEnum; +import org.openhubframework.openhub.core.common.asynch.LogContextHelper; +import org.openhubframework.openhub.core.configuration.ConfigurationService; +import org.openhubframework.openhub.spi.msg.MessageService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.util.Assert; + +/** + * Default implementation of {@link FinalMessagesProcessingService}. + * + * Does find messages in final states & configured last update time. + * After startup, fast check if all required params are filled is performed. + * + * Note: there is technical limit on how many messages for each state are processed at most. + * ({@link CoreProps#ASYNCH_FINAL_MESSAGES_ITERATION_MESSAGE_LIMIT}). + * + * @author Karel Kovarik + * @since 2.1 + */ +public class FinalMessagesProcessingServiceImpl implements FinalMessagesProcessingService { + private static final Logger LOG = LoggerFactory.getLogger(FinalMessagesProcessingServiceImpl.class); + + @Autowired + protected MessageService messageService; + + @Autowired + protected ConfigurationService configurationService; + + @ConfigurableValue(key = CoreProps.ASYNCH_FINAL_MESSAGES_ITERATION_MESSAGE_LIMIT) + protected ConfigurationItem messagesTechnicalLimit; + + // processors + @Autowired(required = false) + protected List finalMessageProcessorList = new ArrayList<>(); + + // final message state map + protected Map finalMessageStatesConfig = new HashMap<>(); + + // transactionTemplate + private final TransactionTemplate transactionTemplate; + + @Autowired + public FinalMessagesProcessingServiceImpl(PlatformTransactionManager transactionManager) { + Assert.notNull(transactionManager, "the transactionManager must not be null"); + + this.transactionTemplate = new TransactionTemplate(transactionManager); + this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + } + + // verify configuration + @EventListener + public void onApplicationEvent(ApplicationReadyEvent event) { + + final Set finalMsgStates = Arrays.stream(MsgStateEnum.values()) + .filter(MsgStateEnum::isFinal) + .collect(Collectors.toSet()); + + // fill map finalMessageStatesConfig, key is state, value is configurationKey + finalMsgStates + .forEach(msgStateEnum -> finalMessageStatesConfig.put(msgStateEnum, resolveConfigurationKey(msgStateEnum))); + + LOG.debug("Checking configuration for final messages processing."); + for (MsgStateEnum msgStateEnum : finalMsgStates) { + final String key = finalMessageStatesConfig.get(msgStateEnum); + Assert.notNull(configurationService.getValue(Long.class, key), + MessageFormat.format("Save time for messages in state {0} was not configured. " + + "Please set corresponding configuration property {1} accordingly.", msgStateEnum, key) + ); + } + } + + @Override + public void processMessages() { + Assert.notNull(finalMessageStatesConfig, "finalMessageStatesConfig must not be null"); + + for (Map.Entry msgStateEntry : finalMessageStatesConfig.entrySet()) { + // find messages + final List messageList = findMessagesForProcessing(msgStateEntry.getKey()); + LOG.trace("Will process '{}' messages in state '{}'.", messageList.size(), msgStateEntry.getKey()); + + for (Message message : messageList) { + // set log context + LogContextHelper.setLogContextParams(message, null); + try { + // execute in new transaction + transactionTemplate.execute((TransactionStatus status) -> { + // invoke all the processors + for (FinalMessageProcessor finalMessageProcessor : finalMessageProcessorList) { + finalMessageProcessor.processMessage(message); + } + return null; // callback without result + }); + + } catch (Exception ex) { + // continue with another message if handling of one fails. + LOG.error("Failed to process message : {}, will continue with next.", message, ex); + } + // clear MDC context + LogContextHelper.removeLogContextParams(); + } + } + } + + /** + * Find messages in given state that are eligible for processing. + * + * @param msgState the state. + * @return list of messages, empty if none messages are eligible. + */ + protected List findMessagesForProcessing(MsgStateEnum msgState) { + long saveTimeInSeconds = getSaveTimeInSeconds(msgState); + + if (saveTimeInSeconds < 0) { + LOG.trace("Messages in state [{}] will be skipped, as configured to be kept indefinitely.", msgState); + return Collections.emptyList(); + } + + final MessageFilter messageFilter = new MessageFilter(); + messageFilter.setState(msgState); + messageFilter.setLastChangeTo(Instant.now().minusSeconds(saveTimeInSeconds)); + + long limit = messagesTechnicalLimit.getValue(); + LOG.trace("Will search for messages with filter {}, and technical limit {}.", messageFilter, limit); + final List messageList = messageService.findMessagesByFilter(messageFilter, limit); + + if (limit == messageList.size()) { + LOG.info("Reached limit for one iteration of job {}, probably there are other messages eligible" + + "to be processed. Will be processed in the next iteration.", limit); + } + + return messageList; + } + + /** + * Get save time (meaning for how many seconds are messages kept in datastore, and ignored for the scope + * of final message processing). + * + * @param msgStateEnum the msg state. + * @return number of seconds to keep messages, before processing them as final. + */ + protected long getSaveTimeInSeconds(MsgStateEnum msgStateEnum) { + final String key = finalMessageStatesConfig.get(msgStateEnum); + Assert.hasText(key, "the key was not set"); + final Long value = configurationService.getValue(Long.class, key); + Assert.notNull(value, MessageFormat.format("The {0} is expected to be set.", key)); + return value; + } + + /** + * Get configuration key for final message save time. + * + * @param msgStateEnum the message state. + * @return key to configuration. + */ + protected String resolveConfigurationKey(MsgStateEnum msgStateEnum) { + final StringBuilder sb = new StringBuilder(); + sb.append(CoreProps.ASYNCH_FINAL_MESSAGES_PREFIX); + sb.append(msgStateEnum.name().toLowerCase()); + sb.append(CoreProps.ASYNCH_FINAL_MESSAGES_SAVE_TIME_IN_SEC_SUFFIX); + return sb.toString(); + } +} diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDao.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDao.java index 3e6fde68..38a26947 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDao.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDao.java @@ -46,6 +46,13 @@ public interface ExternalCallDao { */ void update(ExternalCall externalCall); + /** + * Delete external call. + * + * @param externalCall the external call to be deleted. + */ + void delete(ExternalCall externalCall); + /** * Finds an existing external call for the specified operation and entityId. * diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDaoJpaImpl.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDaoJpaImpl.java index d6e5c90d..f08539a8 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDaoJpaImpl.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/ExternalCallDaoJpaImpl.java @@ -56,6 +56,13 @@ public void update(ExternalCall externalCall) { em.merge(externalCall); } + @Override + public void delete(final ExternalCall externalCall) { + Assert.notNull(externalCall, "the externalCall must not be null."); + + em.remove(em.contains(externalCall) ? externalCall : em.merge(externalCall)); + } + @Override @Nullable @Transactional(propagation = Propagation.SUPPORTS, readOnly = true) diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java index 5a63b645..388723b7 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDao.java @@ -48,6 +48,13 @@ public interface MessageDao { */ void update(Message msg); + /** + * Delete message. + * + * @param msg the message. + */ + void delete(Message msg); + /** * Finds message by its ID. * diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java index fb2ae49b..e10a85d4 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/MessageDaoJpaImpl.java @@ -23,6 +23,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.stream.Stream; import javax.annotation.Nullable; import javax.persistence.EntityManager; import javax.persistence.LockModeType; @@ -68,6 +71,14 @@ public void update(Message msg) { em.merge(msg); } + @Override + @Transactional(propagation = Propagation.MANDATORY) + public void delete(final Message msg) { + Assert.notNull(msg, "the msg must not be null."); + + em.remove(em.contains(msg) ? msg : em.merge(msg)); + } + @Override @Nullable public Message findMessage(Long msgId) { @@ -395,52 +406,60 @@ public List getMessagesForGuaranteedOrderForFunnel(String funnelValue, @Override public List findMessagesByFilter(final MessageFilter filter, long limit) { Assert.notNull(filter, "the messageFilter must not be null"); + verifyMessageFilter(filter); String jSql = "SELECT m " + " FROM " + Message.class.getName() + " m " + - " WHERE m.receiveTimestamp >= :receivedFrom "; + " WHERE "; + final StringJoiner conditions = new StringJoiner(" AND "); // conditions + if (null != filter.getReceivedFrom()) { + conditions.add("m.receiveTimestamp >= :receivedFrom"); + } if (null != filter.getReceivedTo()) { - jSql += " AND m.receiveTimestamp <= :receivedTo"; + conditions.add("m.receiveTimestamp <= :receivedTo"); } if (null != filter.getLastChangeFrom()) { - jSql += " AND m.lastUpdateTimestamp >= :lastChangeFrom"; + conditions.add("m.lastUpdateTimestamp >= :lastChangeFrom"); } if (null != filter.getLastChangeTo()) { - jSql += " AND m.lastUpdateTimestamp <= :lastChangeTo"; + conditions.add("m.lastUpdateTimestamp <= :lastChangeTo"); } if (hasText(filter.getSourceSystem())) { - jSql += " AND m.sourceSystemInternal like :sourceSystem"; + conditions.add("m.sourceSystemInternal like :sourceSystem"); } if (hasText(filter.getCorrelationId())) { - jSql += " AND m.correlationId like :correlationId"; + conditions.add("m.correlationId like :correlationId"); } if (hasText(filter.getProcessId())) { - jSql += " AND m.processId like :processId"; + conditions.add("m.processId like :processId"); } if (null != filter.getState()) { - jSql += " AND m.state like :state"; + conditions.add("m.state like :state"); } if (hasText(filter.getErrorCode())) { - jSql += " AND m.failedErrorCodeInternal like :errorCode"; + conditions.add("m.failedErrorCodeInternal like :errorCode"); } if (hasText(filter.getServiceName())) { - jSql += " AND m.serviceInternal like :serviceName"; + conditions.add("m.serviceInternal like :serviceName"); } if (hasText(filter.getOperationName())) { - jSql += " AND m.operationName like :operationName"; + conditions.add("m.operationName like :operationName"); } // fulltext if (hasText(filter.getFulltext())) { - jSql += findMessagesByFilterFulltextSql("fulltext"); + conditions.add(findMessagesByFilterFulltextSql("fulltext")); } + // add conditions + jSql += conditions.toString(); jSql += " ORDER BY m.receiveTimestamp DESC"; TypedQuery q = em.createQuery(jSql, Message.class); - q.setParameter("receivedFrom", filter.getReceivedFrom()); - + if (null != filter.getReceivedFrom()) { + q.setParameter("receivedFrom", filter.getReceivedFrom()); + } if (null != filter.getReceivedTo()) { q.setParameter("receivedTo", filter.getReceivedTo()); } @@ -488,6 +507,26 @@ public List findMessagesByFilter(final MessageFilter filter, long limit * @return the sql with placeholder. */ protected String findMessagesByFilterFulltextSql(String placeholder) { - return " AND m.envelope like :" + placeholder; + return "m.envelope like :" + placeholder; + } + + private static void verifyMessageFilter(MessageFilter messageFilter) { + // verify at least one field in filled, otherwise it does not make sense + Assert.isTrue( + Stream.of( + messageFilter.getCorrelationId(), + messageFilter.getErrorCode(), + messageFilter.getFulltext(), + messageFilter.getLastChangeFrom(), + messageFilter.getLastChangeTo(), + messageFilter.getOperationName(), + messageFilter.getProcessId(), + messageFilter.getReceivedFrom(), + messageFilter.getReceivedTo(), + messageFilter.getServiceName(), + messageFilter.getSourceSystem(), + messageFilter.getState() + ).anyMatch(Objects::nonNull), + "Filter is not valid, at least one filter field must be set."); } } diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDao.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDao.java index 6a4a5c08..761fae9f 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDao.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDao.java @@ -69,4 +69,18 @@ public interface RequestResponseDao { * @return list of {@link Request} */ List findByCriteria(Instant from, Instant to, @Nullable String subUri, @Nullable String subRequest); + + /** + * Delete request. + * + * @param request the request to be deleted. + */ + void deleteRequest(Request request); + + /** + * Delete response. + * + * @param response the response to be deleted. + */ + void deleteResponse(Response response); } diff --git a/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDaoJpaImpl.java b/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDaoJpaImpl.java index cc2fc3ce..79ddd708 100644 --- a/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDaoJpaImpl.java +++ b/core/src/main/java/org/openhubframework/openhub/core/common/dao/RequestResponseDaoJpaImpl.java @@ -59,6 +59,20 @@ public void insertResponse(Response response) { em.persist(response); } + @Override + public void deleteRequest(final Request request) { + Assert.notNull(request, "the request must not be null."); + + em.remove(em.contains(request) ? request : em.merge(request)); + } + + @Override + public void deleteResponse(final Response response) { + Assert.notNull(response, "the response must not be null."); + + em.remove(em.contains(response) ? response : em.merge(response)); + } + @Nullable @Override public Request findLastRequest(String uri, String responseJoinId) { diff --git a/core/src/main/resources/db/migration/postgresql/V1_0_3__final_messages.sql b/core/src/main/resources/db/migration/postgresql/V1_0_3__final_messages.sql new file mode 100644 index 00000000..2eb34061 --- /dev/null +++ b/core/src/main/resources/db/migration/postgresql/V1_0_3__final_messages.sql @@ -0,0 +1,24 @@ +-- +-- core.async +-- + +-- final messages processing job interval, in seconds. +INSERT INTO configuration_item (code, category_code, current_value, default_value, data_type, mandatory, validation) + VALUES('ohf.asynch.finalMessages.processingIntervalSec', 'core.async', 3600, 3600, 'INT', true, null); + +-- maximum number of messages processed per job. +INSERT INTO configuration_item (code, category_code, current_value, default_value, data_type, mandatory, validation) + VALUES('ohf.asynch.finalMessages.iterationMessageLimit', 'core.async', 10000, 10000, 'INT', true, null); + +-- duration in seconds to keep messages in OK state in the datastore +INSERT INTO configuration_item (code, category_code, current_value, default_value, data_type, mandatory, validation) + VALUES('ohf.asynch.finalMessages.ok.saveTimeInSec', 'core.async', 0, 0, 'INT', true, null); + +-- duration in seconds to keep messages in FAILED state in the datastore +INSERT INTO configuration_item (code, category_code, current_value, default_value, data_type, mandatory, validation) + VALUES('ohf.asynch.finalMessages.failed.saveTimeInSec', 'core.async', 2592000, 2592000, 'INT', true, null); + +-- duration in seconds to keep messages in CANCEL state in the datastore +INSERT INTO configuration_item (code, category_code, current_value, default_value, data_type, mandatory, validation) + VALUES('ohf.asynch.finalMessages.cancel.saveTimeInSec', 'core.async', 2592000, 2592000, 'INT', true, null); + diff --git a/core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessorTest.java b/core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessorTest.java new file mode 100644 index 00000000..605048a8 --- /dev/null +++ b/core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/DeleteFinalMessageProcessorTest.java @@ -0,0 +1,155 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.core.common.asynch.finalmessage; + +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.times; + +import javax.persistence.TypedQuery; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor; +import org.openhubframework.openhub.api.entity.ExternalCall; +import org.openhubframework.openhub.api.entity.Message; +import org.openhubframework.openhub.api.entity.Request; +import org.openhubframework.openhub.api.entity.Response; +import org.openhubframework.openhub.core.AbstractCoreDbTest; +import org.openhubframework.openhub.core.reqres.RequestResponseService; +import org.openhubframework.openhub.spi.extcall.ExternalCallService; +import org.openhubframework.openhub.spi.msg.MessageService; +import org.openhubframework.openhub.test.data.ExternalSystemTestEnum; +import org.openhubframework.openhub.test.data.ServiceTestEnum; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.test.context.TestPropertySource; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * Simple test suite for {@link DeleteFinalMessageProcessor}. + * + * @author Karel Kovarik + */ +@TestPropertySource(properties = { + "ohf.asynch.finalMessages.processingEnabled=true", + "ohf.asynch.finalMessages.ok.saveTimeInSec=14400", + "ohf.asynch.finalMessages.failed.saveTimeInSec=28800", + "ohf.asynch.finalMessages.cancel.saveTimeInSec=7200", + "ohf.asynch.finalMessages.iterationMessageLimit=1000", + "ohf.asynch.finalMessages.deleteProcessor.enabled=true" +}) +public class DeleteFinalMessageProcessorTest extends AbstractCoreDbTest { + + @Autowired + private MessageService messageService; + + @Autowired + private ExternalCallService externalCallService; + + @Autowired + private RequestResponseService requestResponseService; + + @Autowired + private PlatformTransactionManager platformTransactionManager; + + private TransactionTemplate transactionTemplate; + + // tested + @Autowired + @Qualifier(DeleteFinalMessageProcessor.QUALIFIER) + private FinalMessageProcessor deleteFinalMessageProcessor; + + @Before + public void setup() { + transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + } + + @Test + public void test_delete_simpleOk() { + Message message = createAndSaveMessage( + ExternalSystemTestEnum.CRM, + ServiceTestEnum.ACCOUNT, + "testOperation", + "payload"); + + Long id = message.getId(); + assertThat(messageService.findMessageById(id), notNullValue()); + assertThat(countInTable(Message.class), is(1L)); + + // invoke tested + transactionTemplate.execute((TransactionStatus status) -> { + deleteFinalMessageProcessor.processMessage(message); + return null; // without result + }); + + assertThat(messageService.findMessageById(id), nullValue()); + assertThat(countInTable(Message.class), is(0L)); + } + + @Test + public void test_delete_withAllEntities() { + final Message message = createAndSaveMessage( + ExternalSystemTestEnum.CRM, + ServiceTestEnum.ACCOUNT, + "testOperation", + "payload"); + final Long msgId = message.getId(); + + Request request = Request.createRequest("http://test.url", "join id", "request payload", message); + requestResponseService.insertRequest(request); + requestResponseService.insertResponse(Response.createResponse(request, "response payload", null, message)); + + externalCallService.prepare("operationUri", "operationKey", message); + + // verify data is present + Message loadedMessage = messageService.findMessageById(msgId); + assertThat(loadedMessage, notNullValue()); + assertThat(countInTable(Message.class), is(1L)); + assertThat(countInTable(ExternalCall.class), is(1L)); + assertThat(countInTable(Request.class), is(1L)); + assertThat(countInTable(Response.class), is(1L)); + + // invoke tested + transactionTemplate.execute((TransactionStatus status) -> { + deleteFinalMessageProcessor.processMessage(message); + return null; // without result + }); + + assertThat(messageService.findMessageById(msgId), nullValue()); + assertThat(countInTable(Message.class), is(0L)); + assertThat(countInTable(ExternalCall.class), is(0L)); + assertThat(countInTable(Request.class), is(0L)); + assertThat(countInTable(Response.class), is(0L)); + } + + + private long countInTable(Class clazz) { + TypedQuery query = em.createQuery( + "SELECT COUNT(t) FROM " + clazz.getName() + " t ", Long.class); + return query.getSingleResult(); + } +} diff --git a/core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImplTest.java b/core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImplTest.java new file mode 100644 index 00000000..4706d52a --- /dev/null +++ b/core/src/test/java/org/openhubframework/openhub/core/common/asynch/finalmessage/FinalMessagesProcessingServiceImplTest.java @@ -0,0 +1,256 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openhubframework.openhub.core.common.asynch.finalmessage; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; + +import org.junit.Test; +import org.mockito.InOrder; +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor; +import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessagesProcessingService; +import org.openhubframework.openhub.api.entity.Message; +import org.openhubframework.openhub.api.entity.MsgStateEnum; +import org.openhubframework.openhub.core.AbstractCoreDbTest; +import org.openhubframework.openhub.core.configuration.FixedConfigurationItem; +import org.openhubframework.openhub.spi.msg.MessageService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.util.ReflectionTestUtils; + +/** + * Simple test suite for {@link FinalMessagesProcessingServiceImpl}. + * + * @author Karel Kovarik + */ +@TestPropertySource(properties = { + "ohf.asynch.finalMessages.ok.saveTimeInSec=14400", + "ohf.asynch.finalMessages.failed.saveTimeInSec=28800", + "ohf.asynch.finalMessages.cancel.saveTimeInSec=-1", + "ohf.asynch.finalMessages.processingEnabled=true", + "ohf.asynch.finalMessages.iterationMessageLimit=1000" +}) +public class FinalMessagesProcessingServiceImplTest extends AbstractCoreDbTest { + + @MockBean + @Qualifier("defaultFinalMessageProcessor") + private FinalMessageProcessor finalMessageProcessor; + + @MockBean + @Qualifier("additionalFinalMessageProcessor") + private FinalMessageProcessor additionalMessageProcessor; + + @Autowired + private MessageService messageService; + + // tested + @Autowired + private FinalMessagesProcessingService messagesProcessingService; + + @Value("${ohf.asynch.finalMessages.failed.saveTimeInSec}") + private int failedSaveTimeInSec; + + @Value("${ohf.asynch.finalMessages.ok.saveTimeInSec}") + private int okSaveTimeInSec; + + @Value("${ohf.asynch.finalMessages.cancel.saveTimeInSec}") + private int cancelSaveTimeInSec; + + @Test + public void test_timing() { + createAndSaveMessages(3, (message, order) -> { + switch (order) { + case 1: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + case 2: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec)); + break; + case 3: + message.setState(MsgStateEnum.OK); + // updated too recently, should not be processed yet + message.setLastUpdateTimestamp(Instant.now()); + break; + } + }); + + messagesProcessingService.processMessages(); + + verify(finalMessageProcessor, times(2)).processMessage(any(Message.class)); + verify(additionalMessageProcessor, times(2)).processMessage(any(Message.class)); + } + + @Test + public void test_split() { + + createAndSaveMessages(2, (message, order) -> { + switch (order) { + case 1: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + case 2: + message.setState(MsgStateEnum.FAILED); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(failedSaveTimeInSec + 1)); + break; + } + }); + + messagesProcessingService.processMessages(); + + verify(finalMessageProcessor, times(2)).processMessage(any(Message.class)); + verify(additionalMessageProcessor, times(2)).processMessage(any(Message.class)); + } + + @Test + public void test_state_ignoring() { + // none of the messages should be processed + createAndSaveMessages(5, (message, order) -> { + switch (order) { + case 1: + message.setState(MsgStateEnum.NEW); + break; + case 2: + message.setState(MsgStateEnum.IN_QUEUE); + break; + case 3: + message.setState(MsgStateEnum.PROCESSING); + break; + case 4: + message.setState(MsgStateEnum.WAITING); + break; + case 5: + message.setState(MsgStateEnum.PARTLY_FAILED); + break; + } + }); + + messagesProcessingService.processMessages(); + } + + @Test + public void test_technicalCountLimit() { + + // set technicalLimit + ReflectionTestUtils.setField(messagesProcessingService, "messagesTechnicalLimit", new FixedConfigurationItem<>(2L)); + + createAndSaveMessages(3, (message, order) -> { + switch (order) { + case 1: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + case 2: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + case 3: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + } + }); + + messagesProcessingService.processMessages(); + + // verify only two of the three eligible messages were processed, as technical limit is reached + verify(finalMessageProcessor, times(2)).processMessage(any(Message.class)); + } + + @Test + public void test_errorsAreSkipped() { + createAndSaveMessages(3, (message, order) -> { + switch (order) { + case 1: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + case 2: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + case 3: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + } + }); + + // processor will throw an exception + doThrow(new RuntimeException("General exception")).when(finalMessageProcessor).processMessage(any(Message.class)); + + messagesProcessingService.processMessages(); + + // verify all messages were processed regardless + verify(finalMessageProcessor, times(3)).processMessage(any(Message.class)); + } + + @Test + public void test_processorOrder() { + when(finalMessageProcessor.getOrder()).thenReturn(0); + when(additionalMessageProcessor.getOrder()).thenReturn(1); // should be second + + createAndSaveMessages(1, (message, order) -> { + switch (order) { + case 1: + message.setState(MsgStateEnum.OK); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(okSaveTimeInSec + 1)); + break; + } + }); + + // call tested service + messagesProcessingService.processMessages(); + + InOrder inOrder = inOrder(finalMessageProcessor, additionalMessageProcessor); + + inOrder.verify(finalMessageProcessor, times(1)).processMessage(any(Message.class)); + inOrder.verify(additionalMessageProcessor, times(1)).processMessage(any(Message.class)); + } + + @Test + public void test_keepIndefinitely() { + createAndSaveMessages(2, (message, order) -> { + switch (order) { + case 1: + message.setState(MsgStateEnum.CANCEL); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(1_000_000)); + break; + case 2: + message.setState(MsgStateEnum.CANCEL); + message.setLastUpdateTimestamp(Instant.now().minusSeconds(42)); + break; + } + }); + + messagesProcessingService.processMessages(); + + verify(finalMessageProcessor, times(0)).processMessage(any(Message.class)); + verify(additionalMessageProcessor, times(0)).processMessage(any(Message.class)); + } +} diff --git a/core/src/test/resources/config/application-test.properties b/core/src/test/resources/config/application-test.properties index 938615b2..4affc972 100644 --- a/core/src/test/resources/config/application-test.properties +++ b/core/src/test/resources/config/application-test.properties @@ -11,6 +11,8 @@ ohf.asynch.confirmation.repeatTimeSec = 1 # Interval (in seconds) between two tries of failed confirmations ohf.asynch.confirmation.intervalSec = 0 +# Final messages processing disabled by default +ohf.asynch.finalMessages.processingEnabled = false # ----------------------------------------------------------------------------- # test diff --git a/web/src/main/resources/application.properties b/web/src/main/resources/application.properties index b0292971..47055290 100644 --- a/web/src/main/resources/application.properties +++ b/web/src/main/resources/application.properties @@ -24,6 +24,11 @@ info.app.version= log.folder.path=@log.folder@ log.file.pattern=(^.*\\.log$|^.*\\.log\\.2\\d{3}-(0[1-9]|1[0-9])-[0|1|2|3]\\d?_\\d*\\.gz$) +# enable or disable final messages processing. +ohf.asynch.finalMessages.processingEnabled = false + +# enable/disable final messages processor implementation deleting messages. +ohf.asynch.finalMessages.deleteProcessor.enabled = true # =============================== # = LOGGING