Permalink
Browse files

CAMEL-5976: camel-sql consumer can now do onConsume to delete row aft…

…er processing etc.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1435118 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 02cab21 commit 2ee46da991eca5f870f47f21de7dec24e420046e @davsclaus davsclaus committed Jan 18, 2013
@@ -31,6 +31,7 @@
*/
public class SqlComponent extends DefaultComponent {
private DataSource dataSource;
+ private boolean usePlaceholder = true;
public SqlComponent() {
}
@@ -56,19 +57,27 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
if (onConsume == null) {
onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
}
- if (onConsume != null) {
+ if (onConsume != null && usePlaceholder) {
onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
}
+ String onConsumeFailed = getAndRemoveParameter(parameters, "consumer.onConsumeFailed", String.class);
+ if (onConsumeFailed == null) {
+ onConsumeFailed = getAndRemoveParameter(parameters, "onConsumeFailed", String.class);
+ }
+ if (onConsumeFailed != null && usePlaceholder) {
+ onConsumeFailed = onConsumeFailed.replaceAll(parameterPlaceholderSubstitute, "?");
+ }
String onConsumeBatchComplete = getAndRemoveParameter(parameters, "consumer.onConsumeBatchComplete", String.class);
if (onConsumeBatchComplete == null) {
onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete", String.class);
}
- if (onConsumeBatchComplete != null) {
+ if (onConsumeBatchComplete != null && usePlaceholder) {
onConsumeBatchComplete = onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?");
}
SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
endpoint.setOnConsume(onConsume);
+ endpoint.setOnConsumeFailed(onConsumeFailed);
endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
return endpoint;
}
@@ -80,4 +89,17 @@ public void setDataSource(DataSource dataSource) {
public DataSource getDataSource() {
return dataSource;
}
+
+ public boolean isUsePlaceholder() {
+ return usePlaceholder;
+ }
+
+ /**
+ * Sets whether to use placeholder and replace all placeholder characters with ? sign in the SQL queries.
+ * <p/>
+ * This option is default <tt>true</tt>
+ */
+ public void setUsePlaceholder(boolean usePlaceholder) {
+ this.usePlaceholder = usePlaceholder;
+ }
}
@@ -46,6 +46,7 @@
private final JdbcTemplate jdbcTemplate;
private String onConsume;
+ private String onConsumeFailed;
private String onConsumeBatchComplete;
private boolean useIterator = true;
private boolean routeEmptyResultSet;
@@ -159,22 +160,28 @@ public int processBatch(Queue<Object> exchanges) throws Exception {
pendingExchanges = total - index - 1;
// process the current exchange
- getProcessor().process(exchange);
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ // pick the on consume to use
+ String sql = exchange.isFailed() ? onConsumeFailed : onConsume;
try {
// we can only run on consume if there was data
- if (onConsume != null && data != null) {
- int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, onConsume);
+ if (data != null && sql != null) {
+ int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
- String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + onConsume;
+ String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + sql;
throw new SQLException(msg);
}
}
} catch (Exception e) {
if (breakBatchOnConsumeFail) {
throw e;
} else {
- handleException("Error executing onConsume query " + onConsume, e);
+ handleException("Error executing onConsume/onConsumeFailed query " + sql, e);
}
}
}
@@ -195,22 +202,28 @@ public int processBatch(Queue<Object> exchanges) throws Exception {
return total;
}
- /**
- * Gets the statement(s) to run after successful processing.
- * Use comma to separate multiple statements.
- */
public String getOnConsume() {
return onConsume;
}
/**
- * Sets the statement to run after successful processing.
- * Use comma to separate multiple statements.
+ * Sets a SQL to execute when the row has been successfully processed.
*/
public void setOnConsume(String onConsume) {
this.onConsume = onConsume;
}
+ public String getOnConsumeFailed() {
+ return onConsumeFailed;
+ }
+
+ /**
+ * Sets a SQL to execute when the row failed being processed.
+ */
+ public void setOnConsumeFailed(String onConsumeFailed) {
+ this.onConsumeFailed = onConsumeFailed;
+ }
+
public String getOnConsumeBatchComplete() {
return onConsumeBatchComplete;
}
@@ -37,6 +37,7 @@
private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy();
private String onConsume;
+ private String onConsumeFailed;
private String onConsumeBatchComplete;
private boolean allowNamedParameters = true;
@@ -53,6 +54,7 @@ public Consumer createConsumer(Processor processor) throws Exception {
SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
consumer.setOnConsume(getOnConsume());
+ consumer.setOnConsumeFailed(getOnConsumeFailed());
consumer.setOnConsumeBatchComplete(getOnConsumeBatchComplete());
configureConsumer(consumer);
return consumer;
@@ -122,6 +124,14 @@ public void setOnConsume(String onConsume) {
this.onConsume = onConsume;
}
+ public String getOnConsumeFailed() {
+ return onConsumeFailed;
+ }
+
+ public void setOnConsumeFailed(String onConsumeFailed) {
+ this.onConsumeFailed = onConsumeFailed;
+ }
+
public String getOnConsumeBatchComplete() {
return onConsumeBatchComplete;
}
@@ -60,7 +60,7 @@ public void testConsume() throws Exception {
assertMockEndpointsSatisfied();
// give it a little tine to delete
- Thread.sleep(500);
+ Thread.sleep(1000);
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class SqlConsumerDeleteFailedTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+ private JdbcTemplate jdbcTemplate;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ jdbcTemplate = new JdbcTemplate(db);
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(2);
+
+ assertMockEndpointsSatisfied();
+
+ List<Exchange> exchanges = mock.getReceivedExchanges();
+ assertEquals(2, exchanges.size());
+
+ assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(3, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Linux", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+
+ // give it a little tine to delete
+ Thread.sleep(1000);
+
+ assertEquals("Should have deleted 2 rows", 1, jdbcTemplate.queryForInt("select count(*) from projects"));
+ assertEquals("Should be AMQ project that is BAD", "AMQ", jdbcTemplate.queryForObject("select PROJECT from projects where license = 'BAD'", String.class));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ from("sql:select * from projects where license <> 'BAD' order by id"
+ + "?consumer.onConsume=delete from projects where id = :#id"
+ + "&consumer.onConsumeFailed=update projects set license = 'BAD' where id = :#id")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Object project = exchange.getIn().getBody(Map.class).get("PROJECT");
+ if ("AMQ".equals(project)) {
+ throw new IllegalArgumentException("Cannot handled AMQ");
+ }
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}
@@ -75,7 +75,7 @@ public void testConsume() throws Exception {
assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
// give it a little tine to delete
- Thread.sleep(500);
+ Thread.sleep(1000);
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}
@@ -60,7 +60,7 @@ public void testConsume() throws Exception {
assertMockEndpointsSatisfied();
// give it a little tine to delete
- Thread.sleep(500);
+ Thread.sleep(1000);
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}

0 comments on commit 2ee46da

Please sign in to comment.