Skip to content

Commit

Permalink
Merge pull request #95 from lburgazzoli/HFT-CHRON-75
Browse files Browse the repository at this point in the history
CHRON-75 : VolatileExcerpt crashes after some iterations
  • Loading branch information
lburgazzoli committed Oct 21, 2014
2 parents edbdc78 + f3376b4 commit 9eef325
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 112 deletions.
4 changes: 2 additions & 2 deletions chronicle-demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
<parent>
<groupId>net.openhft</groupId>
<artifactId>java-parent-pom</artifactId>
<version>1.1.1</version>
<version>1.1.2</version>
<relativePath/>
</parent>

<artifactId>chronicle-demo</artifactId>

<name>OpenHFT/Chronicle-Queue/chronicle-demo</name>
<version>3.2.3-SNAPSHOT</version>
<version>3.2.5-SNAPSHOT</version>
<description>Demo Processing Engine</description>

<dependencyManagement>
Expand Down
6 changes: 3 additions & 3 deletions chronicle-sandbox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
<parent>
<groupId>net.openhft</groupId>
<artifactId>java-parent-pom</artifactId>
<version>1.1.1</version>
<version>1.1.2</version>
<relativePath/>
</parent>

<version>3.2.3-SNAPSHOT</version>
<version>3.2.5-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<name>OpenHFT/Chronicle-Queue/chronicle-sandbox</name>
<artifactId>chronicle-sandbox</artifactId>
Expand All @@ -40,7 +40,7 @@
<groupId>net.openhft</groupId>
<artifactId>third-party-bom</artifactId>
<type>pom</type>
<version>3.4.6</version>
<version>3.4.8</version>
<scope>import</scope>
</dependency>

Expand Down
6 changes: 3 additions & 3 deletions chronicle-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

<modelVersion>4.0.0</modelVersion>
<artifactId>chronicle-test</artifactId>
<version>3.2.3-SNAPSHOT</version>
<version>3.2.5-SNAPSHOT</version>
<packaging>bundle</packaging>

