Skip to content

Commit

Permalink
[ARTEMIS-2939]: Artemis should not delete corrupt log files.
Browse files Browse the repository at this point in the history
* Moving corrupted journal files to the attic folder.

Jira: https://issues.apache.org/jira/browse/ARTEMIS-2939
  • Loading branch information
Emmanuel Hugonnet authored and clebertsuconic committed Oct 20, 2020
1 parent 9a95418 commit fdfc581
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ public static String getDefaultHapolicyBackupStrategy() {
// The minimal number of data files before we can start compacting
private static int DEFAULT_JOURNAL_COMPACT_MIN_FILES = 10;

// The maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
private static int DEFAULT_JOURNAL_MAX_ATTIC_FILES = 10;

// Interval to log server specific information (e.g. memory usage etc)
private static long DEFAULT_SERVER_DUMP_INTERVAL = -1;

Expand Down Expand Up @@ -998,6 +1001,13 @@ public static int getDefaultJournalCompactMinFiles() {
return DEFAULT_JOURNAL_COMPACT_MIN_FILES;
}

/**
* how many journal files to be stored in the attic.
*/
public static int getDefaultJournalMaxAtticFiles() {
return DEFAULT_JOURNAL_MAX_ATTIC_FILES;
}

/**
* Interval to log server specific information (e.g. memory usage etc)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.core.journal.impl;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
Expand All @@ -30,6 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.io.SequentialFile;
Expand Down Expand Up @@ -83,6 +88,8 @@ public class JournalFilesRepository {

private final int journalFileOpenTimeout;

private final int maxAtticFiles;

private Executor openFilesExecutor;

private final Runnable pushOpenRunnable = new Runnable() {
Expand All @@ -109,7 +116,8 @@ public JournalFilesRepository(final SequentialFileFactory fileFactory,
final int fileSize,
final int minFiles,
final int poolSize,
final int journalFileOpenTimeout) {
final int journalFileOpenTimeout,
final int maxAtticFiles) {
if (filePrefix == null) {
throw new IllegalArgumentException("filePrefix cannot be null");
}
Expand All @@ -129,6 +137,7 @@ public JournalFilesRepository(final SequentialFileFactory fileFactory,
this.userVersion = userVersion;
this.journal = journal;
this.journalFileOpenTimeout = journalFileOpenTimeout;
this.maxAtticFiles = maxAtticFiles;
}

// Public --------------------------------------------------------
Expand Down Expand Up @@ -365,8 +374,7 @@ public synchronized void addFreeFile(final JournalFile file,
throw new IllegalStateException(e.getMessage() + " file: " + file);
}
if (calculatedSize != fileSize) {
ActiveMQJournalLogger.LOGGER.deletingFile(file);
file.getFile().delete();
damagedFile(file);
} else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < poolSize) || (poolSize < 0)) {
// Re-initialise it

Expand Down Expand Up @@ -400,6 +408,30 @@ public synchronized void addFreeFile(final JournalFile file,
}
}

private void damagedFile(JournalFile file) throws Exception {
if (file.getFile().isOpen()) {
file.getFile().close(false);
}
if (file.getFile().exists()) {
final Path journalPath = file.getFile().getJavaFile().toPath();
final Path atticPath = journalPath.getParent().resolve("attic");
Files.createDirectories(atticPath);
if (listFiles(atticPath) < maxAtticFiles) {
ActiveMQJournalLogger.LOGGER.movingFileToAttic(file.getFile().getFileName());
Files.move(journalPath, atticPath.resolve(journalPath.getFileName()), StandardCopyOption.REPLACE_EXISTING);
} else {
ActiveMQJournalLogger.LOGGER.deletingFile(file);
Files.delete(journalPath);
}
}
}

private int listFiles(Path path) throws IOException {
try (Stream<Path> files = Files.list(path)) {
return files.mapToInt(e -> 1).sum();
}
}

public Collection<JournalFile> getFreeFiles() {
return freeFiles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public JournalImpl(final ExecutorFactory ioExecutors,
final String fileExtension,
final int maxAIO,
final int userVersion) {
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null);
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null, 0);
}


Expand All @@ -310,7 +310,8 @@ public JournalImpl(final ExecutorFactory ioExecutors,
final String fileExtension,
final int maxAIO,
final int userVersion,
IOCriticalErrorListener criticalErrorListener) {
IOCriticalErrorListener criticalErrorListener,
final int maxAtticFiles) {

super(fileFactory.isSupportsCallbacks(), fileSize);

Expand Down Expand Up @@ -340,7 +341,7 @@ public JournalImpl(final ExecutorFactory ioExecutors,

this.fileFactory = fileFactory;

filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout);
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize, journalFileOpenTimeout, maxAtticFiles);

this.userVersion = userVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void couldNotRemoveFile(JournalFile file);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 142009, value = "Deleting {0} as it does not have the configured size",
@Message(id = 142009, value = "*******************************************************************************************************************************\nThe File Storage Attic is full, as the file {0} does not have the configured size, and the file will be removed\n*******************************************************************************************************************************",
format = Message.Format.MESSAGE_FORMAT)
void deletingFile(JournalFile file);

Expand Down Expand Up @@ -277,4 +277,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
@Message(id = 144007, value = "Ignoring journal file {0}: file is shorter then minimum header size. This file is being removed.", format = Message.Format.MESSAGE_FORMAT)
void ignoringShortFile(String fileName);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 144008, value = "*******************************************************************************************************************************\nFile {0}: was moved under attic, please review it and remove it.\n*******************************************************************************************************************************", format = Message.Format.MESSAGE_FORMAT)
void movingFileToAttic(String fileName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,18 @@ Configuration addDiscoveryGroupConfiguration(String key,
*/
Configuration setJournalBufferSize_NIO(int journalBufferSize);

/**
* Returns the maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
* <br>
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_MAX_ATTIC_FILES}.
*/
int getJournalMaxAtticFiles();

/**
* Sets the maximal number of data files before we can start deleting corrupted files instead of moving them to attic.
*/
Configuration setJournalMaxAtticFiles(int maxAtticFiles);

/**
* Returns whether the bindings directory is created on this server startup. <br>
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_CREATE_BINDINGS_DIR}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public class ConfigurationImpl implements Configuration, Serializable {

protected int journalMinFiles = ActiveMQDefaultConfiguration.getDefaultJournalMinFiles();

protected int journalMaxAtticFilesFiles = ActiveMQDefaultConfiguration.getDefaultJournalMaxAtticFiles();

// AIO and NIO need different values for these attributes

protected int journalMaxIO_AIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio();
Expand Down Expand Up @@ -2508,4 +2510,15 @@ public ConfigurationImpl setTemporaryQueueNamespace(final String temporaryQueueN
return this;
}

@Override
public int getJournalMaxAtticFiles() {
return journalMaxAtticFilesFiles;
}

@Override
public Configuration setJournalMaxAtticFiles(int maxAtticFiles) {
this.journalMaxAtticFilesFiles = maxAtticFiles;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ public void parseMainConfig(final Element e, final Configuration config) throws

config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.POSITIVE_INT));

config.setJournalMaxAtticFiles(getInteger(e, "journal-max-attic-files", config.getJournalMaxAtticFiles(), Validators.NO_CHECK));

int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GE_ZERO);

int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.POSITIVE_INT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
bindingsFF.setDatasync(config.isJournalDatasync());

Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener, config.getJournalMaxAtticFiles());

bindingsJournal = localBindings;
originalBindingsJournal = localBindings;
Expand Down Expand Up @@ -210,7 +210,7 @@ protected int fixJournalFileSize(int fileSize, int alignment) {
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles());
}

// Life Cycle Handlers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public void testDefaults() {

Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), conf.getJournalFileSize());

Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalMaxAtticFiles(), conf.getJournalMaxAtticFiles());

Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles(), conf.getJournalCompactMinFiles());

Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), conf.getJournalCompactPercentage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected StorageManager createStorageManager() {
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
@Override
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
super.moveNextFile(scheduleReclaim);
Expand Down

0 comments on commit fdfc581

Please sign in to comment.