Skip to content

Commit

Permalink
Merge a3f115c into f257ba4
Browse files Browse the repository at this point in the history
  • Loading branch information
kkovarik committed Nov 14, 2018
2 parents f257ba4 + a3f115c commit c233505
Show file tree
Hide file tree
Showing 20 changed files with 1,192 additions and 15 deletions.
@@ -0,0 +1,57 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.api.asynch.finalmessage;

import org.openhubframework.openhub.api.entity.Message;

/**
* Contract for final messages processor.
* Messages that are no-longer considered relevant for openhub async workflow are expected
* to be processed by this processor.
*
* There can be multiple processors, that are invoked in order defined in getOrder method.
*
* All processor are expected to be executed in transaction, if any should throw
* an exception, transaction will be rolled back.
*
* @author Karel Kovarik
* @since 2.1
*/
@FunctionalInterface
public interface FinalMessageProcessor {

/**
* Default order, if {@link FinalMessageProcessor#getOrder()} method is not implemented.
*/
int DEFAULT_ORDER = 0;

/**
* Process message in OpenHub datastore.
*
* @param message the message to be processed.
*/
void processMessage(Message message);

/**
* Get the order for processor.
*
* @return the order of processor, lower value is expected to be invoked first.
*/
default int getOrder() {
return DEFAULT_ORDER;
}
}
@@ -0,0 +1,33 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.api.asynch.finalmessage;

