Skip to content

Commit

Permalink
Merge pull request #94 from OpenWiseSolutions/feature/OHFJIRA-6-final…
Browse files Browse the repository at this point in the history
…-messages

[OHFJIRA-6] : messages in final state
  • Loading branch information
kkovarik committed Nov 21, 2018
2 parents caf8368 + 2aa287b commit a3144ee
Show file tree
Hide file tree
Showing 21 changed files with 1,224 additions and 15 deletions.
@@ -0,0 +1,58 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.api.asynch.finalmessage;

import org.openhubframework.openhub.api.entity.Message;
import org.springframework.core.Ordered;

/**
* Contract for final messages processor.
* Messages that are no-longer considered relevant for openhub async workflow are expected
* to be processed by this processor.
*
* There can be multiple processors, that are invoked in order defined in getOrder method.
*
* All processor are expected to be executed in transaction, if any should throw
* an exception, transaction will be rolled back.
*
* @author Karel Kovarik
* @since 2.1
*/
@FunctionalInterface
public interface FinalMessageProcessor extends Ordered {

/**
* Default order, if {@link FinalMessageProcessor#getOrder()} method is not implemented.
*/
int DEFAULT_ORDER = 0;

/**
* Process message in OpenHub datastore.
*
* @param message the message to be processed.
*/
void processMessage(Message message);

/**
* Get the order for processor.
*
* @return the order of processor, lower value is expected to be invoked first.
*/
default int getOrder() {
return DEFAULT_ORDER;
}
}
@@ -0,0 +1,33 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.api.asynch.finalmessage;

/**
* Service to find final messages that are relevant for processing and
* invoke all {@link FinalMessageProcessor} beans on them.
*
* @author Karel Kovarik
* @since 2.1
*/
@FunctionalInterface
public interface FinalMessagesProcessingService {

/**
* Do trigger messages processing.
*/
void processMessages();
}
@@ -0,0 +1,4 @@
/**
* Final messages in async workflow handling.
*/
package org.openhubframework.openhub.api.asynch.finalmessage;
Expand Up @@ -82,6 +82,44 @@ public class CoreProps {
*/
public static final String ASYNCH_POSTPONED_INTERVAL_WHEN_FAILED_SEC = PREFIX + "asynch.postponedIntervalWhenFailedSec";

/**
* Final messages processing enabled or disabled. Note: cannot be defined in database, needs to be in properties.
*/
public static final String ASYNCH_FINAL_MESSAGES_PROCESSING_ENABLED = PREFIX + "asynch.finalMessages.processingEnabled";

/**
* Final messages processing job interval, in seconds.
*/
public static final String ASYNCH_FINAL_MESSAGES_PROCESSING_INTERVAL_SEC = PREFIX + "asynch.finalMessages.processingIntervalSec";

/**
* Maximum number of messages processed per job.
*/
public static final String ASYNCH_FINAL_MESSAGES_ITERATION_MESSAGE_LIMIT = PREFIX + "asynch.finalMessages.iterationMessageLimit";

/**
* Configuration of final message processor that does delete messages from the datastore.
* Note: final message processing needs to be enabled in order to do that.
*/
public static final String ASYNCH_FINAL_MESSAGES_DELETE_PROCESSOR_ENABLED = PREFIX + "asynch.finalMessages.deleteProcessor.enabled";

/**
* Prefix for setting duration to keep messages in final states in the datastore.
* After this period expires and there is no other action with the message, it will be processed as final (deleted probably).
* Setting to '0' means, that messages will be processed as soon as possible (next scheduled job).
* Setting to '-1' means, that messages in given state will NOT be processed.
* Property is set for each state separately, in pattern:
* ASYNCH_FINAL_MESSAGES_PREFIX + <lower-case state name> + ASYNCH_FINAL_MESSAGES_SAVE_TIME_IN_SEC_SUFFIX
* Example:
* ohf.asynch.finalMessages.ok.saveTimeInSec
*/
public static final String ASYNCH_FINAL_MESSAGES_PREFIX = PREFIX + "asynch.finalMessages.";

/**
* Suffix to be used in conjuction with {@link CoreProps.ASYNCH_FINAL_MESSAGES_PREFIX}.
*/
public static final String ASYNCH_FINAL_MESSAGES_SAVE_TIME_IN_SEC_SUFFIX = ".saveTimeInSec";

/**
* Administrator email(s); if more emails, then separated them with semicolon, if empty then email won't be sent.
*/
Expand Down
Expand Up @@ -72,4 +72,21 @@ public static void setLogContextParams(Message message, @Nullable String request
LogContext.setContextValue(LogContextFilter.CTX_REQUEST_ID, new GUID().toString());
}
}

