Skip to content

Commit

Permalink
QUEUE-18 - Add functionality to chronicle queue 4 to support text wire
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Apr 21, 2015
1 parent e0cbbac commit 0f09453
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 106 deletions.
Expand Up @@ -15,42 +15,88 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.queue.impl.SingleChronicleQueue; import net.openhft.chronicle.queue.impl.SingleChronicleQueue;
import net.openhft.chronicle.wire.BinaryWire; import net.openhft.chronicle.wire.BinaryWire;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.Wire; import net.openhft.chronicle.wire.Wire;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import java.io.File;
import java.io.IOException; import java.io.IOException;


/** /**
* Created by peter.lawrey on 30/01/15. * Created by peter.lawrey on 30/01/15.
*/ */
public class ChronicleQueueBuilder { public class ChronicleQueueBuilder implements Cloneable {
private String name; private String name;
private long blockSize = 64 << 20; private long blockSize;
private Class<? extends Wire> wireType = BinaryWire.class; private Class<? extends Wire> wireType;

public ChronicleQueueBuilder(File name) {
this(name.getAbsolutePath());
}


public ChronicleQueueBuilder(String name) { public ChronicleQueueBuilder(String name) {
this.name = name; this.name = name;
this.blockSize = 64L << 20;
this.wireType = BinaryWire.class;
}

public String name() {
return this.name;
} }


@NotNull
public ChronicleQueueBuilder blockSize(int blockSize) { public ChronicleQueueBuilder blockSize(int blockSize) {
this.blockSize = blockSize; this.blockSize = blockSize;
return this; return this;
} }


public long blockSize() {
return this.blockSize;
}

public ChronicleQueueBuilder wireType(Class<? extends Wire> wireType) { public ChronicleQueueBuilder wireType(Class<? extends Wire> wireType) {
this.wireType = wireType; this.wireType = wireType;
return this; return this;
} }


public Class<? extends Wire> wireType() {
return this.wireType;
}


@NotNull @NotNull
public SingleChronicleQueue build() throws IOException { public SingleChronicleQueue build() throws IOException {
return new SingleChronicleQueue(name, blockSize, wireType); return new SingleChronicleQueue(this);
}

@NotNull
@SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
@Override
public ChronicleQueueBuilder clone() {
try {
return (ChronicleQueueBuilder) super.clone();
} catch (CloneNotSupportedException e) {
throw new AssertionError(e);
}
}

public static ChronicleQueueBuilder binary(File name) {
return binary(name.getAbsolutePath());
}

public static ChronicleQueueBuilder binary(String name) {
return new ChronicleQueueBuilder(name)
.wireType(BinaryWire.class);
}

public static ChronicleQueueBuilder text(File name) {
return text(name.getAbsolutePath());
}

public static ChronicleQueueBuilder text(String name) {
return new ChronicleQueueBuilder(name)
.wireType(TextWire.class);
} }
} }
@@ -1,3 +1,20 @@
/*
* Copyright 2015 Higher Frequency Trading
*
* http://www.higherfrequencytrading.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.openhft.chronicle.queue.impl; package net.openhft.chronicle.queue.impl;


import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueue;
Expand All @@ -7,12 +24,8 @@
* Created by Rob Austin * Created by Rob Austin
*/ */
abstract class AbstractChronicle implements ChronicleQueue, DirectChronicleQueue { abstract class AbstractChronicle implements ChronicleQueue, DirectChronicleQueue {

abstract Wire wire(); abstract Wire wire();

abstract Class<? extends Wire> wireType(); abstract Class<? extends Wire> wireType();

abstract long indexToIndex(); abstract long indexToIndex();

abstract long newIndex(); abstract long newIndex();
} }
Expand Up @@ -24,13 +24,12 @@
import net.openhft.chronicle.wire.*; import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import java.io.IOException;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.UUID; import java.util.UUID;