<name>OpenHFT/Chronicle-Queue/chronicle-test</name>
Expand All @@ -42,7 +42,7 @@
<groupId>net.openhft</groupId>
<artifactId>third-party-bom</artifactId>
<type>pom</type>
<version>3.4.6</version>
<version>3.4.8</version>
<scope>import</scope>
</dependency>
<dependency>
Expand All @@ -53,7 +53,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>affinity</artifactId>
<version>2.1.1</version>
<version>2.1.2</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
16 changes: 14 additions & 2 deletions chronicle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<groupId>net.openhft</groupId>
<artifactId>third-party-bom</artifactId>
<type>pom</type>
<version>3.4.6</version>
<version>3.4.8</version>
<scope>import</scope>
</dependency>
<dependency>
Expand All @@ -51,7 +51,12 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>affinity</artifactId>
<version>2.1.1</version>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down Expand Up @@ -117,6 +122,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ public boolean read(int size) throws IOException {

public boolean read(int threshod, int size) throws IOException {
if(!closed) {
if (buffer.remaining() < threshod) {
int rem = buffer.remaining();
if (rem < threshod) {
if (buffer.remaining() == 0) {
buffer.clear();
} else {
Expand Down Expand Up @@ -832,24 +833,23 @@ public boolean nextIndex() {
switch (excerptSize) {
case ChronicleTcp.IN_SYNC_LEN:
case ChronicleTcp.PADDED_LEN:
return false;
case ChronicleTcp.SYNC_IDX_LEN:
return false;//nextIndex();
return false;
}

if (excerptSize > 128 << 20 || excerptSize < 0) {
throw new StreamCorruptedException("Size was " + excerptSize);
}

if(buffer.remaining() < excerptSize) {
if(!connector.read(buffer.remaining() - excerptSize)) {
if(!connector.read(excerptSize)) {
return false;
}
}

index = receivedIndex;
positionAddr = startAddr + buffer.position();
limitAddr = startAddr + buffer.limit();
limitAddr = positionAddr + excerptSize;
lastSize = excerptSize;
finished = false;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,33 @@


import net.openhft.chronicle.*;
import net.openhft.lang.io.Bytes;
import net.openhft.lang.io.serialization.BytesMarshallable;
import net.openhft.lang.model.constraints.NotNull;
import net.openhft.chronicle.tools.ChronicleTools;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.util.Date;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class VolatileChronicleTestBase {
protected static final Logger LOGGER = LoggerFactory.getLogger("VolarileChronicleTest");
protected static final Logger LOGGER = LoggerFactory.getLogger("VolatileChronicleTestBase");
protected static final String TMP_DIR = System.getProperty("java.io.tmpdir");
protected static final String PREFIX = "ch-volatile-";
protected static final int BASE_PORT = 12000;
Expand Down Expand Up @@ -103,4 +119,210 @@ protected ChronicleSource vanillaChronicleSource(String basePath, int port) thro
protected ChronicleSource vanillaChronicleSource(String basePath, int port, VanillaChronicleConfig config) throws IOException {
return new ChronicleSource(new VanillaChronicle(basePath, config), port);
}

// *************************************************************************
//
// *************************************************************************

protected void testJiraChron74(final int port, final Chronicle source) throws Exception {
Chronicle sink = null;
ExcerptTailer tailer = null;

try {
sink = volatileChronicleSink("localhost", port);
tailer = sink.createTailer();
assertFalse(tailer.nextIndex());
tailer.close();

sink.close();
sink.clear();
sink = null;

final ExcerptAppender appender = source.createAppender();
appender.startExcerpt(8);
appender.writeLong(1);
appender.finish();
appender.startExcerpt(8);
appender.writeLong(2);
appender.finish();

sink = volatileChronicleSink("localhost", port);
tailer = sink.createTailer().toStart();
assertTrue("nextIndex should return true", tailer.nextIndex());
assertEquals(1L, tailer.readLong());
tailer.finish();
assertTrue("nextIndex should return true", tailer.nextIndex());
assertEquals(2L, tailer.readLong());
tailer.finish();
tailer.close();
tailer = null;

sink.close();
sink.clear();
sink = null;

sink = volatileChronicleSink("localhost", port);
tailer = sink.createTailer().toEnd();
assertFalse("nextIndex should return false", tailer.nextIndex());

sink.close();
sink.clear();
sink = null;

appender.close();
} finally {
source.close();
source.clear();
}
}

protected void testJiraChron75(final int port, final Chronicle source) throws Exception {
final int items = 1000000;
final int clients = 3;
final int warmup = 100;

final ExecutorService executor = Executors.newFixedThreadPool(clients);
final CountDownLatch latch = new CountDownLatch(warmup);

try {
for(int i=0;i<clients;i++) {
executor.submit(new Runnable() {
@Override
public void run() {
int cnt = 0;
ExcerptTailer tailer = null;
Chronicle sink = null;

try {
final long threadId = Thread.currentThread().getId();

sink = new ChronicleSink("localhost", port);
tailer = sink.createTailer().toStart();

latch.await();

LOGGER.info("Start ChronicleSink on thread {}", threadId);
int lastK = 0;
for(cnt=0; cnt<items;) {
if(tailer.nextIndex()) {
Jira75Quote quote = tailer.readObject(Jira75Quote.class);
tailer.finish();

assertEquals(cnt, quote.getQuantity(), 0);
assertEquals(cnt, quote.getPrice(), 0);
assertEquals("instr-" + cnt, quote.getInstrument());
assertEquals('f' , quote.getEntryType());

cnt++;
}
}

tailer.close();
sink.close();
} catch(Exception e) {
LOGGER.warn("Exception {}", cnt, e);
}
}
});
}

LOGGER.info("Write {} elements to the source", items);
final ExcerptAppender appender = source.createAppender();
for(int i=0;i<items;i++) {
appender.startExcerpt(1000);
appender.writeObject(new Jira75Quote(i,i,DateTime.now(),"instr-" + i,'f'));
appender.finish();

if(i < warmup) {
latch.countDown();
}
}

appender.close();

executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch(Exception e) {
LOGGER.warn("Exception", e);
} finally {
source.close();
source.clear();
}
}

// *************************************************************************
//
// *************************************************************************

protected static final class Jira75Quote implements BytesMarshallable {
double price;
double quantity;
DateTime dateTime;
String instrument;
char entryType;

public Jira75Quote() {
this.price = 0;
this.quantity = 0;
this.dateTime = null;
this.instrument = "";
this.entryType = ' ';
}

public Jira75Quote(double price, double quantity, DateTime dateTime, String instrument, char entryType) {
this.price = price;
this.quantity = quantity;
this.dateTime = dateTime;
this.instrument = instrument;
this.entryType = entryType;
}

public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }

public double getQuantity() { return quantity; }
public void setQuantity(double quantity) { this.quantity = quantity; }

public DateTime getDateTime() { return dateTime; }
public void setDateTime(DateTime dateTime) { this.dateTime = dateTime; }

public String getInstrument() { return instrument; }
public void setInstrument(String instrument) { this.instrument = instrument; }

public char getEntryType() { return entryType; }
public void setEntryType(char entryType) { this.entryType = entryType; }

public String toString() {
return "Jira75Quote ["
+ "price=" + price
+ ", quantity=" + quantity
+ ", dateTime=" + dateTime
+ ", instrument=" + instrument
+ ", entryType=" + entryType
+ "]";
}

@Override
public void readMarshallable(@NotNull Bytes in) throws IllegalStateException {
boolean readDateTime = in.readBoolean();
price = in.readDouble();
quantity = in.readDouble();
instrument = in.readUTFΔ();
entryType = in.readChar();
dateTime = readDateTime ? new DateTime(new Date(in.readLong())).withZone(DateTimeZone.UTC) : null;
}

@Override
public void writeMarshallable(@NotNull Bytes out) {
boolean writeDateTime = getDateTime() != null;
out.writeBoolean(writeDateTime);
out.writeDouble(price);
out.writeDouble(quantity);
out.writeUTFΔ(instrument);
out.writeChar(entryType);
if(writeDateTime) {
out.writeLong(dateTime.toDate().getTime());
}
}
}
}
Loading

0 comments on commit 9eef325

Please sign in to comment.