Skip to content

Commit

Permalink
fall back to polling file-system for tailer cycles when queue is conf…
Browse files Browse the repository at this point in the history
…igured in read-only mode
  • Loading branch information
epickrram committed Oct 24, 2017
1 parent f4237f7 commit ea63a1b
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 248 deletions.

This file was deleted.

@@ -1,145 +1,13 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.queue.impl.TableStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;
import java.util.function.ToIntFunction;

final class DirectoryListing {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectoryListing.class);
private static final long LOCK_ACQUISITION_TIMEOUT_MILLIS =
Long.getLong("chronicle.listing.lock.timeout", TimeUnit.SECONDS.toMillis(20L));
private static final long LOCK_MAX_AGE_MILLIS =
Long.getLong("chronicle.listing.lock.maxAge", TimeUnit.SECONDS.toMillis(10L));

private static final String HIGHEST_CREATED_CYCLE = "listing.highestCycle";
private static final String LOWEST_CREATED_CYCLE = "listing.lowestCycle";
// visible for testing
static final String LOCK = "listing.exclusiveLock";
private final TableStore tableStore;
private final Path queuePath;
private final ToIntFunction<File> fileToCycleFunction;
private final LongValue maxCycleValue;
private final LongValue minCycleValue;
private final LongValue lock;
private final boolean readOnly;

DirectoryListing(
final TableStore tableStore, final Path queuePath,
final ToIntFunction<File> fileToCycleFunction,
final boolean readOnly) {
this.tableStore = tableStore;
this.queuePath = queuePath;
this.fileToCycleFunction = fileToCycleFunction;
maxCycleValue = tableStore.acquireValueFor(HIGHEST_CREATED_CYCLE);
minCycleValue = tableStore.acquireValueFor(LOWEST_CREATED_CYCLE);
lock = tableStore.acquireValueFor(LOCK);
if (lock.getVolatileValue() == Long.MIN_VALUE) {
lock.compareAndSwapValue(Long.MIN_VALUE, 0);
}
this.readOnly = readOnly;
}

void refresh() {
if (readOnly) {
return;
}
tryWithLock(this::refreshIndex);
}

void onFileCreated(final File file, final int cycle) {
if (readOnly) {
LOGGER.warn("DirectoryListing is read-only, not updating listing");
return;
}
tryWithLock(() -> {
maxCycleValue.setMaxValue(cycle);
minCycleValue.setMinValue(cycle);
return 0;
});
}

int getMaxCreatedCycle() {
if (readOnly) {
return getMaxCycleValue();
}
return tryWithLock(this::getMaxCycleValue);
}

int getMinCreatedCycle() {
if (readOnly) {
return getMinCycleValue();
}
return tryWithLock(this::getMinCycleValue);
}

private int getMaxCycleValue() {
return (int) maxCycleValue.getVolatileValue();
}

private int getMinCycleValue() {
return (int) minCycleValue.getVolatileValue();
}

private int tryWithLock(final IntSupplier function) {
final long lockAcquisitionTimeout = System.currentTimeMillis() + LOCK_ACQUISITION_TIMEOUT_MILLIS;
long currentTime;
while ((currentTime = System.currentTimeMillis()) < lockAcquisitionTimeout) {
if (lock.compareAndSwapValue(0, currentTime)) {
try {
return function.getAsInt();
} finally {
if (!lock.compareAndSwapValue(currentTime, 0)) {
throw new IllegalStateException("Unable to reset lock state");
}
}

} else {
final long lastLockTime = lock.getValue();
if (lastLockTime != 0 && lastLockTime < currentTime - LOCK_MAX_AGE_MILLIS) {
// assume that previous lock holder has died
if (lock.compareAndSwapValue(lastLockTime, currentTime)) {
LOGGER.warn("Forcing lock on directory listing as it is {}sec old",
(currentTime - lastLockTime) / 1000);
try {
return function.getAsInt();
} finally {
if (!lock.compareAndSwapValue(currentTime, 0)) {
throw new IllegalStateException("Unable to reset lock state");
}
}
}
}
Thread.yield();
}
}

throw new IllegalStateException("Unable to acquire exclusive lock on directory listing.\n" +
"Consider changing system properties chronicle.listing.lock.timeout/chronicle.listing.lock.maxAge");
}
public interface DirectoryListing {
void refresh();

private int refreshIndex() {
maxCycleValue.setOrderedValue(Integer.MIN_VALUE);
minCycleValue.setOrderedValue(Integer.MAX_VALUE);
final File[] queueFiles = queuePath.toFile().
listFiles((d, f) -> f.endsWith(SingleChronicleQueue.SUFFIX));
if (queueFiles != null) {
for (File queueFile : queueFiles) {
maxCycleValue.setMaxValue(fileToCycleFunction.applyAsInt(queueFile));
minCycleValue.setMinValue(fileToCycleFunction.applyAsInt(queueFile));
void onFileCreated(File file, int cycle);

}
}
return 0;
}
int getMaxCreatedCycle();

void close() {
tableStore.close();
}
}
int getMinCreatedCycle();
}
@@ -0,0 +1,49 @@
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.util.function.ToIntFunction;