/**
* Remove context params, that are set in the {@link LogContextHelper#setLogContextParams(Message, String)}.
*/
public static void removeLogContextParams() {
// source system
LogContext.removeContextValue(LogContextFilter.CTX_SOURCE_SYSTEM);

// correlation ID
LogContext.removeContextValue(LogContextFilter.CTX_CORRELATION_ID);

// process ID
LogContext.removeContextValue(LogContextFilter.CTX_PROCESS_ID);

// request ID
LogContext.removeContextValue(LogContextFilter.CTX_REQUEST_ID);
}
}
@@ -0,0 +1,91 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.core.common.asynch.finalmessage;

import javax.annotation.PostConstruct;

import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.core.common.dao.MessageDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionTemplate;

/**
* Abstract class for custom implementations of {@link FinalMessageProcessor}.
*
* @author Karel Kovarik
* @since 2.1
*/
public abstract class AbstractFinalMessageProcessor implements FinalMessageProcessor {

private static final Logger LOG = LoggerFactory.getLogger(AbstractFinalMessageProcessor.class);

@Autowired
protected MessageDao messageDao;

// transactionTemplate
@Autowired
protected TransactionTemplate transactionTemplate;

@Autowired
protected PlatformTransactionManager transactionManager;

@PostConstruct
void initializeTransactionManager() {
this.transactionTemplate = new TransactionTemplate(transactionManager);
this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
}

@Override
public final void processMessage(final Message message) {
LOG.trace("Will process message with id [{}].", message.getId());

// execute in REQUIRED transaction
transactionTemplate.execute((TransactionStatus status) -> {
// fetch the message to have it in current hibernate session
final Message loaded = messageDao.findMessage(message.getId());

LOG.trace("Message fetched successfully");
// invoke the processing method.
doProcessMessage(loaded);
return null; // callback without result
});

LOG.trace("Message processing finished.");
}

/**
* Get instance of messageDao repository.
*
* @return the dao bean.
*/
protected MessageDao getMessageDao() {
return messageDao;
}

/**
* Do process message.
*
* @param message the message entity.
*/
protected abstract void doProcessMessage(Message message);
}
@@ -0,0 +1,112 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openhubframework.openhub.core.common.asynch.finalmessage;

import org.openhubframework.openhub.api.asynch.finalmessage.FinalMessageProcessor;
import org.openhubframework.openhub.api.entity.Message;
import org.openhubframework.openhub.api.entity.Request;
import org.openhubframework.openhub.core.common.dao.ExternalCallDao;
import org.openhubframework.openhub.core.common.dao.RequestResponseDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

/**
* FinalMessageProcessor implementation that does delete given message from datastore.
*
* It does delete message and all related entities as well.
*
* @author Karel Kovarik
* @since 2.1
* @see FinalMessageProcessor
*/
@Qualifier(DeleteFinalMessageProcessor.QUALIFIER)
public class DeleteFinalMessageProcessor extends AbstractFinalMessageProcessor {

private static final Logger LOG = LoggerFactory.getLogger(DeleteFinalMessageProcessor.class);

/**
* Qualifier used for the bean.
*/
public static final String QUALIFIER = "deleteFinalMessageProcessor";

/**
* The order used in FinalMessageProcessor hierarchy, see {@link FinalMessageProcessor#getOrder()}.
*/
public static final int ORDER = -1_000;

@Autowired
protected RequestResponseDao requestResponseDao;

@Autowired
protected ExternalCallDao externalCallDao;

@Override
protected void doProcessMessage(final Message message) {
// delete external calls
deleteExternalCalls(message);

// delete request responses
deleteRequestsResponses(message);

// delete message entity itself
deleteMessageEntity(message);

LOG.debug("Message [{}] deleted.", message.getId());
}

@Override
public int getOrder() {
return ORDER;
}

/**
* Delete all external calls for given Message.
*
* @param message the message to be deleted.
*/
protected void deleteExternalCalls(final Message message) {
message.getExternalCalls().stream()
.peek(externalCall -> LOG.debug("Will delete external call [{}].", externalCall.getId()))
.forEach(externalCall -> externalCallDao.delete(externalCall));
}

/**
* Delete all request & responses for given Message.
*
* @param message the message to be deleted.
*/
protected void deleteRequestsResponses(final Message message) {
for (Request request : message.getRequests()) {
LOG.debug("Will delete request [{}] and response [{}].",
request.getId(),
request.getResponse() != null ? request.getResponse().getId() : null);
requestResponseDao.deleteResponse(request.getResponse());
requestResponseDao.deleteRequest(request);
}
}

/**
* Delete message entity itself.
*
* @param message the message to be deleted.
*/
protected void deleteMessageEntity(final Message message) {
getMessageDao().delete(message);
}
}

0 comments on commit a3144ee

Please sign in to comment.