Skip to content

Commit

Permalink
Correct some typos.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Aug 7, 2019
1 parent 52e2f38 commit dfa5e19
Show file tree
Hide file tree
Showing 16 changed files with 44 additions and 81 deletions.
2 changes: 1 addition & 1 deletion MigratingToV4.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ try (DocumentContext dc = appender.writingDocument()) {

=== Changes to the Bytes API.

Chronicle Bytes disguishes between the read and write position, remaining and limit.
Chronicle Bytes distinguishes between the read and write position, remaining and limit.

When reading you want to use

Expand Down
2 changes: 1 addition & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ try (DocumentContext dc = appender.writingDocument()) {
==== Ensure long Running tasks are not performed with a writingDocument()

The `writingDocument()` should be performed as quickly as possible because a write lock is held until the `DocumentContext` is closed by the try-with-resources. This blocks other appenders and tailers. More dangerously,
if something keeps the thread busy long enough(more than recovery timeout, which is 20 seconds by defult) between call to `appender.writingDocument()` and code that actually writes something into bytes,
if something keeps the thread busy long enough(more than recovery timeout, which is 20 seconds by default) between call to `appender.writingDocument()` and code that actually writes something into bytes,
it can cause recovery to kick in from other appenders (potentially in other process), which will rewrite message header, and if your thread subsequently continues writing its own message it will corrupt queue file.

[source, Java]
Expand Down
4 changes: 2 additions & 2 deletions docs/FAQ.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ You can have any number of writers. However, you may get higher throughput if yo

=== Does Chronicle Queue support replication?

Replication features are availabler in Chronicle Queue Enterprise. Replication supports:
Replication features are available in Chronicle Queue Enterprise. Replication supports:

- replication of a single master to multiple slave nodes.
- writers can wait for replication to be acknowledged.
Expand All @@ -281,7 +281,7 @@ No. Chronicle Queue is designed to be both reliable and deterministic. UDP is n

=== How do I know the consumer is up to date?

For the tailer, either replicated or notreplicated, you can assume you are up-to-date when either `isPresent()` is `false`, or your read method returns `false`
For the tailer, either replicated or not replicated, you can assume you are up-to-date when either `isPresent()` is `false`, or your read method returns `false`

=== What are the differences between the `files directory-listing.cq4t` and `metadata.cq4t` ?

Expand Down
2 changes: 1 addition & 1 deletion docs/How_it_works.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ System.out.println(tailer.readText());
----

Create a tmp directoyr for the Chronicle queue giving the name starting `Java_temp_directory/SimpleChronicle` as the base folder.
Create a tmp directory for the Chronicle queue giving the name starting `Java_temp_directory/SimpleChronicle` as the base folder.

[source,java]
----
Expand Down
4 changes: 2 additions & 2 deletions docs/encryption.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ public SingleChronicleQueueBuilder aesEncryption(@Nullable byte[] keyBytes) {
....

== Customer specified encryption
You can supply a bespoke encryption method to encrpypt your messages using, perhaps, a more complex encryption method.
You can supply a bespoke encryption method to encrypt your messages using, perhaps, a more complex encryption method.

For example, you could perhaps combine encryption with salting, and/or compression.

Another example could be to write simple custom code that will encrypt the more important messages, while saving on overhead by not encrypting unimportant messages.

To enable this form of queue encrpyption, specify `codingSuppliers` at queue build time and supply the bespoke encryption method.
To enable this form of queue encryption, specify `codingSuppliers` at queue build time and supply the bespoke encryption method.

For example:

Expand Down
8 changes: 4 additions & 4 deletions docs/queue-replication-message-protocol-overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ In order to make the messages human-readable in this document, the messages are

=== Who should read this document

Anyone who wants to get more information into the data that is transfered between the remote hosts, used by Chronicle
Anyone who wants to get more information into the data that is transferred between the remote hosts, used by Chronicle
Queue Enterprise, to do TCP/IP replication. This document is not a queue functionality overview document.

== UberHandler
Expand Down Expand Up @@ -87,7 +87,7 @@ Heartbeat handlers are set up on both the source, and the sink machines. Periodi

=== Exchanging data between Source and Sink

Chronicle queues are repliated using the `SourceReplicationHandler`, which sends its messages to the `SinkReplicationHandler`. Both the `SourceReplicationHandler` and `SinkReplicationHandler` are SubHandlers.
Chronicle queues are replicated using the `SourceReplicationHandler`, which sends its messages to the `SinkReplicationHandler`. Both the `SourceReplicationHandler` and `SinkReplicationHandler` are SubHandlers.

However, handshaking messages are sent back from the `SinkReplicationHandler` to the `SourceReplicationHandler`. One such message is the acknowledgement message which the sink sends the source. By inspecting the `idx: 0x451600000000` (see example message below) this allows you to add code on your source machine to ensure that the message was received by the remote machine, and therefore successfully replicated.

Expand All @@ -99,7 +99,7 @@ idx: 0x451600000000
ns: 10849029994071
```

The Sink and Source handers are as follows:
The Sink and Source handlers are as follows:

[%autowidth]
|===
Expand Down Expand Up @@ -167,7 +167,7 @@ eos: !!null "" # END_OF_STREAM

The sink replication handler then receives new messages from the SourceReplicationHandler. When it receives these new messages it uses a chronicle queue appender to write them to a chronicle queue.

NOTE: When the messages are written to the chronicle queue by the `SinkReplicationHandler`, we write the message and take account of the source index, to guarantee that the order of messages from the source exactly matches the order of messages on the sink. In addition, using the index also ensures that the message has always been written to the correct queue file. This is even if events such has roll-over have occured. This would typically cause a normal appender to write the message to the next queue file. This is not what we want, because we create an exact copy of messages, from the source to the sink.
NOTE: When the messages are written to the chronicle queue by the `SinkReplicationHandler`, we write the message and take account of the source index, to guarantee that the order of messages from the source exactly matches the order of messages on the sink. In addition, using the index also ensures that the message has always been written to the correct queue file. This is even if events such has roll-over have occurred. This would typically cause a normal appender to write the message to the next queue file. This is not what we want, because we create an exact copy of messages, from the source to the sink.

== Sending data with the SourceReplicationHandler

Expand Down
31 changes: 14 additions & 17 deletions src/main/java/net/openhft/chronicle/queue/BenchmarkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,25 @@ static void benchmark(int messageSize) {

Thread reader = new Thread(() -> {
// try (ChronicleQueue queue2 = createQueue(path))
ChronicleQueue queue2 = queue;
{
ExcerptTailer tailer = queue2.createTailer().toEnd();
long endLoop = System.nanoTime();
while (running) {
loopTime.sample(System.nanoTime() - endLoop);
Jvm.safepoint();
ExcerptTailer tailer = queue.createTailer().toEnd();
long endLoop = System.nanoTime();
while (running) {
loopTime.sample(System.nanoTime() - endLoop);
Jvm.safepoint();

// readerLoopTime = System.nanoTime();
// if (readerLoopTime - readerEndLoopTime > 1000)
// System.out.println("r " + (readerLoopTime - readerEndLoopTime));
try {
runInner(transportTime, readTime, tailer);
runInner(transportTime, readTime, tailer);
runInner(transportTime, readTime, tailer);
runInner(transportTime, readTime, tailer);
} finally {
// try {
runInner(transportTime, readTime, tailer);
runInner(transportTime, readTime, tailer);
runInner(transportTime, readTime, tailer);
runInner(transportTime, readTime, tailer);
// } finally {
// readerEndLoopTime = System.nanoTime();
}
Jvm.safepoint();
endLoop = System.nanoTime();
}
// }
Jvm.safepoint();
endLoop = System.nanoTime();
}
});
reader.start();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static void scheduleShrinking(File queueFile, long writePos) {
raf.setLength(writePos);
raf.close();
} catch (IOException ex) {
// on macrosux windows, keep retrying until the file is unmapped
// on microsoft windows, keep retrying until the file is unmapped
if (ex.getMessage().contains("The requested operation cannot be performed on a file with a user-mapped section open"))
continue;
LOG.warn("Failed to shrink file " + queueFile, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ReferenceCountedCache(Function<T, V> transformer, ThrowingFunction<K, T,
@NotNull
V get(@NotNull final K key) throws E {

// remove all which have been dereferenced. Garbagy but rare
// remove all which have been dereferenced. Garbagey but rare
cache.entrySet().removeIf(entry -> entry.getValue().refCount() == 0);

@Nullable T value = cache.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public <T extends Metadata> void overrideFrom(T metadata) {
}

if (!(other.sourceId == 0 || sourceId == 0 || other.sourceId == sourceId)) {
Jvm.warn().on(getClass(), "inconsistency with of source ids, existing sourceid=" + other.sourceId + ", requested sourceid=" + sourceId);
Jvm.warn().on(getClass(), "inconsistency with of source ids, existing sourceId=" + other.sourceId + ", requested sourceId=" + sourceId);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public long countExcerpts(long fromIndex, long toIndex) throws IllegalStateExcep
if (cycles.size() == 2)
return result;

final Long[] array = cycles.toArray(new Long[cycles.size()]);
final long[] array = cycles.stream().mapToLong(i -> i).toArray();
for (int i = 1; i < array.length - 1; i++) {
long x = exceptsPerCycle(Math.toIntExact(array[i]));
result += x;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

public class SingleChronicleQueueBuilder implements Cloneable, Marshallable {
public static final String DEFAULT_ROLL_CYCLE_PROPERTY = "net.openhft.queue.builder.defaultRollCycle";
private static final Constructor ENTERPISE_QUEUE_CONSTRUCTOR;
private static final Constructor ENTERPRISE_QUEUE_CONSTRUCTOR;
private static final String DEFAULT_EPOCH_PROPERTY = "net.openhft.queue.builder.defaultEpoch";
private static final Logger LOGGER = LoggerFactory.getLogger(SingleChronicleQueueBuilder.class);

Expand All @@ -88,7 +88,7 @@ public class SingleChronicleQueueBuilder implements Cloneable, Marshallable {
} catch (Exception e) {
co = null;
}
ENTERPISE_QUEUE_CONSTRUCTOR = co;
ENTERPRISE_QUEUE_CONSTRUCTOR = co;
}

}
Expand Down Expand Up @@ -273,7 +273,7 @@ static SingleChronicleQueueStore loadStore(@NotNull Wire wire) {
}

private static boolean isQueueReplicationAvailable() {
return ENTERPISE_QUEUE_CONSTRUCTOR != null;
return ENTERPRISE_QUEUE_CONSTRUCTOR != null;
}

private static RollCycle loadDefaultRollCycle() {
Expand All @@ -290,6 +290,7 @@ private static RollCycle loadDefaultRollCycle() {
if (rollCyclePropertyParts.length < 2) {
LOGGER.warn("Default roll cycle configured as enum, but enum value not specified: " + rollCycleProperty);
} else {
@SuppressWarnings("unchecked")
Class<Enum> eClass = (Class<Enum>) rollCycleClass;
Object instance = ObjectUtils.valueOf(eClass, rollCyclePropertyParts[1]);
if (instance instanceof RollCycle) {
Expand Down Expand Up @@ -351,18 +352,18 @@ private boolean checkEnterpriseFeaturesRequested() {
}

private boolean onlyAvailableInEnterprise(final String feature) {
if (ENTERPISE_QUEUE_CONSTRUCTOR == null)
if (ENTERPRISE_QUEUE_CONSTRUCTOR == null)
LOGGER.warn(feature + " is only supported in Chronicle Queue Enterprise. If you would like to use this feature, please contact sales@chronicle.software for more information.");
return true;
}

@NotNull
private SingleChronicleQueue buildEnterprise() {
if (ENTERPISE_QUEUE_CONSTRUCTOR == null)
if (ENTERPRISE_QUEUE_CONSTRUCTOR == null)
throw new IllegalStateException("Enterprise features requested but Chronicle Queue Enterprise is not in the class path!");

try {
return (SingleChronicleQueue) ENTERPISE_QUEUE_CONSTRUCTOR.newInstance(this);
return (SingleChronicleQueue) ENTERPRISE_QUEUE_CONSTRUCTOR.newInstance(this);
} catch (Exception e) {
throw new IllegalStateException("Couldn't create an instance of Enterprise queue", e);
}
Expand Down Expand Up @@ -435,7 +436,7 @@ protected void initializeMetadata() {
overrideRollCycleForFileNameLength(newMeta.roll().format().length());
}

// if it was overriden - reset
// if it was overridden - reset
rollTime = newMeta.roll().rollTime();
rollTimeZone = newMeta.roll().rollTimeZone();
epoch = newMeta.roll().epoch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,10 @@ static void pingPong(int size) {

long wakeTime = System.nanoTime();
while (running) {
int count = 0;
try (DocumentContext dc = tailer.readingDocument(true)) {
if (!dc.isPresent()) {
count++;
if (!dc.isPresent())
continue;
}
}
if (count > 0)
System.out.println(count);
break;
}
long delay = wakeTime - writeTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public class ChronicleHistoryReader {

private static final int SUMMART_OUTPUT_UNSET = -999;
private static final int SUMMARY_OUTPUT_UNSET = -999;
protected Path basePath;
protected Consumer<String> messageSink;
protected boolean progress = false;
Expand All @@ -33,7 +33,7 @@ public class ChronicleHistoryReader {
protected long measurementWindowNanos = 0;
protected long firstTimeStampNanos = 0;
protected long lastWindowCount = 0;
protected int summaryOutputOffset = SUMMART_OUTPUT_UNSET;
protected int summaryOutputOffset = SUMMARY_OUTPUT_UNSET;
protected int lastHistosSize = 0;

public ChronicleHistoryReader withMessageSink(final Consumer<String> messageSink) {
Expand Down Expand Up @@ -112,7 +112,7 @@ public Map<String, Histogram> readChronicle() {
}

public void outputData() {
if (summaryOutputOffset != SUMMART_OUTPUT_UNSET)
if (summaryOutputOffset != SUMMARY_OUTPUT_UNSET)
printSummary();
else
printPercentilesSummary();
Expand Down Expand Up @@ -148,7 +148,7 @@ private void printSummary() {
}
long tsSinceStart = (lastWindowCount * measurementWindowNanos) - firstTimeStampNanos;
messageSink.accept(
Long.toString(timeUnit.convert(tsSinceStart, TimeUnit.NANOSECONDS)) + "," +
timeUnit.convert(tsSinceStart, TimeUnit.NANOSECONDS) + "," +
histos.values().stream().
map(h -> Long.toString(timeUnit.convert((long) offset(h.getPercentiles(), summaryOutputOffset), TimeUnit.NANOSECONDS))).
collect(Collectors.joining(",")));
Expand Down Expand Up @@ -225,7 +225,7 @@ protected void processMessage(CharSequence methodName, MessageHistory history) {
Histogram histo1 = histos.computeIfAbsent("startTo" + histoId, s -> histogram());
histo1.sample(receivedByThisComponent - history.timing(0));
} else if (lastTime != 0) {
Histogram histo1 = histos.computeIfAbsent(Integer.toString(history.sourceId(sourceIndex - 1)) + "to" + histoId, s -> histogram());
Histogram histo1 = histos.computeIfAbsent(history.sourceId(sourceIndex - 1) + "to" + histoId, s -> histogram());
// here we are comparing System.nanoTime across processes. YMMV
histo1.sample(receivedByThisComponent - lastTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,7 @@ private void moveToSpecifiedPosition(final ChronicleQueue ic, final ExcerptTaile
}

messageSink.accept("Waiting for startIndex " + startIndex);
for (; ; ) {
if (tailer.moveToIndex(startIndex))
break;
while (!tailer.moveToIndex(startIndex)) {
Jvm.pause(100);
}
}
Expand Down

0 comments on commit dfa5e19

Please sign in to comment.