Skip to content

Commit

Permalink
Merge changes setting limit before position.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Mar 8, 2016
1 parent 5b14a14 commit f37c0fc
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId>
<version>1.11.17-SNAPSHOT</version>
<version>1.11.18-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Expand Up @@ -216,7 +216,7 @@ public int lastCycle() {
@Nullable final String[] files = path.list();

if (files == null)
return Integer.MAX_VALUE;
return Integer.MIN_VALUE;

for (String file : files) {
try {
Expand Down
Expand Up @@ -374,9 +374,61 @@ long acquireIndex2Index(Wire wire) {
* @param wire the context that we are referring to
* @param address the address of the Excerpts which we are going to record
*/
public long storeIndexLocation(@NotNull Wire wire, final long address) {
// for now, only index on read attempts.
public long storeIndexLocation(@NotNull Wire wire,
final long address) {

return -1;
/*
lastSequenceNumber(sequenceNumber);
long writePosition = wire.bytes().writePosition();
try {
if (sequenceNumber % 64 != 0)
return;
final LongArrayValues array = this.longArray.get();
final long indexToIndex0 = indexToIndex(wire);
long secondaryAddress;
try (DocumentContext context = wire.readingDocument(indexToIndex0)) {
if (!context.isPresent())
throw new IllegalStateException("document not found");
if (!context.isMetaData()) {
System.out.println("===\n"+Wires.fromSizePrefixedBlobs(wire.bytes(), 0, 2048)+"\n===");
// System.out.println("=== 495 +++\n"+Wires.fromSizePrefixedBlobs(wire.bytes(), 495, 2048)+"\n<<< 495 +++");
throw new IllegalStateException("sequenceNumber not found");
}
@NotNull final LongArrayValues primaryIndex = array(wire, array);
final long primaryOffset = toAddress0(sequenceNumber);
// TODO fix a race condition here.
secondaryAddress = primaryIndex.getValueAt(primaryOffset);
if (secondaryAddress == Wires.NOT_INITIALIZED) {
secondaryAddress = newIndex(wire);
writePosition = Math.max(writePosition, wire.bytes().writePosition());
primaryIndex.setValueAt(primaryOffset, secondaryAddress);
}
}
@NotNull final Bytes<?> bytes = wire.bytes();
bytes.readLimit(bytes.capacity());
try (DocumentContext context = wire.readingDocument(secondaryAddress)) {
@NotNull final LongArrayValues array1 = array(wire, array);
if (!context.isPresent())
throw new IllegalStateException("document not found");
if (!context.isMetaData())
throw new IllegalStateException("sequenceNumber not found");
array1.setValueAt(toAddress1(sequenceNumber), address);
}
} finally {
wire.bytes().writePosition(writePosition);
}
*/
}

@NotNull
Expand Down
@@ -1,58 +1,82 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.wire.*;
import net.openhft.chronicle.wire.WireType;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Created by daniel on 07/03/2016.
*/
public class ToEndTest {
@Test
public void toEndTest(){
System.out.println(OS.TARGET + "/test");
IOTools.shallowDeleteDirWithFiles(OS.TARGET + "/test");
public void toEndTest() {
String baseDir = OS.TARGET + "/toEndTest";
System.out.println(baseDir);
IOTools.shallowDeleteDirWithFiles(baseDir);
List<Integer> results = new ArrayList<>();
ChronicleQueue queue = new SingleChronicleQueueBuilder(OS.TARGET + "/test").
bufferCapacity(4 << 20)
ChronicleQueue queue = new SingleChronicleQueueBuilder(baseDir).
bufferCapacity(4 << 20)
.buffered(false)
.wireType(WireType.BINARY)
.blockSize(64 << 20)
.build();

checkOneFile(baseDir);
ExcerptAppender appender = queue.createAppender();
checkOneFile(baseDir);


for(int i=0; i<10; i++) {
final int j=i;
for (int i = 0; i < 10; i++) {
final int j = i;
appender.writeDocument(wire -> wire.write(() -> "msg").int32(j));
}

checkOneFile(baseDir);

ExcerptTailer tailer = queue.createTailer();
checkOneFile(baseDir);

tailer.toEnd();
checkOneFile(baseDir);
fillResults(tailer, results);
checkOneFile(baseDir);
assertEquals(0, results.size());

tailer.toStart();
checkOneFile(baseDir);
fillResults(tailer, results);
assertEquals(10, results.size());
checkOneFile(baseDir);
}

private void checkOneFile(String baseDir) {
String[] files = new File(baseDir).list();

if (files == null || files.length == 0)
return;

if (files.length == 1)
assertTrue(files[0], files[0].startsWith("2"));
else
fail("Too many files " + Arrays.toString(files));
}

@NotNull
private List<Integer> fillResults(ExcerptTailer tailer,List<Integer> results) {
for(int i=0; i<10; i++) {
private List<Integer> fillResults(ExcerptTailer tailer, List<Integer> results) {
for (int i = 0; i < 10; i++) {
tailer.readDocument(wire -> results.add(wire.read(() -> "msg").int32()));
}
return results;
Expand Down

0 comments on commit f37c0fc

Please sign in to comment.