Skip to content

Commit

Permalink
ARTEMIS-2102 delete paging directory or table if address is removed
Browse files Browse the repository at this point in the history
(cherry picked from commit 8f4bd7c)
  • Loading branch information
wy96f authored and clebertsuconic committed Oct 11, 2018
1 parent a108ad3 commit 6b50d76
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 13 deletions.
Expand Up @@ -155,12 +155,12 @@ public void delete() throws IOException, InterruptedException, ActiveMQException
}
}

private synchronized int internalWrite(byte[] data, IOCallback callback) {
private synchronized int internalWrite(byte[] data, IOCallback callback, boolean append) {
try {
open();
synchronized (writeLock) {
int noBytes = dbDriver.writeToFile(this, data);
seek(noBytes);
int noBytes = dbDriver.writeToFile(this, data, append);
seek(append ? writePosition + noBytes : noBytes);
if (logger.isTraceEnabled()) {
logger.trace("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
}
Expand All @@ -177,18 +177,22 @@ private synchronized int internalWrite(byte[] data, IOCallback callback) {
}

public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
return internalWrite(buffer, callback, true);
}

public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback, boolean append) {
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
return internalWrite(data, callback);
return internalWrite(data, callback, append);
}

private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
return internalWrite(buffer.array(), callback);
return internalWrite(buffer.array(), callback, true);
}

private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback, boolean append) {
executor.execute(() -> {
internalWrite(bytes, callback);
internalWrite(bytes, callback, append);
});
}

Expand All @@ -199,13 +203,17 @@ private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
}

synchronized void seek(long noBytes) {
writePosition += noBytes;
writePosition = noBytes;
}

public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback, boolean append) throws Exception {
// We ignore sync since we schedule writes straight away.
scheduleWrite(bytes, callback, append);
}

@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
// We ignore sync since we schedule writes straight away.
scheduleWrite(bytes, callback);
write(bytes, sync, callback, true);
}

@Override
Expand All @@ -217,7 +225,7 @@ public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize());
bytes.encode(data);
scheduleWrite(data, callback);
write(data, sync, callback, true);
}

@Override
Expand Down
Expand Up @@ -246,6 +246,10 @@ public void deleteFile(JDBCSequentialFile file) throws SQLException {
}
}

public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
return writeToFile(file, data, true);
}

/**
* Persists data to this files associated database mapping.
*
Expand All @@ -254,7 +258,7 @@ public void deleteFile(JDBCSequentialFile file) throws SQLException {
* @return
* @throws SQLException
*/
public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
public int writeToFile(JDBCSequentialFile file, byte[] data, boolean append) throws SQLException {
synchronized (connection) {
connection.setAutoCommit(false);
appendToLargeObject.setLong(1, file.getId());
Expand All @@ -266,7 +270,12 @@ public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException
if (blob == null) {
blob = connection.createBlob();
}
bytesWritten = blob.setBytes(blob.length() + 1, data);
if (append) {
bytesWritten = blob.setBytes(blob.length() + 1, data);
} else {
blob.truncate(0);
bytesWritten = blob.setBytes(1, data);
}
rs.updateBlob(1, blob);
rs.updateRow();
}
Expand Down
Expand Up @@ -201,6 +201,32 @@ public void testAsyncAppendToFile() throws Exception {
checkData(file, src);
}

@Test
public void testWriteToFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();

ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(1);
src.writeByte((byte)7);

file.internalWrite(src, null);
checkData(file, src);
assertEquals(1, file.size());
file.close();

file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();

int bufferSize = 1024;
src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte)i);
}
file.internalWrite(src, null, false);
checkData(file, src);
assertEquals(bufferSize, file.size());
}

@Test
public void testCopyFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
Expand Down
Expand Up @@ -184,4 +184,6 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
* This method will re-enable cleanup of pages. Notice that it will also start cleanup threads.
*/
void enableCleanup();

