From c9b953433e011ded66ba87bdb2b2edcf0b87f67c Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Tue, 3 May 2016 14:36:10 +0100 Subject: [PATCH 1/3] ARTEMIS-513 Add JDBC Sequential File Factory Impl --- .../artemis/jdbc/store/JDBCUtils.java | 13 + .../jdbc/store/file/JDBCSequentialFile.java | 419 ++++++++++++++++++ .../store/file/JDBCSequentialFileFactory.java | 229 ++++++++++ .../jdbc/store/file/sql/DerbySQLProvider.java | 52 +++ .../store/file/sql/GenericSQLProvider.java | 143 ++++++ .../jdbc/store/file/sql/SQLProvider.java | 46 ++ .../jdbc/store/journal/JDBCJournalImpl.java | 7 +- .../file/JDBCSequentialFileFactoryTest.java | 185 ++++++++ .../core/io/nio/NIOSequentialFileFactory.java | 28 +- 9 files changed, 1107 insertions(+), 15 deletions(-) create mode 100644 artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java create mode 100644 artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java create mode 100644 artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java create mode 100644 artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java create mode 100644 artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java create mode 100644 artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java index b44f22501b9..bc04ab9316c 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java @@ -23,6 +23,10 @@ import java.sql.SQLException; import java.sql.Statement; +import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider; +import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; + public class JDBCUtils { public static Driver getDriver(String className) throws Exception { @@ -60,4 +64,13 @@ public static void createTableIfNotExists(Connection connection, String tableNam statement.executeUpdate(sql); } } + + public static SQLProvider getSQLProvider(String driverClass, String tableName) { + if (driverClass.contains("derby")) { + return new DerbySQLProvider(tableName); + } + else { + return new GenericSQLProvider(tableName); + } + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java new file mode 100644 index 00000000000..73bec723144 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; + +public class JDBCSequentialFile implements SequentialFile { + + private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class); + + private final String filename; + + private final String extension; + + private boolean isOpen = false; + + private boolean isCreated = false; + + private int id = -1; + + private final PreparedStatement appendToFile; + + private final PreparedStatement deleteFile; + + private final PreparedStatement readFile; + + private final PreparedStatement createFile; + + private final PreparedStatement selectFileByFileName; + + private final PreparedStatement copyFileRecord; + + private final PreparedStatement renameFile; + + private long readPosition = 0; + + private long writePosition = 0; + + private Executor executor; + + private JDBCSequentialFileFactory fileFactory; + + private int maxSize; + + private SQLProvider sqlProvider; + + private final Object writeLock; + + public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory, + final String filename, + final SQLProvider sqlProvider, + final Executor executor, + final Object writeLock) throws SQLException { + this.fileFactory = fileFactory; + this.filename = filename; + this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : ""; + this.executor = executor; + this.maxSize = sqlProvider.getMaxBlobSize(); + this.sqlProvider = sqlProvider; + this.writeLock = writeLock; + + Connection connection = fileFactory.getConnection(); + this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL()); + this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); + this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS); + this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL()); + this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); + this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); + this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); + } + + @Override + public boolean isOpen() { + return isOpen; + } + + @Override + public boolean exists() { + return isCreated; + } + + @Override + public synchronized void open() throws Exception { + if (!isOpen) { + try { + synchronized (writeLock) { + selectFileByFileName.setString(1, filename); + + try (ResultSet rs = selectFileByFileName.executeQuery()) { + if (!rs.next()) { + createFile.setString(1, filename); + createFile.setString(2, extension); + createFile.setBytes(3, new byte[0]); + createFile.executeUpdate(); + try (ResultSet keys = createFile.getGeneratedKeys()) { + keys.next(); + this.id = keys.getInt(1); + } + } + else { + this.id = rs.getInt(1); + this.writePosition = rs.getBlob(4).length(); + } + } + } + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e); + isOpen = false; + } + + isCreated = true; + isOpen = true; + } + } + + @Override + public void open(int maxIO, boolean useExecutor) throws Exception { + open(); + } + + @Override + public boolean fits(int size) { + return writePosition + size <= maxSize; + } + + @Override + public int getAlignment() throws Exception { + return 0; + } + + @Override + public int calculateBlockStart(int position) throws Exception { + return 0; + } + + @Override + public String getFileName() { + return filename; + } + + @Override + public void fill(int size) throws Exception { + // Do nothing + } + + @Override + public void delete() throws IOException, InterruptedException, ActiveMQException { + try { + if (isCreated) { + deleteFile.setInt(1, id); + deleteFile.executeUpdate(); + } + } + catch (SQLException e) { + throw new IOException(e); + } + } + + private synchronized int internalWrite(byte[] data, IOCallback callback) { + try { + synchronized (writeLock) { + int noBytes = data.length; + appendToFile.setBytes(1, data); + appendToFile.setInt(2, id); + int result = appendToFile.executeUpdate(); + if (result < 1) + throw new ActiveMQException("No record found for file id: " + id); + seek(noBytes); + if (callback != null) + callback.done(); + return noBytes; + } + } + catch (Exception e) { + e.printStackTrace(); + if (callback != null) + callback.onError(-1, e.getMessage()); + } + return -1; + } + + public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) { + byte[] data = new byte[buffer.readableBytes()]; + buffer.readBytes(data); + return internalWrite(data, callback); + } + + private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { + return internalWrite(buffer.array(), callback); + } + + public void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) { + executor.execute(new Runnable() { + @Override + public void run() { + internalWrite(bytes, callback); + } + }); + } + + public void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { + executor.execute(new Runnable() { + @Override + public void run() { + internalWrite(bytes, callback); + } + }); + } + + synchronized void seek(long noBytes) { + writePosition += noBytes; + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception { + // We ignore sync since we schedule writes straight away. + scheduleWrite(bytes, callback); + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync) throws Exception { + write(bytes, sync, null); + } + + @Override + public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception { + ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize()); + bytes.encode(data); + scheduleWrite(data, callback); + } + + @Override + public void write(EncodingSupport bytes, boolean sync) throws Exception { + write(bytes, sync, null); + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { + if (callback == null) { + SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback(); + try { + scheduleWrite(bytes, waitIOCallback); + waitIOCallback.waitCompletion(); + } + catch (Exception e) { + waitIOCallback.onError(-1, e.getMessage()); + } + } + else { + scheduleWrite(bytes, callback); + } + + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception { + writeDirect(bytes, sync, null); + // Are we meant to block here? + } + + @Override + public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException { + synchronized (writeLock) { + readFile.setInt(1, id); + try (ResultSet rs = readFile.executeQuery()) { + if (rs.next()) { + Blob blob = rs.getBlob(1); + + long bytesRemaining = blob.length() - readPosition; + byte[] data; + if (bytesRemaining > bytes.remaining()) { + // First index into blob is 1 (not 0) + data = blob.getBytes(readPosition + 1, bytes.remaining()); + } + else { + // First index into blob is 1 (not 0) + data = blob.getBytes(readPosition + 1, (int) bytesRemaining); + } + + bytes.put(data); + readPosition += data.length; + if (callback != null) + callback.done(); + + return data.length; + } + return 0; + } + catch (Exception e) { + if (callback != null) + callback.onError(-1, e.getMessage()); + return 0; + } + } + } + + @Override + public int read(ByteBuffer bytes) throws Exception { + return read(bytes, null); + } + + @Override + public void position(long pos) throws IOException { + readPosition = pos; + } + + @Override + public long position() { + return readPosition; + } + + @Override + public synchronized void close() throws Exception { + isOpen = false; + } + + @Override + public void sync() throws IOException { + // (mtaylor) We always write straight away, so we don't need to do anything here. + // (mtaylor) Is this meant to be blocking? + } + + @Override + public long size() throws Exception { + return writePosition; + } + + @Override + public void renameTo(String newFileName) throws Exception { + renameFile.setString(1, newFileName); + renameFile.setInt(2, id); + renameFile.executeUpdate(); + } + + @Override + public SequentialFile cloneFile() { + try { + JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock); + return clone; + } + catch (Exception e) { + logger.error("Error cloning file: " + filename, e); + return null; + } + } + + @Override + public void copyTo(SequentialFile cloneFile) throws Exception { + JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; + clone.open(); + + copyFileRecord.setInt(1, id); + copyFileRecord.setInt(2, clone.getId()); + copyFileRecord.executeUpdate(); + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getFilename() { + return filename; + } + + public String getExtension() { + return extension; + } + + // Only Used by Journal, no need to implement. + @Override + public void setTimedBuffer(TimedBuffer buffer) { + } + + // Only Used by replication, no need to implement. + @Override + public File getJavaFile() { + return null; + } +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java new file mode 100644 index 00000000000..4231907797d --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file; + +import java.io.File; +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.jdbc.store.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { + + private Connection connection; + + private String connectionUrl; + + private final Driver driver; + + private boolean started; + + private final String tableName; + + private List files; + + private PreparedStatement selectFileNamesByExtension; + + private Executor executor; + + private SQLProvider sqlProvider; + + private Map fileLocks = new HashMap<>(); + + public JDBCSequentialFileFactory(final String connectionUrl, + final String tableName, + final String className, + Executor executor) throws Exception { + this.connectionUrl = connectionUrl; + this.executor = executor; + this.tableName = tableName.toUpperCase(); + + files = new ArrayList<>(); + sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName); + driver = JDBCUtils.getDriver(className); + } + + public Connection getConnection() { + return connection; + } + + @Override + public SequentialFile createSequentialFile(String fileName) { + try { + fileLocks.putIfAbsent(fileName, new Object()); + JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName)); + files.add(file); + return file; + } + catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error("Could not create file", e); + } + return null; + } + + @Override + public int getMaxIO() { + return 1; + } + + @Override + public List listFiles(String extension) throws Exception { + List fileNames = new ArrayList<>(); + + selectFileNamesByExtension.setString(1, extension); + try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { + while (rs.next()) { + fileNames.add(rs.getString(1)); + } + } + return fileNames; + } + + @Override + public boolean isSupportsCallbacks() { + return true; + } + + @Override + public void onIOError(Exception exception, String message, SequentialFile file) { + } + + @Override + public ByteBuffer allocateDirectBuffer(final int size) { + return NIOSequentialFileFactory.allocateDirectByteBuffer(size); + } + + @Override + public void releaseDirectBuffer(ByteBuffer buffer) { + // nothing we can do on this case. we can just have good faith on GC + } + + @Override + public ByteBuffer newBuffer(final int size) { + return ByteBuffer.allocate(size); + } + + @Override + public void clearBuffer(final ByteBuffer buffer) { + final int limit = buffer.limit(); + buffer.rewind(); + + for (int i = 0; i < limit; i++) { + buffer.put((byte) 0); + } + + buffer.rewind(); + } + + @Override + public ByteBuffer wrapBuffer(final byte[] bytes) { + return ByteBuffer.wrap(bytes); + } + + @Override + public int getAlignment() { + return 1; + } + + @Override + public int calculateBlockSize(final int bytes) { + return bytes; + } + + @Override + public void deactivateBuffer() { + } + + @Override + public void releaseBuffer(final ByteBuffer buffer) { + } + + @Override + public void activateBuffer(SequentialFile file) { + + } + + @Override + public File getDirectory() { + return null; + } + + @Override + public synchronized void start() { + try { + if (!started) { + connection = driver.connect(connectionUrl, new Properties()); + JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL()); + selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); + started = true; + } + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database"); + started = false; + } + } + + @Override + public synchronized void stop() { + try { + if (false) + connection.close(); + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection"); + } + started = false; + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void createDirs() throws Exception { + } + + @Override + public void flush() { + + } + + public synchronized void destroy() throws SQLException { + Statement statement = connection.createStatement(); + statement.executeUpdate(sqlProvider.getDropFileTableSQL()); + stop(); + } +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java new file mode 100644 index 00000000000..c14036ebbce --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file.sql; + +public class DerbySQLProvider extends GenericSQLProvider { + + // Derby max blob size = 2G + private static final int MAX_BLOB_SIZE = 2147483647; + + private final String createFileTableSQL; + + private final String appendToFileSQL; + + public DerbySQLProvider(String tableName) { + super(tableName); + + createFileTableSQL = "CREATE TABLE " + tableName + + "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + + "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; + + appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?"; + } + + @Override + public int getMaxBlobSize() { + return MAX_BLOB_SIZE; + } + + @Override + public String getCreateFileTableSQL() { + return createFileTableSQL; + } + + @Override + public String getAppendToFileSQL() { + return appendToFileSQL; + } +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java new file mode 100644 index 00000000000..c95edb3d53a --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file.sql; + +public class GenericSQLProvider implements SQLProvider { + + // Default to lowest (MYSQL = 64k) + private static final int MAX_BLOB_SIZE = 64512; + + private final String tableName; + + private final String createFileTableSQL; + + private final String insertFileSQL; + + private final String selectFileNamesByExtensionSQL; + + private final String selectIdByFileNameSQL; + + private final String appendToFileSQL; + + private final String readFileSQL; + + private final String deleteFileSQL; + + private final String updateFileNameByIdSQL; + + private final String copyFileRecordByIdSQL; + + private final String cloneFileRecordSQL; + + private final String dropFileTableSQL; + + public GenericSQLProvider(String tableName) { + this.tableName = tableName; + + createFileTableSQL = "CREATE TABLE " + tableName + + "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; + + insertFileSQL = "INSERT INTO " + tableName + + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)"; + + selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?"; + + selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?"; + + appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?"; + + readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?"; + + deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?"; + + updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?"; + + cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)"; + + copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?"; + + dropFileTableSQL = "DROP TABLE " + tableName; + } + + @Override + public int getMaxBlobSize() { + return MAX_BLOB_SIZE; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public String getCreateFileTableSQL() { + return createFileTableSQL; + } + + @Override + public String getInsertFileSQL() { + return insertFileSQL; + } + + @Override + public String getSelectFileByFileName() { + return selectIdByFileNameSQL; + } + + @Override + public String getSelectFileNamesByExtensionSQL() { + return selectFileNamesByExtensionSQL; + } + + @Override + public String getAppendToFileSQL() { + return appendToFileSQL; + } + + @Override + public String getReadFileSQL() { + return readFileSQL; + } + + @Override + public String getDeleteFileSQL() { + return deleteFileSQL; + } + + @Override + public String getUpdateFileNameByIdSQL() { + return updateFileNameByIdSQL; + } + + @Override + public String getCopyFileRecordByIdSQL() { + return copyFileRecordByIdSQL; + } + + @Override + public String getCloneFileRecordByIdSQL() { + return cloneFileRecordSQL; + } + + @Override + public String getDropFileTableSQL() { + return dropFileTableSQL; + } + + +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java new file mode 100644 index 00000000000..e9fe36c1b73 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file.sql; + +public interface SQLProvider { + + int getMaxBlobSize(); + + String getTableName(); + + String getCreateFileTableSQL(); + + String getInsertFileSQL(); + + String getSelectFileNamesByExtensionSQL(); + + String getSelectFileByFileName(); + + String getAppendToFileSQL(); + + String getReadFileSQL(); + + String getDeleteFileSQL(); + + String getUpdateFileNameByIdSQL(); + + String getCopyFileRecordByIdSQL(); + + String getDropFileTableSQL(); + + String getCloneFileRecordByIdSQL(); +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 73a8602a02e..f25316780cd 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -229,12 +229,13 @@ public synchronized int sync() { /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ - private void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { + private synchronized void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { List iterableCopy; List iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); + for (Long txId : committedTx) { transactions.get(txId).committed = true; } @@ -319,7 +320,7 @@ private void appendRecord(JDBCJournalRecord record) throws Exception { if (callback != null) callback.waitCompletion(); } - private void addTxRecord(JDBCJournalRecord record) { + private synchronized void addTxRecord(JDBCJournalRecord record) { TransactionHolder txHolder = transactions.get(record.getTxId()); if (txHolder == null) { txHolder = new TransactionHolder(record.getTxId()); @@ -341,7 +342,7 @@ private void addTxRecord(JDBCJournalRecord record) { } } - private void removeTxRecord(JDBCJournalRecord record) { + private synchronized void removeTxRecord(JDBCJournalRecord record) { TransactionHolder txHolder = transactions.get(record.getTxId()); // We actually only need the record ID in this instance. diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java new file mode 100644 index 00000000000..554f36b832b --- /dev/null +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.file; + +import java.nio.ByteBuffer; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; +import org.apache.derby.jdbc.EmbeddedDriver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JDBCSequentialFileFactoryTest { + + private static String connectionUrl = "jdbc:derby:target/data;create=true"; + + private static String tableName = "FILES"; + + private static String className = EmbeddedDriver.class.getCanonicalName(); + + private JDBCSequentialFileFactory factory; + + @Before + public void setup() throws Exception { + Executor executor = Executors.newSingleThreadExecutor(); + + factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor); + factory.start(); + } + + @After + public void tearDown() throws SQLException { + factory.destroy(); + } + + @Test + public void testJDBCFileFactoryStarted() throws Exception { + assertTrue(factory.isStarted()); + } + + @Test + public void testCreateFiles() throws Exception { + int noFiles = 100; + Set fileNames = new HashSet(); + for (int i = 0; i < noFiles; i++) { + String fileName = UUID.randomUUID().toString() + ".txt"; + fileNames.add(fileName); + SequentialFile file = factory.createSequentialFile(fileName); + // We create files on Open + file.open(); + } + + List queryFileNames = factory.listFiles("txt"); + assertTrue(queryFileNames.containsAll(fileNames)); + } + + @Test + public void testAsyncAppendToFile() throws Exception { + + JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt"); + file.open(); + + // Create buffer and fill with test data + int bufferSize = 1024; + ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize); + for (int i = 0; i < bufferSize; i++) { + src.writeByte((byte) 1); + } + + IOCallbackCountdown callback = new IOCallbackCountdown(1); + file.internalWrite(src, callback); + + callback.assertEmpty(5); + checkData(file, src); + } + + @Test + public void testCopyFile() throws Exception { + JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt"); + file.open(); + + // Create buffer and fill with test data + int bufferSize = 1024; + ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize); + for (int i = 0; i < bufferSize; i++) { + src.writeByte((byte) 5); + } + + IOCallbackCountdown callback = new IOCallbackCountdown(1); + file.internalWrite(src, callback); + + JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt"); + file.copyTo(copy); + + checkData(copy, src); + checkData(file, src); + } + + @Test + public void testCloneFile() throws Exception { + JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt"); + file.open(); + + // Create buffer and fill with test data + int bufferSize = 1024; + ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize); + for (int i = 0; i < bufferSize; i++) { + src.writeByte((byte) 5); + } + + IOCallbackCountdown callback = new IOCallbackCountdown(1); + file.internalWrite(src, callback); + + JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile(); + } + + private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException { + expectedData.resetReaderIndex(); + + byte[] resultingBytes = new byte[expectedData.readableBytes()]; + ByteBuffer byteBuffer = ByteBuffer.allocate(expectedData.readableBytes()); + + file.read(byteBuffer, null); + expectedData.getBytes(0, resultingBytes); + + assertArrayEquals(resultingBytes, byteBuffer.array()); + } + + private class IOCallbackCountdown implements IOCallback { + + private final CountDownLatch countDownLatch; + + public IOCallbackCountdown(int size) { + this.countDownLatch = new CountDownLatch(size); + } + + @Override + public void done() { + countDownLatch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + fail(errorMessage); + } + + public void assertEmpty(int timeout) throws InterruptedException { + countDownLatch.await(timeout, TimeUnit.SECONDS); + assertEquals(countDownLatch.getCount(), 0); + } + } +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index 67c30384fb5..a5884b9afb0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -65,18 +65,7 @@ public NIOSequentialFileFactory(final File journalDir, super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); } - @Override - public SequentialFile createSequentialFile(final String fileName) { - return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); - } - - @Override - public boolean isSupportsCallbacks() { - return timedBuffer != null; - } - - @Override - public ByteBuffer allocateDirectBuffer(final int size) { + public static ByteBuffer allocateDirectByteBuffer(final int size) { // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 ByteBuffer buffer2 = null; try { @@ -104,6 +93,21 @@ public ByteBuffer allocateDirectBuffer(final int size) { return buffer2; } + @Override + public SequentialFile createSequentialFile(final String fileName) { + return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); + } + + @Override + public boolean isSupportsCallbacks() { + return timedBuffer != null; + } + + @Override + public ByteBuffer allocateDirectBuffer(final int size) { + return NIOSequentialFileFactory.allocateDirectByteBuffer(size); + } + @Override public void releaseDirectBuffer(ByteBuffer buffer) { // nothing we can do on this case. we can just have good faith on GC From d19a7ddd6ddc25db5d1839b15c4bc3164cb25731 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Tue, 3 May 2016 14:37:37 +0100 Subject: [PATCH 2/3] ARTEMIS-514 Add support for LargeMEssages backed by Database --- .../config/ActiveMQDefaultConfiguration.java | 9 +- .../storage/DatabaseStorageConfiguration.java | 10 + .../impl/FileConfigurationParser.java | 3 +- .../journal/JDBCJournalStorageManager.java | 38 +++- .../impl/journal/JournalStorageManager.java | 4 +- .../impl/journal/OperationContextImpl.java | 1 + .../schema/artemis-configuration.xsd | 7 + .../artemis/tests/util/ActiveMQTestBase.java | 43 +++-- .../test/resources/database-store-config.xml | 1 + docs/user-manual/en/persistence.md | 9 +- .../client/InterruptedLargeMessageTest.java | 5 + .../LargeMessageAvoidLargeMessagesTest.java | 4 +- .../client/LargeMessageCompressTest.java | 4 +- .../integration/client/LargeMessageTest.java | 181 ++++++++++-------- .../jdbc/store/journal/JDBCJournalTest.java | 3 +- .../largemessage/LargeMessageTestBase.java | 54 +++++- .../persistence/StorageManagerTestBase.java | 2 +- .../stress/chunk/LargeMessageStressTest.java | 5 + 18 files changed, 259 insertions(+), 124 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index efaa7807c52..8fe6e4024ef 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -23,7 +23,7 @@ * Default values of ActiveMQ Artemis configuration parameters. */ public final class ActiveMQDefaultConfiguration { - /* + /* *

In order to avoid compile time in-lining of constants, all access is done through methods * and all fields are PRIVATE STATIC but not FINAL. This is done following the recommendation at * 13.4.9. @@ -414,6 +414,9 @@ public static String getDefaultHapolicyBackupStrategy() { // Default bindings table name, used with Database storage type private static String DEFAULT_BINDINGS_TABLE_NAME = "BINDINGS"; + // Default large messages table name, used with Database storage type + private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES"; + /** * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. */ @@ -1103,4 +1106,8 @@ public static String getDefaultBindingsTableName() { public static String getDefaultDriverClassName() { return DEFAULT_JDBC_DRIVER_CLASS_NAME; } + + public static String getDefaultLargeMessagesTableName() { + return DEFAULT_LARGE_MESSAGES_TABLE_NAME; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 8ff62eeee66..49a2251065a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -25,6 +25,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private String bindingsTableName = ActiveMQDefaultConfiguration.getDefaultBindingsTableName(); + private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName(); + private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); @@ -49,6 +51,14 @@ public void setBindingsTableName(String bindingsTableName) { this.bindingsTableName = bindingsTableName; } + public String getLargeMessageTableName() { + return largeMessagesTableName; + } + + public void setLargeMessageTableName(String largeMessagesTableName) { + this.largeMessagesTableName = largeMessagesTableName; + } + public void setJdbcConnectionUrl(String jdbcConnectionUrl) { this.jdbcConnectionUrl = jdbcConnectionUrl; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 12ac5e19ed1..0a6391b3cd6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1144,11 +1144,10 @@ private ScaleDownConfiguration parseScaleDownConfig(Element policyNode) { } private DatabaseStorageConfiguration createDatabaseStoreConfig(Element storeNode) { - NodeList databaseStoreNode = storeNode.getElementsByTagName("database-store"); - DatabaseStorageConfiguration conf = new DatabaseStorageConfiguration(); conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK)); conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK)); + conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK)); return conf; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index f23b58bfb92..25b7cd1bc29 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -16,13 +16,16 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; +import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -39,14 +42,25 @@ public JDBCJournalStorageManager(final Configuration config, } @Override - protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { - DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); + protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { + try { + DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); - Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName()); - bindingsJournal = localBindings; + Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName()); + bindingsJournal = localBindings; - Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName()); - messageJournal = localMessage; + Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName()); + messageJournal = localMessage; + + bindingsJournal.start(); + messageJournal.start(); + + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executor); + largeMessagesFactory.start(); + } + catch (Exception e) { + criticalErrorListener.onIOException(e, e.getMessage(), null); + } } @Override @@ -76,7 +90,9 @@ public void run() { ((JDBCJournalImpl) bindingsJournal).stop(false); - messageJournal.stop(); + ((JDBCJournalImpl) messageJournal).stop(false); + + largeMessagesFactory.stop(); singleThreadExecutor.shutdown(); @@ -85,4 +101,12 @@ public void run() { started = false; } + @Override + public ByteBuffer allocateDirectBuffer(int size) { + return NIOSequentialFileFactory.allocateDirectByteBuffer(size); + } + + @Override + public void freeDirectBuffer(ByteBuffer buffer) { + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 7deebcb6e4d..acdf57bc66a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -66,13 +66,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager { private SequentialFileFactory journalFF; - private SequentialFileFactory largeMessagesFactory; + SequentialFileFactory largeMessagesFactory; private Journal originalMessageJournal; private Journal originalBindingsJournal; - private String largeMessagesDirectory; + protected String largeMessagesDirectory; private ReplicationManager replicator; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index a34a0ad5109..acd75b1d5f1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -143,6 +143,7 @@ public void executeOnCompletion(final IOCallback completion) { } // On this case, we can just execute the context directly + if (replicationLineUp.intValue() == replicated && storeLineUp.intValue() == stored && pageLineUp.intValue() == paged) { // We want to avoid the executor if everything is complete... diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index ef6f1063e8f..38961f0a75e 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1544,6 +1544,13 @@ + + + + The table name used to large message files + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 92fdd72b71b..cf9ee846e09 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -78,6 +78,7 @@ import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; @@ -396,20 +397,13 @@ protected Configuration createDefaultConfig(final boolean netty) throws Exceptio return createDefaultConfig(0, netty); } - protected Configuration createDefaultJDBCConfig() throws Exception { - Configuration configuration = createDefaultConfig(true); - - DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); - dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); - dbStorageConfiguration.setBindingsTableName("BINDINGS"); - dbStorageConfiguration.setMessageTableName("MESSAGES"); - dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver"); - - configuration.setStoreConfiguration(dbStorageConfiguration); - + protected Configuration createDefaultJDBCConfig(boolean isNetty) throws Exception { + Configuration configuration = createDefaultConfig(isNetty); + setDBStoreType(configuration); return configuration; } + protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID))); @@ -448,6 +442,16 @@ protected ConfigurationImpl createBasicConfig(final int serverID) { return configuration; } + private void setDBStoreType(Configuration configuration) { + DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); + dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); + dbStorageConfiguration.setBindingsTableName("BINDINGS"); + dbStorageConfiguration.setMessageTableName("MESSAGES"); + dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver"); + + configuration.setStoreConfiguration(dbStorageConfiguration); + } + protected Map generateInVMParams(final int node) { Map params = new HashMap<>(); @@ -1388,6 +1392,18 @@ protected final ActiveMQServer createServer(final boolean realFiles, return server; } + protected final ActiveMQServer createServer(final boolean realFiles, + final Configuration configuration, + final long pageSize, + final long maxAddressSize, + final Map settings, + StoreConfiguration.StoreType storeType) { + if (storeType == StoreConfiguration.StoreType.DATABASE) { + setDBStoreType(configuration); + } + return createServer(realFiles, configuration, pageSize, maxAddressSize, settings); + } + protected final ActiveMQServer createServer(final boolean realFiles) throws Exception { return createServer(realFiles, false); } @@ -1404,6 +1420,11 @@ protected final ActiveMQServer createServer(final Configuration configuration) { return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap()); } + protected ActiveMQServer createServer(final boolean realFiles, boolean isNetty, StoreConfiguration.StoreType storeType) throws Exception { + Configuration configuration = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty) : createDefaultConfig(isNetty); + return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap()); + } + protected ActiveMQServer createInVMFailoverServer(final boolean realFiles, final Configuration configuration, final NodeManager nodeManager, diff --git a/artemis-server/src/test/resources/database-store-config.xml b/artemis-server/src/test/resources/database-store-config.xml index 7d08ad49127..c3be405ac97 100644 --- a/artemis-server/src/test/resources/database-store-config.xml +++ b/artemis-server/src/test/resources/database-store-config.xml @@ -24,6 +24,7 @@ jdbc:derby:target/derby/database-store;create=true BINDINGS_TABLE MESSAGE_TABLE + LARGE_MESSAGE_TABLE org.apache.derby.jdbc.EmbeddedDriver diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index 83180b7ccd6..2d50c42aa91 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -376,6 +376,7 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a jdbc:derby:target/derby/database-store;create=true BINDINGS_TABLE MESSAGE_TABLE + LARGE_MESSAGES_TABLE org.apache.derby.jdbc.EmbeddedDriver @@ -384,13 +385,17 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a - `jdbc-connection-url` The full JDBC connection URL for your database server. The connection url should include all configuration parameters and database name. - + - `bindings-table-name` The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference. - + - `message-table-name` + The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference. + +- `large-message-table-name` + The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference. - `jdbc-driver-class-name` diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index 509429d4d8c..b9bdc3f5518 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -72,6 +73,10 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { protected ServerLocator locator; + public InterruptedLargeMessageTest(StoreConfiguration.StoreType storeType) { + super(storeType); + } + @Override @Before public void setUp() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java index 2d92e4243c5..6cf150a05e9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -37,7 +38,8 @@ */ public class LargeMessageAvoidLargeMessagesTest extends LargeMessageTest { - public LargeMessageAvoidLargeMessagesTest() { + public LargeMessageAvoidLargeMessagesTest(StoreConfiguration.StoreType storeType) { + super(storeType); isCompressedTest = true; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index ae5221f8c86..70a9cb4fc7e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; @@ -45,7 +46,8 @@ public class LargeMessageCompressTest extends LargeMessageTest { // Constructors -------------------------------------------------- - public LargeMessageCompressTest() { + public LargeMessageCompressTest(StoreConfiguration.StoreType storeType) { + super(storeType); isCompressedTest = true; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index bc94ab32c39..abcf195c39b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -29,17 +29,17 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; @@ -48,33 +48,35 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; -import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class LargeMessageTest extends LargeMessageTestBase { - // Constants ----------------------------------------------------- static final int RECEIVE_WAIT_TIME = 10000; - private final int LARGE_MESSAGE_SIZE = 20 * 1024; - private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; protected ServerLocator locator; protected boolean isCompressedTest = false; - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- + private int largeMessageSize; protected boolean isNetty() { return false; } + public LargeMessageTest(StoreConfiguration.StoreType storeType) { + super(storeType); + // The JDBC Large Message store is pretty slow, to speed tests up we only test 5MB large messages + largeMessageSize = (storeType == StoreConfiguration.StoreType.DATABASE) ? 5 * 1024 : 100 * 1024; + } + @Test public void testRollbackPartiallyConsumedBuffer() throws Exception { for (int i = 0; i < 1; i++) { @@ -82,9 +84,7 @@ public void testRollbackPartiallyConsumedBuffer() throws Exception { internalTestRollbackPartiallyConsumedBuffer(false); tearDown(); setUp(); - } - } @Test @@ -93,11 +93,12 @@ public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exce } private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception { - final int messageSize = 100 * 1024; + final int messageSize = largeMessageSize; final ClientSession session; - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); + AddressSettings settings = new AddressSettings(); if (redeliveryDelay) { @@ -184,7 +185,7 @@ public void testCloseConsumer() throws Exception { ClientSession session = null; - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -229,7 +230,7 @@ public void testCloseConsumer() throws Exception { public void testDeleteOnNoBinding() throws Exception { final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -277,7 +278,7 @@ private void fillAddress() throws Exception { Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); - ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap()); + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap(), storeType); server.start(); @@ -338,7 +339,8 @@ public void doTestLargeBuffer(boolean transacted) throws Exception { ClientSession session = null; - Configuration config = createDefaultConfig(isNetty()).setJournalFileSize(journalsize).setJournalBufferSize_AIO(10 * 1024).setJournalBufferSize_NIO(10 * 1024); + Configuration config = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty()) : createDefaultConfig(isNetty()); + config.setJournalFileSize(journalsize).setJournalBufferSize_AIO(10 * 1024).setJournalBufferSize_NIO(10 * 1024); ActiveMQServer server = createServer(true, config); @@ -396,7 +398,7 @@ public void testDLALargeMessage() throws Exception { ClientSession session = null; - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -445,7 +447,7 @@ public void testDLALargeMessage() throws Exception { session.close(); server.stop(); - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); @@ -469,7 +471,9 @@ public void testDLALargeMessage() throws Exception { session.commit(); - validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), isCompressedTest ? 0 : 1); + if (storeType != StoreConfiguration.StoreType.DATABASE) { + validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), isCompressedTest ? 0 : 1); + } consumer = session.createConsumer(ADDRESS.concat("-2")); @@ -487,7 +491,9 @@ public void testDLALargeMessage() throws Exception { session.close(); - validateNoFilesOnLargeDir(); + if (storeType != StoreConfiguration.StoreType.DATABASE) { + validateNoFilesOnLargeDir(); + } } @Test @@ -496,7 +502,7 @@ public void testDeliveryCount() throws Exception { ClientSession session = null; - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -557,7 +563,7 @@ public void testDLAOnExpiryNonDurableMessage() throws Exception { ClientSession session = null; - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -665,7 +671,7 @@ public void testDLAOnExpiry() throws Exception { ClientSession session = null; - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -734,7 +740,7 @@ public void testDLAOnExpiry() throws Exception { session.close(); server.stop(); - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); @@ -772,7 +778,7 @@ public void testExpiryLargeMessage() throws Exception { ClientSession session = null; try { - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -819,7 +825,7 @@ public void testExpiryLargeMessage() throws Exception { session.close(); server.stop(); - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); @@ -872,7 +878,7 @@ private void internalTestSentWithDuplicateID(final boolean isSimulateBridge) thr ClientSession session = null; try { - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -944,7 +950,7 @@ public void internalTestResendMessage(final long messageSize) throws Exception { ClientSession session = null; try { - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -1019,7 +1025,7 @@ public void internalTestCachedResendMessage(final long messageSize) throws Excep ClientSession session = null; try { - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -1090,12 +1096,12 @@ private void compareString(final long messageSize, ClientMessage msg) { @Test public void testFilePersistenceOneHugeMessage() throws Exception { - testChunks(false, false, false, true, true, false, false, false, false, 1, 100 * 1024L * 1024L, LargeMessageTest.RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024); + testChunks(false, false, false, true, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024); } @Test public void testFilePersistenceOneMessageStreaming() throws Exception { - testChunks(false, false, false, true, true, false, false, false, false, 1, 100 * 1024L * 1024L, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, false, true, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test @@ -1105,22 +1111,22 @@ public void testFilePersistenceSmallMessageStreaming() throws Exception { @Test public void testFilePersistenceOneHugeMessageConsumer() throws Exception { - testChunks(false, false, false, true, true, false, false, false, true, 1, 100 * 1024 * 1024, 120000, 0, 10 * 1024 * 1024, 1024 * 1024); + testChunks(false, false, false, true, true, false, false, false, true, 1, largeMessageSize, 120000, 0, 10 * 1024 * 1024, 1024 * 1024); } @Test public void testFilePersistence() throws Exception { - testChunks(false, false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, true, false, false, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceConsumer() throws Exception { - testChunks(false, false, true, false, true, false, false, true, true, 2, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, true, false, false, true, true, 2, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceXA() throws Exception { - testChunks(true, false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, true, false, false, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test @@ -1135,122 +1141,122 @@ public void testFilePersistenceXAStreamRestart() throws Exception { @Test public void testFilePersistenceXAConsumer() throws Exception { - testChunks(true, false, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, true, false, false, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceXAConsumerRestart() throws Exception { - testChunks(true, true, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, true, true, false, true, false, false, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlocked() throws Exception { - testChunks(false, false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, true, false, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedConsumer() throws Exception { - testChunks(false, false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, true, false, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedXA() throws Exception { - testChunks(true, false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, true, false, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedXAConsumer() throws Exception { - testChunks(true, false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, true, false, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedPreACK() throws Exception { - testChunks(false, false, true, false, true, true, true, true, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, true, true, true, true, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedPreACKConsumer() throws Exception { - testChunks(false, false, true, false, true, true, true, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, true, true, true, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedPreACKXA() throws Exception { - testChunks(true, false, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, true, true, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedPreACKXARestart() throws Exception { - testChunks(true, true, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, true, true, false, true, true, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedPreACKXAConsumer() throws Exception { - testChunks(true, false, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, true, true, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceBlockedPreACKXAConsumerRestart() throws Exception { - testChunks(true, true, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, true, true, false, true, true, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testFilePersistenceDelayed() throws Exception { - testChunks(false, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); } @Test public void testFilePersistenceDelayedConsumer() throws Exception { - testChunks(false, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); } @Test public void testFilePersistenceDelayedXA() throws Exception { - testChunks(true, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); } @Test public void testFilePersistenceDelayedXAConsumer() throws Exception { - testChunks(true, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); + testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000); } @Test public void testNullPersistence() throws Exception { - testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testNullPersistenceConsumer() throws Exception { - testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(false, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testNullPersistenceXA() throws Exception { - testChunks(true, false, true, false, false, false, false, true, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, false, false, false, true, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testNullPersistenceXAConsumer() throws Exception { - testChunks(true, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0); + testChunks(true, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0); } @Test public void testNullPersistenceDelayed() throws Exception { - testChunks(false, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100); + testChunks(false, false, true, false, false, false, false, false, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testNullPersistenceDelayedConsumer() throws Exception { - testChunks(false, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100); + testChunks(false, false, true, false, false, false, false, false, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testNullPersistenceDelayedXA() throws Exception { - testChunks(true, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100); + testChunks(true, false, true, false, false, false, false, false, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test public void testNullPersistenceDelayedXAConsumer() throws Exception { - testChunks(true, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100); + testChunks(true, false, true, false, false, false, false, false, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100); } @Test @@ -1343,7 +1349,7 @@ public void testTwoBindingsTwoStartedConsumers() throws Exception { // there are two bindings.. one is ACKed, the other is not, the server is restarted // The other binding is acked... The file must be deleted - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -1410,7 +1416,7 @@ public void testTwoBindingsNoRestart() throws Exception { public void testTwoBindings(final boolean restart) throws Exception { // there are two bindings.. one is ACKed, the other is not, the server is restarted // The other binding is acked... The file must be deleted - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -1439,7 +1445,7 @@ public void testTwoBindings(final boolean restart) throws Exception { server.stop(); - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); @@ -1478,7 +1484,7 @@ public void testSendRollbackNonDurable() throws Exception { private void internalTestSendRollback(final boolean isXA, final boolean durable) throws Exception { ClientSession session = null; - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -1536,7 +1542,7 @@ public void testSimpleRollbackXA() throws Exception { public void simpleRollbackInternalTest(final boolean isXA) throws Exception { // there are two bindings.. one is ACKed, the other is not, the server is restarted // The other binding is acked... The file must be deleted - ActiveMQServer server = createServer(true, isNetty()); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -1644,7 +1650,7 @@ public void testBufferMultipleLargeMessages() throws Exception { final int NUMBER_OF_MESSAGES = 30; try { - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); @@ -1730,7 +1736,7 @@ public void testReceiveMultipleMessages() throws Exception { final int NUMBER_OF_MESSAGES = 1000; try { - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); @@ -1821,7 +1827,7 @@ public void testPageOnLargeMessageMultipleQueues() throws Exception { AddressSettings value = new AddressSettings(); map.put(ADDRESS.toString(), value); - ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType); server.start(); final int numberOfBytes = 1024; @@ -1871,7 +1877,7 @@ public void testPageOnLargeMessageMultipleQueues() throws Exception { server.stop(); - server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType); server.start(); sf = createSessionFactory(locator); @@ -1948,7 +1954,7 @@ public void testPageOnLargeMessageMultipleQueues2() throws Exception { AddressSettings value = new AddressSettings(); map.put(ADDRESS.toString(), value); - ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType); server.start(); final int numberOfBytes = 1024; @@ -2048,11 +2054,11 @@ public void testSendStreamingSingleMessage() throws Exception { final int SIZE = 10 * 1024 * 1024; try { - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); - locator.setMinLargeMessageSize(100 * 1024); + locator.setMinLargeMessageSize(largeMessageSize * 1024); ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); @@ -2081,7 +2087,7 @@ public void testSendStreamingSingleMessage() throws Exception { msg2.acknowledge(); msg2.setOutputStream(createFakeOutputStream()); - Assert.assertTrue(msg2.waitOutputStreamCompletion(60000)); + Assert.assertTrue(msg2.waitOutputStreamCompletion(0)); session.commit(); @@ -2089,6 +2095,10 @@ public void testSendStreamingSingleMessage() throws Exception { Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); } + catch (Throwable t) { + t.printStackTrace(); + throw t; + } finally { try { session.close(); @@ -2115,11 +2125,11 @@ public void testSendStreamingSingleEmptyMessage() throws Exception { final int SIZE = 0; try { - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); - locator.setMinLargeMessageSize(100 * 1024); + locator.setMinLargeMessageSize(largeMessageSize * 1024); ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); @@ -2184,11 +2194,11 @@ public void testSendStreamingEmptyMessagesWithRestart() throws Exception { final int SIZE = 0; try { - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); - locator.setMinLargeMessageSize(100 * 1024); + locator.setMinLargeMessageSize(largeMessageSize * 1024); ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); @@ -2265,7 +2275,7 @@ public void testIgnoreStreaming() throws Exception { final int SIZE = 10 * 1024; final int NUMBER_OF_MESSAGES = 1; - server = createServer(true, isNetty()); + server = createServer(true, isNetty(), storeType); server.start(); @@ -2320,7 +2330,7 @@ public void testIgnoreStreaming() throws Exception { // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge @Test public void testSendServerMessage() throws Exception { - ActiveMQServer server = createServer(true); + ActiveMQServer server = createServer(true, isNetty(), storeType); server.start(); @@ -2332,12 +2342,12 @@ public void testSendServerMessage() throws Exception { fileMessage.setMessageID(1005); - for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) { + for (int i = 0; i < largeMessageSize; i++) { fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)}); } // The server would be doing this - fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, LARGE_MESSAGE_SIZE); + fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize); fileMessage.releaseResources(); @@ -2359,9 +2369,9 @@ public void testSendServerMessage() throws Exception { Assert.assertNotNull(msg); - Assert.assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE); + Assert.assertEquals(msg.getBodySize(), largeMessageSize); - for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) { + for (int i = 0; i < largeMessageSize; i++) { Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), msg.getBodyBuffer().readByte()); } @@ -2379,6 +2389,7 @@ public void testSendServerMessage() throws Exception { public void setUp() throws Exception { super.setUp(); locator = createFactory(isNetty()); + locator.setCallTimeout(100000000); } protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception { @@ -2392,7 +2403,7 @@ protected void testPageOnLargeMessage(final boolean realFiles, final boolean sen AddressSettings value = new AddressSettings(); map.put(ADDRESS.toString(), value); - ActiveMQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map); + ActiveMQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map, storeType); server.start(); final int numberOfBytes = 1024; @@ -2435,7 +2446,7 @@ protected void testPageOnLargeMessage(final boolean realFiles, final boolean sen if (realFiles) { server.stop(); - server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType); server.start(); sf = createSessionFactory(locator); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index ac2188de39d..7eb4b9a2fa6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.junit.After; import org.junit.Before; @@ -33,7 +34,7 @@ import static org.junit.Assert.assertEquals; -public class JDBCJournalTest { +public class JDBCJournalTest extends ActiveMQTestBase { @Rule public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java index 0d51d9c9a8c..d7d5b1c84a8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java @@ -28,6 +28,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -35,18 +37,23 @@ import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DeflaterReader; import org.junit.Assert; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collection; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +@RunWith(Parameterized.class) public abstract class LargeMessageTestBase extends ActiveMQTestBase { // Constants ----------------------------------------------------- @@ -66,6 +73,20 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase { // Protected ----------------------------------------------------- + protected StoreConfiguration.StoreType storeType; + + public LargeMessageTestBase(StoreConfiguration.StoreType storeType) { + this.storeType = storeType; + } + + @Parameterized.Parameters(name = "storeType={0}") + public static Collection data() { +// Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; + Object[][] params = new Object[][] {{StoreConfiguration.StoreType.DATABASE}}; + //Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}}; + return Arrays.asList(params); + } + protected void testChunks(final boolean isXA, final boolean restartOnXA, final boolean rollbackFirstSend, @@ -99,7 +120,15 @@ protected void testChunks(final boolean isXA, final int minSize) throws Exception { clearDataRecreateServerDirs(); - ActiveMQServer server = createServer(realFiles); + Configuration configuration; + if (storeType == StoreConfiguration.StoreType.DATABASE) { + configuration = createDefaultJDBCConfig(true); + } + else { + configuration = createDefaultConfig(false); + } + + ActiveMQServer server = createServer(realFiles, configuration); server.start(); ServerLocator locator = createInVMNonHALocator(); @@ -200,7 +229,7 @@ protected void testChunks(final boolean isXA, if (realFiles) { server.stop(); - server = createServer(realFiles); + server = createServer(realFiles, configuration); server.start(); sf = locator.createSessionFactory(); @@ -352,13 +381,14 @@ public void write(final int b) throws IOException { @Override public void write(final byte[] b) throws IOException { - if (b[0] == ActiveMQTestBase.getSamplebyte(bytesRead.get())) { - bytesRead.addAndGet(b.length); - } - else { - LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get()); + if (b.length > 0) { + if (b[0] == ActiveMQTestBase.getSamplebyte(bytesRead.get())) { + bytesRead.addAndGet(b.length); + } + else { + LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get()); + } } - } @Override @@ -426,12 +456,17 @@ public void write(final int b) throws IOException { validateNoFilesOnLargeDir(); } + catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { locator.close(); try { server.stop(); } catch (Throwable ignored) { + ignored.printStackTrace(); } } } @@ -442,7 +477,7 @@ public void write(final int b) throws IOException { * @param delayDelivery * @param session * @param producer - * @throws FileNotFoundException + * @throws Exception * @throws IOException * @throws ActiveMQException */ @@ -523,7 +558,6 @@ protected ClientMessage createLargeClientMessageStreaming(final ClientSession se * @param queueToRead * @param numberOfBytes * @throws ActiveMQException - * @throws FileNotFoundException * @throws IOException */ protected void readMessage(final ClientSession session, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java index e0b746a50d5..75e5eb28b7a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java @@ -125,7 +125,7 @@ public void tearDown() throws Exception { protected void createStorage() throws Exception { if (storeType == StoreConfiguration.StoreType.DATABASE) { - journal = createJDBCJournalStorageManager(createDefaultJDBCConfig()); + journal = createJDBCJournalStorageManager(createDefaultJDBCConfig(true)); } else { journal = createJournalStorageManager(createDefaultInVMConfig()); diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/chunk/LargeMessageStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/chunk/LargeMessageStressTest.java index ff8dc3be9e2..9cae53dd848 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/chunk/LargeMessageStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/chunk/LargeMessageStressTest.java @@ -16,11 +16,16 @@ */ package org.apache.activemq.artemis.tests.stress.chunk; +import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; import org.junit.Test; public class LargeMessageStressTest extends LargeMessageTestBase { + public LargeMessageStressTest(StoreConfiguration.StoreType storeType) { + super(storeType); + } + // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- From 152e572dd1ce7bdbf191c37d4461cf95b783c9b0 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Tue, 3 May 2016 14:37:57 +0100 Subject: [PATCH 3/3] ARTEMIS-515 Enable XA Tests for JDBC Journal --- .../artemis/tests/integration/xa/BasicXaRecoveryTest.java | 2 +- .../activemq/artemis/tests/integration/xa/BasicXaTest.java | 2 +- .../activemq/artemis/tests/integration/xa/XaTimeoutTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java index 59cba5826b8..53945657166 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java @@ -101,7 +101,7 @@ public void setUp() throws Exception { addressSettings.clear(); if (storeType == StoreConfiguration.StoreType.DATABASE) { - configuration = createDefaultJDBCConfig().setJMXManagementEnabled(true); + configuration = createDefaultJDBCConfig(true).setJMXManagementEnabled(true); } else { configuration = createDefaultInVMConfig().setJMXManagementEnabled(true); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java index f7f5d65e79c..9eef7199108 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java @@ -89,7 +89,7 @@ public void setUp() throws Exception { addressSettings.clear(); if (storeType == StoreConfiguration.StoreType.DATABASE) { - configuration = createDefaultJDBCConfig(); + configuration = createDefaultJDBCConfig(true); } else { configuration = createDefaultNettyConfig(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java index e3149e32982..6d02cc98458 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/XaTimeoutTest.java @@ -99,7 +99,7 @@ public void setUp() throws Exception { addressSettings.clear(); if (storeType == StoreConfiguration.StoreType.DATABASE) { - configuration = createDefaultJDBCConfig(); + configuration = createDefaultJDBCConfig(true); } else { configuration = createBasicConfig();