Skip to content

Commit

Permalink
CHRON-96 added a mapping function, still requires more testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Jan 13, 2015
1 parent d955b70 commit dfd79dd
Show file tree
Hide file tree
Showing 10 changed files with 599 additions and 319 deletions.
Expand Up @@ -20,6 +20,7 @@
import net.openhft.chronicle.tcp.*;
import net.openhft.lang.Jvm;
import net.openhft.lang.model.constraints.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -95,18 +96,14 @@ public static class IndexedChronicleQueueBuilder extends ChronicleQueueBuilder i
private int indexBlockSize;

/**
* On 64 bit JVMs it has the following params:
* <ul>
* <li>data block size <b>128M</b></li>
* On 64 bit JVMs it has the following params: <ul> <li>data block size <b>128M</b></li>
* </ul>
*
* On 32 bit JVMs it has the following params:
* <ul>
* <li>data block size <b>16M</b></li>
* On 32 bit JVMs it has the following params: <ul> <li>data block size <b>16M</b></li>
* </ul>
*/
private IndexedChronicleQueueBuilder(final File path) {
this.path = path;
this.path = path;
this.synchronous = false;
this.useCheckedExcerpt = false;
this.cacheLineSize = 64;
Expand All @@ -120,11 +117,10 @@ protected File path() {
}

/**
* Sets the synchronous mode to be used. Enabling synchronous mode means that
* {@link ExcerptCommon#finish()} will force a persistence every time.
* Sets the synchronous mode to be used. Enabling synchronous mode means that {@link
* ExcerptCommon#finish()} will force a persistence every time.
*
* @param synchronous If synchronous mode should be used or not.
*
* @return this builder object back
*/
public IndexedChronicleQueueBuilder synchronous(boolean synchronous) {
Expand All @@ -142,9 +138,7 @@ public boolean synchronous() {
}

/**
*
* @param useCheckedExcerpt
*
* @return this builder object back
*/
public IndexedChronicleQueueBuilder useCheckedExcerpt(boolean useCheckedExcerpt) {
Expand All @@ -153,22 +147,20 @@ public IndexedChronicleQueueBuilder useCheckedExcerpt(boolean useCheckedExcerpt)
}

/**
*
* @return true if useCheckedExcerpt mode is enabled, false otherwise.
*/
public boolean useCheckedExcerpt() {
return this.useCheckedExcerpt;
}

/**
* Sets the size of the index cache lines. Index caches (files) consist
* of fixed size lines, each line having ultiple index entries on it. This
* param specifies the size of such a multi entry line.
* Sets the size of the index cache lines. Index caches (files) consist of fixed size lines,
* each line having ultiple index entries on it. This param specifies the size of such a
* multi entry line.
*
* Default value is <b>64</b>.
*
* @param cacheLineSize the size of the cache lines making up index files
*
* @return this builder object back
*/
public IndexedChronicleQueueBuilder cacheLineSize(int cacheLineSize) {
Expand All @@ -177,8 +169,8 @@ public IndexedChronicleQueueBuilder cacheLineSize(int cacheLineSize) {
}

/**
* The size of the index cache lines (index caches are made up of multiple
* fixed length lines, each line contains multiple index entries).
* The size of the index cache lines (index caches are made up of multiple fixed length
* lines, each line contains multiple index entries).
*
* Default value is <b>64</b>.
*
Expand All @@ -189,13 +181,11 @@ public int cacheLineSize() {
}

/**
* Sets the size to be used for data blocks. The method also has a side
* effect. If the data block size specified is smaller than twice the
* message capacity, then the message capacity will be adjusted to equal
* half of the new data block size.
* Sets the size to be used for data blocks. The method also has a side effect. If the data
* block size specified is smaller than twice the message capacity, then the message
* capacity will be adjusted to equal half of the new data block size.
*
* @param dataBlockSize the size of the data blocks
*
* @return this builder object back
*/
public IndexedChronicleQueueBuilder dataBlockSize(int dataBlockSize) {
Expand All @@ -217,11 +207,10 @@ public int dataBlockSize() {
}

/**
* Sets the size to be used for index blocks. Is capped to the bigger
* value among <b>4096</b> and a <b>quarter of the <tt>data block size</tt></b>.
* Sets the size to be used for index blocks. Is capped to the bigger value among
* <b>4096</b> and a <b>quarter of the <tt>data block size</tt></b>.
*
* @param indexBlockSize the size of the index blocks
*
* @return this builder object back
*/
public IndexedChronicleQueueBuilder indexBlockSize(int indexBlockSize) {
Expand All @@ -239,12 +228,11 @@ public int indexBlockSize() {
}

/**
* The maximum size a message stored in a {@link net.openhft.chronicle.Chronicle}
* instance can have. Defaults to <b>128K</b>. Is limited by the <tt>data block size</tt>,
* can't be bigger than half of the data block size.
* The maximum size a message stored in a {@link net.openhft.chronicle.Chronicle} instance
* can have. Defaults to <b>128K</b>. Is limited by the <tt>data block size</tt>, can't be
* bigger than half of the data block size.
*
* @param messageCapacity the maximum message size that can be stored
*
* @return tthis builder object back
*/
public IndexedChronicleQueueBuilder messageCapacity(int messageCapacity) {
Expand All @@ -253,9 +241,9 @@ public IndexedChronicleQueueBuilder messageCapacity(int messageCapacity) {
}

/**
* The maximum size of the message that can be stored in the {@link net.openhft.chronicle.Chronicle}
* instance to be configured. Defaults to <b>128K</b>. Can't be bigger
* than half of the <tt>data block size</tt>.
* The maximum size of the message that can be stored in the {@link
* net.openhft.chronicle.Chronicle} instance to be configured. Defaults to <b>128K</b>.
* Can't be bigger than half of the <tt>data block size</tt>.
*
* @return the maximum message size that can be stored
*/
Expand All @@ -264,51 +252,43 @@ public int messageCapacity() {
}

/**
* A pre-defined ChronicleBuilder for small {@link net.openhft.chronicle.Chronicle} instances.
* A pre-defined ChronicleBuilder for small {@link net.openhft.chronicle.Chronicle}
* instances.
*
* It has the following params:
* <ul>
* <li>data block size <b>16M</b></li>
* </ul>
* It has the following params: <ul> <li>data block size <b>16M</b></li> </ul>
*/
public IndexedChronicleQueueBuilder small() {
dataBlockSize(16 * 1024 * 1024);
return this;
}

/**
* A pre-defined ChronicleBuilder for medium {@link net.openhft.chronicle.Chronicle} instances.
* A pre-defined ChronicleBuilder for medium {@link net.openhft.chronicle.Chronicle}
* instances.
*
* It has the following params:
* <ul>
* <li>data block size <b>128M</b></li>
* </ul>
* It has the following params: <ul> <li>data block size <b>128M</b></li> </ul>
*/
public IndexedChronicleQueueBuilder medium() {
dataBlockSize(128 * 1024 * 1024);
return this;
}

/**
* A pre-defined ChronicleBuilder for large {@link net.openhft.chronicle.Chronicle} instances.
* A pre-defined ChronicleBuilder for large {@link net.openhft.chronicle.Chronicle}
* instances.
*
* It has the following params:
* <ul>
* <li>data block size <b>512M</b></li>
* </ul>
* It has the following params: <ul> <li>data block size <b>512M</b></li> </ul>
*/
public IndexedChronicleQueueBuilder large() {
dataBlockSize(512 * 1024 * 1024);
return this;
}

/**
* A pre-defined ChronicleBuilder for test {@link net.openhft.chronicle.Chronicle} instances.
* A pre-defined ChronicleBuilder for test {@link net.openhft.chronicle.Chronicle}
* instances.
*
* It has the following params:
* <ul>
* <li>data block size <b>8k</b></li>
* </ul>
* It has the following params: <ul> <li>data block size <b>8k</b></li> </ul>
*/
public IndexedChronicleQueueBuilder test() {
dataBlockSize(8 * 1024);
Expand Down Expand Up @@ -362,7 +342,7 @@ public static class VanillaChronicleQueueBuilder extends ChronicleQueueBuilder {
private boolean cleanupOnClose;

private VanillaChronicleQueueBuilder(File path) {
this.path = path;
this.path = path;
this.synchronous = false;
this.useCheckedExcerpt = false;
this.cycleFormat = "yyyyMMdd";
Expand All @@ -381,11 +361,10 @@ protected File path() {
}

/**
* Sets the synchronous mode to be used. Enabling synchronous mode means that
* {@link ExcerptCommon#finish()} will force a persistence every time.
* Sets the synchronous mode to be used. Enabling synchronous mode means that {@link
* ExcerptCommon#finish()} will force a persistence every time.
*
* @param synchronous If synchronous mode should be used or not.
*
* @return this builder object back
*/
public VanillaChronicleQueueBuilder synchronous(boolean synchronous) {
Expand All @@ -403,9 +382,7 @@ public boolean synchronous() {
}

/**
*
* @param useCheckedExcerpt
*
* @return this builder object back
*/
public VanillaChronicleQueueBuilder useCheckedExcerpt(boolean useCheckedExcerpt) {
Expand All @@ -414,7 +391,6 @@ public VanillaChronicleQueueBuilder useCheckedExcerpt(boolean useCheckedExcerpt)
}

/**
*
* @return true if useCheckedExcerpt mode is enabled, false otherwise.
*/
public boolean useCheckedExcerpt() {
Expand All @@ -437,7 +413,7 @@ public VanillaChronicleQueueBuilder cycleLength(int cycleLength) {
public VanillaChronicleQueueBuilder cycleLength(int cycleLength, boolean check) {
if (check && cycleLength < VanillaChronicle.MIN_CYCLE_LENGTH) {
throw new IllegalArgumentException(
"Cycle length can't be less than " + VanillaChronicle.MIN_CYCLE_LENGTH + " ms!");
"Cycle length can't be less than " + VanillaChronicle.MIN_CYCLE_LENGTH + " ms!");
}

this.cycleLength = cycleLength;
Expand Down Expand Up @@ -467,15 +443,15 @@ public VanillaChronicleQueueBuilder dataBlockSize(long dataBlockSize) {
}

public VanillaChronicleQueueBuilder entriesPerCycle(long entriesPerCycle) {
if(entriesPerCycle < 256) {
if (entriesPerCycle < 256) {
throw new IllegalArgumentException("EntriesPerCycle must be at least 256");
}

if(entriesPerCycle > 1L << 48) {
if (entriesPerCycle > 1L << 48) {
throw new IllegalArgumentException("EntriesPerCycle must not exceed 1L << 48 (" + (1L << 48) + ")");
}

if(!((entriesPerCycle & -entriesPerCycle) == entriesPerCycle)) {
if (!((entriesPerCycle & -entriesPerCycle) == entriesPerCycle)) {
throw new IllegalArgumentException("EntriesPerCycle must be a power of 2");
}

Expand Down Expand Up @@ -557,7 +533,8 @@ public VanillaChronicleQueueBuilder clone() {
//
// *************************************************************************

public static abstract class ReplicaChronicleQueueBuilder extends ChronicleQueueBuilder {
public static abstract class ReplicaChronicleQueueBuilder extends ChronicleQueueBuilder
implements MappingProvider<ReplicaChronicleQueueBuilder> {
public static final TcpConnectionListener CONNECTION_LISTENER = new TcpConnectionHandler();

private final ChronicleQueueBuilder builder;
Expand All @@ -584,6 +561,9 @@ public static abstract class ReplicaChronicleQueueBuilder extends ChronicleQueue
private TimeUnit acceptorThreadPoolkeepAliveTimeUnit;
private TcpConnectionListener connectionListener;

@Nullable
private MappingFunction mapping;

private ReplicaChronicleQueueBuilder(Chronicle chronicle, ChronicleQueueBuilder builder) {
this.builder = builder;
this.chronicle = chronicle;
Expand Down Expand Up @@ -802,13 +782,29 @@ public Chronicle chronicle() {

@Override
public Chronicle build() throws IOException {
if(this.builder != null) {
if (this.builder != null) {
this.chronicle = this.builder.build();
}

return doBuild();
}

/**
* the {@code mapping} is send from the sink to the source ( usually via TCP/IP ) this
* mapping is then applied to the the source before the data is sent to the sink
*
* @param mapping a mapping function which is sent to the soure
* @return this object
*/
public ReplicaChronicleQueueBuilder withMapping(@Nullable MappingFunction mapping) {
this.mapping = mapping;
return this;
}

public MappingFunction withMapping() {
return this.mapping;
}

protected abstract Chronicle doBuild() throws IOException;

/**
Expand All @@ -819,7 +815,7 @@ public Chronicle build() throws IOException {
@NotNull
@SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
@Override
public ReplicaChronicleQueueBuilder clone(){
public ReplicaChronicleQueueBuilder clone() {
try {
return (ReplicaChronicleQueueBuilder) super.clone();
} catch (CloneNotSupportedException e) {
Expand All @@ -846,9 +842,9 @@ private SinkChronicleQueueBuilder(@NotNull Chronicle chronicle) {
public Chronicle doBuild() throws IOException {
SinkTcp cnx;

if(bindAddress() != null && connectAddress() == null) {
if (bindAddress() != null && connectAddress() == null) {
cnx = new SinkTcpAcceptor(this);
} else if(connectAddress() != null) {
} else if (connectAddress() != null) {
cnx = new SinkTcpInitiator(this);
} else {
throw new IllegalArgumentException("BindAddress and ConnectAddress are not set");
Expand Down Expand Up @@ -884,9 +880,9 @@ private SourceChronicleQueueBuilder(@NotNull Chronicle chronicle) {
public Chronicle doBuild() throws IOException {
SourceTcp cnx;

if(bindAddress() != null && connectAddress() == null) {
if (bindAddress() != null && connectAddress() == null) {
cnx = new SourceTcpAcceptor(this);
} else if(connectAddress() != null) {
} else if (connectAddress() != null) {
cnx = new SourceTcpInitiator(this);
} else {
throw new IllegalArgumentException("BindAddress and ConnectAddress are not set");
Expand Down

0 comments on commit dfd79dd

Please sign in to comment.