/** /**
* Data structure to bind to an off heap representation. This is required to support persistence * Data structure to bind to an off heap representation. This is required to
* and sharing of this data structure. * support persistence and sharing of this data structure.
*/ */
class Header implements Marshallable { class Header implements Marshallable {
public static final long PADDED_SIZE = 512; public static final long PADDED_SIZE = 512;
Expand Down Expand Up @@ -85,7 +84,7 @@ enum Field implements WireKey {
@Override @Override
public void writeMarshallable(@NotNull WireOut out) { public void writeMarshallable(@NotNull WireOut out) {
out.write(Field.uuid).uuid(uuid) out.write(Field.uuid).uuid(uuid)
.write(Field.writeByte).int64forBinding(PADDED_SIZE) .write(Field.writeByte).int64forBinding(PADDED_SIZE + 2)
.write(Field.created).zonedDateTime(created) .write(Field.created).zonedDateTime(created)
.write(Field.user).text(user) .write(Field.user).text(user)
.write(Field.host).text(host) .write(Field.host).text(host)
Expand Down Expand Up @@ -125,6 +124,5 @@ public static void main(String... args) {
TextWire tw = new TextWire(NativeBytes.nativeBytes()); TextWire tw = new TextWire(NativeBytes.nativeBytes());
tw.writeDocument(true, w -> w.write(() -> "header").marshallable(h)); tw.writeDocument(true, w -> w.write(() -> "header").marshallable(h));
System.out.println(tw.bytes().flip().toString()); System.out.println(tw.bytes().flip().toString());

} }
} }
Expand Up @@ -19,10 +19,10 @@
package net.openhft.chronicle.queue.impl; package net.openhft.chronicle.queue.impl;


import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.wire.*; import net.openhft.chronicle.wire.ByteableLongArrayValues;
import net.openhft.chronicle.wire.util.WireUtil;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import static java.lang.ThreadLocal.withInitial;
import static net.openhft.chronicle.queue.impl.Indexer.IndexOffset.toAddress0; import static net.openhft.chronicle.queue.impl.Indexer.IndexOffset.toAddress0;
import static net.openhft.chronicle.queue.impl.Indexer.IndexOffset.toAddress1; import static net.openhft.chronicle.queue.impl.Indexer.IndexOffset.toAddress1;
import static net.openhft.chronicle.queue.impl.SingleChronicleQueue.UNINITIALISED; import static net.openhft.chronicle.queue.impl.SingleChronicleQueue.UNINITIALISED;
Expand All @@ -41,22 +41,10 @@ public class Indexer {
private ThreadLocal<ByteableLongArrayValues> array; private ThreadLocal<ByteableLongArrayValues> array;


public Indexer(@NotNull final AbstractChronicle chronicle) { public Indexer(@NotNull final AbstractChronicle chronicle) {
this.array = newLongArrayValuesPool(chronicle.wireType()); this.array = WireUtil.newLongArrayValuesPool(chronicle.wireType());
this.chronicle = chronicle; this.chronicle = chronicle;
} }


public static ThreadLocal<ByteableLongArrayValues>
newLongArrayValuesPool(Class<? extends Wire> wireType) {

if (TextWire.class.isAssignableFrom(wireType))
return withInitial(TextLongArrayReference::new);
if (BinaryWire.class.isAssignableFrom(wireType))
return withInitial(BinaryLongArrayReference::new);
else
throw new IllegalStateException("todo, unsupported type=" + wireType);

}

/** /**
* sans through every excerpts and records every 64th address in the index2indexs' * sans through every excerpts and records every 64th address in the index2indexs'
* *
Expand Down
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package net.openhft.chronicle.queue.impl; package net.openhft.chronicle.queue.impl;


import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.Bytes;
Expand All @@ -28,23 +27,23 @@
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;


/** /**
* Created by peter.lawrey on 30/01/15. * Created by peter.lawrey on 30/01/15.
*/ */
public class SingleAppender implements ExcerptAppender { public class SingleAppender implements ExcerptAppender {


@NotNull @NotNull
private final DirectChronicleQueue chronicle; private final SingleChronicleQueue chronicle;
private final Bytes buffer = NativeBytes.nativeBytes(); private final Bytes buffer;
private final Wire wire; private final Wire wire;


private long lastWrittenIndex = -1; private long lastWrittenIndex = -1;


public SingleAppender(ChronicleQueue chronicle, Function<Bytes, Wire> bytesToWire) { public SingleAppender(@NotNull final SingleChronicleQueue chronicle) {
this.chronicle = (DirectChronicleQueue) chronicle; this.buffer = NativeBytes.nativeBytes();
wire = bytesToWire.apply(buffer); this.chronicle = chronicle;
this.wire = chronicle.createWire(buffer);
} }


@Nullable @Nullable
Expand All @@ -68,10 +67,11 @@ public void writeDocument(@NotNull Consumer<WireOut> writer) {
@Override @Override
public long lastWrittenIndex() { public long lastWrittenIndex() {
if (lastWrittenIndex == -1) { if (lastWrittenIndex == -1) {
String message = "No document has been written using this appender, so the " + throw new IllegalStateException(
"lastWrittenIndex() is not available."; "No document has been written using this appender, so the "
throw new IllegalStateException(message); + "lastWrittenIndex() is not available.");
} }

return lastWrittenIndex; return lastWrittenIndex;
} }


Expand Down

0 comments on commit 0f09453

Please sign in to comment.