Skip to content

Commit

Permalink
CHRON-24 Start implementing ChronicleQueueFormat
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 10, 2015
1 parent ed072fe commit 6c2b86f
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 28 deletions.
Expand Up @@ -23,11 +23,11 @@

import java.io.IOException;

public abstract class AbstractChronicleQueue implements ChronicleQueue {
public abstract class AbstractChronicleQueue<T extends ChronicleQueueFormat> implements ChronicleQueue {

private final ChronicleQueueFormat format;
private final T format;

protected AbstractChronicleQueue(final ChronicleQueueFormat format) {
protected AbstractChronicleQueue(final T format) {
this.format = format;
}

Expand Down Expand Up @@ -78,4 +78,12 @@ public long lastWrittenIndex() {
public void close() throws IOException {
throw new UnsupportedOperationException("Not implemented");
}

// *************************************************************************
//
// *************************************************************************

protected T format() {
return this.format;
}
}
Expand Up @@ -17,31 +17,26 @@

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.queue.util.ChronicleQueueUtil;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.WireUtil;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.time.ZonedDateTime;
import java.util.UUID;
import java.util.function.Function;

public abstract class AbstractChronicleQueueFormat implements ChronicleQueueFormat {

protected final File root;
protected final long blocks;
protected final Function<Bytes, Wire> wireSupplier;
protected final Header header;

protected AbstractChronicleQueueFormat(File root, long blocks, WireType wireType) {
this.root = root;
this.blocks = blocks;
this.wireSupplier = ChronicleQueueUtil.wireSuplierFor(wireType);
protected AbstractChronicleQueueFormat(WireType wireType) {
this.wireSupplier = WireUtil.wireSupplierFor(wireType);
this.header = new Header();
}

Expand Down
Expand Up @@ -17,10 +17,26 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.queue.impl.AbstractChronicleQueue;
import net.openhft.chronicle.wire.WireUtil;

class SingleChronicleQueue extends AbstractChronicleQueue {
class SingleChronicleQueue extends AbstractChronicleQueue<SingleChronicleQueueFormat> {
private final SingleChronicleQueueBuilder builder;

protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) {
super(new SingleChronicleQueueFormat(builder));

this.builder = builder;

init();
}

// *************************************************************************
//
// *************************************************************************

protected void init() {
WireUtil.writeDataOnce(null, true, format());
//Wires.writeDataOnce(wire, true, this::buildHeader);
//readHeader();
}
}
Expand Up @@ -63,7 +63,7 @@ public WireType wireType() {

@NotNull
public ChronicleQueue build() throws IOException {
return new SingleChronicleQueue(this);
return new SingleChronicleQueue(this.clone());
}

@NotNull
Expand Down
Expand Up @@ -16,16 +16,10 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.queue.impl.AbstractChronicleQueueFormat;
import net.openhft.chronicle.wire.WireType;

import java.io.File;

class SingleChronicleQueueFormat extends AbstractChronicleQueueFormat {
private final SingleChronicleQueueBuilder builder;

SingleChronicleQueueFormat(final SingleChronicleQueueBuilder builder) {
super(null, builder.blockSize(), builder.wireType());

this.builder = builder.clone();
super(builder.wireType());
}
}
Expand Up @@ -14,20 +14,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package net.openhft.chronicle.queue.util;
package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.wire.BinaryWire;
import net.openhft.chronicle.wire.RawWire;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireType;
import org.jetbrains.annotations.NotNull;

import java.util.function.Function;

public class ChronicleQueueUtil {
//TODO: workaround for protected access to WireInternal
public class WireUtil {

public static final Function<Bytes,Wire> wireSuplierFor(WireType type) {
public static final Function<Bytes,Wire> wireSupplierFor(WireType type) {
switch (type) {
case BINARY:
return BinaryWire::new;
Expand All @@ -39,4 +36,12 @@ public static final Function<Bytes,Wire> wireSuplierFor(WireType type) {

throw new IllegalArgumentException("Unknown WireType (" + type + ")");
}

public static void writeDataOnce(
@NotNull WireOut wireOut,
boolean metaData,
@NotNull WriteMarshallable writer) {

WireInternal.writeDataOnce(wireOut, metaData, writer);
}
}

0 comments on commit 6c2b86f

Please sign in to comment.