/**
* Service to find final messages that are relevant for processing and
* invoke all {@link FinalMessageProcessor} beans on them.
*
* @author Karel Kovarik
* @since 2.1
*/
@FunctionalInterface
public interface FinalMessagesProcessingService {

/**
* Do trigger messages processing.
*/
void processMessages();
}
@@ -0,0 +1,4 @@
/**
* Final messages in async workflow handling.
*/
package org.openhubframework.openhub.api.asynch.finalmessage;
Expand Up @@ -82,6 +82,43 @@ public class CoreProps {
*/
public static final String ASYNCH_POSTPONED_INTERVAL_WHEN_FAILED_SEC = PREFIX + "asynch.postponedIntervalWhenFailedSec";

/**
* Final messages processing enabled or disabled. Note: cannot be defined in database, needs to be in properties.
*/
public static final String ASYNCH_FINAL_MESSAGES_PROCESSING_ENABLED = PREFIX + "asynch.finalMessages.processingEnabled";

/**
* Final messages processing job interval, in seconds.
*/
public static final String ASYNCH_FINAL_MESSAGES_PROCESSING_INTERVAL_SEC = PREFIX + "asynch.finalMessages.processingIntervalSec";

/**
* Maximum number of messages processed per job.
*/
public static final String ASYNCH_FINAL_MESSAGES_ITERATION_MESSAGE_LIMIT = PREFIX + "asynch.finalMessages.iterationMessageLimit";

/**
* Configuration of final message processor that does delete messages from the datastore.
* Note: final message processing needs to be enabled in order to do that.
*/
public static final String ASYNCH_FINAL_MESSAGES_DELETE_PROCESSOR_ENABLED = PREFIX + "asynch.finalMessages.deleteProcessor.enabled";

/**
* Prefix for setting duration to keep messages in final states in the datastore.
* After this period expires and there is no other action with the message, it will be processed as final (deleted probably).
* Setting to '0' means, that messages will be processed as soon as possible (next scheduled job).
* Property is set for each state separately, in pattern:
* ASYNCH_FINAL_MESSAGES_PREFIX + <lower-case state name> + ASYNCH_FINAL_MESSAGES_SAVE_TIME_IN_SEC_SUFFIX
* Example:
* ohf.asynch.finalMessages.ok.saveTimeInSec
*/
public static final String ASYNCH_FINAL_MESSAGES_PREFIX = PREFIX + "asynch.finalMessages.";

/**
* Suffix to be used in conjuction with {@link CoreProps.ASYNCH_FINAL_MESSAGES_PREFIX}.
*/
public static final String ASYNCH_FINAL_MESSAGES_SAVE_TIME_IN_SEC_SUFFIX = ".saveTimeInSec";

/**
* Administrator email(s); if more emails, then separated them with semicolon, if empty then email won't be sent.
*/
Expand Down
@@ -0,0 +1,62 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.core.common.asynch.finalmessage;

import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.core.common.dao.MessageDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

/**
* Abstract class for custom implementations of {@link FinalMessageProcessor}.
*
* @author Karel Kovarik
* @since 2.1
*/
public abstract class AbstractFinalMessageProcessor implements FinalMessageProcessor {

@Autowired
protected MessageDao messageDao;

@Transactional(propagation = Propagation.REQUIRED)
@Override
public void processMessage(final Message message) {
// fetch the message to have it in current hibernate session
final Message loaded = messageDao.findMessage(message.getId());

// invoke the processing method.
doProcessMessage(loaded);
}

/**
* Get instance of messageDao repository.
*
* @return the dao bean.
*/
protected MessageDao getMessageDao() {
return messageDao;
}

/**
* Do process message.
*
* @param message the message entity.
*/
protected abstract void doProcessMessage(Message message);
}
@@ -0,0 +1,102 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.core.common.asynch.finalmessage;

import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.entity.Request;
import org.openhubframework.openhub.core.common.dao.ExternalCallDao;
import org.openhubframework.openhub.core.common.dao.RequestResponseDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

/**
* FinalMessageProcessor implementation that does delete given message from datastore.
*
* It does delete message and all related entities as well.
*
* @author Karel Kovarik
* @since 2.1
* @see FinalMessageProcessor
*/
@Qualifier(DeleteFinalMessageProcessor.QUALIFIER)
public class DeleteFinalMessageProcessor extends AbstractFinalMessageProcessor {

/**
* Qualifier used for the bean.
*/
public static final String QUALIFIER = "deleteFinalMessageProcessor";

/**
* The order used in FinalMessageProcessor hierarchy, see {@link FinalMessageProcessor#getOrder()}.
*/
public static final int ORDER = -1_000;

@Autowired
protected RequestResponseDao requestResponseDao;

@Autowired
protected ExternalCallDao externalCallDao;

@Override
protected void doProcessMessage(final Message message) {
// delete external calls
deleteExternalCalls(message);

// delete request responses
deleteRequestsResponses(message);

// delete message entity itself
deleteMessageEntity(message);
}

@Override
public int getOrder() {
return ORDER;
}

/**
* Delete all external calls for given Message.
*
* @param message the message to be deleted.
*/
protected void deleteExternalCalls(final Message message) {
message.getExternalCalls()
.forEach(externalCall -> externalCallDao.delete(externalCall));
}

/**
* Delete all request & responses for given Message.
*
* @param message the message to be deleted.
*/
protected void deleteRequestsResponses(final Message message) {
for (Request request : message.getRequests()) {
requestResponseDao.deleteResponse(request.getResponse());
requestResponseDao.deleteRequest(request);
}
}

/**
* Delete message entity itself.
*
* @param message the message to be deleted.
*/
protected void deleteMessageEntity(final Message message) {
getMessageDao().delete(message);
}
}
@@ -0,0 +1,58 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.core.common.asynch.finalmessage;

import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor;
import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessagesProcessingService;
import org.openhubframework.openhub.api.configuration.CoreProps;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

/**
* Configuration for final messages processing.
* Does declare all the necessary beans.
*
* For more info about final messages processing, see javadoc of {@link FinalMessagesProcessingService} &
* {@link FinalMessageProcessor}.
*
* @author Karel Kovarik
* @since 2.1
*/
@Configuration
@ConditionalOnProperty(value = CoreProps.ASYNCH_FINAL_MESSAGES_PROCESSING_ENABLED)
public class FinalMessageProcessingConfiguration {

@Bean
public FinalMessagesProcessingJob finalMessagesProcessingJob() {
return new FinalMessagesProcessingJob();
}

@Bean
public FinalMessagesProcessingService finalMessagesProcessingService(PlatformTransactionManager transactionManager) {
return new FinalMessagesProcessingServiceImpl(transactionManager);
}

@Bean
@ConditionalOnProperty(value = CoreProps.ASYNCH_FINAL_MESSAGES_DELETE_PROCESSOR_ENABLED)
public FinalMessageProcessor deleteFinalMessageProcessor() {
return new DeleteFinalMessageProcessor();
}
}

0 comments on commit c233505

Please sign in to comment.