Permalink
Browse files

CAMEL-5976: camel-sql consumer can now do onConsume to delete row aft…

…er processing etc.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1434669 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent f4ac386 commit 00a094c8925774a129d35d7bdc276224e08f3dfd @davsclaus davsclaus committed Jan 17, 2013
@@ -60,5 +60,27 @@ public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
});
}
+ @Override
+ public int commitBatchComplete(final SqlEndpoint endpoint, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
+ final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());
+
+ return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
+ public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
+ int expected = ps.getParameterMetaData().getParameterCount();
+ if (expected != 0) {
+ throw new IllegalArgumentException("Query onConsumeBatchComplete " + query + " cannot have parameters, was " + expected);
+ }
+
+ LOG.trace("Execute query {}", query);
+ ps.execute();
+
+ int updateCount = ps.getUpdateCount();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update count {}", updateCount);
+ }
+ return updateCount;
+ };
+ });
+ }
}
@@ -59,9 +59,17 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
if (onConsume != null) {
onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
}
+ String onConsumeBatchComplete = getAndRemoveParameter(parameters, "consumer.onConsumeBatchComplete", String.class);
+ if (onConsumeBatchComplete == null) {
+ onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete", String.class);
+ }
+ if (onConsumeBatchComplete != null) {
+ onConsumeBatchComplete = onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?");
+ }
SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
endpoint.setOnConsume(onConsume);
+ endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
return endpoint;
}
@@ -46,6 +46,7 @@
private final JdbcTemplate jdbcTemplate;
private String onConsume;
+ private String onConsumeBatchComplete;
private boolean useIterator = true;
private boolean routeEmptyResultSet;
private int expectedUpdateCount = -1;
@@ -178,6 +179,19 @@ public int processBatch(Queue<Object> exchanges) throws Exception {
}
}
+ try {
+ if (onConsumeBatchComplete != null) {
+ int updateCount = getEndpoint().getProcessingStrategy().commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete);
+ log.debug("onConsumeBatchComplete update count {}", updateCount);
+ }
+ } catch (Exception e) {
+ if (breakBatchOnConsumeFail) {
+ throw e;
+ } else {
+ handleException("Error executing onConsumeBatchComplete query " + onConsumeBatchComplete, e);
+ }
+ }
+
return total;
}
@@ -197,6 +211,14 @@ public void setOnConsume(String onConsume) {
this.onConsume = onConsume;
}
+ public String getOnConsumeBatchComplete() {
+ return onConsumeBatchComplete;
+ }
+
+ public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+ this.onConsumeBatchComplete = onConsumeBatchComplete;
+ }
+
/**
* Indicates how resultset should be delivered to the route
*/
@@ -37,6 +37,7 @@
private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy();
private String onConsume;
+ private String onConsumeBatchComplete;
private boolean allowNamedParameters = true;
// TODO: onConsumeBatchDone to execute a query when batch done
@@ -54,6 +55,7 @@ public Consumer createConsumer(Processor processor) throws Exception {
SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
consumer.setOnConsume(getOnConsume());
+ consumer.setOnConsumeBatchComplete(getOnConsumeBatchComplete());
configureConsumer(consumer);
return consumer;
}
@@ -122,6 +124,14 @@ public void setOnConsume(String onConsume) {
this.onConsume = onConsume;
}
+ public String getOnConsumeBatchComplete() {
+ return onConsumeBatchComplete;
+ }
+
+ public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+ this.onConsumeBatchComplete = onConsumeBatchComplete;
+ }
+
public boolean isAllowNamedParameters() {
return allowNamedParameters;
}
@@ -36,4 +36,16 @@
* @throws Exception can be thrown in case of error
*/
int commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
+
+ /**
+ * Commit callback when the batch is complete. This allows you to do one extra query after all rows has been processed in the batch.
+ *
+ * @param endpoint the endpoint
+ * @param jdbcTemplate The JDBC template
+ * @param query The SQL query to execute
+ * @return the update count if the query returned an update count
+ * @throws Exception can be thrown in case of error
+ */
+ int commitBatchComplete(SqlEndpoint endpoint, JdbcTemplate jdbcTemplate, String query) throws Exception;
+
}
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class SqlConsumerDeleteBatchCompleteTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+ private JdbcTemplate jdbcTemplate;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ jdbcTemplate = new JdbcTemplate(db);
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ // give it a little tine to delete
+ Thread.sleep(500);
+
+ assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ from("sql:select * from projects order by id?consumer.onConsumeBatchComplete=delete from projects")
+ .to("mock:result");
+ }
+ };
+ }
+}
@@ -77,7 +77,6 @@ public void testConsume() throws Exception {
// give it a little tine to delete
Thread.sleep(500);
- // there should only be 1 row in the table
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class SqlConsumerDeleteTransformTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+ private JdbcTemplate jdbcTemplate;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ jdbcTemplate = new JdbcTemplate(db);
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("The project is Camel", "The project is AMQ", "The project is Linux");
+
+ assertMockEndpointsSatisfied();
+
+ // give it a little tine to delete
+ Thread.sleep(500);
+
+ assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ // even if we transform the exchange we can still do onConsume as we have the original data at
+ // the point when onConsume is executed
+ from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = :#id")
+ .transform().simple("The project is ${body[project]}")
+ .to("mock:result");
+ }
+ };
+ }
+}

0 comments on commit 00a094c

Please sign in to comment.