Skip to content

Commit

Permalink
Add support for a smarter SimpleStoreRecovery and throw Unrecoverable…
Browse files Browse the repository at this point in the history
…TimeoutException instead of TimeoutException
  • Loading branch information
peter-lawrey committed May 22, 2016
1 parent e032dc4 commit c2e7b7f
Show file tree
Hide file tree
Showing 18 changed files with 854 additions and 835 deletions.
10 changes: 3 additions & 7 deletions src/main/java/net/openhft/chronicle/queue/ChronicleReader.java
Expand Up @@ -27,7 +27,6 @@

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Display records in a Chronicle in a text form.
Expand Down Expand Up @@ -55,12 +54,9 @@ public static void tailFileFrom(String basePath, String regex, long index, boole
if (index > 0) {
System.out.println("Waiting for index " + index);
for (; ; ) {
try {
if (tailer.moveToIndex(index))
break;
} catch (TimeoutException e) {
Jvm.pause(500);
}
if (tailer.moveToIndex(index))
break;
Jvm.pause(100);
}
}

Expand Down
Expand Up @@ -17,6 +17,7 @@

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.wire.MarshallableOut;
import net.openhft.chronicle.wire.UnrecoverableTimeoutException;
import org.jetbrains.annotations.NotNull;

import java.io.StreamCorruptedException;
Expand All @@ -31,7 +32,7 @@ public interface ExcerptAppender extends ExcerptCommon<ExcerptAppender>, Marshal
/**
* @param bytes to write to excerpt.
*/
void writeBytes(@NotNull Bytes<?> bytes);
void writeBytes(@NotNull Bytes<?> bytes) throws UnrecoverableTimeoutException;

/**
* Write an entry at a given index. This can use used for rebuilding a queue, or replication.
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java
Expand Up @@ -22,8 +22,6 @@
import net.openhft.chronicle.wire.SourceContext;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.TimeoutException;

/**
* The component that facilitates sequentially reading data from a {@link ChronicleQueue}.
*
Expand Down Expand Up @@ -64,7 +62,7 @@ default DocumentContext readingDocument() {
* with this cycle
* @return true if this is a valid entries.
*/
boolean moveToIndex(long index) throws TimeoutException;
boolean moveToIndex(long index);

/**
* Replay from the first entry in the first cycle.
Expand Down
58 changes: 0 additions & 58 deletions src/main/java/net/openhft/chronicle/queue/QueueInternal.java

This file was deleted.

16 changes: 9 additions & 7 deletions src/main/java/net/openhft/chronicle/queue/impl/WireStore.java
Expand Up @@ -18,13 +18,11 @@
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.core.ReferenceCounted;
import net.openhft.chronicle.queue.impl.single.ScanResult;
import net.openhft.chronicle.wire.Demarshallable;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WriteMarshallable;
import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull;

import java.io.EOFException;
import java.util.concurrent.TimeoutException;
import java.io.StreamCorruptedException;

public interface WireStore extends ReferenceCounted, Demarshallable, WriteMarshallable {
WireStore writePosition(long position);
Expand All @@ -40,7 +38,7 @@ public interface WireStore extends ReferenceCounted, Demarshallable, WriteMarsha
*/
long writePosition();

ScanResult moveToIndex(@NotNull Wire wire, long index, long timeoutMS) throws TimeoutException;
ScanResult moveToIndexForRead(@NotNull Wire wire, long index, long timeoutMS);

@NotNull
MappedBytes bytes();
Expand All @@ -52,13 +50,17 @@ public interface WireStore extends ReferenceCounted, Demarshallable, WriteMarsha
* @param timeoutMS
* @return index in this store.
*/
long indexForPosition(Wire wire, long position, long timeoutMS) throws EOFException, TimeoutException;
long indexForPosition(Wire wire, long position, long timeoutMS) throws EOFException, UnrecoverableTimeoutException, StreamCorruptedException;

String dump();

void lastAcknowledgedIndexReplicated(long lastAcknowledgedIndexReplicated);

long lastAcknowledgedIndexReplicated();

void setPositionForIndex(Wire wire, long index, long position, long timeoutMS);
void setPositionForIndex(Wire wire, long index, long position, long timeoutMS) throws UnrecoverableTimeoutException, StreamCorruptedException;

long writeHeader(Wire wire, int length, long timeoutMS) throws EOFException, UnrecoverableTimeoutException;

void writeEOF(AbstractWire wire, long timeoutMS) throws UnrecoverableTimeoutException;
}

0 comments on commit c2e7b7f

Please sign in to comment.