Skip to content

Commit

Permalink
#389 Appender should write EOF to last file before creating new one
Browse files Browse the repository at this point in the history
  • Loading branch information
dpisklov committed Oct 23, 2017
1 parent 205e1e7 commit ca62b9a
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 135 deletions.
151 changes: 151 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/impl/single/QueueFiles.java
@@ -0,0 +1,151 @@
/*
* Copyright 2014-2017 Higher Frequency Trading
*
* http://www.higherfrequencytrading.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.stream.Stream;

import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.SUFFIX;

public enum QueueFiles {
;
private static final Logger LOGGER = LoggerFactory.getLogger(RollCycleRetriever.class);

static <T> Optional<T> processLastQueueFile(Path queuePath, WireType wireType, long blockSize, boolean readOnly,
BiFunction<Wire, SingleChronicleQueueStore, T> processor) {
if (Files.exists(queuePath) && hasQueueFiles(queuePath)) {
final MappedBytes mappedBytes = mappedBytes(getLastQueueFile(queuePath), blockSize, readOnly);
mappedBytes.reserve();
try {
final Wire wire = wireType.apply(mappedBytes);
// move past header

final Bytes<?> bytes = wire.bytes();
bytes.readLimit(bytes.capacity());

if (bytes.readLimit() < 4) {
return Optional.empty();
}
final File file = mappedBytes.mappedFile().file();
for (int i = 0; i < 500 && file.length() == 0; i++) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
if (file.length() < 4) {
LOGGER.warn("Queue file exists, but is truncated, cannot determine existing roll cycle");
return Optional.empty();
}

final int firstInt = bytes.peekVolatileInt();
if (!Wires.isReady(firstInt)) {
return Optional.empty();
}
bytes.readSkip(4);
try (final SingleChronicleQueueStore queueStore = SingleChronicleQueueBuilder.loadStore(wire)) {
if (queueStore == null) {
return Optional.empty();
}
processor.apply(wire, queueStore);


} catch (final Throwable e) {
LOGGER.warn("Unable to load queue store from file {}", queuePath, e);
}

} finally {
mappedBytes.release();
}
}
return Optional.empty();
}

static void writeEOFIfNeeded(@NotNull Path queuePath, @NotNull WireType wireType, long blockSize, long timeoutMS) {
processLastQueueFile(queuePath, wireType, blockSize, false, (w, qs) -> {
long l = qs.writePosition();
Bytes<?> bytes = w.bytes();
long len = Wires.lengthOf(bytes.readVolatileInt(l));
long eofOffset = l + len + 4L;
if (0 == bytes.readVolatileInt(eofOffset)) {
// no EOF found - write EOF
try {
bytes.writePosition(eofOffset);
w.writeEndOfWire(timeoutMS, TimeUnit.MILLISECONDS, eofOffset);
} catch (TimeoutException e) {
Jvm.warn().on(RollCycleRetriever.class, "Timeout writing EOF for last file in " + queuePath);
}
}
return null;
});
}

private static Path getLastQueueFile(final Path queuePath) {
try {
try (final Stream<Path> children = Files.list(queuePath)) {
return children.filter(p -> p.toString().endsWith(SUFFIX)).
sorted(Comparator.reverseOrder()).findFirst().orElseThrow(() ->
new UncheckedIOException(new IOException(
String.format("Expected at least one %s file in directory %s",
SUFFIX, queuePath))));
}
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to list contents of known directory %s", queuePath), e);
}
}

private static MappedBytes mappedBytes(@NotNull Path queueFile, long blockSize, boolean readOnly) {
long chunkSize = OS.pageAlign(blockSize);
long overlapSize = OS.pageAlign(blockSize / 4);
try {
return MappedBytes.mappedBytes(queueFile.toFile(), chunkSize, overlapSize, readOnly);
} catch (FileNotFoundException e) {
throw new UncheckedIOException(String.format("Failed to open existing file %s", queueFile), e);
}
}

private static boolean hasQueueFiles(final Path queuePath) {
try {
try (final Stream<Path> children = Files.list(queuePath)) {
return children.anyMatch(p -> p.toString().endsWith(SUFFIX));
}
} catch (IOException e) {
return false;
}
}
}
@@ -1,130 +1,35 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;

import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.SUFFIX;

public enum RollCycleRetriever {
;

private static final Logger LOGGER = LoggerFactory.getLogger(RollCycleRetriever.class);
private static final RollCycles[] ROLL_CYCLES = RollCycles.values();

public static Optional<RollCycle> getRollCycle(final Path queuePath, final WireType wireType, final long blockSize) {
if (Files.exists(queuePath) && hasQueueFiles(queuePath)) {
final MappedBytes mappedBytes = mappedBytes(getLastQueueFile(queuePath), blockSize);
mappedBytes.reserve();
try {
final Wire wire = wireType.apply(mappedBytes);
// move past header

final Bytes<?> bytes = wire.bytes();
bytes.readLimit(bytes.capacity());

if (bytes.readLimit() < 4) {
return Optional.empty();
}
final File file = mappedBytes.mappedFile().file();
for (int i = 0; i < 500 && file.length() == 0; i++) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
if (file.length() < 4) {
LOGGER.warn("Queue file exists, but is truncated, cannot determine existing roll cycle");
return Optional.empty();
}

final int firstInt = bytes.peekVolatileInt();
if (!Wires.isReady(firstInt)) {
return Optional.empty();
}
bytes.readSkip(4);
try (final SingleChronicleQueueStore queueStore = SingleChronicleQueueBuilder.loadStore(wire)) {
if (queueStore == null) {
return Optional.empty();
}

final int rollCycleLength = queueStore.rollCycleLength();
final int rollCycleIndexCount = queueStore.rollIndexCount();
final int rollCycleIndexSpacing = queueStore.rollIndexSpacing();

for (final RollCycle cycle : ROLL_CYCLES) {
if (rollCycleMatches(cycle, rollCycleLength, rollCycleIndexCount, rollCycleIndexSpacing)) {
return Optional.of(cycle);
}
}

} catch (final Throwable e) {
LOGGER.warn("Unable to load queue store from file {}", queuePath, e);
return QueueFiles.processLastQueueFile(queuePath, wireType, blockSize, true, (w, qs) -> {
final int rollCycleLength = qs.rollCycleLength();
final int rollCycleIndexCount = qs.rollIndexCount();
final int rollCycleIndexSpacing = qs.rollIndexSpacing();

for (final RollCycle cycle : ROLL_CYCLES) {
if (rollCycleMatches(cycle, rollCycleLength, rollCycleIndexCount, rollCycleIndexSpacing)) {
return cycle;
}

} finally {
mappedBytes.release();
}
}
return Optional.empty();
return null;
});
}

private static boolean rollCycleMatches(final RollCycle cycle, final int rollCycleLength,
final int rollCycleIndexCount, final int rollCycleIndexSpacing) {
return cycle.length() == rollCycleLength && cycle.defaultIndexCount() == rollCycleIndexCount &&
cycle.defaultIndexSpacing() == rollCycleIndexSpacing;
}

private static Path getLastQueueFile(final Path queuePath) {
try {
try (final Stream<Path> children = Files.list(queuePath)) {
return children.filter(p -> p.toString().endsWith(SUFFIX)).
sorted(Comparator.reverseOrder()).findFirst().orElseThrow(() ->
new UncheckedIOException(new IOException(
String.format("Expected at least one %s file in directory %s",
SUFFIX, queuePath))));
}
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to list contents of known directory %s", queuePath), e);
}
}

private static MappedBytes mappedBytes(@NotNull final Path queueFile,
final long blockSize) {
long chunkSize = OS.pageAlign(blockSize);
long overlapSize = OS.pageAlign(blockSize / 4);
try {
return MappedBytes.mappedBytes(queueFile.toFile(), chunkSize, overlapSize, true);
} catch (FileNotFoundException e) {
throw new UncheckedIOException(String.format("Failed to open existing file %s", queueFile), e);
}
}

private static boolean hasQueueFiles(final Path queuePath) {
try {
try (final Stream<Path> children = Files.list(queuePath)) {
return children.anyMatch(p -> p.toString().endsWith(SUFFIX));
}
} catch (IOException e) {
return false;
}
}
}
Expand Up @@ -24,48 +24,23 @@
import net.openhft.chronicle.core.threads.ThreadLocalHelper;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.core.util.StringUtils;
import net.openhft.chronicle.queue.CycleCalculator;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.TailerDirection;
import net.openhft.chronicle.queue.impl.CommonStore;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.queue.impl.RollingResourcesCache;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.WireStorePool;
import net.openhft.chronicle.queue.impl.WireStoreSupplier;
import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.queue.impl.*;
import net.openhft.chronicle.queue.impl.table.SingleTableBuilder;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.AbstractWire;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StreamCorruptedException;
import java.io.Writer;
import java.io.*;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.ParseException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeMap;
import java.util.WeakHashMap;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -717,6 +692,8 @@ public WireStore acquire(int cycle, boolean createIfAbsent) {

if (createIfAbsent && !path.exists()) {
directoryListing.onFileCreated(path, cycle);
// before we create a new file, we need to ensure previous file has got EOF mark
QueueFiles.writeEOFIfNeeded(that.path.toPath(), wireType(), blockSize(), timeoutMS);
}

final MappedBytes mappedBytes = mappedBytes(path);
Expand Down

0 comments on commit ca62b9a

Please sign in to comment.