Skip to content

Commit

Permalink
QUEUE-24 remove unused classes, rename format to store
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 18, 2015
1 parent 56caef2 commit 2cbd919
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 146 deletions.
Expand Up @@ -16,9 +16,7 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.WireIn;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;

Expand All @@ -42,21 +40,21 @@ public interface ExcerptTailer extends ExcerptCommon {
* @param l index to look up
* @return true if this is a valid entries and not padding.
*/
boolean index(long l);
boolean index(long l) throws IOException;;

/**
* Replay from the start.
*
* @return this Excerpt
*/
@NotNull
ExcerptTailer toStart();
ExcerptTailer toStart() throws IOException;;

/**
* Wind to the end.
*
* @return this Excerpt
*/
@NotNull
ExcerptTailer toEnd();
ExcerptTailer toEnd() throws IOException;;
}

This file was deleted.

This file was deleted.

Expand Up @@ -29,7 +29,7 @@ class SingleChronicleQueue extends AbstractChronicleQueue {

private final SingleChronicleQueueBuilder builder;
private final RollDateCache dateCache;
private final Map<Integer, SingleChronicleQueueFormat> formatCache;
private final Map<Integer, SingleChronicleQueueStore> stores;

protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws IOException {
this.dateCache = new RollDateCache(
Expand All @@ -38,7 +38,7 @@ protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws
builder.rollCycleZoneId());

this.builder = builder;
this.formatCache = new HashMap<>();
this.stores = new HashMap<>();
}

@Override
Expand All @@ -56,12 +56,12 @@ SingleChronicleQueueBuilder builder() {
}

//TODO: maybe use kolobroke ?
synchronized SingleChronicleQueueFormat formatForCycle(int cycle) throws IOException {
SingleChronicleQueueFormat format = formatCache.get(cycle);
synchronized SingleChronicleQueueStore storeForCycle(int cycle) throws IOException {
SingleChronicleQueueStore format = stores.get(cycle);
if(null == format) {
formatCache.put(
stores.put(
cycle,
format = new SingleChronicleQueueFormat(
format = new SingleChronicleQueueStore(
builder,
cycle,
this.dateCache.formatFor(cycle)).buildHeader()
Expand All @@ -74,4 +74,14 @@ synchronized SingleChronicleQueueFormat formatForCycle(int cycle) throws IOExcep
int cycle() {
return (int) (System.currentTimeMillis() / builder.rollCycleLength());
}

//TODO: scan data folder
int firstCyle() {
return cycle();
}

//TODO: scan data folder
int lastCycle() {
return cycle();
}
}
Expand Up @@ -35,24 +35,24 @@ static class Appender implements ExcerptAppender {

private int cycle;
private long index;
private SingleChronicleQueueFormat format;
private SingleChronicleQueueStore store;

Appender(SingleChronicleQueue queue) {
this.queue = queue;

this.cycle = 0;
this.index = 0;
this.format = null;
this.store = null;
}

@Override
public long writeDocument(WriteMarshallable writer) throws IOException {
if(this.cycle != queue.cycle()) {
this.cycle = queue.cycle();
this.format = queue.formatForCycle(this.cycle);
this.store = queue.storeForCycle(this.cycle);
}

index = format.append(writer);
index = store.append(writer);

return index;
}
Expand All @@ -77,25 +77,25 @@ static class Tailer implements ExcerptTailer {

private int cycle;
private long position;
private SingleChronicleQueueFormat format;
private SingleChronicleQueueStore store;

Tailer(SingleChronicleQueue queue) {
this.queue = queue;

this.cycle = 0;
this.position = 0;
this.format = null;
this.store = null;
}

@Override
public boolean readDocument(ReadMarshallable reader) throws IOException {
if(this.cycle != queue.cycle()) {
this.cycle = queue.cycle();
this.format = queue.formatForCycle(this.cycle);
this.position = format.dataPosition();
this.store = queue.storeForCycle(this.cycle);
this.position = store.dataPosition();
}

long result = format.read(this.position, reader);
long result = store.read(this.position, reader);
if(WireUtil.NO_DATA != result) {
this.position = result;
return true;
Expand All @@ -105,19 +105,25 @@ public boolean readDocument(ReadMarshallable reader) throws IOException {
}

@Override
public boolean index(long l) {
public boolean index(long l) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public ExcerptTailer toStart() {
this.position = format.dataPosition();
public ExcerptTailer toStart() throws IOException {
this.cycle = queue.firstCyle();
this.store = queue.storeForCycle(this.cycle);
this.position = store.dataPosition();

return this;
}

@Override
public ExcerptTailer toEnd() {
this.position = format.writePosition();
public ExcerptTailer toEnd() throws IOException {
this.cycle = queue.lastCycle();
this.store = queue.storeForCycle(this.cycle);
this.position = store.dataPosition();

return this;
}

Expand Down

0 comments on commit 2cbd919

Please sign in to comment.