Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CAMEL-5976: camel-sql now has batch consumer. Work in progress.

git-svn-id: https://svn.apache.org/repos/asf/camel/trunk@1434584 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 669d5e04be4f7d866030f7e193b426ea772dfaa5 1 parent 821490f
@davsclaus davsclaus authored
View
110 components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+
+/**
+ *
+ */
+public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
+
+ @Override
+ public void commit(SqlEndpoint endpoint, final Exchange exchange, Object data, JdbcTemplate jdbcTemplate, final String query) throws Exception {
+ jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>() {
+ public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
+ int expected = ps.getParameterMetaData().getParameterCount();
+
+ Iterator<?> iterator = createIterator(exchange, query, expected);
+ if (iterator != null) {
+ populateStatement(ps, iterator, expected);
+ LOG.trace("Execute query {}", query);
+ ps.execute();
+ }
+
+ return null;
+ };
+ });
+ }
+
+ private Iterator<?> createIterator(Exchange exchange, final String query, final int expectedParams) {
+ Object body = exchange.getIn().getBody();
+ if (body == null) {
+ return null;
+ }
+
+ // TODO: support named parameters
+/*
+ if (body instanceof Map) {
+ final Map map = (Map) body;
+ return new Iterator() {
+
+ private int current;
+
+ @Override
+ public boolean hasNext() {
+ return current < expectedParams;
+ }
+
+ @Override
+ public Object next() {
+ current++;
+ // TODO: Fix me
+ return map.get("ID");
+ }
+
+ @Override
+ public void remove() {
+ // noop
+ }
+ };
+ }*/
+
+ // else force as iterator based
+ Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
+ return iterator;
+ }
+
+ private void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException {
+ int argNumber = 1;
+ if (expectedParams > 0) {
+ while (iterator != null && iterator.hasNext()) {
+ Object value = iterator.next();
+ LOG.trace("Setting parameter #{} with value: {}", argNumber, value);
+ ps.setObject(argNumber, value);
+ argNumber++;
+ }
+ }
+
+ if (argNumber - 1 != expectedParams) {
+ throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1));
+ }
+ }
+
+}
+
View
229 components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.RowMapperResultSetExtractor;
+
+/**
+ *
+ */
+public class SqlConsumer extends ScheduledBatchPollingConsumer {
+
+ private final String query;
+ private final JdbcTemplate jdbcTemplate;
+
+ /**
+ * Statement to run after data has been processed in the route
+ */
+ private String onConsume;
+
+ /**
+ * Process resultset individually or as a list
+ */
+ private boolean useIterator = true;
+
+ /**
+ * Whether allow empty resultset to be routed to the next hop
+ */
+ private boolean routeEmptyResultSet;
+
+ private static final class DataHolder {
+ private Exchange exchange;
+ private Object data;
+
+ private DataHolder() {
+ }
+ }
+
+ public SqlConsumer(Endpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query) {
+ super(endpoint, processor);
+ this.jdbcTemplate = jdbcTemplate;
+ this.query = query;
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ // must reset for each poll
+ shutdownRunningTask = null;
+ pendingExchanges = 0;
+
+ Integer messagePolled = jdbcTemplate.execute(query, new PreparedStatementCallback<Integer>() {
+ @Override
+ public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException, DataAccessException {
+ Queue<DataHolder> answer = new LinkedList<DataHolder>();
+
+ ResultSet rs = preparedStatement.executeQuery();
+ try {
+ log.trace("Got result list from query {}", rs);
+
+ RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
+ List<Map<String, Object>> data = mapper.extractData(rs);
+
+ // create a list of exchange objects with the data
+ if (useIterator) {
+ for (Map<String, Object> item : data) {
+ Exchange exchange = createExchange(item);
+ DataHolder holder = new DataHolder();
+ holder.exchange = exchange;
+ holder.data = item;
+ answer.add(holder);
+ }
+ } else {
+ if (!data.isEmpty() || routeEmptyResultSet) {
+ Exchange exchange = createExchange(data);
+ DataHolder holder = new DataHolder();
+ holder.exchange = exchange;
+ holder.data = data;
+ answer.add(holder);
+ }
+ }
+ } finally {
+ rs.close();
+ }
+
+ // process all the exchanges in this batch
+ try {
+ int rows = processBatch(CastUtils.cast(answer));
+ return Integer.valueOf(rows);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+ });
+
+ return messagePolled;
+ }
+
+ protected Exchange createExchange(Object data) {
+ final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
+ Message msg = exchange.getIn();
+ msg.setBody(data);
+ return exchange;
+ }
+
+ @Override
+ public int processBatch(Queue<Object> exchanges) throws Exception {
+ int total = exchanges.size();
+
+ // limit if needed
+ if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+ log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
+ total = maxMessagesPerPoll;
+ }
+
+ for (int index = 0; index < total && isBatchAllowed(); index++) {
+ // only loop if we are started (allowed to run)
+ DataHolder holder = ObjectHelper.cast(DataHolder.class, exchanges.poll());
+ Exchange exchange = holder.exchange;
+ Object data = holder.data;
+
+ // add current index and total as properties
+ exchange.setProperty(Exchange.BATCH_INDEX, index);
+ exchange.setProperty(Exchange.BATCH_SIZE, total);
+ exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+ // update pending number of exchanges
+ pendingExchanges = total - index - 1;
+
+ // process the current exchange
+ log.debug("Processing exchange: {} with properties: {}", exchange, exchange.getProperties());
+ getProcessor().process(exchange);
+
+ // TODO: support when with CAMEL-5977
+ /*
+ try {
+ if (onConsume != null) {
+ SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
+ endpoint.getProcessingStrategy().commit(endpoint, exchange, data, jdbcTemplate, onConsume);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ }*/
+ }
+
+ return total;
+ }
+
+ /**
+ * Gets the statement(s) to run after successful processing.
+ * Use comma to separate multiple statements.
+ */
+ public String getOnConsume() {
+ return onConsume;
+ }
+
+ /**
+ * Sets the statement to run after successful processing.
+ * Use comma to separate multiple statements.
+ */
+ public void setOnConsume(String onConsume) {
+ this.onConsume = onConsume;
+ }
+
+ /**
+ * Indicates how resultset should be delivered to the route
+ */
+ public boolean isUseIterator() {
+ return useIterator;
+ }
+
+ /**
+ * Sets how resultset should be delivered to route.
+ * Indicates delivery as either a list or individual object.
+ * defaults to true.
+ */
+ public void setUseIterator(boolean useIterator) {
+ this.useIterator = useIterator;
+ }
+
+ /**
+ * Indicates whether empty resultset should be allowed to be sent to the next hop or not
+ */
+ public boolean isRouteEmptyResultSet() {
+ return routeEmptyResultSet;
+ }
+
+ /**
+ * Sets whether empty resultset should be allowed to be sent to the next hop.
+ * defaults to false. So the empty resultset will be filtered out.
+ */
+ public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
+ this.routeEmptyResultSet = routeEmptyResultSet;
+ }
+
+}
+
View
41 components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -20,7 +20,7 @@
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultPollingEndpoint;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -29,10 +29,15 @@
* question marks (that are parameter placeholders), sharp signs should be used.
* This is because in camel question mark has other meaning.
*/
-public class SqlEndpoint extends DefaultEndpoint {
+public class SqlEndpoint extends DefaultPollingEndpoint {
private JdbcTemplate jdbcTemplate;
private String query;
private boolean batch;
+ private int maxMessagesPerPoll;
+ private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
+ private String onConsume;
+
+ // TODO: onConsumeBatchDone to execute a query when batch done
public SqlEndpoint() {
}
@@ -42,9 +47,13 @@ public SqlEndpoint(String uri, Component component, JdbcTemplate jdbcTemplate, S
this.jdbcTemplate = jdbcTemplate;
this.query = query;
}
-
+
public Consumer createConsumer(Processor processor) throws Exception {
- throw new UnsupportedOperationException("Not implemented");
+ SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
+ consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+ consumer.setOnConsume(getOnConsume());
+ configureConsumer(consumer);
+ return consumer;
}
public Producer createProducer() throws Exception {
@@ -79,6 +88,30 @@ public void setBatch(boolean batch) {
this.batch = batch;
}
+ public int getMaxMessagesPerPoll() {
+ return maxMessagesPerPoll;
+ }
+
+ public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+ this.maxMessagesPerPoll = maxMessagesPerPoll;
+ }
+
+ public SqlProcessingStrategy getProcessingStrategy() {
+ return processingStrategy;
+ }
+
+ public void setProcessingStrategy(SqlProcessingStrategy processingStrategy) {
+ this.processingStrategy = processingStrategy;
+ }
+
+ public String getOnConsume() {
+ return onConsume;
+ }
+
+ public void setOnConsume(String onConsume) {
+ this.onConsume = onConsume;
+ }
+
@Override
protected String createEndpointUri() {
// Make sure it's properly encoded
View
38 components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Exchange;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Processing strategy for dealing with SQL when consuming.
+ */
+public interface SqlProcessingStrategy {
+
+ /**
+ * Commit callback if there are a query to be run after processing.
+ *
+ * @param endpoint the endpoint
+ * @param exchange The exchange after it has been processed
+ * @param data The original data delivered to the route
+ * @param jdbcTemplate The JDBC template
+ * @param query The SQL query to execute
+ * @throws Exception can be thrown in case of error
+ */
+ void commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
+}
View
87 components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+@Ignore
+public class SqlConsumerDeleteTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(3);
+
+ assertMockEndpointsSatisfied();
+
+ List<Exchange> exchanges = mock.getReceivedExchanges();
+ assertEquals(3, exchanges.size());
+
+ assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+ assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = #")
+ .to("mock:result");
+ }
+ };
+ }
+}
View
85 components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class SqlConsumerTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(3);
+
+ assertMockEndpointsSatisfied();
+
+ List<Exchange> exchanges = mock.getReceivedExchanges();
+ assertTrue(exchanges.size() >= 3);
+
+ assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+ assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ from("sql:select * from projects order by id")
+ .to("mock:result");
+ }
+ };
+ }
+}
View
5 components/camel-sql/src/test/resources/log4j.properties
@@ -20,8 +20,9 @@
#
log4j.rootLogger=INFO, file
-#log4j.logger.org.apache.activemq=DEBUG
-#log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel.component.sql=DEBUG
+#log4j.logger.org.apache.camel.processor.aggregate.sql=DEBUG
+#log4j.logger.org.apache.camel.processor.idempotent.sql=DEBUG
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender
Please sign in to comment.
Something went wrong with that request. Please try again.