Skip to content

Commit

Permalink
improve last indexfile lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Apr 8, 2015
1 parent bbbd3ed commit 031d4ea
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
Expand Up @@ -563,6 +563,7 @@ public VanillaChronicleQueueBuilder clone() {
public static abstract class ReplicaChronicleQueueBuilder extends ChronicleQueueBuilder public static abstract class ReplicaChronicleQueueBuilder extends ChronicleQueueBuilder
implements MappingProvider<ReplicaChronicleQueueBuilder> { implements MappingProvider<ReplicaChronicleQueueBuilder> {


public static final int DEFAULT_SOCKET_BUFFER_SIZE = 256 * 1024;
public static final TcpConnectionListener CONNECTION_LISTENER = new TcpConnectionHandler(); public static final TcpConnectionListener CONNECTION_LISTENER = new TcpConnectionHandler();


private final ChronicleQueueBuilder builder; private final ChronicleQueueBuilder builder;
Expand Down Expand Up @@ -616,9 +617,9 @@ private ReplicaChronicleQueueBuilder(Chronicle chronicle, ChronicleQueueBuilder
this.selectTimeoutUnit = TimeUnit.MILLISECONDS; this.selectTimeoutUnit = TimeUnit.MILLISECONDS;
this.heartbeatInterval = 2500; this.heartbeatInterval = 2500;
this.heartbeatIntervalUnit = TimeUnit.MILLISECONDS; this.heartbeatIntervalUnit = TimeUnit.MILLISECONDS;
this.receiveBufferSize = 256 * 1024; this.receiveBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
this.sendBufferSize = -1; this.sendBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
this.minBufferSize = this.receiveBufferSize; this.minBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
this.sharedChronicle = false; this.sharedChronicle = false;
this.acceptorMaxBacklog = 50; this.acceptorMaxBacklog = 50;
this.acceptorDefaultThreads = 0; this.acceptorDefaultThreads = 0;
Expand Down
25 changes: 11 additions & 14 deletions chronicle/src/main/java/net/openhft/chronicle/VanillaChronicle.java
Expand Up @@ -516,7 +516,7 @@ private class VanillaAppenderImpl extends AbstractVanillaExcerpt implements Vani
private boolean nextSynchronous; private boolean nextSynchronous;
private long lastWrittenIndex; private long lastWrittenIndex;
private long[] positionArr = {0L}; private long[] positionArr = {0L};
private int dataCount; private int lastDataIndex;


public VanillaAppenderImpl() { public VanillaAppenderImpl() {
this.lastCycle = Integer.MIN_VALUE; this.lastCycle = Integer.MIN_VALUE;
Expand All @@ -525,7 +525,7 @@ public VanillaAppenderImpl() {
this.appenderCycle = -1; this.appenderCycle = -1;
this.appenderThreadId = -1; this.appenderThreadId = -1;
this.nextSynchronous = builder.synchronous(); this.nextSynchronous = builder.synchronous();
this.dataCount = 0; this.lastDataIndex = 0;
} }


@Override @Override
Expand Down Expand Up @@ -560,16 +560,14 @@ public void startExcerpt(long capacity, int cycle) {
lastThreadId = appenderThreadId; lastThreadId = appenderThreadId;
} }


if (dataBytes == null || indexBytes == null) { if (dataBytes == null) {
dataCount = dataCache.findNextDataCount(appenderCycle, appenderThreadId); lastDataIndex = dataCache.findNextDataCount(appenderCycle, appenderThreadId);
dataBytes = dataCache.dataFor(appenderCycle, appenderThreadId, dataCount, true); dataBytes = dataCache.dataFor(appenderCycle, appenderThreadId, lastDataIndex, true);
} }


if (dataBytes.remaining() < capacity + 4) { if (dataBytes.remaining() < capacity + 4) {
dataBytes.release(); dataBytes.release();
dataBytes = null; dataBytes = dataCache.dataFor(appenderCycle, appenderThreadId, ++lastDataIndex, true);
dataCount++;
dataBytes = dataCache.dataFor(appenderCycle, appenderThreadId, dataCount, true);
} }


startAddr = positionAddr = dataBytes.positionAddr() + 4; startAddr = positionAddr = dataBytes.positionAddr() + 4;
Expand Down Expand Up @@ -598,8 +596,10 @@ public void nextSynchronous(boolean nextSynchronous) {
} }
@Override @Override
public void finish() { public void finish() {
if (finished) if (finished) {
throw new IllegalStateException("Not started"); throw new IllegalStateException("Not started");
}

super.finish(); super.finish();
if (dataBytes == null) { if (dataBytes == null) {
return; return;
Expand All @@ -615,20 +615,17 @@ public void finish() {


try { try {
long position = VanillaIndexCache.append(indexBytes, indexValue, nextSynchronous); long position = VanillaIndexCache.append(indexBytes, indexValue, nextSynchronous);
long lvindex = -1;
if (position < 0) { if (position < 0) {
if (indexBytes != null) { if (indexBytes != null) {
indexBytes.release(); indexBytes.release();
indexBytes = null; indexBytes = null;
} }


indexBytes = indexCache.append(appenderCycle, indexValue, nextSynchronous, positionArr); indexBytes = indexCache.append(appenderCycle, indexValue, nextSynchronous, positionArr);
lvindex = indexFrom(appenderCycle, indexBytes.index(), positionArr[0]); setLastWrittenIndex(indexFrom(appenderCycle, indexBytes.index(), positionArr[0]));
} else { } else {
lvindex = indexFrom(appenderCycle, indexBytes.index(), position); setLastWrittenIndex(indexFrom(appenderCycle, indexBytes.index(), position));
} }

setLastWrittenIndex(lvindex);
} catch (IOException e) { } catch (IOException e) {
throw new AssertionError(e); throw new AssertionError(e);
} }
Expand Down
Expand Up @@ -138,8 +138,9 @@ public int findNextDataCount(int cycle, int threadId) throws IOException {
for (File file : files) { for (File file : files) {
if (file.getName().startsWith(dataPrefix)) { if (file.getName().startsWith(dataPrefix)) {
final int count = Integer.parseInt(file.getName().substring(dataPrefix.length())); final int count = Integer.parseInt(file.getName().substring(dataPrefix.length()));
if (maxCount < count) if (maxCount < count) {
maxCount = count; maxCount = count;
}
} }
} }
} }
Expand Down
Expand Up @@ -140,6 +140,11 @@ public synchronized void close() {
this.cache.close(); this.cache.close();
} }


int lastIndexFile() {
int lastCycle = (int)lastCycle();
return lastIndexFile(lastCycle);
}

int lastIndexFile(int cycle) { int lastIndexFile(int cycle) {
return lastIndexFile(cycle, 0); return lastIndexFile(cycle, 0);
} }
Expand Down Expand Up @@ -212,15 +217,17 @@ public synchronized VanillaMappedBytes append(


public long firstCycle() { public long firstCycle() {
File[] files = baseFile.listFiles(); File[] files = baseFile.listFiles();
if (files == null) if (files == null) {
return -1; return -1;
}


long firstDate = Long.MAX_VALUE; long firstDate = Long.MAX_VALUE;
for (File file : files) { for (File file : files) {
try { try {
long date = dateCache.parseCount(file.getName()); long date = dateCache.parseCount(file.getName());
if (firstDate > date) if (firstDate > date) {
firstDate = date; firstDate = date;
}
} catch (ParseException ignored) { } catch (ParseException ignored) {
// ignored // ignored
} }
Expand Down

0 comments on commit 031d4ea

Please sign in to comment.