Permalink
Browse files

CAMEL-5643: Allow to store message body and headers as text in JDBC a…

…gg repo. Thanks to Alan Foster for the patch.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1434700 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent dfa5d63 commit 43c3cfbde1f6ade356bbd0051215f0ffa4b054d5 @davsclaus davsclaus committed Jan 17, 2013
@@ -24,7 +24,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import javax.sql.DataSource;
import org.apache.camel.CamelContext;
@@ -34,7 +33,6 @@
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
@@ -56,6 +54,7 @@
private static final transient Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
private static final String ID = "id";
private static final String EXCHANGE = "exchange";
+ private static final String BODY = "body";
private PlatformTransactionManager transactionManager;
private DataSource dataSource;
private TransactionTemplate transactionTemplate;
@@ -69,6 +68,8 @@
private boolean useRecovery = true;
private int maximumRedeliveries;
private String deadLetterUri;
+ private List<String> headersToStoreAsText;
+ private boolean storeBodyAsText;
/**
* Creates an aggregation repository
@@ -94,7 +95,7 @@ public final void setRepositoryName(String repositoryName) {
public final void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
-
+
transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
@@ -109,50 +110,140 @@ public final void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
+ @Override
public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
return transactionTemplate.execute(new TransactionCallback<Exchange>() {
public Exchange doInTransaction(TransactionStatus status) {
- String sql;
Exchange result = null;
final String key = correlationId;
try {
- final byte[] data = codec.marshallExchange(camelContext, exchange);
-
LOG.debug("Adding exchange with key: [{}]", key);
- String insert = "INSERT INTO " + getRepositoryName() + " (" + EXCHANGE + ", " + ID + ") VALUES (?, ?)";
- String update = "UPDATE " + getRepositoryName() + " SET " + EXCHANGE + " = ? WHERE " + ID + " = ?";
-
boolean present = jdbcTemplate.queryForInt(
"SELECT COUNT(*) FROM " + getRepositoryName() + " WHERE " + ID + " = ?", key) != 0;
- sql = present ? update : insert;
// Recover existing exchange with that ID
if (isReturnOldExchange() && present) {
result = get(key, getRepositoryName(), camelContext);
}
- jdbcTemplate.execute(sql,
- new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
- @Override
- protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
- lobCreator.setBlobAsBytes(ps, 1, data);
- ps.setString(2, key);
- }
- });
+ if (present) {
+ update(camelContext, correlationId, exchange, getRepositoryName());
+ } else {
+ insert(camelContext, correlationId, exchange, getRepositoryName());
+ }
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException("Error adding to repository " + repositoryName + " with key " + key, e);
}
return result;
}
});
+ }
+ /**
+ * Updates the current exchange details in the given repository table
+ *
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
+ * @param exchange the aggregated exchange
+ * @param repositoryName The name of the table
+ * @throws Exception
+ */
+ protected void update(final CamelContext camelContext, final String key, final Exchange exchange, String repositoryName) throws Exception {
+ StringBuilder queryBuilder = new StringBuilder()
+ .append("UPDATE ").append(repositoryName)
+ .append(" SET ")
+ .append(EXCHANGE).append(" = ?");
+ if (storeBodyAsText) {
+ queryBuilder.append(", ").append(BODY).append(" = ?");
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ queryBuilder.append(", ").append(headerName).append(" = ?");
+ }
+ }
+
+ queryBuilder.append(" WHERE ").append(ID).append(" = ?");
+
+ String sql = queryBuilder.toString();
+ insertAndUpdateHelper(camelContext, key, exchange, sql, false);
+ }
+
+ /**
+ * Inserts a new record into the given repository table
+ *
+ * @param camelContext the current CamelContext
+ * @param correlationId the correlation key
+ * @param exchange the aggregated exchange
+ * @param repositoryName The name of the table
+ * @throws Exception
+ */
+ protected void insert(final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName) throws Exception {
+ // The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased
+ int totalParameterIndex = 2;
+ StringBuilder queryBuilder = new StringBuilder()
+ .append("INSERT INTO ").append(repositoryName)
+ .append('(')
+ .append(EXCHANGE).append(", ")
+ .append(ID);
+
+ if (storeBodyAsText) {
+ queryBuilder.append(", ").append(BODY);
+ totalParameterIndex++;
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ queryBuilder.append(", ").append(headerName);
+ totalParameterIndex++;
+ }
+ }
+
+ queryBuilder.append(") VALUES (");
+
+ for (int i = 0; i < totalParameterIndex - 1; i++) {
+ queryBuilder.append("?, ");
+ }
+ queryBuilder.append("?)");
+
+ String sql = queryBuilder.toString();
+
+ insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true);
+ }
+
+ protected void insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception {
+ final byte[] data = codec.marshallExchange(camelContext, exchange);
+ jdbcTemplate.execute(sql,
+ new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+ @Override
+ protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
+ int totalParameterIndex = 0;
+ lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data);
+ if (idComesFirst) {
+ ps.setString(++totalParameterIndex, key);
+ }
+ if (storeBodyAsText) {
+ ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class));
+ }
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ String headerValue = exchange.getIn().getHeader(headerName, String.class);
+ ps.setString(++totalParameterIndex, headerValue);
+ }
+ }
+ if (!idComesFirst) {
+ ps.setString(++totalParameterIndex, key);
+ }
+ }
+ });
}
+ @Override
public Exchange get(final CamelContext camelContext, final String correlationId) {
final String key = correlationId;
Exchange result = get(key, getRepositoryName(), camelContext);
@@ -183,34 +274,27 @@ public Exchange doInTransaction(TransactionStatus status) {
});
}
+ @Override
public void remove(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
final String key = correlationId;
final String confirmKey = exchange.getExchangeId();
try {
- final byte[] data = codec.marshallExchange(camelContext, exchange);
-
LOG.debug("Removing key [{}]", key);
- jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ?",
- new Object[]{key});
-
- jdbcTemplate.execute("INSERT INTO " + getRepositoryNameCompleted() + " (" + EXCHANGE + ", " + ID + ") VALUES (?, ?)",
- new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
- @Override
- protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
- lobCreator.setBlobAsBytes(ps, 1, data);
- ps.setString(2, confirmKey);
- }
- });
- } catch (IOException e) {
+ jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ?", key);
+
+ insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted());
+
+ } catch (Exception e) {
throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e);
}
}
});
}
+ @Override
public void confirm(final CamelContext camelContext, final String exchangeId) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
@@ -224,26 +308,26 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
});
}
+ @Override
public Set<String> getKeys() {
- return transactionTemplateReadOnly.execute(new TransactionCallback<LinkedHashSet<String>>() {
- public LinkedHashSet<String> doInTransaction(TransactionStatus status) {
- List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + getRepositoryName(),
- new RowMapper<String>() {
- public String mapRow(ResultSet rs, int rowNum) throws SQLException {
- String id = rs.getString(ID);
- LOG.trace("getKey [{}]", id);
- return id;
- }
- });
- return new LinkedHashSet<String>(keys);
- }
- });
+ return getKeys(getRepositoryName());
}
+ @Override
public Set<String> scan(CamelContext camelContext) {
+ return getKeys(getRepositoryNameCompleted());
+ }
+
+ /**
+ * Returns the keys in the given repository
+ *
+ * @param repositoryName The name of the table
+ * @return Set of keys in the given repository name
+ */
+ protected Set<String> getKeys(final String repositoryName) {
return transactionTemplateReadOnly.execute(new TransactionCallback<LinkedHashSet<String>>() {
public LinkedHashSet<String> doInTransaction(TransactionStatus status) {
- List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + getRepositoryNameCompleted(),
+ List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName,
new RowMapper<String>() {
public String mapRow(ResultSet rs, int rowNum) throws SQLException {
String id = rs.getString(ID);
@@ -256,6 +340,7 @@ public String mapRow(ResultSet rs, int rowNum) throws SQLException {
});
}
+ @Override
public Exchange recover(CamelContext camelContext, String exchangeId) {
final String key = exchangeId;
Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
@@ -309,6 +394,22 @@ public void setReturnOldExchange(boolean returnOldExchange) {
this.returnOldExchange = returnOldExchange;
}
+ public void setJdbcCamelCodec(JdbcCamelCodec codec) {
+ this.codec = codec;
+ }
+
+ public boolean hasHeadersToStoreAsText() {
+ return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty();
+ }
+
+ public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
+ this.headersToStoreAsText = headersToStoreAsText;
+ }
+
+ public void setStoreBodyAsText(boolean storeBodyAsText) {
+ this.storeBodyAsText = storeBodyAsText;
+ }
+
/**
* @return the lobHandler
*/
@@ -31,7 +31,7 @@
/**
* Adapted from HawtDBCamelCodec
*/
-public final class JdbcCamelCodec {
+public class JdbcCamelCodec {
public byte[] marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
// use DefaultExchangeHolder to marshal to a serialized object
Oops, something went wrong.

0 comments on commit 43c3cfb

Please sign in to comment.