public final class FileSystemDirectoryListing implements DirectoryListing {
private final File queueDir;
private final ToIntFunction<File> fileToCycleFunction;

public FileSystemDirectoryListing(final File queueDir,
final ToIntFunction<File> fileToCycleFunction) {
this.queueDir = queueDir;
this.fileToCycleFunction = fileToCycleFunction;
}

@Override
public void refresh() {
// no-op
}

@Override
public void onFileCreated(final File file, final int cycle) {
// no-op
}

@Override
public int getMaxCreatedCycle() {
int maxCycle = Integer.MIN_VALUE;
final File[] files = queueDir.listFiles((d, n) -> n.endsWith(SingleChronicleQueue.SUFFIX));
if (files != null) {
for (File file : files) {
maxCycle = Math.max(maxCycle, fileToCycleFunction.applyAsInt(file));
}
}
return maxCycle;
}

@Override
public int getMinCreatedCycle() {
int minCycle = Integer.MAX_VALUE;
final File[] files = queueDir.listFiles((d, n) -> n.endsWith(SingleChronicleQueue.SUFFIX));
if (files != null) {
for (File file : files) {
minCycle = Math.min(minCycle, fileToCycleFunction.applyAsInt(file));
}
}
return minCycle;
}
}
Expand Up @@ -74,6 +74,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;

import static net.openhft.chronicle.queue.TailerDirection.NONE;
import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts.StoreAppender;
Expand Down Expand Up @@ -133,6 +134,7 @@ public class SingleChronicleQueue implements RollingChronicleQueue {
private boolean persistedRollCycleCheckPerformed = false;

protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builder) {
readOnly = builder.readOnly();
rollCycle = builder.rollCycle();
cycleCalculator = builder.cycleCalculator();
epoch = builder.epoch();
Expand All @@ -155,12 +157,15 @@ protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builde
// add a 10% random element to make it less likely threads will timeout at the same time.
timeoutMS = (long) (builder.timeoutMS() * (1 + 0.2 * ThreadLocalRandom.current().nextFloat()));
storeFactory = builder.storeFactory();
final File listingPath = createDirectoryListingFile();
this.directoryListing = new DirectoryListing(SingleTableBuilder.
binary(listingPath).readOnly(builder.readOnly()).build(), path.toPath(), f -> {
final String name = f.getName();
return dateCache.parseCount(name.substring(0, name.length() - SUFFIX.length()));
}, builder.readOnly());
if (readOnly) {
this.directoryListing = new FileSystemDirectoryListing(path, fileToCycleFunction());
} else {
final File listingPath = createDirectoryListingFile();
this.directoryListing = new TableDirectoryListing(SingleTableBuilder.
binary(listingPath).readOnly(builder.readOnly()).build(),
path.toPath(), fileToCycleFunction(), builder.readOnly());
}

this.directoryListing.refresh();

if (builder.getClass().getName().equals("software.chronicle.enterprise.queue.EnterpriseChronicleQueueBuilder")) {
Expand All @@ -176,7 +181,6 @@ protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builde

sourceId = builder.sourceId();
recoverySupplier = builder.recoverySupplier();
readOnly = builder.readOnly();
}

@Nullable
Expand Down Expand Up @@ -569,7 +573,7 @@ private File createDirectoryListingFile() {
listingPath.getParentFile().mkdirs();
}
try {
if (listingPath.createNewFile()) {
if (!readOnly && listingPath.createNewFile()) {
if (!listingPath.canWrite()) {
throw new IllegalStateException("Cannot write to cycle file");
}
Expand Down Expand Up @@ -688,6 +692,14 @@ private void assignRollCycleDependentFields() {
fileToText());
}

@NotNull
private ToIntFunction<File> fileToCycleFunction() {
return f -> {
final String name = f.getName();
return dateCache.parseCount(name.substring(0, name.length() - SUFFIX.length()));
};
}

void removeCloseListener(final StoreTailer storeTailer) {
synchronized (closers) {
closers.remove(storeTailer);
Expand Down Expand Up @@ -763,8 +775,9 @@ private void checkDiskSpace(@NotNull final File filePath) throws IOException {
if (unallocatedBytes < totalSpace * .05)
LOG.warn("your disk is more than 95% full, warning: chronicle-queue may crash if " +
"it runs out of space.");
else if (unallocatedBytes < (100 << 20)) // if less than 10 Megabytes
LOG.warn("your disk is almost full, warning: chronicle-queue may crash if it runs out of space.");
else
if (unallocatedBytes < (100 << 20)) // if less than 10 Megabytes
LOG.warn("your disk is almost full, warning: chronicle-queue may crash if it runs out of space.");
}
}

Expand Down

0 comments on commit ea63a1b

Please sign in to comment.