From 5a52536e409eae5dfae69ddb03e2dfa31112f90c Mon Sep 17 00:00:00 2001 From: pgf Date: Sat, 9 Aug 2014 16:27:40 +0200 Subject: [PATCH] added a check for autocommit before calling commit when dropping and creating tables + test case. --- .../jdbc/adapter/DefaultJDBCAdapter.java | 12 +- .../store/jdbc/JDBCStoreAutoCommitTest.java | 522 ++++++++++++++++++ 2 files changed, 532 insertions(+), 2 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreAutoCommitTest.java diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 7a85de3b35b..0087ac99099 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -120,7 +120,12 @@ public void doCreateTables(TransactionContext c) throws SQLException, IOExceptio } } } - c.getConnection().commit(); + + // if autoCommit used do not call commit + if(!c.getConnection().getAutoCommit()){ + c.getConnection().commit(); + } + } finally { cleanupExclusiveLock.writeLock().unlock(); try { @@ -149,7 +154,10 @@ public void doDropTables(TransactionContext c) throws SQLException, IOException JDBCPersistenceAdapter.log("Failure details: ", e); } } - c.getConnection().commit(); + // if autoCommit used do not call commit + if(!c.getConnection().getAutoCommit()){ + c.getConnection().commit(); + } } finally { cleanupExclusiveLock.writeLock().unlock(); try { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreAutoCommitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreAutoCommitTest.java new file mode 100644 index 00000000000..ec150877a5f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreAutoCommitTest.java @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jdbc; + + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.Test; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +import static org.junit.Assert.assertEquals; + + +/** + * to be compliant with JDBC spec; officially commit is not supposed to be called on + * a connection that uses autocommit.The oracle v12 driver does a check for + * autocommitSpecCompliance and it causes issues + *

+ * To test; wrap the datasource used by the broker and check for autocommit before delegating + * to real datasource. If commit is called on connection with autocommit, wrapper throws a + * SQLException. + */ + +public class JDBCStoreAutoCommitTest { + + public static final String BROKER_NAME = "AutoCommitTest"; + private static final String TEST_DEST = "commitCheck"; + private static final String MSG_TEXT = "JDBCStoreAutoCommitTest TEST"; + + /** + * verify dropping and recreating tables + * + * @throws Exception + */ + + @Test + public void testDeleteAllMessages() throws Exception { + + BrokerService broker = createBrokerService(); + broker.getPersistenceAdapter().deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + + broker.stop(); + broker.waitUntilStopped(); + + } + + /** + * Send message and consume message, JMS session is not transacted + * + * @throws Exception + */ + @Test + public void testSendConsume() throws Exception { + this.doSendConsume(false); + } + + /** + * send message and consume message, JMS session is transacted + * + * @throws Exception + */ + @Test + public void testSendConsumeTransacted() throws Exception { + this.doSendConsume(true); + } + + + + + private void doSendConsume(boolean transacted) throws Exception { + + BrokerService broker = createBrokerService(); + broker.start(); + broker.waitUntilStarted(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI("vm:" + BROKER_NAME)); + ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection(); + c1.start(); + + try { + //message send + Session session1 = c1.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session1.createProducer(session1.createQueue(TEST_DEST)); + TextMessage textMessage = session1.createTextMessage(MSG_TEXT); + messageProducer.send(textMessage); + + + if(transacted){ + session1.commit(); + } + + + //consume + Session session2 = c1.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session2.createConsumer(session2.createQueue(TEST_DEST)); + TextMessage messageReceived = (TextMessage) messageConsumer.receive(1000); + + assertEquals("check message received", MSG_TEXT, messageReceived.getText()); + + } finally { + c1.close(); + broker.stop(); + broker.waitUntilStopped(); + + } + + + } + + + private BrokerService createBrokerService() throws IOException { + BrokerService broker = new BrokerService(); + broker.setBrokerName(BROKER_NAME); + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + EmbeddedDataSource embeddedDataSource = new EmbeddedDataSource(); + embeddedDataSource.setDatabaseName("derbyDb"); + embeddedDataSource.setCreateDatabase("create"); + + javax.sql.DataSource wrappedDataSource = + new TestDataSource(embeddedDataSource); + + jdbc.setDataSource(wrappedDataSource); + + + broker.setPersistenceAdapter(jdbc); + return broker; + } + + + /* + =============== + Need some wrapping classes to throw exception if commit called when autocommit is enabled + ================ + */ + + + private class TestDataSource implements javax.sql.DataSource { + + private final javax.sql.DataSource realDataSource; + + public TestDataSource(javax.sql.DataSource dataSource) { + realDataSource = dataSource; + } + + @Override + public Connection getConnection() throws SQLException { + + Connection autoCommitCheckConnection = + new AutoCommitCheckConnection(realDataSource.getConnection()); + + return autoCommitCheckConnection; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + + Connection autoCommitCheckConnection = + new AutoCommitCheckConnection(realDataSource.getConnection(username, password)); + + return autoCommitCheckConnection; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return realDataSource.getLogWriter(); + } + + @Override + public void setLogWriter(PrintWriter out) throws SQLException { + realDataSource.setLogWriter(out); + } + + @Override + public void setLoginTimeout(int seconds) throws SQLException { + realDataSource.setLoginTimeout(seconds); + } + + @Override + public int getLoginTimeout() throws SQLException { + return realDataSource.getLoginTimeout(); + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return realDataSource.getParentLogger(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return realDataSource.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return realDataSource.isWrapperFor(iface); + } + } + + + private class AutoCommitCheckConnection implements Connection { + + private Connection realConnection; + + public AutoCommitCheckConnection(Connection connection) { + this.realConnection = connection; + } + + + /* + verify commit is not called on an autocommit connection + */ + @Override + public void commit() throws SQLException { + + if (getAutoCommit() == true) { + throw new SQLException("AutoCommitCheckConnection: Called commit on autoCommit Connection"); + } + realConnection.commit(); + } + + + // Just plumping for wrapper. Might have been better to do a Dynamic Proxy here. + + + @Override + public Statement createStatement() throws SQLException { + return realConnection.createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return realConnection.prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return realConnection.prepareCall(sql); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return realConnection.nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + realConnection.setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return realConnection.getAutoCommit(); + } + + + @Override + public void rollback() throws SQLException { + realConnection.rollback(); + + } + + @Override + public void close() throws SQLException { + realConnection.close(); + } + + @Override + public boolean isClosed() throws SQLException { + return realConnection.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return realConnection.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + realConnection.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return realConnection.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + realConnection.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return realConnection.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + realConnection.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return realConnection.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return realConnection.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + realConnection.clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return realConnection.createStatement(resultSetType, resultSetConcurrency); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public Map> getTypeMap() throws SQLException { + return realConnection.getTypeMap(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + realConnection.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + realConnection.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return realConnection.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return realConnection.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return realConnection.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + realConnection.rollback(); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + realConnection.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return realConnection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return realConnection.prepareStatement(sql, autoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return realConnection.prepareStatement(sql, columnIndexes); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return realConnection.prepareStatement(sql, columnNames); + } + + @Override + public Clob createClob() throws SQLException { + return realConnection.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return realConnection.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return realConnection.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return realConnection.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return realConnection.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + realConnection.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + realConnection.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return realConnection.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return realConnection.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return realConnection.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return realConnection.createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + realConnection.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return realConnection.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + realConnection.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + realConnection.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return realConnection.getNetworkTimeout(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return realConnection.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return realConnection.isWrapperFor(iface); + } + } +}