void destroy() throws Exception;
}
Expand Up @@ -49,6 +49,8 @@ PageCursorProvider newCursorProvider(PagingStore store,

SequentialFileFactory newFileFactory(SimpleString address) throws Exception;

void removeFileFactory(SequentialFileFactory fileFactory) throws Exception;

void injectMonitor(FileStoreMonitor monitor) throws Exception;

default ScheduledExecutorService getScheduledExecutor() {
Expand Down
Expand Up @@ -317,6 +317,7 @@ public void deletePageStore(final SimpleString storeName) throws Exception {
PagingStore store = stores.remove(storeName);
if (store != null) {
store.stop();
store.destroy();
}
} finally {
syncLock.readLock().unlock();
Expand Down
Expand Up @@ -18,7 +18,9 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -94,6 +96,8 @@ public Executor newExecutor() {

private final IOCriticalErrorListener criticalErrorListener;

private final Map<SequentialFileFactory, String> factoryToTableName;

public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
final StorageManager storageManager,
final long syncTimeout,
Expand All @@ -108,6 +112,7 @@ public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
this.syncTimeout = syncTimeout;
this.dbConf = dbConf;
this.criticalErrorListener = critialErrorListener;
this.factoryToTableName = new HashMap<>();
start();
}

Expand Down Expand Up @@ -180,6 +185,32 @@ public synchronized SequentialFileFactory newFileFactory(final SimpleString addr
return factory;
}

@Override
public synchronized void removeFileFactory(SequentialFileFactory fileFactory) throws Exception {
((JDBCSequentialFileFactory)fileFactory).destroy();
String tableName = factoryToTableName.remove(fileFactory);
if (tableName != null) {
SimpleString removeTableName = SimpleString.toSimpleString(tableName);
JDBCSequentialFile directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
directoryList.open();

int size = ((Long) directoryList.size()).intValue();
ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);

ActiveMQBuffer writeBuffer = ActiveMQBuffers.fixedBuffer(size);

while (buffer.readableBytes() > 0) {
SimpleString table = buffer.readSimpleString();
if (!removeTableName.equals(table)) {
writeBuffer.writeSimpleString(table);
}
}

directoryList.write(writeBuffer, true, null, false);
directoryList.close();
}
}

@Override
public void setPagingManager(final PagingManager pagingManager) {
this.pagingManager = pagingManager;
Expand Down Expand Up @@ -249,6 +280,7 @@ private synchronized SequentialFileFactory newFileFactory(final String directory
if (jdbcNetworkTimeout >= 0) {
fileFactory.setNetworkTimeout(this.executorFactory.getExecutor(), jdbcNetworkTimeout);
}
factoryToTableName.put(fileFactory, directoryName);
return fileFactory;
}

Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;

Expand Down Expand Up @@ -173,6 +174,14 @@ public synchronized SequentialFileFactory newFileFactory(final SimpleString addr
return factory;
}

@Override
public synchronized void removeFileFactory(SequentialFileFactory fileFactory) throws Exception {
File directory = fileFactory.getDirectory();
if (directory.exists()) {
FileUtil.deleteDirectory(directory);
}
}

@Override
public void setPagingManager(final PagingManager pagingManager) {
this.pagingManager = pagingManager;
Expand Down
Expand Up @@ -967,6 +967,13 @@ private void installPageTransaction(final Transaction tx, final RouteContextList
return;
}

@Override
public void destroy() throws Exception {
if (fileFactory != null) {
storeFactory.removeFileFactory(fileFactory);
}
}

private static class FinishPageMessageOperation implements TransactionOperation {

private final PageTransactionInfo pageTransaction;
Expand Down
Expand Up @@ -2392,6 +2392,8 @@ synchronized void initialisePart2(boolean scalingDown) throws Exception {

JournalLoadInformation[] journalInfo = loadJournals();

removeExtraAddressStores();

final ServerInfo dumper = new ServerInfo(this, pagingManager);

long dumpInfoInterval = configuration.getServerDumpInterval();
Expand Down Expand Up @@ -3163,6 +3165,17 @@ public void removeClientConnection(String clientId) {
}
}

private void removeExtraAddressStores() throws Exception {
SimpleString[] storeNames = pagingManager.getStoreNames();
if (storeNames != null && storeNames.length > 0) {
for (SimpleString storeName : storeNames) {
if (getAddressInfo(storeName) == null) {
pagingManager.deletePageStore(storeName);
}
}
}
}

private final class ActivationThread extends Thread {

final Runnable runnable;
Expand Down
Expand Up @@ -6220,6 +6220,75 @@ public synchronized PagingStore newStore(SimpleString address, AddressSettings s
server.stop();
}

@Test
public void testPagingStoreDestroyed() throws Exception {
clearDataRecreateServerDirs();

Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);

server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);

server.start();

final int numberOfMessages = 5000;

locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);

sf = createSessionFactory(locator);

ClientSession session = sf.createSession(false, false, false);

session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);

ClientProducer producer = session.createProducer(PagingTest.ADDRESS);

ClientMessage message = null;

byte[] body = new byte[MESSAGE_SIZE];

ByteBuffer bb = ByteBuffer.wrap(body);

for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}

for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);

ActiveMQBuffer bodyLocal = message.getBodyBuffer();

bodyLocal.writeBytes(body);

producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
producer.close();
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());

session.deleteQueue(PagingTest.ADDRESS);
session.close();
sf.close();
locator.close();
locator = null;
sf = null;
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
// Ensure pagingStore is physically deleted
server.getPagingManager().reloadStores();
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
server.stop();

server.start();
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
// Ensure pagingStore is physically deleted
server.getPagingManager().reloadStores();
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
server.stop();
}

@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);
Expand Down
Expand Up @@ -469,5 +469,9 @@ public boolean isStarted() {
public boolean checkReleasedMemory() {
return true;
}

@Override
public void destroy() throws Exception {
}
}
}
Expand Up @@ -802,6 +802,10 @@ public SequentialFileFactory newFileFactory(final SimpleString destinationName)
return factory;
}

@Override
public void removeFileFactory(SequentialFileFactory fileFactory) throws Exception {
}

@Override
public PagingStore newStore(final SimpleString destinationName, final AddressSettings addressSettings) {
return null;
Expand Down

0 comments on commit 6b50d76

Please sign in to comment.