Skip to content

Commit

Permalink
CAMEL-4725: Batch support and other options for Producer.
Browse files Browse the repository at this point in the history
  • Loading branch information
snurmine authored and davsclaus committed Jan 14, 2016
1 parent f511606 commit df5944f
Show file tree
Hide file tree
Showing 32 changed files with 2,644 additions and 1,719 deletions.
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.sql.stored;

import java.sql.CallableStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.camel.component.sql.stored.template.ast.InputParameter;
import org.apache.camel.component.sql.stored.template.ast.Template;
import org.springframework.jdbc.core.CallableStatementCreator;
import org.springframework.jdbc.core.CallableStatementCreatorFactory;
import org.springframework.jdbc.core.SqlParameter;
import org.springframework.jdbc.core.StatementCreatorUtils;


public class BatchCallableStatementCreatorFactory {

final CallableStatementCreatorFactory callableStatementCreatorFactory;

final List<SqlParameter> sqlParameterList;

final Template template;


public BatchCallableStatementCreatorFactory(Template template) {
this.template = template;
this.sqlParameterList = createParams();
this.callableStatementCreatorFactory = new CallableStatementCreatorFactory(formatSql(), createParams());
}

public void addParameter(CallableStatement callableStatement, Map batchRow) throws SQLException {
int i = 1;
for (SqlParameter parameter : getSqlParameterList()) {
StatementCreatorUtils.setParameterValue(callableStatement, i, parameter, batchRow.get(parameter.getName()));
i++;
}
}

private String formatSql() {
return "{call " + this.template.getProcedureName() + "(" + repeatParameter(this.template.getParameterList()
.size()) + ")}";
}

private String repeatParameter(int size) {
StringBuilder ret = new StringBuilder();
for (int i = 0; i < size; i++) {
ret.append('?');
if (i + 1 < size) {
ret.append(',');
}
}
return ret.toString();
}

private List<SqlParameter> createParams() {
List<SqlParameter> params = new ArrayList<>();

for (Object parameter : template.getParameterList()) {
if (parameter instanceof InputParameter) {
InputParameter inputParameter = (InputParameter) parameter;
params.add(new SqlParameter(inputParameter.getName(), inputParameter.getSqlType()));

} else {
throw new UnsupportedOperationException("Only IN parameters supported by batch!");
}
}


return params;
}

public CallableStatementCreator newCallableStatementCreator(Map params) {
return this.callableStatementCreatorFactory.newCallableStatementCreator(params);
}

public List<SqlParameter> getSqlParameterList() {
return sqlParameterList;
}

public Template getTemplate() {
return template;
}
}
@@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.sql.stored;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.component.sql.stored.template.ast.InputParameter;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.CallableStatementCallback;
import org.springframework.jdbc.core.CallableStatementCreator;

public class CallableStatementWrapper implements StamentWrapper {

final CallableStatementWrapperFactory factory;

final String template;

Map result;

List<Map> batchItems;

Integer updateCount;

BatchCallableStatementCreatorFactory batchFactory;

public CallableStatementWrapper(String template, CallableStatementWrapperFactory wrapperFactory) {
this.factory = wrapperFactory;
this.template = template;
}

@Override
public void call(final WrapperExecuteCallback cb) throws Exception {
cb.execute(this);
}


@Override
public int[] executeBatch() throws SQLException {

if (this.batchItems == null) {
throw new IllegalArgumentException("Batch must have at least one item");
}

final Iterator<Map> params = batchItems.iterator();


return factory.getJdbcTemplate().execute(new CallableStatementCreator() {
@Override
public CallableStatement createCallableStatement(Connection connection) throws SQLException {
return batchFactory.newCallableStatementCreator(params.next()).createCallableStatement(connection);

}
}, new CallableStatementCallback<int[]>() {
@Override
public int[] doInCallableStatement(CallableStatement callableStatement) throws SQLException, DataAccessException {
//add first item to batch
callableStatement.addBatch();

while (params.hasNext()) {
batchFactory.addParameter(callableStatement, params.next());
callableStatement.addBatch();
}
return callableStatement.executeBatch();
}
});
}


@Override
public Integer getUpdateCount() throws SQLException {
return this.updateCount;
}


@Override
public Object executeStatement() throws SQLException {
return this.result;
}

@Override
public void populateStatement(Object value, Exchange exchange) throws SQLException {
this.result = this.factory.getTemplateStoredProcedure(this.template).execute(exchange, value);
//Spring sets #update-result-1
this.updateCount = (Integer) this.result.get("#update-count-1");
}

@Override
public void addBatch(Object value, Exchange exchange) {

if (this.batchFactory == null) {
this.batchFactory = factory.getTemplateForBatch(template);
}

Map<String, Object> batchValues = new HashMap<>();
//only IN-parameters supported by template
for (Object param : this.batchFactory.getTemplate().getParameterList()) {
InputParameter inputParameter = (InputParameter) param;
Object paramValue = inputParameter.getValueExtractor().eval(exchange, value);
batchValues.put(inputParameter.getName(), paramValue);
}
if (this.batchItems == null) {
this.batchItems = new ArrayList<>();
}
batchItems.add(batchValues);
}
}
@@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.sql.stored;

