From 3a24ba832bc82e87006c29a0a16506e3071b7071 Mon Sep 17 00:00:00 2001
From: James Wickham
- * Partial implementation of behaviour common to retry services.
- *
- * Creates a new instance. Defaults to
- * Returns the
- * Sets the
* Service which processes asynchronous acknowledgements for messages stored
- * using {@link StoreMessageForRetryServiceTest}.
+ * using {@link JdbcStoreMessageForRetryService}.
*
* The following metadata keys are required.
* JdbcRetryStore
.
- * RetryStore
to use.
- * RetryStore
to use
- */
- public final RetryStore getRetryStore() {
- return retryStore;
- }
-
- /**
- * RetryStore
to use. May not be null.
- * RetryStore
to use
- */
- public final void setRetryStore(RetryStore r) {
- if (r == null) {
- throw new IllegalArgumentException("null param");
- }
- this.retryStore = r;
- }
-
-
- /**
- * @return the pruneAcknowledged
- */
- public boolean getPruneAcknowledged() {
- return pruneAcknowledged;
- }
-
- /**
- * Specify whether to delete messages from the underlying store if they have
- * already been acknowledged.
- *
- * @param b the pruneAcknowledged to set
- */
- public void setPruneAcknowledged(boolean b) {
- this.pruneAcknowledged = b;
- }
-
- private boolean isPruneAcknowledged() {
- return BooleanUtils.toBooleanDefaultIfNull(getPruneAcknowledged(), false);
- }
-
- @Override
- protected void prepareService() throws CoreException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void startService() throws CoreException {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/interlok-core/src/main/java/com/adaptris/core/jdbc/retry/AcknowledgeService.java b/interlok-core/src/main/java/com/adaptris/core/services/jdbc/retry/JdbcAcknowledgeService.java
similarity index 70%
rename from interlok-core/src/main/java/com/adaptris/core/jdbc/retry/AcknowledgeService.java
rename to interlok-core/src/main/java/com/adaptris/core/services/jdbc/retry/JdbcAcknowledgeService.java
index 88686ce00..277c8ddb4 100644
--- a/interlok-core/src/main/java/com/adaptris/core/jdbc/retry/AcknowledgeService.java
+++ b/interlok-core/src/main/java/com/adaptris/core/services/jdbc/retry/JdbcAcknowledgeService.java
@@ -1,36 +1,37 @@
-package com.adaptris.core.jdbc.retry;
+package com.adaptris.core.services.jdbc.retry;
import com.adaptris.annotation.AdapterComponent;
import com.adaptris.annotation.ComponentProfile;
import com.adaptris.annotation.DisplayOrder;
import com.adaptris.core.AdaptrisMessage;
import com.adaptris.core.ServiceException;
+import com.adaptris.core.jdbc.retry.Constants;
import com.adaptris.interlok.InterlokException;
import com.thoughtworks.xstream.annotations.XStreamAlias;
/**
*
- *
*
- * Service which obtains messages from the retry store that meet the appropriate
+ * Service which obtains messages from the Database that meet the appropriate
* criteria and retries them. This service is intended to be used in conjunction
* with PollingTrigger
.
*
- * JDBC-based implementation of RetryStore
.
+ * JDBC services for storing and retrying messages with acknowledgment support.
*
- * This uses JdbcTemplate
from Spring for database operations. If
- * there is no explicit configuration of the sqlProperties then the SQL
+ * If there is no explicit configuration of the sqlProperties then the SQL
* statements are found in the file retry-store-derby.properties
which
* is suitable for Apache Derby and MySQL. If explicitly configured, the
* property file is expected to be present on the classpath.
*
- * The default property file is listed below. + * The default property file statements are listed below. * *
create.sql = CREATE TABLE retry_store \
@@ -81,7 +79,7 @@ CONSTRAINT idx_acknowledge_id UNIQUE (acknowledge_id))
updated_on=? WHERE message_id=?
retry.sql = SELECT * FROM retry_store WHERE \
- (acknowledged='F' AND (retries_to_date < total_retries OR total_retries = -1))
+ (acknowledged='F' AND (retries_to_date < total_retries OR total_retries = -1))
delete.acknowleged.sql = DELETE FROM retry_store WHERE acknowledged='T'
@@ -89,7 +87,7 @@ CONSTRAINT idx_acknowledge_id UNIQUE (acknowledge_id))
select.expired.sql = SELECT * FROM retry_store \
WHERE (acknowledged='F' AND \
- (retries_to_date >= total_retries AND total_retries != -1))
+ (retries_to_date >= total_retries AND total_retries != -1))
*
*
@@ -97,14 +95,15 @@ CONSTRAINT idx_acknowledge_id UNIQUE (acknowledge_id))
* The create.sql script is always executed upon initialisation,
* any errors are discarded
*
+ * + * Partial implementation of behaviour common to retry services. + *
*/ -@XStreamAlias("retry-store-jdbc") -@ComponentProfile(summary = "Store message for retry in a database using jdbc", since = "4.9.0") -@DisplayOrder(order = { "sqlPropertiesFile" }) -@Slf4j -public class JdbcRetryStore implements RetryStore { +@NoArgsConstructor +public abstract class JdbcRetryServiceImp extends JdbcService { + private static final String RETRY_STORE_PROPERTIES = "retry-store-derby.properties"; private static final String CREATE_SQL = "create.sql"; private static final String INSERT_SQL = "insert.sql"; @@ -114,41 +113,41 @@ public class JdbcRetryStore implements RetryStore { private static final String UPDATE_RETRY_SQL = "update-retry.sql"; private static final String DELETE_ACKNOWLEGED_SQL = "delete.acknowleged.sql"; private static final String ACKNOWLEDGE_SQL = "acknowledge.sql"; - + private transient Properties sqlStatements; private transient MimeEncoder encoder; - private AdaptrisConnection connection; - private Connection sqlConnection; + + protected static AdaptrisMarshaller marshaller; + + static { // only create marshaller once + marshaller = DefaultMarshaller.getDefaultMarshaller(); + } + /** * Set the sql properties file to use, by default it will look for the value * "retry-store-derby.properties" */ + @Getter @Setter @NotBlank @InputFieldDefault(value = RETRY_STORE_PROPERTIES) private String sqlPropertiesFile; + + @InputFieldDefault(value = "true") + private boolean pruneAcknowledged; - /** - *
- * Creates a new instance. Default properties file is
- * retry-store-derby.properties
.
- *
* This class supports both synchronous and asynchronous acknowledgement. This is controlled by the field - * {@linkplain StoreMessageForRetryServiceTest#setAsynchronousAcknowledgment(boolean)}. + * {@linkplain JdbcStoreMessageForRetryService#setAsynchronousAcknowledgment(boolean)}. *
** A message is deemed to be synchronously acknowledged if the wrapped service completes normally. In such cases it is not added to @@ -48,12 +49,12 @@ *
*/ -@XStreamAlias("store-message-for-retry-service") +@XStreamAlias("jdbc-store-message-for-retry-service") @AdapterComponent -@ComponentProfile(summary = "Wraps an interlok service and gives the option to store the message in a retry store in the event of an exception.", - since = "4.9.0", tag = "retry") -@DisplayOrder(order = {"asynchronousAcknowledgment", "asyncAutoRetryOnFail", "retryStore"}) -public class StoreMessageForRetryService extends RetryServiceImp { +@ComponentProfile(summary = "Wraps an interlok service and gives the option to store the message into a Database table in the event of an exception.", + since = "5.0.0", tag = "retry") +@DisplayOrder(order = {"asynchronousAcknowledgment", "asyncAutoRetryOnFail", "jdbc, retry"}) +public class JdbcStoreMessageForRetryService extends JdbcRetryServiceImp { // persistent @NotNull @@ -74,7 +75,7 @@ public class StoreMessageForRetryService extends RetryServiceImp { *- * Acknowledge that the message with the passed ID has now been successfully - * processed and should not be retried again. NB this method does not throw an - * Exception if the acknowledge ID does not exist in the store. - *
- * - * @param acknowledgeId the acknowledge ID of the message to acknowledge - * @throws InterlokException wrapping anyException
which occurs
- */
- void acknowledge(String acknowledgeId) throws InterlokException;
-
- /**
- * Delete any messages that have been successfully Acknowledged.
- *
- * @throws InterlokException wrapping any Exception
which occurs
- */
- void deleteAcknowledged() throws InterlokException;
-
- /**
- *
- * Obtain a list of AdaptrisMessage
s which meet the expiration
- * criteria.
- * In the most abstract sense, expired messages are those that have exceeded
- * their max retry count but not yet been acknowledged.
- *
AdaptrisMessage
s which meet the expiration
- * criteria.
- * @throws InterlokException wrapping any Exception
which occurs
- */
- List
- * Obtain a list of AdaptrisMessage
s which meet the criteria
- * for retrying.
- *
AdaptrisMessage
s which meet the criteria
- * for retrying
- * @throws InterlokException wrapping any Exception
which occurs
- */
- List- * Update the number of retries which have taken place for the message with - * the passed ID. NB this method does not throw an Exception if an attempt is - * made to update the retry count for a message ID which does not exist in the - * store. - *
- * - * @param messageId the ID of the message to update - * @throws InterlokException wrapping anyException
which occurs
- */
- void updateRetryCount(String messageId) throws InterlokException;
-
- /**
- * - * Used for any implementations that have a connected RetryStore - *
- */ - void makeConnection(AdaptrisConnection connection); - } \ No newline at end of file diff --git a/interlok-core/src/test/java/com/adaptris/core/http/jetty/retry/InMemoryRetryStore.java b/interlok-core/src/test/java/com/adaptris/core/http/jetty/retry/InMemoryRetryStore.java index ebe31ee10..30e31cc00 100644 --- a/interlok-core/src/test/java/com/adaptris/core/http/jetty/retry/InMemoryRetryStore.java +++ b/interlok-core/src/test/java/com/adaptris/core/http/jetty/retry/InMemoryRetryStore.java @@ -2,11 +2,9 @@ import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import com.adaptris.core.AdaptrisConnection; import com.adaptris.core.AdaptrisMessage; import com.adaptris.core.AdaptrisMessageFactory; import com.adaptris.interlok.InterlokException; @@ -58,34 +56,4 @@ public Iterable* Note that any read timeout will be overridden by the timeout value passed in via the - * {{@link #request(AdaptrisMessage, long)} method; if it is not the same as + * {{@link com.adaptris.core.RequestReplyProducerImp#request(AdaptrisMessage, long)} method; if it is not the same as * {@value com.adaptris.core.http.HttpConstants#DEFAULT_SOCKET_TIMEOUT} *
*/ From 049d398a352bc78a629d029e6d3b0cfbdb3b2982 Mon Sep 17 00:00:00 2001 From: James Wickham
- * Represents an entry in retry_store
, the sole table in the
- * 'retry store database'.
+ * Represents an entry in retry table
, the sole table in the
+ * 'retry database'.
*
- * JDBC services for storing and retrying messages with acknowledgment support. *
** If there is no explicit configuration of the sqlProperties then the SQL @@ -100,8 +95,6 @@ CONSTRAINT idx_acknowledge_id UNIQUE (acknowledge_id)) *
*/ - -@NoArgsConstructor public abstract class JdbcRetryServiceImp extends JdbcService { private static final String RETRY_STORE_PROPERTIES = "retry-store-derby.properties"; @@ -113,7 +106,7 @@ public abstract class JdbcRetryServiceImp extends JdbcService { private static final String UPDATE_RETRY_SQL = "update-retry.sql"; private static final String DELETE_ACKNOWLEGED_SQL = "delete.acknowleged.sql"; private static final String ACKNOWLEDGE_SQL = "acknowledge.sql"; - + private transient Properties sqlStatements; private transient MimeEncoder encoder; @@ -137,7 +130,12 @@ public abstract class JdbcRetryServiceImp extends JdbcService { @InputFieldDefault(value = "true") private boolean pruneAcknowledged; - + + public JdbcRetryServiceImp() { + setPruneAcknowledged(true); + setSqlPropertiesFile(RETRY_STORE_PROPERTIES); + } + /** @see com.adaptris.core.AdaptrisComponent#init() */ @Override protected void initJdbcService() throws CoreException { @@ -146,8 +144,6 @@ protected void initJdbcService() throws CoreException { if (getConnection() == null) { throw new CoreException("DatabaseConnection is null in service"); } - getConnection().init(); - if (sqlStatements == null) { sqlStatements = new Properties(); @@ -170,40 +166,17 @@ protected void initJdbcService() throws CoreException { } } } - - /** @see com.adaptris.core.ServiceImp#start() */ - @Override - public void start() throws CoreException { - getConnection().start(); - } - + + /** @see com.adaptris.core.AdaptrisComponent#prepare() */ @Override protected void prepareService() throws CoreException { - // TODO Auto-generated method stub - - } - - @Override - protected void startService() throws CoreException { - // TODO Auto-generated method stub - } /** @see com.adaptris.core.AdaptrisComponent#close() */ @Override - protected void closeJdbcService() { - getConnection().close(); - } + protected void closeJdbcService() {} - public final void doService(AdaptrisMessage msg) throws ServiceException { - pruneAcknowledged(); - performService(msg); - } - - protected abstract void performService(AdaptrisMessage msg) - throws ServiceException; - - private void pruneAcknowledged() { + void pruneAcknowledged() { try { if (isPruneAcknowledged()) { log.debug("Pruning Previously Acknowledged Messages"); @@ -237,7 +210,7 @@ private boolean isPruneAcknowledged() { return BooleanUtils.toBooleanDefaultIfNull(getPruneAcknowledged(), true); } - public void write(AdaptrisMessage msg) throws InterlokException { + void write(AdaptrisMessage msg) throws InterlokException { PreparedStatement ps = null; validateMessage(msg); Object[] params = new Object[] { msg.getUniqueId(), msg.getMetadataValue(Constants.ACKNOWLEDGE_ID_KEY), @@ -255,7 +228,7 @@ public void write(AdaptrisMessage msg) throws InterlokException { } } - public boolean delete(String msgId) throws InterlokException { + boolean delete(String msgId) throws InterlokException { PreparedStatement ps = null; Object[] params = new Object[] { msgId }; try { @@ -270,7 +243,7 @@ public boolean delete(String msgId) throws InterlokException { } } - public void acknowledge(String acknowledgeId) throws InterlokException { + void acknowledge(String acknowledgeId) throws InterlokException { PreparedStatement ps = null; Object[] params = { Constants.ACKNOWLEDGED, new Date(), acknowledgeId }; try { @@ -284,7 +257,7 @@ public void acknowledge(String acknowledgeId) throws InterlokException { } } - public void deleteAcknowledged() throws InterlokException { + void deleteAcknowledged() throws InterlokException { PreparedStatement ps = null; try { ps = prepareStatementWithoutParameters(sqlStatements.getProperty(DELETE_ACKNOWLEGED_SQL)); @@ -297,7 +270,7 @@ public void deleteAcknowledged() throws InterlokException { } } - public List- * Service which stores unacknowledged messages for future retry. + * Service which stores unacknowledged messages for future retry, with acknowledgment support. *
*
* This class supports both synchronous and asynchronous acknowledgement. This is controlled by the field
@@ -80,28 +81,27 @@ public JdbcStoreMessageForRetryService() {
setService(new NullService());
setAsyncAutoRetryOnFail(true);
}
-
- /** @see com.adaptris.core.ServiceImp#start() */
+
+ /** @see com.adaptris.core.AdaptrisComponent#start() */
@Override
- public void start() throws CoreException {
- super.start();
- getService().start();
+ protected void startService() throws CoreException {
+ LifecycleHelper.start(getService());
}
-
- /** @see com.adaptris.core.ServiceImp#stop() */
+
+ /** @see com.adaptris.core.AdaptrisComponent#stop() */
@Override
- public void stop() {
- getService().stop();
- super.stop();
+ protected void stopService() {
+ LifecycleHelper.stop(getService());
}
-
/**
- *
- * @see JdbcRetryServiceImp#performService(com.adaptris.core.AdaptrisMessage)
+ * The main service method, which stores a message and details of the wrapped service in the retry database table.
+ *
+ * @see com.adaptris.core.Service#doService(com.adaptris.core.AdaptrisMessage)
*/
@Override
- protected void performService(AdaptrisMessage msg) throws ServiceException {
+ public void doService(AdaptrisMessage msg) throws ServiceException {
+ pruneAcknowledged();
if (isAsynchronousAcknowledgment()) {
handleAsynchronous(msg);
}
@@ -246,10 +246,4 @@ private boolean isAsyncAutoRetryOnFail() {
return BooleanUtils.toBooleanDefaultIfNull(getAsyncAutoRetryOnFail(), true);
}
- @Override
- protected void stopService() {
- // TODO Auto-generated method stub
-
- }
-
}
\ No newline at end of file
From 292344e324c7b07e1e89f4af6fa69b3afa4bef96 Mon Sep 17 00:00:00 2001
From: James Wickham