Skip to content

Commit

Permalink
CAMEL-5976: camel-sql consumer can now do onConsume to delete row aft…
Browse files Browse the repository at this point in the history
…er processing etc.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1434662 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
davsclaus committed Jan 17, 2013
1 parent 9c95e3d commit f4ac386
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 87 deletions.
Expand Up @@ -19,7 +19,6 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.Map;

import org.apache.camel.Exchange;
import org.slf4j.Logger;
Expand All @@ -35,75 +34,30 @@ public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);

@Override
public void commit(SqlEndpoint endpoint, final Exchange exchange, Object data, JdbcTemplate jdbcTemplate, final String query) throws Exception {
jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>() {
public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
public int commit(final SqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception {

final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());

return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
int expected = ps.getParameterMetaData().getParameterCount();

Iterator<?> iterator = createIterator(exchange, query, expected);
Iterator<?> iterator = endpoint.getPrepareStatementStrategy().createPopulateIterator(query, preparedQuery, expected, exchange, data);
if (iterator != null) {
populateStatement(ps, iterator, expected);
endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator, expected);
LOG.trace("Execute query {}", query);
ps.execute();
}

return null;
};
});
}

private Iterator<?> createIterator(Exchange exchange, final String query, final int expectedParams) {
Object body = exchange.getIn().getBody();
if (body == null) {
return null;
}

// TODO: support named parameters
/*
if (body instanceof Map) {
final Map map = (Map) body;
return new Iterator() {

private int current;
@Override
public boolean hasNext() {
return current < expectedParams;
}
@Override
public Object next() {
current++;
// TODO: Fix me
return map.get("ID");
int updateCount = ps.getUpdateCount();
if (LOG.isTraceEnabled()) {
LOG.trace("Update count {}", updateCount);
}
return updateCount;
}

@Override
public void remove() {
// noop
}
return 0;
};
}*/

// else force as iterator based
Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
return iterator;
}

private void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException {
int argNumber = 1;
if (expectedParams > 0) {
while (iterator != null && iterator.hasNext()) {
Object value = iterator.next();
LOG.trace("Setting parameter #{} with value: {}", argNumber, value);
ps.setObject(argNumber, value);
argNumber++;
}
}

if (argNumber - 1 != expectedParams) {
throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1));
}
});
}

}
Expand Down
Expand Up @@ -51,7 +51,18 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
IntrospectionSupport.setProperties(jdbcTemplate, parameters, "template.");

String query = remaining.replaceAll(parameterPlaceholderSubstitute, "?");
return new SqlEndpoint(uri, this, jdbcTemplate, query);

String onConsume = getAndRemoveParameter(parameters, "consumer.onConsume", String.class);
if (onConsume == null) {
onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
}
if (onConsume != null) {
onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
}

SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
endpoint.setOnConsume(onConsume);
return endpoint;
}

public void setDataSource(DataSource dataSource) {
Expand Down
Expand Up @@ -45,20 +45,11 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
private final String query;
private final JdbcTemplate jdbcTemplate;

/**
* Statement to run after data has been processed in the route
*/
private String onConsume;

/**
* Process resultset individually or as a list
*/
private boolean useIterator = true;

/**
* Whether allow empty resultset to be routed to the next hop
*/
private boolean routeEmptyResultSet;
private int expectedUpdateCount = -1;
private boolean breakBatchOnConsumeFail;

private static final class DataHolder {
private Exchange exchange;
Expand Down Expand Up @@ -92,9 +83,10 @@ protected int poll() throws Exception {
public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException, DataAccessException {
Queue<DataHolder> answer = new LinkedList<DataHolder>();

log.debug("Executing query: {}", preparedQuery);
ResultSet rs = preparedStatement.executeQuery();
try {
log.trace("Got result list from query {}", rs);
log.trace("Got result list from query: {}", rs);

RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
List<Map<String, Object>> data = mapper.extractData(rs);
Expand Down Expand Up @@ -166,19 +158,24 @@ public int processBatch(Queue<Object> exchanges) throws Exception {
pendingExchanges = total - index - 1;

// process the current exchange
log.debug("Processing exchange: {} with properties: {}", exchange, exchange.getProperties());
getProcessor().process(exchange);

// TODO: support when with CAMEL-5977
/*
try {
if (onConsume != null) {
SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
endpoint.getProcessingStrategy().commit(endpoint, exchange, data, jdbcTemplate, onConsume);
// we can only run on consume if there was data
if (onConsume != null && data != null) {
int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, onConsume);
if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + onConsume;
throw new SQLException(msg);
}
}
} catch (Exception e) {
handleException(e);
}*/
if (breakBatchOnConsumeFail) {
throw e;
} else {
handleException("Error executing onConsume query " + onConsume, e);
}
}
}

return total;
Expand Down Expand Up @@ -231,5 +228,28 @@ public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
this.routeEmptyResultSet = routeEmptyResultSet;
}

public int getExpectedUpdateCount() {
return expectedUpdateCount;
}

/**
* Sets an expected update count to validate when using onConsume.
*
* @param expectedUpdateCount typically set this value to <tt>1</tt> to expect 1 row updated.
*/
public void setExpectedUpdateCount(int expectedUpdateCount) {
this.expectedUpdateCount = expectedUpdateCount;
}

public boolean isBreakBatchOnConsumeFail() {
return breakBatchOnConsumeFail;
}

/**
* Sets whether to break batch if onConsume failed.
*/
public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
}
}

Expand Up @@ -32,7 +32,8 @@ public interface SqlProcessingStrategy {
* @param data The original data delivered to the route
* @param jdbcTemplate The JDBC template
* @param query The SQL query to execute
* @return the update count if the query returned an update count
* @throws Exception can be thrown in case of error
*/
void commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
int commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
}
Expand Up @@ -20,30 +20,33 @@
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

/**
*
*/
@Ignore
public class SqlConsumerDeleteTest extends CamelTestSupport {

private EmbeddedDatabase db;
private JdbcTemplate jdbcTemplate;

@Before
public void setUp() throws Exception {
db = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();

jdbcTemplate = new JdbcTemplate(db);

super.setUp();
}

Expand All @@ -70,6 +73,12 @@ public void testConsume() throws Exception {
assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));

// give it a little tine to delete
Thread.sleep(500);

// there should only be 1 row in the table
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}

@Override
Expand All @@ -79,7 +88,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
getContext().getComponent("sql", SqlComponent.class).setDataSource(db);

from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = #")
from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = :#id")
.to("mock:result");
}
};
Expand Down
2 changes: 1 addition & 1 deletion components/camel-sql/src/test/resources/log4j.properties
Expand Up @@ -16,7 +16,7 @@
## ------------------------------------------------------------------------

#
# The logging properties used for eclipse testing, We want to see debug output on the console.
# The logging properties used for testing
#
log4j.rootLogger=INFO, file

Expand Down

0 comments on commit f4ac386

Please sign in to comment.