import java.sql.SQLException;

import org.apache.camel.component.sql.stored.template.TemplateParser;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.LRUCache;
import org.springframework.jdbc.core.JdbcTemplate;

/**
* Statefull class that cached template functions.
*/
public class CallableStatementWrapperFactory extends ServiceSupport {

public static final int TEMPLATE_CACHE_DEFAULT_SIZE = 200;

public static final int BATCH_TEMPLATE_CACHE_DEFAULT_SIZE = 200;

final JdbcTemplate jdbcTemplate;

final TemplateParser templateParser;

private final LRUCache<String, TemplateStoredProcedure> templateCache = new LRUCache<>(TEMPLATE_CACHE_DEFAULT_SIZE);

private final LRUCache<String, BatchCallableStatementCreatorFactory> batchTemplateCache = new LRUCache<>(BATCH_TEMPLATE_CACHE_DEFAULT_SIZE);

public CallableStatementWrapperFactory(JdbcTemplate jdbcTemplate, TemplateParser
templateParser) {
this.jdbcTemplate = jdbcTemplate;
this.templateParser = templateParser;
}

public StamentWrapper create(String sql) throws SQLException {
return new CallableStatementWrapper(sql, this);
}

public BatchCallableStatementCreatorFactory getTemplateForBatch(String sql) {
BatchCallableStatementCreatorFactory template = this.batchTemplateCache.get(sql);
if (template != null) {
return template;
}

template = new BatchCallableStatementCreatorFactory(templateParser.parseTemplate(sql));
this.batchTemplateCache.put(sql, template);

return template;
}

public TemplateStoredProcedure getTemplateStoredProcedure(String sql) {
TemplateStoredProcedure templateStoredProcedure = this.templateCache.get(sql);
if (templateStoredProcedure != null) {
return templateStoredProcedure;
}

templateStoredProcedure = new TemplateStoredProcedure(jdbcTemplate, templateParser.parseTemplate(sql));

this.templateCache.put(sql, templateStoredProcedure);

return templateStoredProcedure;
}

public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}

@Override
protected void doStart() throws Exception {
}

@Override
protected void doStop() throws Exception {
try {
// clear cache when we are stopping
templateCache.clear();
} catch (Exception ex) {
//noop
}
try {
// clear cache when we are stopping
batchTemplateCache.clear();
} catch (Exception ex) {
//noop
}
}
}
Expand Up @@ -21,19 +21,24 @@


import org.apache.camel.CamelContext; import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint; import org.apache.camel.Endpoint;
import org.apache.camel.component.sql.stored.template.TemplateStoredProcedureFactory;
import org.apache.camel.impl.UriEndpointComponent; import org.apache.camel.impl.UriEndpointComponent;
import org.apache.camel.util.CamelContextHelper;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;


public class SqlStoredComponent extends UriEndpointComponent { public class SqlStoredComponent extends UriEndpointComponent {

private DataSource dataSource; private DataSource dataSource;


public SqlStoredComponent() { public SqlStoredComponent() {
super(SqlStoredEndpoint.class); super(SqlStoredEndpoint.class);
} }


public SqlStoredComponent(Class<? extends Endpoint> endpointClass) {
super(endpointClass);
}

public SqlStoredComponent(CamelContext context) {
super(context, SqlStoredEndpoint.class);
}

public SqlStoredComponent(CamelContext context, Class<? extends Endpoint> endpointClass) { public SqlStoredComponent(CamelContext context, Class<? extends Endpoint> endpointClass) {
super(context, endpointClass); super(context, endpointClass);
} }
Expand All @@ -47,10 +52,6 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
if (ds != null) { if (ds != null) {
target = ds; target = ds;
} }
String dataSourceRef = getAndRemoveParameter(parameters, "dataSourceRef", String.class);
if (target == null && dataSourceRef != null) {
target = CamelContextHelper.mandatoryLookup(getCamelContext(), dataSourceRef, DataSource.class);
}
if (target == null) { if (target == null) {
// fallback and use component // fallback and use component
target = dataSource; target = dataSource;
Expand All @@ -59,14 +60,12 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
throw new IllegalArgumentException("DataSource must be configured"); throw new IllegalArgumentException("DataSource must be configured");
} }


JdbcTemplate template = new JdbcTemplate(target); JdbcTemplate jdbcTemplate = new JdbcTemplate(target);
TemplateStoredProcedureFactory factory = new TemplateStoredProcedureFactory(template); String template = remaining;


SqlStoredEndpoint answer = new SqlStoredEndpoint(uri, this); SqlStoredEndpoint endpoint = new SqlStoredEndpoint(uri, this, jdbcTemplate);
answer.setJdbcTemplate(template); endpoint.setTemplate(template);
answer.setTemplate(remaining); return endpoint;
answer.setTemplateStoredProcedureFactory(factory);
return answer;
} }


public DataSource getDataSource() { public DataSource getDataSource() {
Expand All @@ -79,5 +78,4 @@ public DataSource getDataSource() {
public void setDataSource(DataSource dataSource) { public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource; this.dataSource = dataSource;
} }

} }

0 comments on commit df5944f

Please sign in to comment.