From 32964a007e94f01e50fc694c4a07cf548c8b7cc3 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 29 Jun 2017 17:58:54 +0200 Subject: [PATCH 1/3] ARTEMIS-1266 Mapped Journal refactoring The MAPPED journal refactoring include: - simplified lifecycle and logic (eg fixed file size with single mmap memory region) - supports for the TimedBuffer to coalesce msyncs (via Decorator pattern) - TLAB pooling of direct ByteBuffer like the NIO journal - remove of old benchmarks and benchmark dependencies --- .../cli/commands/util/SyncCalculation.java | 11 +- .../artemis/core/io/mapped/BytesUtils.java | 26 +- .../core/io/mapped/MappedByteBufferCache.java | 224 ------------------ .../artemis/core/io/mapped/MappedFile.java | 184 +++++++------- .../core/io/mapped/MappedSequentialFile.java | 74 +++--- .../mapped/MappedSequentialFileFactory.java | 131 ++++++---- .../core/io/mapped/TimedSequentialFile.java | 164 +++++-------- .../artemis/core/io/JournalTptBenchmark.java | 17 +- .../core/io/SequentialFileTptBenchmark.java | 21 +- .../impl/journal/JournalStorageManager.java | 3 +- tests/extra-tests/pom.xml | 28 --- .../journal/JournalImplLatencyBench.java | 153 ------------ .../gcfree/AddJournalRecordEncoder.java | 105 -------- .../journal/gcfree/EncodersBench.java | 114 --------- .../journal/gcfree/GcFreeJournal.java | 80 ------- .../gcfree/GcFreeJournalLatencyBench.java | 131 ---------- .../journal/gcfree/JournalRecordHeader.java | 27 --- .../journal/gcfree/JournalRecordTypes.java | 29 --- .../SequentialFileLatencyBench.java | 128 ---------- .../journal/MappedImportExportTest.java | 20 +- .../journal/MappedJournalCompactTest.java | 20 +- .../journal/MappedJournalImplTest.java | 20 +- .../MappedSequentialFileFactoryTest.java | 4 +- .../ValidateTransactionHealthTest.java | 9 +- 24 files changed, 391 insertions(+), 1332 deletions(-) delete mode 100644 artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournalLatencyBench.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java delete mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java index 25c8b270f2a..860bcb68181 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; @@ -190,13 +189,9 @@ private static SequentialFileFactory newFactory(File datafolder, boolean datasyn ((AIOSequentialFileFactory) factory).disableBufferReuse(); return factory; case MAPPED: - factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() { - @Override - public void onIOException(Throwable code, String message, SequentialFile file) { - - } - }, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync); - + factory = MappedSequentialFileFactory.unbuffered(datafolder, fileSize, null) + .setDatasync(datasync) + .disableBufferReuse(); factory.start(); return factory; default: diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java index 3ff723f413d..986b69839a5 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java @@ -29,8 +29,32 @@ public static long align(final long value, final long alignment) { return (value + (alignment - 1)) & ~(alignment - 1); } + /** + * Is a value a positive power of two. + * + * @param value to be checked. + * @return true if the number is a positive power of two otherwise false. + */ + public static boolean isPowOf2(final int value) { + return Integer.bitCount(value) == 1; + } + + /** + * Test if a value is pow2alignment-aligned. + * + * @param value to be tested. + * @param pow2alignment boundary the address is tested against. + * @return true if the address is on the aligned boundary otherwise false. + * @throws IllegalArgumentException if the alignment is not a power of 2 + */ + public static boolean isAligned(final long value, final int pow2alignment) { + if (!isPowOf2(pow2alignment)) { + throw new IllegalArgumentException("Alignment must be a power of 2"); + } + return (value & (pow2alignment - 1)) == 0; + } + public static void zerosDirect(final ByteBuffer buffer) { - //TODO When PlatformDependent will be replaced by VarHandle or Unsafe, replace with safepoint-fixed setMemory //DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops int remaining = buffer.capacity(); long address = PlatformDependent.directBufferAddress(buffer); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java deleted file mode 100644 index 73384c8181e..00000000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.io.mapped; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.lang.ref.WeakReference; -import java.nio.ByteOrder; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.util.ArrayList; - -import io.netty.util.internal.PlatformDependent; - -final class MappedByteBufferCache implements AutoCloseable { - - public static final int PAGE_SIZE = Integer.parseInt(System.getProperty("os_page_size", "4096")); - private static final Object FILE_LOCK = new Object(); - private final RandomAccessFile raf; - private final FileChannel fileChannel; - private final long chunkBytes; - private final long overlapBytes; - private final ArrayList> byteBuffers; - private final File file; - private final long mappedSize; - private boolean closed; - - private MappedByteBufferCache(File file, RandomAccessFile raf, long chunkBytes, long overlapBytes, long alignment) { - this.byteBuffers = new ArrayList<>(); - this.file = file; - this.raf = raf; - this.fileChannel = raf.getChannel(); - this.chunkBytes = BytesUtils.align(chunkBytes, alignment); - this.overlapBytes = BytesUtils.align(overlapBytes, alignment); - this.closed = false; - this.mappedSize = this.chunkBytes + this.overlapBytes; - } - - public static MappedByteBufferCache of(File file, long chunkSize, long overlapSize) throws FileNotFoundException { - final RandomAccessFile raf = new RandomAccessFile(file, "rw"); - return new MappedByteBufferCache(file, raf, chunkSize, overlapSize, PAGE_SIZE); - } - - public static boolean inside(long position, long mappedPosition, long mappedLimit) { - return mappedPosition <= position && position < mappedLimit; - } - - public File file() { - return file; - } - - public long chunkBytes() { - return chunkBytes; - } - - public long overlapBytes() { - return overlapBytes; - } - - public int indexFor(long position) { - final int chunk = (int) (position / chunkBytes); - return chunk; - } - - public long mappedPositionFor(int index) { - return index * chunkBytes; - } - - public long mappedLimitFor(long mappedPosition) { - return mappedPosition + chunkBytes; - } - - public MappedByteBuffer acquireMappedByteBuffer(final int index) throws IOException, IllegalArgumentException, IllegalStateException { - if (closed) - throw new IOException("Closed"); - if (index < 0) - throw new IOException("Attempt to access a negative index: " + index); - while (byteBuffers.size() <= index) { - byteBuffers.add(null); - } - final WeakReference mbbRef = byteBuffers.get(index); - if (mbbRef != null) { - final MappedByteBuffer mbb = mbbRef.get(); - if (mbb != null) { - return mbb; - } - } - return mapAndAcquire(index); - } - - //METHOD BUILT TO SEPARATE THE SLOW PATH TO ENSURE INLINING OF THE MOST OCCURRING CASE - private MappedByteBuffer mapAndAcquire(final int index) throws IOException { - final long chunkStartPosition = mappedPositionFor(index); - final long minSize = chunkStartPosition + mappedSize; - if (fileChannel.size() < minSize) { - try { - synchronized (FILE_LOCK) { - try (FileLock lock = fileChannel.lock()) { - final long size = fileChannel.size(); - if (size < minSize) { - raf.setLength(minSize); - } - } - } - } catch (IOException ioe) { - throw new IOException("Failed to resize to " + minSize, ioe); - } - } - - final MappedByteBuffer mbb = fileChannel.map(FileChannel.MapMode.READ_WRITE, chunkStartPosition, mappedSize); - mbb.order(ByteOrder.nativeOrder()); - byteBuffers.set(index, new WeakReference<>(mbb)); - return mbb; - } - - public long fileSize() throws IOException { - if (closed) - throw new IllegalStateException("Closed"); - return fileChannel.size(); - } - - public void closeAndResize(long length) { - if (!closed) { - //TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memmory mapped file! - for (final WeakReference mbbRef : this.byteBuffers) { - if (mbbRef != null) { - final MappedByteBuffer mbb = mbbRef.get(); - if (mbb != null) { - try { - PlatformDependent.freeDirectBuffer(mbb); - } catch (Throwable t) { - //TO_FIX: force releasing of the other buffers - } - } - } - } - this.byteBuffers.clear(); - try { - if (fileChannel.size() != length) { - try { - synchronized (FILE_LOCK) { - try (FileLock lock = fileChannel.lock()) { - final long size = fileChannel.size(); - if (size != length) { - raf.setLength(length); - } - } - } - } catch (IOException ioe) { - throw new IllegalStateException("Failed to resize to " + length, ioe); - } - } - } catch (IOException ex) { - throw new IllegalStateException("Failed to get size", ex); - } finally { - try { - fileChannel.close(); - } catch (IOException e) { - throw new IllegalStateException("Failed to close channel", e); - } finally { - try { - raf.close(); - } catch (IOException e) { - throw new IllegalStateException("Failed to close RandomAccessFile", e); - } - } - closed = true; - } - } - } - - public boolean isClosed() { - return closed; - } - - @Override - public void close() { - if (!closed) { - //TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memory mapped file! - for (final WeakReference mbbRef : this.byteBuffers) { - if (mbbRef != null) { - final MappedByteBuffer mbb = mbbRef.get(); - if (mbb != null) { - try { - PlatformDependent.freeDirectBuffer(mbb); - } catch (Throwable t) { - //TO_FIX: force releasing of the other buffers - } - } - } - } - this.byteBuffers.clear(); - try { - fileChannel.close(); - } catch (IOException e) { - throw new IllegalStateException("Failed to close channel", e); - } finally { - try { - raf.close(); - } catch (IOException e) { - throw new IllegalStateException("Failed to close RandomAccessFile", e); - } - } - closed = true; - } - } -} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java index adfc4fef7b2..eb39320c973 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java @@ -18,79 +18,66 @@ import java.io.File; import java.io.IOException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.utils.Env; final class MappedFile implements AutoCloseable { - private final MappedByteBufferCache cache; + private static final int OS_PAGE_SIZE = Env.osPageSize(); + private final MappedByteBuffer buffer; + private final FileChannel channel; + private final long address; private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper; private final ChannelBufferWrapper channelBufferWrapper; - private MappedByteBuffer lastMapped; - private long lastMappedStart; - private long lastMappedLimit; - private long position; - private long length; + private int position; + private int length; - private MappedFile(MappedByteBufferCache cache) throws IOException { - this.cache = cache; - this.lastMapped = null; - this.lastMappedStart = -1; - this.lastMappedLimit = -1; - this.position = 0; - this.length = this.cache.fileSize(); + private MappedFile(FileChannel channel, MappedByteBuffer byteBuffer, int position, int length) throws IOException { + this.channel = channel; + this.buffer = byteBuffer; + this.position = position; + this.length = length; this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper(); this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false); + this.address = PlatformDependent.directBufferAddress(buffer); } - public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException { - return new MappedFile(MappedByteBufferCache.of(file, chunckSize, overlapSize)); + public static MappedFile of(File file, int position, int capacity) throws IOException { + final MappedByteBuffer buffer; + final int length; + final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ); + length = (int) channel.size(); + if (length != capacity && length != 0) { + channel.close(); + throw new IllegalStateException("the file is not " + capacity + " bytes long!"); + } + buffer = channel.map(FileChannel.MapMode.READ_WRITE, position, capacity); + return new MappedFile(channel, buffer, 0, length); } - public MappedByteBufferCache cache() { - return cache; + public FileChannel channel() { + return channel; } - private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException { - if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) { - return updateOffset(offset, bytes); - } else { - final int bufferPosition = (int) (offset - lastMappedStart); - return bufferPosition; - } + public MappedByteBuffer mapped() { + return buffer; } - private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException { - try { - final int index = cache.indexFor(offset); - final long mappedPosition = cache.mappedPositionFor(index); - final long mappedLimit = cache.mappedLimitFor(mappedPosition); - if (offset + bytes > mappedLimit) { - throw new IOException("mapping overflow!"); - } - lastMapped = cache.acquireMappedByteBuffer(index); - lastMappedStart = mappedPosition; - lastMappedLimit = mappedLimit; - final int bufferPosition = (int) (offset - mappedPosition); - return bufferPosition; - } catch (IllegalStateException e) { - throw new IOException(e); - } catch (IllegalArgumentException e) { - throw new BufferUnderflowException(); - } + public long address() { + return this.address; } public void force() { - if (lastMapped != null) { - lastMapped.force(); - } + this.buffer.force(); } /** @@ -98,9 +85,8 @@ public void force() { *

*

Bytes are read starting at this file's specified position. */ - public int read(long position, ByteBuf dst, int dstStart, int dstLength) throws IOException { - final int bufferPosition = checkOffset(position, dstLength); - final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + public int read(int position, ByteBuf dst, int dstStart, int dstLength) throws IOException { + final long srcAddress = this.address + position; if (dst.hasMemoryAddress()) { final long dstAddress = dst.memoryAddress() + dstStart; PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength); @@ -122,9 +108,8 @@ public int read(long position, ByteBuf dst, int dstStart, int dstLength) throws *

*

Bytes are read starting at this file's specified position. */ - public int read(long position, ByteBuffer dst, int dstStart, int dstLength) throws IOException { - final int bufferPosition = checkOffset(position, dstLength); - final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + public int read(int position, ByteBuffer dst, int dstStart, int dstLength) throws IOException { + final long srcAddress = this.address + position; if (dst.isDirect()) { final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart; PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength); @@ -146,10 +131,9 @@ public int read(long position, ByteBuffer dst, int dstStart, int dstLength) thro * then the position is updated with the number of bytes actually read. */ public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException { - final int remaining = (int) Math.min(this.length - this.position, Integer.MAX_VALUE); + final int remaining = this.length - this.position; final int read = Math.min(remaining, dstLength); - final int bufferPosition = checkOffset(position, read); - final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + final long srcAddress = this.address + position; if (dst.hasMemoryAddress()) { final long dstAddress = dst.memoryAddress() + dstStart; PlatformDependent.copyMemory(srcAddress, dstAddress, read); @@ -170,10 +154,9 @@ public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException { * then the position is updated with the number of bytes actually read. */ public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException { - final int remaining = (int) Math.min(this.length - this.position, Integer.MAX_VALUE); + final int remaining = this.length - this.position; final int read = Math.min(remaining, dstLength); - final int bufferPosition = checkOffset(position, read); - final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + final long srcAddress = this.address + position; if (dst.isDirect()) { final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart; PlatformDependent.copyMemory(srcAddress, dstAddress, read); @@ -192,8 +175,7 @@ public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException */ public void write(EncodingSupport encodingSupport) throws IOException { final int encodedSize = encodingSupport.getEncodeSize(); - final int bufferPosition = checkOffset(position, encodedSize); - this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize); + this.byteBufWrapper.wrap(this.buffer, this.position, encodedSize); try { encodingSupport.encode(this.channelBufferWrapper); } finally { @@ -211,8 +193,7 @@ public void write(EncodingSupport encodingSupport) throws IOException { *

Bytes are written starting at this file's current position, */ public void write(ByteBuf src, int srcStart, int srcLength) throws IOException { - final int bufferPosition = checkOffset(position, srcLength); - final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + final long destAddress = this.address + position; if (src.hasMemoryAddress()) { final long srcAddress = src.memoryAddress() + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -234,8 +215,7 @@ public void write(ByteBuf src, int srcStart, int srcLength) throws IOException { *

Bytes are written starting at this file's current position, */ public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException { - final int bufferPosition = checkOffset(position, srcLength); - final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + final long destAddress = this.address + position; if (src.isDirect()) { final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -254,9 +234,8 @@ public void write(ByteBuffer src, int srcStart, int srcLength) throws IOExceptio *

*

Bytes are written starting at this file's specified position, */ - public void write(long position, ByteBuf src, int srcStart, int srcLength) throws IOException { - final int bufferPosition = checkOffset(position, srcLength); - final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + public void write(int position, ByteBuf src, int srcStart, int srcLength) throws IOException { + final long destAddress = this.address + position; if (src.hasMemoryAddress()) { final long srcAddress = src.memoryAddress() + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -277,9 +256,8 @@ public void write(long position, ByteBuf src, int srcStart, int srcLength) throw *

*

Bytes are written starting at this file's specified position, */ - public void write(long position, ByteBuffer src, int srcStart, int srcLength) throws IOException { - final int bufferPosition = checkOffset(position, srcLength); - final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; + public void write(int position, ByteBuffer src, int srcStart, int srcLength) throws IOException { + final long destAddress = this.address + position; if (src.isDirect()) { final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -298,32 +276,47 @@ public void write(long position, ByteBuffer src, int srcStart, int srcLength) th *

*

Bytes are written starting at this file's current position, */ - public void zeros(long offset, int count) throws IOException { - while (count > 0) { - //do not need to validate the bytes count - final int bufferPosition = checkOffset(offset, 0); - final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity()); - final int zeros = endZerosPosition - bufferPosition; - final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; - PlatformDependent.setMemory(destAddress, zeros, (byte) 0); - offset += zeros; - count -= zeros; - //TODO need to call force on each write? - //this.force(); + public void zeros(int position, final int count) throws IOException { + //zeroes memory in reverse direction in OS_PAGE_SIZE batches + //to gain sympathy by the page cache LRU policy + final long start = this.address + position; + final long end = start + count; + int toZeros = count; + final long lastGap = (int) (end & (OS_PAGE_SIZE - 1)); + final long lastStartPage = end - lastGap; + long lastZeroed = end; + if (start <= lastStartPage) { + if (lastGap > 0) { + PlatformDependent.setMemory(lastStartPage, lastGap, (byte) 0); + lastZeroed = lastStartPage; + toZeros -= lastGap; + } + } + //any that will enter has lastZeroed OS page aligned + while (toZeros >= OS_PAGE_SIZE) { + assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/ + final long startPage = lastZeroed - OS_PAGE_SIZE; + PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0); + lastZeroed = startPage; + toZeros -= OS_PAGE_SIZE; } - if (offset > this.length) { - this.length = offset; + //there is anything left in the first OS page? + if (toZeros > 0) { + PlatformDependent.setMemory(start, toZeros, (byte) 0); + } + + position += count; + if (position > this.length) { + this.length = position; } } - public long position() { + public int position() { return position; } - public long position(long newPosition) { - final long oldPosition = this.position; - this.position = newPosition; - return oldPosition; + public void position(int position) { + this.position = position; } public long length() { @@ -332,10 +325,13 @@ public long length() { @Override public void close() { - cache.close(); - } - - public void closeAndResize(long length) { - cache.closeAndResize(length); + try { + channel.close(); + } catch (IOException e) { + throw new IllegalStateException(e); + } finally { + //unmap in a deterministic way: do not rely on GC to do it + PlatformDependent.freeDirectBuffer(this.buffer); + } } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java index 12e359cfc38..0091f3061b1 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java @@ -38,32 +38,37 @@ final class MappedSequentialFile implements SequentialFile { private final File directory; - private final long chunkBytes; - private final long overlapBytes; private final IOCriticalErrorListener criticalErrorListener; private final MappedSequentialFileFactory factory; private File file; private File absoluteFile; private String fileName; private MappedFile mappedFile; + private int capacity; MappedSequentialFile(MappedSequentialFileFactory factory, final File directory, final File file, - final long chunkBytes, - final long overlapBytes, + final int capacity, final IOCriticalErrorListener criticalErrorListener) { this.factory = factory; this.directory = directory; this.file = file; this.absoluteFile = null; this.fileName = null; - this.chunkBytes = chunkBytes; - this.overlapBytes = overlapBytes; + this.capacity = capacity; this.mappedFile = null; this.criticalErrorListener = criticalErrorListener; } + public MappedFile mappedFile() { + return mappedFile; + } + + public int capacity() { + return this.capacity; + } + private void checkIsOpen() { if (!isOpen()) { throw new IllegalStateException("File not opened!"); @@ -95,7 +100,7 @@ public boolean exists() { @Override public void open() throws IOException { if (this.mappedFile == null) { - this.mappedFile = MappedFile.of(file, chunkBytes, overlapBytes); + this.mappedFile = MappedFile.of(this.file, 0, this.capacity); } } @@ -129,7 +134,11 @@ public String getFileName() { @Override public void fill(int size) throws IOException { checkIsOpen(); + //the fill will give a big performance hit when done in parallel of other writings! this.mappedFile.zeros(this.mappedFile.position(), size); + if (factory.isDatasync()) { + this.mappedFile.force(); + } } @Override @@ -214,11 +223,11 @@ public void write(EncodingSupport bytes, boolean sync) throws IOException { @Override public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { - if (callback == null) { - throw new NullPointerException("callback parameter need to be set"); - } - checkIsOpen(callback); try { + if (callback == null) { + throw new NullPointerException("callback parameter need to be set"); + } + checkIsOpen(callback); final int position = bytes.position(); final int limit = bytes.limit(); final int remaining = limit - position; @@ -237,22 +246,28 @@ public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { } callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); throw new RuntimeException(e); + } finally { + this.factory.releaseBuffer(bytes); } } @Override public void writeDirect(ByteBuffer bytes, boolean sync) throws IOException { - checkIsOpen(); - final int position = bytes.position(); - final int limit = bytes.limit(); - final int remaining = limit - position; - if (remaining > 0) { - this.mappedFile.write(bytes, position, remaining); - final int newPosition = position + remaining; - bytes.position(newPosition); - if (factory.isDatasync() && sync) { - this.mappedFile.force(); + try { + checkIsOpen(); + final int position = bytes.position(); + final int limit = bytes.limit(); + final int remaining = limit - position; + if (remaining > 0) { + this.mappedFile.write(bytes, position, remaining); + final int newPosition = position + remaining; + bytes.position(newPosition); + if (factory.isDatasync() && sync) { + this.mappedFile.force(); + } } + } finally { + this.factory.releaseBuffer(bytes); } } @@ -304,8 +319,11 @@ public int read(ByteBuffer bytes) throws IOException { @Override public void position(long pos) { + if (pos > Integer.MAX_VALUE) { + throw new IllegalArgumentException("pos must be < " + Integer.MAX_VALUE); + } checkIsOpen(); - this.mappedFile.position(pos); + this.mappedFile.position((int) pos); } @Override @@ -317,7 +335,7 @@ public long position() { @Override public void close() { if (this.mappedFile != null) { - this.mappedFile.closeAndResize(this.mappedFile.length()); + this.mappedFile.close(); this.mappedFile = null; } } @@ -325,7 +343,9 @@ public void close() { @Override public void sync() throws IOException { checkIsOpen(); - this.mappedFile.force(); + if (factory.isDatasync()) { + this.mappedFile.force(); + } } @Override @@ -363,9 +383,9 @@ public void renameTo(String newFileName) throws Exception { } @Override - public SequentialFile cloneFile() { + public MappedSequentialFile cloneFile() { checkIsNotOpen(); - return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener); + return new MappedSequentialFile(this.factory, this.directory, this.file, this.capacity, this.criticalErrorListener); } @Override @@ -404,4 +424,4 @@ public File getJavaFile() { } return this.absoluteFile; } -} +} \ No newline at end of file diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index c4b7d30c3db..a05d3229e74 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -29,62 +29,64 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.utils.Env; public final class MappedSequentialFileFactory implements SequentialFileFactory { - private static long DEFAULT_BLOCK_SIZE = 64L << 20; private final File directory; + private int capacity; private final IOCriticalErrorListener criticalErrorListener; private final TimedBuffer timedBuffer; - private long chunkBytes; - private long overlapBytes; private boolean useDataSync; - private boolean supportCallbacks; - - protected volatile int alignment = -1; - - public MappedSequentialFileFactory(File directory, - IOCriticalErrorListener criticalErrorListener, - boolean supportCallbacks) { + private boolean bufferPooling; + //pools only the biggest one -> optimized for the common case + private final ThreadLocal bytesPool; + + private MappedSequentialFileFactory(File directory, + int capacity, + final boolean buffered, + final int bufferSize, + final int bufferTimeout, + IOCriticalErrorListener criticalErrorListener) { this.directory = directory; + this.capacity = capacity; this.criticalErrorListener = criticalErrorListener; - this.chunkBytes = DEFAULT_BLOCK_SIZE; - this.overlapBytes = DEFAULT_BLOCK_SIZE / 4; this.useDataSync = true; - this.timedBuffer = null; - this.supportCallbacks = supportCallbacks; - } - - public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) { - this(directory, criticalErrorListener, false); - } - - public MappedSequentialFileFactory(File directory) { - this(directory, null); + if (buffered && bufferTimeout > 0 && bufferSize > 0) { + timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, false); + } else { + timedBuffer = null; + } + this.bufferPooling = true; + this.bytesPool = new ThreadLocal<>(); } - - public long chunkBytes() { - return chunkBytes; + public MappedSequentialFileFactory capacity(int capacity) { + this.capacity = capacity; + return this; } - public MappedSequentialFileFactory chunkBytes(long chunkBytes) { - this.chunkBytes = chunkBytes; - return this; + public int capacity() { + return capacity; } - public long overlapBytes() { - return overlapBytes; + public static MappedSequentialFileFactory buffered(File directory, + int capacity, + final int bufferSize, + final int bufferTimeout, + IOCriticalErrorListener criticalErrorListener) { + return new MappedSequentialFileFactory(directory, capacity, true, bufferSize, bufferTimeout, criticalErrorListener); } - public MappedSequentialFileFactory overlapBytes(long overlapBytes) { - this.overlapBytes = overlapBytes; - return this; + public static MappedSequentialFileFactory unbuffered(File directory, + int capacity, + IOCriticalErrorListener criticalErrorListener) { + return new MappedSequentialFileFactory(directory, capacity, false, 0, 0, criticalErrorListener); } @Override public SequentialFile createSequentialFile(String fileName) { - final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); + final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), capacity, criticalErrorListener); if (this.timedBuffer == null) { return mappedSequentialFile; } else { @@ -93,7 +95,7 @@ public SequentialFile createSequentialFile(String fileName) { } @Override - public SequentialFileFactory setDatasync(boolean enabled) { + public MappedSequentialFileFactory setDatasync(boolean enabled) { this.useDataSync = enabled; return this; } @@ -120,7 +122,7 @@ public List listFiles(final String extension) throws Exception { @Override public boolean isSupportsCallbacks() { - return this.supportCallbacks; + return timedBuffer != null; } @Override @@ -132,23 +134,65 @@ public void onIOError(Exception exception, String message, SequentialFile file) @Override public ByteBuffer allocateDirectBuffer(final int size) { - return ByteBuffer.allocateDirect(size); + final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize()); + final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); + byteBuffer.limit(size); + return byteBuffer; } @Override - public void releaseDirectBuffer(final ByteBuffer buffer) { + public void releaseDirectBuffer(ByteBuffer buffer) { PlatformDependent.freeDirectBuffer(buffer); } + public MappedSequentialFileFactory enableBufferReuse() { + this.bufferPooling = true; + return this; + } + + public MappedSequentialFileFactory disableBufferReuse() { + this.bufferPooling = false; + return this; + } + @Override public ByteBuffer newBuffer(final int size) { - return ByteBuffer.allocate(size); + if (!this.bufferPooling) { + return allocateDirectBuffer(size); + } else { + final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize()); + ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { + //do not free the old one (if any) until the new one will be released into the pool! + byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); + } else { + bytesPool.set(null); + PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0); + byteBuffer.clear(); + } + byteBuffer.limit(size); + return byteBuffer; + } } @Override public void releaseBuffer(ByteBuffer buffer) { - if (buffer.isDirect()) { - PlatformDependent.freeDirectBuffer(buffer); + if (this.bufferPooling) { + if (buffer.isDirect()) { + final ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer != buffer) { + //replace with the current pooled only if greater or null + if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { + if (byteBuffer != null) { + //free the smaller one + PlatformDependent.freeDirectBuffer(byteBuffer); + } + bytesPool.set(buffer); + } else { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } } } @@ -179,9 +223,9 @@ public int getAlignment() { } @Override + @Deprecated public MappedSequentialFileFactory setAlignment(int alignment) { - this.alignment = alignment; - return this; + throw new UnsupportedOperationException("alignment can't be changed!"); } @Override @@ -203,7 +247,6 @@ public void clearBuffer(final ByteBuffer buffer) { //SIMD OPTIMIZATION Arrays.fill(array, (byte) 0); } else { - //TODO VERIFY IF IT COULD HAPPENS final int capacity = buffer.capacity(); for (int i = 0; i < capacity; i++) { buffer.put(i, (byte) 0); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java index d376d7df65f..8436ed501ac 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java @@ -20,14 +20,10 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -35,6 +31,7 @@ import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; final class TimedSequentialFile implements SequentialFile { @@ -42,14 +39,12 @@ final class TimedSequentialFile implements SequentialFile { private final SequentialFileFactory factory; private final SequentialFile sequentialFile; private final LocalBufferObserver observer; - private final ThreadLocal callbackPool; private TimedBuffer timedBuffer; TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) { this.sequentialFile = sequentialFile; this.factory = factory; this.observer = new LocalBufferObserver(); - this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new); } @Override @@ -114,13 +109,10 @@ public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throw public void write(ActiveMQBuffer bytes, boolean sync) throws Exception { if (sync) { if (this.timedBuffer != null) { - final ResettableIOCallback callback = callbackPool.get(); - try { - this.timedBuffer.addBytes(bytes, true, callback); - callback.waitCompletion(); - } finally { - callback.reset(); - } + //the only way to avoid allocations is by using a lock-free pooled callback -> CyclicBarrier allocates on each new Generation!!! + final SimpleWaitIOCallback callback = new SimpleWaitIOCallback(); + this.timedBuffer.addBytes(bytes, true, callback); + callback.waitCompletion(); } else { this.sequentialFile.write(bytes, true); } @@ -146,13 +138,10 @@ public void write(EncodingSupport bytes, boolean sync, IOCallback callback) thro public void write(EncodingSupport bytes, boolean sync) throws Exception { if (sync) { if (this.timedBuffer != null) { - final ResettableIOCallback callback = callbackPool.get(); - try { - this.timedBuffer.addBytes(bytes, true, callback); - callback.waitCompletion(); - } finally { - callback.reset(); - } + //the only way to avoid allocations is by using a lock-free pooled callback -> CyclicBarrier allocates on each new Generation!!! + final SimpleWaitIOCallback callback = new SimpleWaitIOCallback(); + this.timedBuffer.addBytes(bytes, true, callback); + callback.waitCompletion(); } else { this.sequentialFile.write(bytes, true); } @@ -197,7 +186,11 @@ public long position() { @Override public void close() throws Exception { - this.sequentialFile.close(); + try { + this.sequentialFile.close(); + } finally { + this.timedBuffer = null; + } } @Override @@ -241,128 +234,99 @@ public File getJavaFile() { return this.sequentialFile.getJavaFile(); } - private static final class ResettableIOCallback implements IOCallback { - - private final CyclicBarrier cyclicBarrier; - private int errorCode; - private String errorMessage; - - ResettableIOCallback() { - this.cyclicBarrier = new CyclicBarrier(2); - } - - public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException { - this.cyclicBarrier.await(); - if (this.errorMessage != null) { - throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage); - } - } - - public void reset() { - this.errorCode = 0; - this.errorMessage = null; - } - - @Override - public void done() { + private static void invokeDoneOn(List callbacks) { + final int size = callbacks.size(); + for (int i = 0; i < size; i++) { try { - this.cyclicBarrier.await(); - } catch (BrokenBarrierException | InterruptedException e) { - throw new IllegalStateException(e); + final IOCallback callback = callbacks.get(i); + callback.done(); + } catch (Throwable e) { + ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e); } } + } - @Override - public void onError(int errorCode, String errorMessage) { + private static void invokeOnErrorOn(final int errorCode, + final String errorMessage, + List callbacks) { + final int size = callbacks.size(); + for (int i = 0; i < size; i++) { try { - this.errorCode = errorCode; - this.errorMessage = errorMessage; - this.cyclicBarrier.await(); - } catch (BrokenBarrierException | InterruptedException e) { - throw new IllegalStateException(e); + final IOCallback callback = callbacks.get(i); + callback.onError(errorCode, errorMessage); + } catch (Throwable e) { + ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e); } } } private static final class DelegateCallback implements IOCallback { - final List delegates; + List delegates; private DelegateCallback() { - this.delegates = new ArrayList<>(); - } - - public List delegates() { - return this.delegates; + this.delegates = null; } @Override public void done() { - final int size = delegates.size(); - for (int i = 0; i < size; i++) { - try { - final IOCallback callback = delegates.get(i); - callback.done(); - } catch (Throwable e) { - ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e); - } - } + invokeDoneOn(delegates); } @Override public void onError(final int errorCode, final String errorMessage) { - for (IOCallback callback : delegates) { - try { - callback.onError(errorCode, errorMessage); - } catch (Throwable e) { - ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e); - } - } + invokeOnErrorOn(errorCode, errorMessage, delegates); } } private final class LocalBufferObserver implements TimedBufferObserver { - private final ThreadLocal callbacksPool = ThreadLocal.withInitial(DelegateCallback::new); + private final DelegateCallback delegateCallback = new DelegateCallback(); @Override public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List callbacks) { buffer.flip(); + if (buffer.limit() == 0) { - //if there are no bytes to flush, can release the callbacks - final int size = callbacks.size(); - for (int i = 0; i < size; i++) { - callbacks.get(i).done(); - } - } else { - final DelegateCallback delegateCallback = callbacksPool.get(); - final int size = callbacks.size(); - final List delegates = delegateCallback.delegates(); - for (int i = 0; i < size; i++) { - delegates.add(callbacks.get(i)); - } try { - sequentialFile.writeDirect(buffer, requestedSync, delegateCallback); + invokeDoneOn(callbacks); } finally { - delegates.clear(); + factory.releaseBuffer(buffer); + } + } else { + if (callbacks.isEmpty()) { + try { + sequentialFile.writeDirect(buffer, requestedSync); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } else { + delegateCallback.delegates = callbacks; + try { + sequentialFile.writeDirect(buffer, requestedSync, delegateCallback); + } finally { + delegateCallback.delegates = null; + } } } } @Override public ByteBuffer newBuffer(final int size, final int limit) { - final int alignedSize = factory.calculateBlockSize(size); - final int alignedLimit = factory.calculateBlockSize(limit); - final ByteBuffer buffer = factory.newBuffer(alignedSize); - buffer.limit(alignedLimit); - return buffer; + return factory.newBuffer(limit); } @Override public int getRemainingBytes() { try { - final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE); - return remaining; + final long position = sequentialFile.position(); + final long size = sequentialFile.size(); + final long remaining = size - position; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) remaining; + } } catch (Exception e) { throw new IllegalStateException(e); } @@ -370,7 +334,7 @@ public int getRemainingBytes() { @Override public String toString() { - return "TimedBufferObserver on file (" + getFileName() + ")"; + return "TimedBufferObserver on file (" + sequentialFile.getFileName() + ")"; } } diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java index b426219551e..d0bada84825 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java @@ -37,7 +37,6 @@ import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.jlibaio.LibaioContext; /** @@ -47,12 +46,12 @@ public class JournalTptBenchmark { public static void main(String[] args) throws Exception { final boolean useDefaultIoExecutor = true; - final int fileSize = 1024 * 1024; - final boolean dataSync = true; + final int fileSize = 10 * 1024 * 1024; + final boolean dataSync = false; final Type type = Type.Mapped; - final int tests = 5; + final int tests = 10; final int warmup = 20_000; - final int measurements = 20_000; + final int measurements = 100_000; final int msgSize = 100; final byte[] msgContent = new byte[msgSize]; Arrays.fill(msgContent, (byte) 1); @@ -63,8 +62,8 @@ public static void main(String[] args) throws Exception { switch (type) { case Mapped: - final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true); - factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync); + factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null) + .setDatasync(dataSync); break; case Nio: factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); @@ -195,9 +194,7 @@ private static long writeMeasurements(long id, private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception { journal.appendAddRecord(id, (byte) 1, encodingSupport, false); - final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback(); - journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback); - ioCallback.waitCompletion(); + journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true); } private enum Type { diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java index 7756a064cd6..7f2641abff6 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java @@ -21,7 +21,6 @@ import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -41,12 +40,12 @@ public class SequentialFileTptBenchmark { private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback(); public static void main(String[] args) throws Exception { - final boolean dataSync = true; + final boolean dataSync = false; final boolean writeSync = true; final Type type = Type.Mapped; final int tests = 10; final int warmup = 20_000; - final int measurements = 20_000; + final int measurements = 100_000; final int msgSize = 100; final byte[] msgContent = new byte[msgSize]; Arrays.fill(msgContent, (byte) 1); @@ -56,10 +55,8 @@ public static void main(String[] args) throws Exception { switch (type) { case Mapped: - final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true); - final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize); - final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup); - factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync); + final int fileSize = Math.max(msgSize * measurements, msgSize * warmup); + factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync); break; case Nio: factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); @@ -147,9 +144,9 @@ private static void write(SequentialFile sequentialFile, boolean sync) throws Exception { //this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it if (sequentialFile.fits(encodingSupport.getEncodeSize())) { - final FastWaitIOCallback ioCallback = CALLBACK.reset(); - sequentialFile.write(encodingSupport, sync, ioCallback); - ioCallback.waitCompletion(); + CALLBACK.reset(); + sequentialFile.write(encodingSupport, sync, CALLBACK); + CALLBACK.waitCompletion(); } else { throw new IllegalStateException("can't happen!"); } @@ -189,11 +186,7 @@ public void onError(int errorCode, String errorMessage) { } public void waitCompletion() throws InterruptedException, ActiveMQException { - final Thread currentThread = Thread.currentThread(); while (!done.get()) { - LockSupport.parkNanos(1L); - if (currentThread.isInterrupted()) - throw new InterruptedException(); } if (errorMessage != null) { throw ActiveMQExceptionType.createException(errorCode, errorMessage); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index ba7bb867e87..148c1f05840 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -137,8 +137,7 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL break; case MAPPED: ActiveMQServerLogger.LOGGER.journalUseMAPPED(); - //the mapped version do not need buffering by default - journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0); + journalFF = MappedSequentialFileFactory.buffered(config.getJournalLocation(), config.getJournalFileSize(), config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener); break; default: throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml index 6ba2c553c9a..4a565cf05a8 100644 --- a/tests/extra-tests/pom.xml +++ b/tests/extra-tests/pom.xml @@ -223,34 +223,6 @@ jbossjts-jacorb 4.17.13.Final - - - - - net.openhft - chronicle-core - ${openhft.core.version} - - - - net.openhft - affinity - ${openhft.affinity.version} - - - - - org.openjdk.jmh - jmh-core - ${openjdk.jmh.version} - - - - org.openjdk.jmh - jmh-generator-annprocess - ${openjdk.jmh.version} - - diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java deleted file mode 100644 index 3304f1529f8..00000000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.benchmarks.journal; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayList; - -import net.openhft.chronicle.core.jlbh.JLBH; -import net.openhft.chronicle.core.jlbh.JLBHOptions; -import net.openhft.chronicle.core.jlbh.JLBHTask; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.Journal; -import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; - -public class JournalImplLatencyBench implements JLBHTask { - - private static final int FILE_SIZE = 1024 * 1024 * 1024; - private static final JournalType JOURNAL_TYPE = JournalType.MAPPED; - private static final int ITERATIONS = 100_000; - private static final int WARMUP_ITERATIONS = 20_000; - private static final int TARGET_THROUGHPUT = 50_000; - private static final int TESTS = 5; - private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS); - private static int ENCODED_SIZE = 8; - private static int CHUNK_BYTES = FILE_SIZE; - private static int OVERLAP_BYTES = CHUNK_BYTES / 4; - private final SequentialFileFactory sequentialFileFactory; - private Journal journal; - private EncodingSupport encodingSupport; - private JLBH jlbh; - private long id; - - public JournalImplLatencyBench(SequentialFileFactory sequentialFileFactory) { - this.sequentialFileFactory = sequentialFileFactory; - } - - public static void main(String[] args) throws IOException { - final File journalDir = Files.createTempDirectory("seq_files").toFile(); - journalDir.deleteOnExit(); - final boolean buffered = false; - final int bufferSize = 4096; - final int bufferTimeout = 0; - final int maxIO = -1; - final boolean logRates = false; - final IOCriticalErrorListener criticalErrorListener = null; - final SequentialFileFactory sequentialFileFactory; - switch (JOURNAL_TYPE) { - case MAPPED: - sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES); - break; - case NIO: - sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener); - break; - - default: - throw new AssertionError("!?"); - } - final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new JournalImplLatencyBench(sequentialFileFactory)); - new JLBH(lth).start(); - } - - @Override - public void init(JLBH jlbh) { - id = 0; - this.jlbh = jlbh; - int numFiles = (int) ((TOTAL_MESSAGES * 1024 + 512) / FILE_SIZE * 1.3); - if (numFiles < 2) { - numFiles = 2; - } - this.journal = new JournalImpl(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE); - this.encodingSupport = NilEncodingSupport.Instance; - try { - journal.start(); - journal.load(new ArrayList(), null, null); - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - - @Override - public void run(long startTimeNS) { - id++; - try { - journal.appendAddRecord(id, (byte) 0, encodingSupport, false); - } catch (Exception e) { - throw new RuntimeException(e); - } - jlbh.sample(System.nanoTime() - startTimeNS); - } - - @Override - public void complete() { - try { - journal.stop(); - for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) { - journalFile.deleteOnExit(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private enum JournalType { - MAPPED, - NIO - } - - private enum NilEncodingSupport implements EncodingSupport { - Instance; - - @Override - public int getEncodeSize() { - return ENCODED_SIZE; - } - - @Override - public void encode(ActiveMQBuffer buffer) { - final int writerIndex = buffer.writerIndex(); - for (int i = 0; i < ENCODED_SIZE; i++) { - buffer.writeByte((byte) 0); - } - buffer.writerIndex(writerIndex + ENCODED_SIZE); - } - - @Override - public void decode(ActiveMQBuffer buffer) { - throw new UnsupportedOperationException(); - } - } -} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java deleted file mode 100644 index 1096c638fab..00000000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree; - -import java.nio.ByteBuffer; - -import io.netty.util.internal.PlatformDependent; - -/** - * IT IS NOT A FLYWEIGHT BUT AN ENCODER: NEED TO RESPECT THE SEQUENCE OF WRITE: - * FileId - * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.concurrent.TimeUnit; - -import io.netty.buffer.Unpooled; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; -import org.apache.activemq.artemis.core.journal.EncoderPersister; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; -import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.profile.GCProfiler; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; - -@State(Scope.Thread) -@BenchmarkMode(value = {Mode.Throughput, Mode.SampleTime}) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -public class EncodersBench { - - private static final int expectedEncoderSize = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(0); - private JournalInternalRecord record; - private ByteBuffer byteBuffer; - private AddJournalRecordEncoder addJournalRecordEncoder; - private ActiveMQBuffer outBuffer; - - public static void main(String[] args) throws RunnerException { - final Options opt = new OptionsBuilder().include(EncodersBench.class.getSimpleName()).addProfiler(GCProfiler.class).warmupIterations(5).measurementIterations(5).forks(1).build(); - new Runner(opt).run(); - } - - @Setup - public void init() { - this.byteBuffer = ByteBuffer.allocateDirect(expectedEncoderSize); - this.byteBuffer.order(ByteOrder.nativeOrder()); - this.addJournalRecordEncoder = new AddJournalRecordEncoder(); - - this.record = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance); - this.record.setFileID(1); - this.record.setCompactCount((short) 1); - this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder())); - } - - @Benchmark - public int encodeAligned() { - //Header - final long header = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedEncoderSize); - this.byteBuffer.putLong(0, header); - //FileId - * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.JournalFile; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; - -final class GcFreeJournal extends JournalImpl { - - private final AddJournalRecordEncoder addJournalRecordEncoder = new AddJournalRecordEncoder(); - //TODO replace with thread local pools if not single threaded! - private ByteBuffer journalRecordBytes = null; - - GcFreeJournal(final int fileSize, - final int minFiles, - final int poolSize, - final int compactMinFiles, - final int compactPercentage, - final SequentialFileFactory fileFactory, - final String filePrefix, - final String fileExtension, - final int maxAIO) { - super(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0); - } - - public static int align(final int value, final int alignment) { - return (value + (alignment - 1)) & ~(alignment - 1); - } - - public void appendAddRecord(final long id, - final int recordType, - final ByteBuffer encodedRecord, - final int offset, - final int length, - final boolean sync) throws Exception { - final int expectedLength = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(length); - final int alignedLength = align(expectedLength, 8); - switchFileIfNecessary(alignedLength); - final JournalFile currentFile = getCurrentFile(); - final int fileId = currentFile.getRecordID(); - if (this.journalRecordBytes == null || this.journalRecordBytes.capacity() < alignedLength) { - final int newPooledLength = align(alignedLength, 4096); - //TODO ADD LIMITS OR WARNS IN CASE OF TOO MUCH BIGGER SIZE - this.journalRecordBytes = ByteBuffer.allocateDirect(newPooledLength); - this.journalRecordBytes.order(ByteOrder.nativeOrder()); - } - final long journalRecordHeader = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedLength); - this.journalRecordBytes.putLong(0, journalRecordHeader); - //use natural stride while encoding: FileId - * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.file.Files; -import java.util.ArrayList; - -import net.openhft.chronicle.core.jlbh.JLBH; -import net.openhft.chronicle.core.jlbh.JLBHOptions; -import net.openhft.chronicle.core.jlbh.JLBHTask; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; - -public class GcFreeJournalLatencyBench implements JLBHTask { - - private static final int FILE_SIZE = JournalImpl.SIZE_HEADER + (1024 * 1024 * 1024); - private static final JournalType JOURNAL_TYPE = JournalType.MAPPED; - private static final int ITERATIONS = 100_000; - private static final int WARMUP_ITERATIONS = 20_000; - private static final int TARGET_THROUGHPUT = 500_000; - private static final int TESTS = 5; - private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS); - private static int ENCODED_SIZE = 8; - private static int CHUNK_BYTES = FILE_SIZE; - private static int OVERLAP_BYTES = CHUNK_BYTES / 4; - private final SequentialFileFactory sequentialFileFactory; - private GcFreeJournal journal; - private JLBH jlbh; - private long id; - private ByteBuffer encodedRecord; - - public GcFreeJournalLatencyBench(SequentialFileFactory sequentialFileFactory) { - this.sequentialFileFactory = sequentialFileFactory; - } - - public static void main(String[] args) throws IOException { - final File journalDir = Files.createTempDirectory("seq_files").toFile(); - journalDir.deleteOnExit(); - final boolean buffered = false; - final int bufferSize = 4096; - final int bufferTimeout = 0; - final int maxIO = -1; - final boolean logRates = false; - final IOCriticalErrorListener criticalErrorListener = null; - final SequentialFileFactory sequentialFileFactory; - switch (JOURNAL_TYPE) { - case MAPPED: - sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES); - break; - case NIO: - sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener); - break; - - default: - throw new AssertionError("!?"); - } - final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new GcFreeJournalLatencyBench(sequentialFileFactory)); - new JLBH(lth).start(); - } - - @Override - public void init(JLBH jlbh) { - id = 0; - this.jlbh = jlbh; - final int expectedMaxSize = GcFreeJournal.align(JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(ENCODED_SIZE), 8); - int numFiles = (int) ((TOTAL_MESSAGES * expectedMaxSize + 512) / FILE_SIZE * 1.3); - if (numFiles < 2) { - numFiles = 2; - } - this.encodedRecord = ByteBuffer.allocateDirect(ENCODED_SIZE); - this.encodedRecord.order(ByteOrder.nativeOrder()); - this.journal = new GcFreeJournal(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE); - try { - journal.start(); - journal.load(new ArrayList(), null, null); - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - - @Override - public void run(long startTimeNS) { - id++; - try { - journal.appendAddRecord(id, (byte) 0, encodedRecord, 0, ENCODED_SIZE, false); - } catch (Exception e) { - throw new RuntimeException(e); - } - jlbh.sample(System.nanoTime() - startTimeNS); - } - - @Override - public void complete() { - try { - journal.stop(); - for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) { - journalFile.deleteOnExit(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private enum JournalType { - MAPPED, - NIO - } -} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java deleted file mode 100644 index 98b205edf4e..00000000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree; - -final class JournalRecordHeader { - - public static final int BYTES = 8; - - public static long makeHeader(final int journalRecordTypeId, final int length) { - return ((journalRecordTypeId & 0xFFFF_FFFFL) << 32) | (length & 0xFFFF_FFFFL); - } - -} \ No newline at end of file diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java deleted file mode 100644 index ece9b3a4798..00000000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree; - -/** - * Created by developer on 18/06/16. - */ -final class JournalRecordTypes { - - public static final int ADD_JOURNAL = 11; - - private JournalRecordTypes() { - - } -} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java deleted file mode 100644 index 343d95d10a8..00000000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.tests.extras.benchmarks.sequentialfile; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.file.Files; -import java.nio.file.Paths; - -import net.openhft.chronicle.core.jlbh.JLBH; -import net.openhft.chronicle.core.jlbh.JLBHOptions; -import net.openhft.chronicle.core.jlbh.JLBHTask; -import org.apache.activemq.artemis.core.io.DummyCallback; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; - -public final class SequentialFileLatencyBench implements JLBHTask { - - private static final JournalType JOURNAL_TYPE = JournalType.MAPPED; - //NOTE: SUPPORTED ONLY ON *NIX - private static final boolean SHM = false; - private static final int JOURNAL_RECORD_SIZE = 8; - private static final int ITERATIONS = 100_000; - private static final int WARMUP_ITERATIONS = 20_000; - private static final int TARGET_THROUGHPUT = 500_000; - private static final int TESTS = 5; - private static int CHUNK_BYTES = 4096 * 1024 * 16; - private static int OVERLAP_BYTES = CHUNK_BYTES / 4; - private final SequentialFileFactory sequentialFileFactory; - private SequentialFile sequentialFile; - private ByteBuffer message; - private JLBH jlbh; - - public SequentialFileLatencyBench(SequentialFileFactory sequentialFileFactory) { - this.sequentialFileFactory = sequentialFileFactory; - } - - public static void main(String[] args) throws IOException { - final File journalDir; - if (SHM) { - journalDir = Files.createDirectory(Paths.get("/dev/shm/seq_files")).toFile(); - } else { - journalDir = Files.createTempDirectory("seq_files").toFile(); - } - journalDir.deleteOnExit(); - final boolean buffered = false; - final int bufferSize = 4096; - final int bufferTimeout = 0; - final int maxIO = -1; - final boolean logRates = false; - final IOCriticalErrorListener criticalErrorListener = null; - final SequentialFileFactory sequentialFileFactory; - switch (JOURNAL_TYPE) { - case MAPPED: - sequentialFileFactory = new MappedSequentialFileFactory(journalDir).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES); - break; - case NIO: - sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener); - break; - default: - throw new AssertionError("!?"); - } - final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new SequentialFileLatencyBench(sequentialFileFactory)); - new JLBH(lth).start(); - } - - @Override - public void init(JLBH jlbh) { - this.jlbh = jlbh; - this.sequentialFile = this.sequentialFileFactory.createSequentialFile(Long.toString(System.nanoTime())); - try { - this.sequentialFile.open(-1, false); - final File file = this.sequentialFile.getJavaFile(); - file.deleteOnExit(); - System.out.println("sequentialFile: " + file); - } catch (Exception e) { - throw new RuntimeException(e); - } - this.message = this.sequentialFileFactory.allocateDirectBuffer(JOURNAL_RECORD_SIZE).order(ByteOrder.nativeOrder()); - - } - - @Override - public void run(long startTimeNS) { - message.position(0); - try { - sequentialFile.writeDirect(message, false, DummyCallback.getInstance()); - } catch (Exception e) { - throw new RuntimeException(e); - } - jlbh.sample(System.nanoTime() - startTimeNS); - } - - @Override - public void complete() { - sequentialFileFactory.releaseDirectBuffer(message); - try { - sequentialFile.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private enum JournalType { - MAPPED, - NIO - } -} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java index 5d540601c60..3ca3a9054a7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java @@ -22,9 +22,27 @@ public class MappedImportExportTest extends NIOImportExportTest { + @Override + protected void setup(int minFreeFiles, int fileSize, boolean sync) { + super.setup(minFreeFiles, fileSize, sync); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + + @Override + protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) { + super.setup(minFreeFiles, fileSize, sync, maxAIO); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + + @Override + protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) { + super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + @Override protected SequentialFileFactory getFileFactory() throws Exception { - return new MappedSequentialFileFactory(getTestDirfile()); + return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 4096, null); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java index 32b4b8f96c5..f333f6c4513 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java @@ -24,6 +24,24 @@ public class MappedJournalCompactTest extends NIOJournalCompactTest { + @Override + protected void setup(int minFreeFiles, int fileSize, boolean sync) { + super.setup(minFreeFiles, fileSize, sync); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + + @Override + protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) { + super.setup(minFreeFiles, fileSize, sync, maxAIO); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + + @Override + protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) { + super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + @Override protected SequentialFileFactory getFileFactory() throws Exception { File file = new File(getTestDir()); @@ -32,6 +50,6 @@ protected SequentialFileFactory getFileFactory() throws Exception { file.mkdir(); - return new MappedSequentialFileFactory(getTestDirfile()); + return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 60 * 1024, null); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java index 940c8a69d80..f59ef360cad 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java @@ -24,6 +24,24 @@ public class MappedJournalImplTest extends JournalImplTestUnit { + @Override + protected void setup(int minFreeFiles, int fileSize, boolean sync) { + super.setup(minFreeFiles, fileSize, sync); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + + @Override + protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) { + super.setup(minFreeFiles, fileSize, sync, maxAIO); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + + @Override + protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) { + super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO); + ((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize); + } + @Override protected SequentialFileFactory getFileFactory() throws Exception { File file = new File(getTestDir()); @@ -32,7 +50,7 @@ protected SequentialFileFactory getFileFactory() throws Exception { file.mkdir(); - return new MappedSequentialFileFactory(getTestDirfile()); + return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 1024, null); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java index cf87cdede91..56841473b59 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java @@ -34,7 +34,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa @Override protected SequentialFileFactory createFactory(String folder) { - return new MappedSequentialFileFactory(new File(folder)); + return MappedSequentialFileFactory.unbuffered(new File(folder), 2048, null); } @Test @@ -58,7 +58,7 @@ public void decode(ActiveMQBuffer buffer) { }; final AtomicInteger calls = new AtomicInteger(0); - final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), (code, message, file) -> { + final MappedSequentialFileFactory factory = MappedSequentialFileFactory.unbuffered(new File(getTestDir()), fakeEncoding.getEncodeSize(), (code, message, file) -> { new Exception("shutdown").printStackTrace(); calls.incrementAndGet(); }); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java index d2ffd6fac32..4019d8d8ba9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java @@ -22,8 +22,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.ArtemisConstants; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; -import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; @@ -354,12 +352,7 @@ public static SequentialFileFactory getFactory(final String factoryType, final S } else if (factoryType.equals("nio2")) { return new NIOSequentialFileFactory(new File(directory), true, 1); } else if (factoryType.equals("mmap")) { - return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener() { - @Override - public void onIOException(Throwable code, String message, SequentialFile file) { - code.printStackTrace(); - } - }, true).chunkBytes(fileSize).overlapBytes(0); + return MappedSequentialFileFactory.unbuffered(new File(directory), fileSize, null); } else { return new NIOSequentialFileFactory(new File(directory), false, 1); } From 4f48dfb543a057db62e53498cf95000e8f895cd0 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 30 Jun 2017 20:19:43 -0400 Subject: [PATCH 2/3] ARTEMIS-1269 replication won't finish synchronization --- .../core/client/ActiveMQClientLogger.java | 2 +- .../core/protocol/core/impl/ChannelImpl.java | 10 +++ .../core/impl/RemotingConnectionImpl.java | 2 +- .../ReplicationResponseMessageV2.java | 3 +- .../core/replication/ReplicationEndpoint.java | 74 +++++++++++++++++-- .../core/replication/ReplicationManager.java | 7 +- .../core/server/impl/ReplicationError.java | 12 ++- .../impl/SharedNothingBackupActivation.java | 34 ++++++--- .../artemis/tests/util/ActiveMQTestBase.java | 6 +- .../failover/LargeMessageFailoverTest.java | 5 +- .../ReplicatedMultipleServerFailoverTest.java | 2 +- .../util/TransportConfigurationUtils.java | 4 +- 12 files changed, 127 insertions(+), 34 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index bdb4bd16312..405ed07a934 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -374,7 +374,7 @@ public interface ActiveMQClientLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT) - void errorDecodingPacket(@Cause Exception e); + void errorDecodingPacket(@Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) @Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 75c23de8bd1..39bddf5d29c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -462,6 +462,10 @@ public void setCommandConfirmationHandler(final CommandConfirmationHandler handl @Override public void setHandler(final ChannelHandler handler) { + if (logger.isTraceEnabled()) { + logger.trace("Setting handler on " + this + " as " + handler); + } + this.handler = handler; } @@ -521,6 +525,9 @@ public void replayCommands(final int otherLastConfirmedCommandID) { @Override public void lock() { + if (logger.isTraceEnabled()) { + logger.trace("lock channel " + this); + } lock.lock(); reconnectID.incrementAndGet(); @@ -532,6 +539,9 @@ public void lock() { @Override public void unlock() { + if (logger.isTraceEnabled()) { + logger.trace("unlock channel " + this); + } lock.lock(); failingOver = false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index cc1d6852b1f..e0837e9ef8e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -363,7 +363,7 @@ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffe doBufferReceived(packet); super.bufferReceived(connectionID, buffer); - } catch (Exception e) { + } catch (Throwable e) { ActiveMQClientLogger.LOGGER.errorDecodingPacket(e); throw new IllegalStateException(e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java index b26084bb4c5..c01dd4fff77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -38,8 +38,9 @@ public boolean isSynchronizationIsFinishedAcknowledgement() { return synchronizationIsFinishedAcknowledgement; } - public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { + public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; + return this; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index f879aeb4347..a68c3f9002d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -210,7 +210,16 @@ public void handlePacket(final Packet packet) { ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet); response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)); } - channel.send(response); + + if (response != null) { + if (logger.isTraceEnabled()) { + logger.trace("Returning " + response); + } + + channel.send(response); + } else { + logger.trace("Response is null, ignoring response"); + } } /** @@ -332,34 +341,68 @@ public void setChannel(final Channel channel) { private void finishSynchronization(String liveID) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("finishSynchronization::" + liveID); + logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID); } for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { Journal journal = journalsHolder.remove(jc); + if (logger.isTraceEnabled()) { + logger.trace("getting lock on " + jc + ", journal = " + journal); + } + registerJournal(jc.typeByte, journal); journal.synchronizationLock(); try { + if (logger.isTraceEnabled()) { + logger.trace("lock acquired on " + jc); + } // files should be already in place. filesReservedForSync.remove(jc); - registerJournal(jc.typeByte, journal); + if (logger.isTraceEnabled()) { + logger.trace("stopping journal for " + jc); + } journal.stop(); + if (logger.isTraceEnabled()) { + logger.trace("starting journal for " + jc); + } journal.start(); + if (logger.isTraceEnabled()) { + logger.trace("loadAndSync " + jc); + } journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE); } finally { + if (logger.isTraceEnabled()) { + logger.trace("unlocking " + jc); + } journal.synchronizationUnlock(); } } + + if (logger.isTraceEnabled()) { + logger.trace("Sync on large messages..."); + } ByteBuffer buffer = ByteBuffer.allocate(4 * 1024); for (Entry entry : largeMessages.entrySet()) { ReplicatedLargeMessage lm = entry.getValue(); if (lm instanceof LargeServerMessageInSync) { LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm; + if (logger.isTraceEnabled()) { + logger.trace("lmSync on " + lmSync.toString()); + } lmSync.joinSyncedData(buffer); } } + if (logger.isTraceEnabled()) { + logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID); + } + journalsHolder = null; backupQuorum.liveIDSet(liveID); activation.setRemoteBackupUpToDate(); + + if (logger.isTraceEnabled()) { + logger.trace("Backup is synchronized / BACKUP-SYNC-DONE"); + } + ActiveMQServerLogger.LOGGER.backupServerSynched(server); return; } @@ -428,13 +471,28 @@ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final if (logger.isTraceEnabled()) { logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet); } - ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); - if (!started) - return replicationResponseMessage; if (packet.isSynchronizationFinished()) { - finishSynchronization(packet.getNodeID()); - replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + executor.execute(() -> { + try { + // this is a long running process, we cannot block the reading thread from netty + finishSynchronization(packet.getNodeID()); + if (logger.isTraceEnabled()) { + logger.trace("returning completion on synchronization catchup"); + } + channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true)); + } catch (Exception e) { + logger.warn(e.getMessage()); + channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e))); + } + + }); + // the write will happen through an executor + return null; + } + + ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); + if (!started) { return replicationResponseMessage; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 398f4527f56..3b6f9d6253e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -356,15 +356,16 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp } if (enabled) { - pendingTokens.add(repliToken); if (useExecutor) { replicationStream.execute(() -> { if (enabled) { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } }); } else { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } @@ -411,9 +412,9 @@ private void replicated() { OperationContext ctx = pendingTokens.poll(); if (ctx == null) { - throw new IllegalStateException("Missing replication token on the queue."); + logger.warn("Missing replication token on queue"); + return; } - ctx.replicationDone(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java index 7c333a5a639..83b49c90ff1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java @@ -22,10 +22,10 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.jboss.logging.Logger; /** * Stops the backup in case of an error at the start of Replication. @@ -36,11 +36,11 @@ */ final class ReplicationError implements Interceptor { - private final ActiveMQServer server; + private static final Logger logger = Logger.getLogger(ReplicationError.class); + private LiveNodeLocator nodeLocator; - ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) { - this.server = server; + ReplicationError(LiveNodeLocator nodeLocator) { this.nodeLocator = nodeLocator; } @@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor { public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED) return true; + + if (logger.isTraceEnabled()) { + logger.trace("Received ReplicationError::" + packet); + } BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet; switch (message.getRegistrationProblem()) { case ALREADY_REPLICATING: diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index d45abe3a804..fcba00c8d96 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -101,6 +101,8 @@ public void init() throws Exception { @Override public void run() { try { + + logger.trace("SharedNothingBackupActivation..start"); synchronized (activeMQServer) { activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); } @@ -109,16 +111,24 @@ public void run() { activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize()); activeMQServer.getNodeManager().start(); synchronized (this) { - if (closed) + if (closed) { + logger.trace("SharedNothingBackupActivation is closed, ignoring activation!"); return; + } } boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled(); - if (!activeMQServer.initialisePart1(scalingDown)) + if (!activeMQServer.initialisePart1(scalingDown)) { + if (logger.isTraceEnabled()) { + logger.trace("could not initialize part1 " + scalingDown); + } return; + } + logger.trace("Waiting for a synchronize now..."); synchronized (this) { + logger.trace("Entered a synchronized"); if (closed) return; backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize()); @@ -136,16 +146,12 @@ public void run() { ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); clusterController.addClusterTopologyListenerForReplication(nodeLocator); - if (logger.isTraceEnabled()) { - logger.trace("Waiting on cluster connection"); - } - //todo do we actually need to wait? + logger.trace("Waiting on cluster connection"); clusterController.awaitConnectionToReplicationCluster(); - if (logger.isTraceEnabled()) { - logger.trace("Cluster Connected"); - } - clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator)); + logger.trace("Cluster Connected"); + + clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator)); // nodeManager.startBackup(); if (logger.isTraceEnabled()) { @@ -320,13 +326,19 @@ public void run() { return; } ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); + logger.trace("stop backup"); activeMQServer.getNodeManager().stopBackup(); + logger.trace("start store manager"); activeMQServer.getStorageManager().start(); + logger.trace("activated"); activeMQServer.getBackupManager().activated(); if (scalingDown) { + logger.trace("Scalling down..."); activeMQServer.initialisePart2(true); } else { + logger.trace("Setting up new activation"); activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy())); + logger.trace("initialize part 2"); activeMQServer.initialisePart2(false); if (activeMQServer.getIdentity() != null) { @@ -337,6 +349,8 @@ public void run() { } + logger.trace("completeActivation at the end"); + activeMQServer.completeActivation(); } } catch (Exception e) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 1b35393eabc..a95f77a2562 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -534,7 +534,11 @@ protected static final ClusterConnectionConfiguration basicClusterConnectionConf for (String c : connectors) { connectors0.add(c); } - ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0); + ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration(). + setName("cluster1").setAddress("jms").setConnectorName(connectorName). + setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1). + setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT). + setStaticConnectors(connectors0); return clusterConnectionConfiguration; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java index f192506463d..8889ec5f31e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java @@ -18,22 +18,23 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; +import org.junit.Ignore; import org.junit.Test; public class LargeMessageFailoverTest extends FailoverTest { @Override @Test + @Ignore public void testLiveAndBackupLiveComesBackNewFactory() throws Exception { // skip test because it triggers OutOfMemoryError. - Thread.sleep(1000); } @Override @Test + @Ignore public void testLiveAndBackupBackupComesBackNewFactory() throws Exception { // skip test because it triggers OutOfMemoryError. - Thread.sleep(1000); } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java index 38bf424358a..383b371ca21 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java @@ -136,7 +136,7 @@ public int getBackupServerCount() { @Override public boolean isNetty() { - return false; + return true; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java index 472d32795ec..abd08b82f99 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java @@ -86,7 +86,7 @@ private static TransportConfiguration transportConfiguration(String classname, b private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) { if (classname.contains("netty")) { Map serverParams = new HashMap<>(); - Integer port = live ? 61616 : 5545; + Integer port = live ? 61616 + server : 5545 + server; serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); return new TransportConfiguration(classname, serverParams); } @@ -102,7 +102,7 @@ private static TransportConfiguration transportConfiguration(String classname, String name) { if (classname.contains("netty")) { Map serverParams = new HashMap<>(); - Integer port = live ? 61616 : 5545; + Integer port = live ? 61616 + server : 5545 + server; serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); return new TransportConfiguration(classname, serverParams, name); } From e4f47ca46fe40f9f1baca2afccfb4b348ca66b84 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 19 Dec 2016 16:02:17 +0100 Subject: [PATCH 3/3] ARTEMIS-1273 Bounded OrderedExecutor --- artemis-commons/pom.xml | 4 + .../artemis/utils/BoundedExecutor.java | 46 +++ .../utils/BoundedOrderedExecutorFactory.java | 269 ++++++++++++++++++ .../artemis/utils/OrderedExecutorFactory.java | 7 +- ...ndrolledExecutorFactoryThroughputTest.java | 153 ++++++++++ .../src/main/assembly/dep.xml | 1 + artemis-server-osgi/pom.xml | 1 + pom.xml | 8 + 8 files changed, 488 insertions(+), 1 deletion(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedExecutor.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/HandrolledExecutorFactoryThroughputTest.java diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml index 1c9d318be52..8fa41e2a7a9 100644 --- a/artemis-commons/pom.xml +++ b/artemis-commons/pom.xml @@ -52,6 +52,10 @@ io.netty netty-all + + org.jctools + jctools-core + commons-beanutils commons-beanutils diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedExecutor.java new file mode 100644 index 00000000000..6d15afca1ad --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedExecutor.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.Executor; + +public interface BoundedExecutor extends Executor { + + int pendingTasks(); + + int capacity(); + + default int remainingCapacity() { + return capacity() - pendingTasks(); + } + + default boolean isEmpty() { + return pendingTasks() == 0; + } + + /** + * Executes the given command at some time in the future. The command + * may execute in a new thread, in a pooled thread, or in the calling + * thread, at the discretion of the {@code Executor} implementation. + * + * @param command the runnable task + * @return true if the command is submitted to the executor, false otherwise + * @throws NullPointerException if command is null + */ + boolean tryExecute(Runnable command); +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java new file mode 100644 index 00000000000..0bf4d7384d0 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BoundedOrderedExecutorFactory.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.jboss.logging.Logger; +import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MpscChunkedArrayQueue; +import org.jctools.queues.SpscArrayQueue; +import org.jctools.queues.SpscChunkedArrayQueue; +import org.jctools.util.Pow2; + +/** + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance. + */ +public final class BoundedOrderedExecutorFactory implements ExecutorFactory { + + public enum ProducerType { + Single, Multi + } + + public static final int DEFAULT_INITIAL_CAPACITY = Integer.getInteger("bounded.executor.initial.capacity", 1024); + public static final int DEFAULT_MAX_CAPACITY = Integer.getInteger("bounded.executor.max.capacity", Pow2.MAX_POW2); + public static final int DEFAULT_MAX_BURST_SIZE = Integer.getInteger("bounded.executor.max.burst", Integer.MAX_VALUE); + private static final Logger logger = Logger.getLogger(BoundedOrderedExecutorFactory.class); + + private final ProducerType producerType; + private final Executor parent; + private final int initialCapacity; + private final int maxCapacity; + private final int maxBurstSize; + + public BoundedOrderedExecutorFactory(Executor parent) { + this(ProducerType.Multi, parent, DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY, DEFAULT_MAX_BURST_SIZE); + } + + /** + * Construct a new instance delegating to the given parent executor. + * + * @param parent the parent executor + */ + public BoundedOrderedExecutorFactory(ProducerType producerType, + Executor parent, + int initialCapacity, + int maxCapacity, + int maxBurstSize) { + this.producerType = producerType; + this.parent = parent; + this.initialCapacity = initialCapacity; + this.maxCapacity = maxCapacity; + this.maxBurstSize = maxBurstSize; + } + + private static int drainCommands(final Queue commands, int maxBurstSize) { + for (int i = 0; i < maxBurstSize; i++) { + final Runnable command = commands.poll(); + if (command == null) { + return i; + } + try { + command.run(); + } catch (ActiveMQInterruptedException e) { + // This could happen during shutdowns. Nothing to be concerned about here + logger.debug("Interrupted Thread", e); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } + return maxBurstSize; + } + + /** + * Get an executor that always executes tasks in order. + * + * @return an ordered executor + */ + @Override + public BoundedExecutor getExecutor() { + if (this.initialCapacity == this.maxCapacity) { + return createFixedExecutor(this.producerType, this.maxCapacity, this.maxBurstSize, this.parent); + } else { + return createGrowableExecutor(this.producerType, this.initialCapacity, this.maxCapacity, this.maxBurstSize, this.parent); + } + } + + public static BoundedExecutor createFixedExecutor(ProducerType producerType, + int requiredCapacity, + int maxBurstSize, + Executor parent) { + final Queue queue; + final int capacity; + switch (producerType) { + case Multi: + final MpscArrayQueue mpscChunkedArrayQueue = new MpscArrayQueue<>(requiredCapacity); + queue = mpscChunkedArrayQueue; + capacity = mpscChunkedArrayQueue.capacity(); + break; + case Single: + final SpscArrayQueue spscChunkedArrayQueue = new SpscArrayQueue<>(requiredCapacity); + queue = spscChunkedArrayQueue; + capacity = spscChunkedArrayQueue.capacity(); + break; + default: + throw new AssertionError("producerType not supported"); + } + return new BoundedOrderedExecutor(queue, capacity, maxBurstSize, parent); + } + + public static BoundedExecutor createGrowableExecutor(ProducerType producerType, + int initialCapacity, + int maxCapacity, + int maxBurstSize, + Executor parent) { + final Queue queue; + final int capacity; + switch (producerType) { + case Multi: + final MpscChunkedArrayQueue mpscChunkedArrayQueue = new MpscChunkedArrayQueue<>(initialCapacity, maxCapacity); + queue = mpscChunkedArrayQueue; + capacity = mpscChunkedArrayQueue.capacity(); + break; + case Single: + final SpscChunkedArrayQueue spscChunkedArrayQueue = new SpscChunkedArrayQueue<>(initialCapacity, maxCapacity); + queue = spscChunkedArrayQueue; + //SpscChunkedArrayQueue doesn't support the MessagePassingQueue interface yet! + capacity = Pow2.roundToPowerOfTwo(maxCapacity); + break; + default: + throw new AssertionError("producerType not supported"); + } + return new BoundedOrderedExecutor(queue, capacity, maxBurstSize, parent); + } + + /** + * The padding is employed for false sharing protection: separate the cold fields (load only) from the hot one (the state, store and load) + */ + private abstract static class BoundedOrderedExecutorL0Pad { + + protected long p00, p01, p02, p03, p04, p05, p06; + protected long p10, p11, p12, p13, p14, p15, p16, p17; + } + + private abstract static class OrderedExecutorState extends BoundedOrderedExecutorL0Pad { + + protected static final int RELEASED = 0; + public static final AtomicIntegerFieldUpdater STATE_UPDATER; + + static { + STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutorState.class, "state"); + } + + private volatile int state = 0; + + } + + private abstract static class BoundedOrderedExecutorL1Pad extends OrderedExecutorState { + + protected long p01, p02, p03, p04, p05, p06, p07; + protected long p10, p11, p12, p13, p14, p15, p16, p17; + } + + private static final class BoundedOrderedExecutor extends BoundedOrderedExecutorL1Pad implements BoundedExecutor { + + private final Queue commands; + private final Runnable executeCommandsTask; + private final Executor delegate; + private final int maxBurstSize; + private final int capacity; + + BoundedOrderedExecutor(Queue commands, int capacity, int maxBurstSize, Executor delegate) { + this.commands = commands; + this.executeCommandsTask = this::executeCommands; + this.delegate = delegate; + this.capacity = capacity; + this.maxBurstSize = maxBurstSize; + } + + private boolean tryAcquire() { + //much cheaper than CAS when highly contended + final long oldState = STATE_UPDATER.getAndIncrement(this); + final boolean isAcquired = oldState == RELEASED; + return isAcquired; + } + + private boolean isReleased() { + return STATE_UPDATER.get(this) == RELEASED; + } + + private void release() { + //StoreStore + LoadStore: much cheaper than a volatile store + STATE_UPDATER.lazySet(this, RELEASED); + } + + private void executeCommands() { + final Queue commands = this.commands; + final int maxBurstSize = this.maxBurstSize; + //let others consumers to try to acquire the lock and drain the tasks + while (!commands.isEmpty() && tryAcquire()) { + try { + drainCommands(commands, maxBurstSize); + } finally { + release(); + } + } + } + + @Override + public int capacity() { + return this.capacity; + } + + @Override + public int pendingTasks() { + return this.commands.size(); + } + + @Override + public boolean isEmpty() { + return this.commands.isEmpty(); + } + + @Override + public boolean tryExecute(Runnable command) { + //no optimisations on recursive offers + if (commands.offer(command)) { + if (isReleased() && !commands.isEmpty()) { + this.delegate.execute(executeCommandsTask); + } + return true; + } else { + return false; + } + } + + @Override + public void execute(Runnable command) { + //no optimisations on recursive offers + if (!commands.offer(command)) { + throw new RejectedExecutionException("can't submit the task: max capacity reached"); + } else if (isReleased() && !commands.isEmpty()) { + this.delegate.execute(executeCommandsTask); + } + } + + @Override + public String toString() { + return "BoundedOrderedExecutor{" + "delegate=" + delegate + ", maxBurstSize=" + maxBurstSize + ", capacity=" + capacity + ", pending=" + pendingTasks() + '}'; + } + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java index 24fa5e79bb1..85c57ff9cce 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java @@ -29,6 +29,7 @@ */ public final class OrderedExecutorFactory implements ExecutorFactory { + private static final boolean FORCE_UNBOUNDED_EXECUTOR = Boolean.getBoolean("force.unbounded.executor"); private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class); private final Executor parent; @@ -49,7 +50,11 @@ public OrderedExecutorFactory(final Executor parent) { */ @Override public Executor getExecutor() { - return new OrderedExecutor(parent); + if (FORCE_UNBOUNDED_EXECUTOR) { + return new OrderedExecutor(parent); + } else { + return BoundedOrderedExecutorFactory.createGrowableExecutor(BoundedOrderedExecutorFactory.ProducerType.Multi, BoundedOrderedExecutorFactory.DEFAULT_INITIAL_CAPACITY, BoundedOrderedExecutorFactory.DEFAULT_MAX_CAPACITY, BoundedOrderedExecutorFactory.DEFAULT_MAX_BURST_SIZE, this.parent); + } } /** diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/HandrolledExecutorFactoryThroughputTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/HandrolledExecutorFactoryThroughputTest.java new file mode 100644 index 00000000000..340f2f9cd68 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/HandrolledExecutorFactoryThroughputTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import org.jctools.util.Pow2; + +public class HandrolledExecutorFactoryThroughputTest { + + public static void main(String[] args) throws Exception { + final int producers = 1; + final int consumers = 1; + final BoundedOrderedExecutorFactory.ProducerType producerType = BoundedOrderedExecutorFactory.ProducerType.Multi; + final int initialCapacity = 1024; + final int maxCapacity = Pow2.MAX_POW2; + final int maxBurstSize = Integer.MAX_VALUE; + final ExecutorType executorType = ExecutorType.BOUNDED_ORDERED; + final int TESTS = 10; + final int OPERATIONS = 1_000_000; + final Executor executor = Executors.newFixedThreadPool(consumers); + final Executor producerExecutor; + switch (executorType) { + case ORDERED: + System.setProperty("force.unbounded.executor", Boolean.TRUE.toString()); + producerExecutor = new OrderedExecutorFactory(executor).getExecutor(); + break; + case BOUNDED_ORDERED: + producerExecutor = new BoundedOrderedExecutorFactory(producerType, executor, initialCapacity, maxCapacity, maxBurstSize).getExecutor(); + break; + default: + throw new AssertionError("not supported case!"); + } + final CountDownLatch[] producersStart = new CountDownLatch[TESTS]; + final long[][] elapsedProducingPerProducer = new long[producers][TESTS]; + final long[][] elapsedPerProducer = new long[producers][TESTS]; + final CountDownLatch[] producersStop = new CountDownLatch[TESTS]; + for (int i = 0; i < TESTS; i++) { + producersStart[i] = new CountDownLatch(producers); + producersStop[i] = new CountDownLatch(producers); + } + final Thread[] producerThreads = new Thread[producers]; + for (int i = 0; i < producers; i++) { + elapsedPerProducer[i] = new long[TESTS]; + elapsedProducingPerProducer[i] = new long[TESTS]; + final int producerIndex = i; + final Thread producerRunner = new Thread(() -> run(producerExecutor, new ProducerTask(), TESTS, OPERATIONS, elapsedProducingPerProducer[producerIndex], elapsedPerProducer[producerIndex], producersStart, producersStop)); + producerRunner.start(); + producerThreads[i] = producerRunner; + } + + for (int i = 0; i < producers; i++) { + producerThreads[i].join(); + } + if (executor instanceof AutoCloseable) { + ((AutoCloseable) executor).close(); + } else if (executor instanceof ExecutorService) { + ((ExecutorService) executor).shutdown(); + } + for (int t = 0; t < TESTS; t++) { + System.out.println("TEST\t" + (t + 1)); + System.out.println("\tproduction\t\tservice"); + for (int p = 0; p < producers; p++) { + System.out.println(p + ":\t" + (OPERATIONS * 1000_000_000L) / elapsedProducingPerProducer[p][t] + "\t\t" + (OPERATIONS * 1000_000_000L) / elapsedPerProducer[p][t] + "\tops/sec"); + } + } + } + + private static void run(Executor executor, + ProducerTask task, + int tests, + int operations, + long[] elapsedProductionPerRun, + long[] elapsedPerRun, + CountDownLatch[] startedPerRun, + CountDownLatch[] finishedPerRun) { + long count = 0; + + for (int t = 0; t < tests; t++) { + final CountDownLatch started = startedPerRun[t]; + started.countDown(); + try { + started.await(); + } catch (Exception e) { + //no_op + } + final long start = System.nanoTime(); + for (int i = 0; i < operations; i++) { + executeOp(executor, task); + count++; + } + final long elapsedProduction = System.nanoTime() - start; + final AtomicLong lastExecutedCommandId = task.lastExecutedCommandId; + while (lastExecutedCommandId.get() != count) { + LockSupport.parkNanos(1L); + } + final long elapsed = System.nanoTime() - start; + elapsedPerRun[t] = elapsed; + elapsedProductionPerRun[t] = elapsedProduction; + //wait the other producers + final CountDownLatch finishedRun = finishedPerRun[t]; + finishedRun.countDown(); + try { + finishedRun.await(); + } catch (Exception e) { + //no_op + } + } + } + + private static void executeOp(Executor executor, Runnable task) { + executor.execute(task); + } + + private enum ExecutorType { + ORDERED, BOUNDED_ORDERED + } + + private static final class ProducerTask implements Runnable { + + private final AtomicLong lastExecutedCommandId = new AtomicLong(0); + + @Override + public void run() { + //force heavyweight operations to simulate a real consume + final long beforeChange = lastExecutedCommandId.getAndIncrement(); + final long oldValue = lastExecutedCommandId.get(); + if (oldValue != beforeChange + 1) { + throw new IllegalStateException("can't happen"); + } + } + } +} diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 6b3430e7d1b..0e4c70f350a 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -80,6 +80,7 @@ org.jboss.logging:jboss-logging org.jboss.slf4j:slf4j-jboss-logmanager io.netty:netty-all + org.jctools:jctools-core org.apache.qpid:proton-j org.apache.activemq:activemq-client org.slf4j:slf4j-api diff --git a/artemis-server-osgi/pom.xml b/artemis-server-osgi/pom.xml index 21306cdb63d..225efd18741 100644 --- a/artemis-server-osgi/pom.xml +++ b/artemis-server-osgi/pom.xml @@ -129,6 +129,7 @@ *;scope=compile|runtime;groupId=org.apache.activemq + org.jctools.*;resolution:=optional, org.postgresql*;resolution:=optional, io.netty.buffer;io.netty.*;version="[4.1,5)", * diff --git a/pom.xml b/pom.xml index 840dda16974..71d9efe6f5c 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,7 @@ 3.6.13.Final 2.4 4.1.9.Final + 2.0.2 0.19.0 3.0.19.Final 1.7.21 @@ -459,6 +460,13 @@ ${netty.version} + + + org.jctools + jctools-core + ${jctools.version} + + org.apache.qpid proton-j