From a5ffd4eb18e6e9eab30c176a7bb4008a51b3d59d Mon Sep 17 00:00:00 2001 From: Andrey Gura Date: Wed, 13 Sep 2017 15:36:26 +0300 Subject: [PATCH] ignite-6339 WAL write operations are optimized and file IO operations are non-interruptible from user thread now --- .../org/apache/ignite/DataStorageMetrics.java | 16 +- .../apache/ignite/IgniteSystemProperties.java | 6 + .../DataStorageConfiguration.java | 31 +- .../PersistentStoreConfiguration.java | 42 +- .../pagemem/wal/record/WALRecord.java | 19 +- .../persistence/DataStorageMetricsImpl.java | 28 +- .../DataStorageMetricsSnapshot.java | 9 + .../cache/persistence/file/AsyncFileIO.java | 43 +- .../cache/persistence/file/FileIO.java | 35 +- .../persistence/file/FileIODecorator.java | 30 +- .../persistence/file/RandomAccessFileIO.java | 30 +- .../wal/AbstractWalRecordsIterator.java | 7 +- .../cache/persistence/wal/FileWALPointer.java | 48 +- .../wal/FileWriteAheadLogManager.java | 1234 +++++++++-------- .../wal/SegmentedRingByteBuffer.java | 593 ++++++++ .../persistence/wal/record/HeaderRecord.java | 4 +- .../serializer/RecordDataV1Serializer.java | 107 +- .../serializer/RecordDataV2Serializer.java | 9 +- .../wal/serializer/RecordV1Serializer.java | 46 +- .../wal/serializer/RecordV2Serializer.java | 27 +- .../utils/PlatformConfigurationUtils.java | 105 +- .../ignite/internal/util/GridUnsafe.java | 9 + .../ignite/internal/util/IgniteUtils.java | 6 +- .../VisorPersistentStoreConfiguration.java | 2 +- .../mxbean/DataStorageMetricsMXBean.java | 4 + ...ActivateDeactivateTestWithPersistence.java | 4 + .../db/IgnitePdsTransactionsHangTest.java | 6 +- .../db/wal/IgnitePdsWalTlbTest.java | 126 -- .../db/wal/IgniteWalFlushFailoverTest.java | 40 +- ...lushMultiNodeFailoverAbstractSelfTest.java | 25 +- .../db/wal/reader/IgniteWalReaderTest.java | 36 +- .../db/wal/reader/MockWalIteratorFactory.java | 11 +- .../pagemem/PagesWriteThrottleSmokeTest.java | 7 + .../wal/SegmentedRingByteBufferTest.java | 744 ++++++++++ .../IgnitePdsOutOfMemoryTestSuite.java | 38 - .../ignite/testsuites/IgnitePdsTestSuite.java | 3 + 36 files changed, 2423 insertions(+), 1107 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBuffer.java delete mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsWalTlbTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java delete mode 100644 modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsOutOfMemoryTestSuite.java diff --git a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java index e26bb1fe5b225..680caba62b8f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java @@ -26,7 +26,7 @@ public interface DataStorageMetrics { * Gets the average number of WAL records per second written during the last time interval. *

* The length of time interval is configured via {@link DataStorageConfiguration#setMetricsRateTimeInterval(long)} - * configurartion property. + * configuration property. * The number of subintervals is configured via {@link DataStorageConfiguration#setMetricsSubIntervalCount(int)} * configuration property. */ @@ -35,7 +35,7 @@ public interface DataStorageMetrics { /** * Gets the average number of bytes per second written during the last time interval. * The length of time interval is configured via {@link DataStorageConfiguration#setMetricsRateTimeInterval(long)} - * configurartion property. + * configuration property. * The number of subintervals is configured via {@link DataStorageConfiguration#setMetricsSubIntervalCount(int)} * configuration property. */ @@ -50,12 +50,22 @@ public interface DataStorageMetrics { * Gets the average WAL fsync duration in microseconds over the last time interval. *

* The length of time interval is configured via {@link DataStorageConfiguration#setMetricsRateTimeInterval(long)} - * configurartion property. + * configuration property. * The number of subintervals is configured via {@link DataStorageConfiguration#setMetricsSubIntervalCount(int)} * configuration property. */ public float getWalFsyncTimeAverage(); + /** + * Returns WAL buffer poll spins number over the last time interval. + *

+ * The length of time interval is configured via {@link DataStorageConfiguration#setMetricsRateTimeInterval(long)} + * configuration property. + * The number of subintervals is configured via {@link DataStorageConfiguration#setMetricsSubIntervalCount(int)} + * configuration property. + */ + public long getWalBuffPollSpinsRate(); + /** * Gets the duration of the last checkpoint in milliseconds. * diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 8e2298fc67d1f..97329055d3787 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -750,6 +750,12 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_WAL_SERIALIZER_VERSION = "IGNITE_WAL_SERIALIZER_VERSION"; + /** + * Property that indicates should be mapped byte buffer used or not. + * Possible values: {@code true} and {@code false}. + */ + public static final String IGNITE_WAL_MMAP = "IGNITE_WAL_MMAP"; + /** * When set to {@code true}, Data store folders are generated only by consistent id, and no consistent ID will be * set based on existing data store folders. This option also enables compatible folder generation mode as it was diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index 2c903984446db..381c3152287b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -124,6 +124,9 @@ public class DataStorageConfiguration implements Serializable { /** Default thread local buffer size. */ public static final int DFLT_TLB_SIZE = 128 * 1024; + /** Default thread local buffer size. */ + public static final int DFLT_WAL_BUFF_SIZE = DFLT_WAL_SEGMENT_SIZE / 4; + /** Default Wal flush frequency. */ public static final int DFLT_WAL_FLUSH_FREQ = 2000; @@ -205,6 +208,9 @@ public class DataStorageConfiguration implements Serializable { /** WAl thread local buffer size. */ private int walTlbSize = DFLT_TLB_SIZE; + /** WAl buffer size. */ + private int walBuffSize/* = DFLT_WAL_BUFF_SIZE*/; + /** Wal flush frequency in milliseconds. */ private long walFlushFreq = DFLT_WAL_FLUSH_FREQ; @@ -230,7 +236,7 @@ public class DataStorageConfiguration implements Serializable { * rate-based metrics when next sub-interval has to be recycled but introduces bigger * calculation overhead. */ - private int metricsSubIntervalCount = DFLT_SUB_INTERVALS; + private int metricsSubIntervalCnt = DFLT_SUB_INTERVALS; /** Time interval (in milliseconds) for rate-based metrics. */ private long metricsRateTimeInterval = DFLT_RATE_TIME_INTERVAL_MILLIS; @@ -648,7 +654,7 @@ public DataStorageConfiguration setMetricsRateTimeInterval(long metricsRateTimeI * @return The number of sub-intervals for history tracking. */ public int getMetricsSubIntervalCount() { - return metricsSubIntervalCount; + return metricsSubIntervalCnt; } /** @@ -657,7 +663,7 @@ public int getMetricsSubIntervalCount() { * @param metricsSubIntervalCnt The number of sub-intervals for history tracking. */ public DataStorageConfiguration setMetricsSubIntervalCount(int metricsSubIntervalCnt) { - this.metricsSubIntervalCount = metricsSubIntervalCnt; + this.metricsSubIntervalCnt = metricsSubIntervalCnt; return this; } @@ -706,6 +712,25 @@ public DataStorageConfiguration setWalThreadLocalBufferSize(int walTlbSize) { return this; } + /** + * Property defines size of WAL buffer. + * Each WAL record will be serialized to this buffer before write in WAL file. + * + * @return WAL buffer size. + */ + public int getWalBufferSize() { + return walBuffSize <= 0 ? getWalSegmentSize() / 4 : walBuffSize; + } + + /** + * @param walBuffSize WAL buffer size. + */ + public DataStorageConfiguration setWalBufferSize(int walBuffSize) { + this.walBuffSize = walBuffSize; + + return this; + } + /** * This property define how often WAL will be fsync-ed in {@code BACKGROUND} mode. Ignored for * all other WAL modes. diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index c41721a5f9450..d59d19b67b2d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -65,9 +65,6 @@ public class PersistentStoreConfiguration implements Serializable { /** Default wal mode. */ public static final WALMode DFLT_WAL_MODE = WALMode.DEFAULT; - /** Default thread local buffer size. */ - public static final int DFLT_TLB_SIZE = 128 * 1024; - /** Default Wal flush frequency. */ public static final int DFLT_WAL_FLUSH_FREQ = 2000; @@ -128,8 +125,8 @@ public class PersistentStoreConfiguration implements Serializable { /** Wal mode. */ private WALMode walMode = DFLT_WAL_MODE; - /** WAl thread local buffer size. */ - private int tlbSize = DFLT_TLB_SIZE; + /** WAl buffer size. */ + private int walBuffSize/* = DFLT_WAL_BUFF_SIZE*/; /** Wal flush frequency in milliseconds. */ private long walFlushFreq = DFLT_WAL_FLUSH_FREQ; @@ -492,20 +489,43 @@ public PersistentStoreConfiguration setWalMode(WALMode walMode) { } /** - * Property define size thread local buffer. - * Each thread which write to wal have thread local buffer for serialize recode before write in wal. + * Property defines size of WAL buffer. + * Each WAL record will be serialized to this buffer before write in WAL file. * - * @return Thread local buffer size. + * @return WAL buffer size. + * @deprecated Instead {@link #getWalBufferSize()} should be used. */ + @Deprecated public int getTlbSize() { - return tlbSize <= 0 ? DFLT_TLB_SIZE : tlbSize; + return getWalBufferSize(); } /** - * @param tlbSize Tlb size. + * @param tlbSize WAL buffer size. + * @deprecated Instead {@link #setWalBufferSize(int walBuffSize)} should be used. */ + @Deprecated public PersistentStoreConfiguration setTlbSize(int tlbSize) { - this.tlbSize = tlbSize; + return setWalBufferSize(tlbSize); + } + + /** + * Property defines size of WAL buffer. + * Each WAL record will be serialized to this buffer before write in WAL file. + * + * @return WAL buffer size. + */ + @Deprecated + public int getWalBufferSize() { + return walBuffSize <= 0 ? getWalSegmentSize() / 4 : walBuffSize; + } + + /** + * @param walBuffSize WAL buffer size. + */ + @Deprecated + public PersistentStoreConfiguration setWalBufferSize(int walBuffSize) { + this.walBuffSize = walBuffSize; return this; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 08bba1b174fb2..744691695314e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -43,7 +43,7 @@ public enum RecordType { /** Checkpoint (begin) record */ CHECKPOINT_RECORD, - /** */ + /** WAL segment header record. */ HEADER_RECORD, // Delta records. @@ -187,9 +187,6 @@ public static RecordType fromOrdinal(int ord) { /** */ private int size; - /** */ - private int chainSize; - /** */ @GridToStringExclude private WALRecord prev; @@ -197,20 +194,6 @@ public static RecordType fromOrdinal(int ord) { /** */ private WALPointer pos; - /** - * @param chainSize Chain size in bytes. - */ - public void chainSize(int chainSize) { - this.chainSize = chainSize; - } - - /** - * @return Get chain size in bytes. - */ - public int chainSize() { - return chainSize; - } - /** * @return Previous record in chain. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java index e871597c593ec..4f32c25321e6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java @@ -34,7 +34,10 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { private volatile HitRateMetrics walFsyncTimeDuration; /** */ - private volatile HitRateMetrics walFsyncTimeNumber; + private volatile HitRateMetrics walFsyncTimeNum; + + /** */ + private volatile HitRateMetrics walBuffPollSpinsNum; /** */ private volatile long lastCpLockWaitDuration; @@ -118,7 +121,7 @@ public DataStorageMetricsImpl( if (!metricsEnabled) return 0; - long numRate = walFsyncTimeNumber.getRate(); + long numRate = walFsyncTimeNum.getRate(); if (numRate == 0) return 0; @@ -126,6 +129,15 @@ public DataStorageMetricsImpl( return (float)walFsyncTimeDuration.getRate() / numRate; } + /** {@inheritDoc} */ + @Override public long getWalBuffPollSpinsRate() { + if (!metricsEnabled) + return 0; + + return walBuffPollSpinsNum.getRate(); + } + + /** {@inheritDoc} */ @Override public long getLastCheckpointDuration() { if (!metricsEnabled) @@ -281,7 +293,14 @@ public void onFsync(long nanoTime) { long microseconds = nanoTime / 1_000; walFsyncTimeDuration.onHits(microseconds); - walFsyncTimeNumber.onHit(); + walFsyncTimeNum.onHit(); + } + + /** + * @param num Number. + */ + public void onBuffPollSpin(int num) { + walBuffPollSpinsNum.onHits(num); } /** @@ -290,8 +309,9 @@ public void onFsync(long nanoTime) { private void resetRates() { walLoggingRate = new HitRateMetrics((int)rateTimeInterval, subInts); walWritingRate = new HitRateMetrics((int)rateTimeInterval, subInts); + walBuffPollSpinsNum = new HitRateMetrics((int)rateTimeInterval, subInts); walFsyncTimeDuration = new HitRateMetrics((int)rateTimeInterval, subInts); - walFsyncTimeNumber = new HitRateMetrics((int)rateTimeInterval, subInts); + walFsyncTimeNum = new HitRateMetrics((int)rateTimeInterval, subInts); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java index 5bbb0e1086c4f..c67eeeb1f2bb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java @@ -35,6 +35,9 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics { /** */ private float walFsyncTimeAvg; + /** */ + private long walBuffPollSpinsNum; + /** */ private long lastCpDuration; @@ -67,6 +70,7 @@ public DataStorageMetricsSnapshot(DataStorageMetrics metrics) { walWritingRate = metrics.getWalWritingRate(); walArchiveSegments = metrics.getWalArchiveSegments(); walFsyncTimeAvg = metrics.getWalFsyncTimeAverage(); + walBuffPollSpinsNum = metrics.getWalBuffPollSpinsRate(); lastCpDuration = metrics.getLastCheckpointDuration(); lastCpLockWaitDuration = metrics.getLastCheckpointLockWaitDuration(); lastCpMmarkDuration = metrics.getLastCheckpointMarkDuration(); @@ -97,6 +101,11 @@ public DataStorageMetricsSnapshot(DataStorageMetrics metrics) { return walFsyncTimeAvg; } + /** {@inheritDoc} */ + @Override public long getWalBuffPollSpinsRate() { + return walBuffPollSpinsNum; + } + /** {@inheritDoc} */ @Override public long getLastCheckpointDuration() { return lastCpDuration; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java index 8fad7a5109d01..b1db79d706a2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.OpenOption; @@ -69,11 +70,11 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... } /** {@inheritDoc} */ - @Override public int read(ByteBuffer destinationBuffer) throws IOException { + @Override public int read(ByteBuffer destBuf) throws IOException { ChannelOpFuture fut = holder.get(); fut.reset(); - ch.read(destinationBuffer, position, this, fut); + ch.read(destBuf, position, this, fut); try { return fut.getUninterruptibly(); @@ -84,11 +85,11 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... } /** {@inheritDoc} */ - @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException { + @Override public int read(ByteBuffer destBuf, long position) throws IOException { ChannelOpFuture fut = holder.get(); fut.reset(); - ch.read(destinationBuffer, position, null, fut); + ch.read(destBuf, position, null, fut); try { return fut.getUninterruptibly(); @@ -102,11 +103,12 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... } /** {@inheritDoc} */ - @Override public int read(byte[] buffer, int offset, int length) throws IOException { + @Override public int read(byte[] buf, int off, int + length) throws IOException { ChannelOpFuture fut = holder.get(); fut.reset(); - ch.read(ByteBuffer.wrap(buffer, offset, length), position, this, fut); + ch.read(ByteBuffer.wrap(buf, off, length), position, this, fut); try { return fut.getUninterruptibly(); @@ -117,11 +119,11 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... } /** {@inheritDoc} */ - @Override public int write(ByteBuffer sourceBuffer) throws IOException { + @Override public int write(ByteBuffer srcBuf) throws IOException { ChannelOpFuture fut = holder.get(); fut.reset(); - ch.write(sourceBuffer, position, this, fut); + ch.write(srcBuf, position, this, fut); try { return fut.getUninterruptibly(); @@ -132,13 +134,13 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... } /** {@inheritDoc} */ - @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException { + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { ChannelOpFuture fut = holder.get(); fut.reset(); asyncFuts.add(fut); - ch.write(sourceBuffer, position, null, fut); + ch.write(srcBuf, position, null, fut); try { return fut.getUninterruptibly(); @@ -152,11 +154,11 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... } /** {@inheritDoc} */ - @Override public void write(byte[] buffer, int offset, int length) throws IOException { + @Override public void write(byte[] buf, int off, int len) throws IOException { ChannelOpFuture fut = holder.get(); fut.reset(); - ch.write(ByteBuffer.wrap(buffer, offset, length), position, this, fut); + ch.write(ByteBuffer.wrap(buf, off, len), position, this, fut); try { fut.getUninterruptibly(); @@ -166,6 +168,11 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... } } + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap."); + } + /** {@inheritDoc} */ @Override public void force() throws IOException { ch.force(false); @@ -200,18 +207,18 @@ public AsyncFileIO(File file, ThreadLocal holder, OpenOption... /** */ static class ChannelOpFuture extends GridFutureAdapter implements CompletionHandler { /** {@inheritDoc} */ - @Override public void completed(Integer result, AsyncFileIO attachment) { - if (attachment != null) { - if (result != -1) - attachment.position += result; + @Override public void completed(Integer res, AsyncFileIO attach) { + if (attach != null) { + if (res != -1) + attach.position += res; } // Release waiter and allow next operation to begin. - super.onDone(result, null); + super.onDone(res, null); } /** {@inheritDoc} */ - @Override public void failed(Throwable exc, AsyncFileIO attachment) { + @Override public void failed(Throwable exc, AsyncFileIO attach) { super.onDone(exc); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java index 1e81150658596..849f03a51453a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; /** * Interface to perform file I/O operations. @@ -49,76 +50,78 @@ public interface FileIO extends AutoCloseable { /** * Reads a sequence of bytes from this file into the {@code destinationBuffer}. * - * @param destinationBuffer Destination byte buffer. + * @param destBuf Destination byte buffer. * * @return Number of read bytes. * * @throws IOException If some I/O error occurs. */ - public int read(ByteBuffer destinationBuffer) throws IOException; + public int read(ByteBuffer destBuf) throws IOException; /** * Reads a sequence of bytes from this file into the {@code destinationBuffer} * starting from specified file {@code position}. * - * @param destinationBuffer Destination byte buffer. + * @param destBuf Destination byte buffer. * @param position Starting position of file. * * @return Number of read bytes. * * @throws IOException If some I/O error occurs. */ - public int read(ByteBuffer destinationBuffer, long position) throws IOException; + public int read(ByteBuffer destBuf, long position) throws IOException; /** * Reads a up to {@code length} bytes from this file into the {@code buffer}. * - * @param buffer Destination byte array. - * @param offset The start offset in array {@code b} + * @param buf Destination byte array. + * @param off The start offset in array {@code b} * at which the data is written. - * @param length Maximum number of bytes read. + * @param len Maximum number of bytes read. * * @return Number of read bytes. * * @throws IOException If some I/O error occurs. */ - public int read(byte[] buffer, int offset, int length) throws IOException; + public int read(byte[] buf, int off, int len) throws IOException; /** * Writes a sequence of bytes to this file from the {@code sourceBuffer}. * - * @param sourceBuffer Source buffer. + * @param srcBuf Source buffer. * * @return Number of written bytes. * * @throws IOException If some I/O error occurs. */ - public int write(ByteBuffer sourceBuffer) throws IOException; + public int write(ByteBuffer srcBuf) throws IOException; /** * Writes a sequence of bytes to this file from the {@code sourceBuffer} * starting from specified file {@code position} * - * @param sourceBuffer Source buffer. + * @param srcBuf Source buffer. * @param position Starting file position. * * @return Number of written bytes. * * @throws IOException If some I/O error occurs. */ - public int write(ByteBuffer sourceBuffer, long position) throws IOException; + public int write(ByteBuffer srcBuf, long position) throws IOException; /** * Writes {@code length} bytes from the {@code buffer} * starting at offset {@code off} to this file. * - * @param buffer Source byte array. - * @param offset Start offset in the {@code buffer}. - * @param length Number of bytes to write. + * @param buf Source byte array. + * @param off Start offset in the {@code buffer}. + * @param len Number of bytes to write. * * @throws IOException If some I/O error occurs. */ - public void write(byte[] buffer, int offset, int length) throws IOException; + public void write(byte[] buf, int off, int len) throws IOException; + + public MappedByteBuffer map(int maxWalSegmentSize) throws IOException; /** * Forces any updates of this file to be written to the storage diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java index 53115908a714e..dd563f2d68197 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; /** * Decorator class for File I/O @@ -46,33 +47,38 @@ public FileIODecorator(FileIO delegate) { } /** {@inheritDoc} */ - @Override public int read(ByteBuffer destinationBuffer) throws IOException { - return delegate.read(destinationBuffer); + @Override public int read(ByteBuffer destBuf) throws IOException { + return delegate.read(destBuf); } /** {@inheritDoc} */ - @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException { - return delegate.read(destinationBuffer, position); + @Override public int read(ByteBuffer destBuf, long position) throws IOException { + return delegate.read(destBuf, position); } /** {@inheritDoc} */ - @Override public int read(byte[] buffer, int offset, int length) throws IOException { - return delegate.read(buffer, offset, length); + @Override public int read(byte[] buf, int off, int len) throws IOException { + return delegate.read(buf, off, len); } /** {@inheritDoc} */ - @Override public int write(ByteBuffer sourceBuffer) throws IOException { - return delegate.write(sourceBuffer); + @Override public int write(ByteBuffer srcBuf) throws IOException { + return delegate.write(srcBuf); } /** {@inheritDoc} */ - @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException { - return delegate.write(sourceBuffer, position); + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + return delegate.write(srcBuf, position); } /** {@inheritDoc} */ - @Override public void write(byte[] buffer, int offset, int length) throws IOException { - delegate.write(buffer, offset, length); + @Override public void write(byte[] buf, int off, int len) throws IOException { + delegate.write(buf, off, len); + } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + return delegate.map(maxWalSegmentSize); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java index 55849fe0cf0b5..23d6ebfeead76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.OpenOption; @@ -53,33 +54,33 @@ public RandomAccessFileIO(File file, OpenOption... modes) throws IOException { } /** {@inheritDoc} */ - @Override public int read(ByteBuffer destinationBuffer) throws IOException { - return ch.read(destinationBuffer); + @Override public int read(ByteBuffer destBuf) throws IOException { + return ch.read(destBuf); } /** {@inheritDoc} */ - @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException { - return ch.read(destinationBuffer, position); + @Override public int read(ByteBuffer destBuf, long position) throws IOException { + return ch.read(destBuf, position); } /** {@inheritDoc} */ - @Override public int read(byte[] buffer, int offset, int length) throws IOException { - return ch.read(ByteBuffer.wrap(buffer, offset, length)); + @Override public int read(byte[] buf, int off, int len) throws IOException { + return ch.read(ByteBuffer.wrap(buf, off, len)); } /** {@inheritDoc} */ - @Override public int write(ByteBuffer sourceBuffer) throws IOException { - return ch.write(sourceBuffer); + @Override public int write(ByteBuffer srcBuf) throws IOException { + return ch.write(srcBuf); } /** {@inheritDoc} */ - @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException { - return ch.write(sourceBuffer, position); + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + return ch.write(srcBuf, position); } /** {@inheritDoc} */ - @Override public void write(byte[] buffer, int offset, int length) throws IOException { - ch.write(ByteBuffer.wrap(buffer, offset, length)); + @Override public void write(byte[] buf, int off, int len) throws IOException { + ch.write(ByteBuffer.wrap(buf, off, len)); } /** {@inheritDoc} */ @@ -101,4 +102,9 @@ public RandomAccessFileIO(File file, OpenOption... modes) throws IOException { @Override public void close() throws IOException { ch.close(); } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + return ch.map(FileChannel.MapMode.READ_WRITE, 0, maxWalSegmentSize); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 7415db321ac7a..c2c07f6b3a6a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -219,13 +219,10 @@ private IgniteBiTuple advanceRecord( if (hnd == null) return null; - final FileWALPointer ptr = new FileWALPointer( - hnd.idx, - (int)hnd.in.position(), - 0); + FileWALPointer ptr = new FileWALPointer(hnd.idx, (int)hnd.in.position(),0); try { - final WALRecord rec = hnd.ser.readRecord(hnd.in, ptr); + WALRecord rec = hnd.ser.readRecord(hnd.in, ptr); ptr.length(rec.size()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java index 4998700024a63..0e095fa24f17f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java @@ -19,6 +19,7 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; /** * File WAL pointer. @@ -31,34 +32,20 @@ public class FileWALPointer implements WALPointer, Comparable { private final long idx; /** */ - private final int fileOffset; + private final int fileOff; /** Written record length */ private int len; - /** Force flush flag. Used in BACKGROUND WAL mode. */ - private boolean forceFlush; - - /** - * @param idx Absolute WAL segment file index (incremental counter) - * @param fileOffset Offset in file, from the beginning. - * @param len Record length. - */ - public FileWALPointer(long idx, int fileOffset, int len) { - this(idx, fileOffset, len, false); - } - /** - * @param idx Absolute WAL segment file index . - * @param fileOffset Offset in file, from the beginning. + * @param idx Absolute WAL segment file index (incremental counter). + * @param fileOff Offset in file, from the beginning. * @param len Record length. - * @param forceFlush Force flush flag. */ - public FileWALPointer(long idx, int fileOffset, int len, boolean forceFlush) { + public FileWALPointer(long idx, int fileOff, int len) { this.idx = idx; - this.fileOffset = fileOffset; + this.fileOff = fileOff; this.len = len; - this.forceFlush = forceFlush; } /** @@ -72,7 +59,7 @@ public long index() { * @return File offset. */ public int fileOffset() { - return fileOffset; + return fileOff; } /** @@ -96,14 +83,7 @@ public void length(int len) { "(this pointer is a terminal): " + this); // Return a terminal pointer. - return new FileWALPointer(idx, fileOffset + len, 0); - } - - /** - * @return Force flush flag. - */ - public boolean forceFlush() { - return forceFlush; + return new FileWALPointer(idx, fileOff + len, 0); } /** {@inheritDoc} */ @@ -116,23 +96,23 @@ public boolean forceFlush() { FileWALPointer that = (FileWALPointer)o; - return idx == that.idx && fileOffset == that.fileOffset; + return idx == that.idx && fileOff == that.fileOff; } /** {@inheritDoc} */ @Override public int hashCode() { - int result = (int)(idx ^ (idx >>> 32)); + int res = (int)(idx ^ (idx >>> 32)); - result = 31 * result + fileOffset; + res = 31 * res + fileOff; - return result; + return res; } /** {@inheritDoc} */ - @Override public int compareTo(FileWALPointer o) { + @Override public int compareTo(@NotNull FileWALPointer o) { int res = Long.compare(idx, o.idx); - return res == 0 ? Integer.compare(fileOffset, o.fileOffset) : res; + return res == 0 ? Integer.compare(fileOff, o.fileOff) : res; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 5cb6b2f7f7b14..4a7a20b8ac61d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -26,12 +26,17 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.MappedByteBuffer; import java.nio.file.Files; import java.sql.Time; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -40,10 +45,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.zip.ZipEntry; @@ -90,7 +95,6 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -98,21 +102,48 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; +import static org.apache.ignite.configuration.WALMode.LOG_ONLY; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD; +import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT; +import static org.apache.ignite.internal.util.IgniteUtils.findField; +import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod; /** * File WAL manager. */ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager { - /** */ - public static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0]; + /** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */ + private static final Method force0 = findNonPublicMethod( + MappedByteBuffer.class, "force0", + java.io.FileDescriptor.class, long.class, long.class + ); + + /** {@link MappedByteBuffer#mappingOffset()}. */ + private static final Method mappingOffset = findNonPublicMethod(MappedByteBuffer.class, "mappingOffset"); + + /** {@link MappedByteBuffer#mappingAddress(long)}. */ + private static final Method mappingAddress = findNonPublicMethod( + MappedByteBuffer.class, "mappingAddress", long.class + ); + + /** {@link MappedByteBuffer#fd} */ + private static final Field fd = findField(MappedByteBuffer.class, "fd"); + + /** Page size. */ + private static final int PAGE_SIZE = GridUnsafe.pageSize(); /** */ - public static final String WAL_SEGMENT_FILE_EXT = ".wal"; + private static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0]; + + /** WAL segment file extension. */ + private static final String WAL_SEGMENT_FILE_EXT = ".wal"; /** */ private static final byte[] FILL_BUF = new byte[1024 * 1024]; @@ -168,6 +199,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Latest serializer version to use. */ private static final int LATEST_SERIALIZER_VERSION = 2; + /** Buffer size. */ + private static final int BUF_SIZE = 1024 * 1024; + + /** Use mapped byte buffer. */ + private static boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true); + /** */ private final boolean alwaysWriteFullPages; @@ -177,9 +214,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final WALMode mode; - /** Thread local byte buffer size, see {@link #tlb} */ - private final int tlbSize; - /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */ private final long flushFreq; @@ -208,7 +242,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private RecordSerializer serializer; /** Serializer latest version to use. */ - private final int serializerVersion = + private final int serializerVer = IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION); /** Latest segment cleared by {@link #truncate(WALPointer)}. */ @@ -217,24 +251,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Factory to provide I/O interfaces for read/write operations with files */ private final FileIOFactory ioFactory; - /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */ - private static final AtomicReferenceFieldUpdater currentHndUpd = - AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd"); - - /** - * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}. - * Introduced to decrease number of buffers allocation. - * Used only for record itself is shorter than {@link #tlbSize}. - */ - private final ThreadLocal tlb = new ThreadLocal() { - @Override protected ByteBuffer initialValue() { - ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize); - - buf.order(GridUnsafe.NATIVE_BYTE_ORDER); - - return buf; - } - }; + /** Updater for {@link #currHnd}, used for verify there are no concurrent update for current log segment handle */ + private static final AtomicReferenceFieldUpdater CURR_HND_UPD = + AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currHnd"); /** */ private volatile FileArchiver archiver; @@ -249,7 +268,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final ThreadLocal lastWALPtr = new ThreadLocal<>(); /** Current log segment handle */ - private volatile FileWriteHandle currentHnd; + private volatile FileWriteHandle currHnd; /** Environment failure. */ private volatile Throwable envFailed; @@ -261,25 +280,26 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final long walAutoArchiveAfterInactivity; /** - * Container with last WAL record logged timestamp.
- * Zero value means there was no records logged to current segment, skip possible archiving for this case
- * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0
+ * Container with last WAL record logged timestamp.
Zero value means there was no records logged to current + * segment, skip possible archiving for this case
Value is filled only for case {@link + * #walAutoArchiveAfterInactivity} > 0
*/ private AtomicLong lastRecordLoggedMs = new AtomicLong(); /** - * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown - * Null for non background modes + * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown Null for non background modes */ @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule; /** - * Reference to the last added next archive timeout check object. - * Null if mode is not enabled. - * Should be cancelled at shutdown + * Reference to the last added next archive timeout check object. Null if mode is not enabled. Should be cancelled + * at shutdown */ @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj; + /** WAL writer worker. */ + private WALWriter walWriter; + /** * @param ctx Kernal context. */ @@ -294,7 +314,6 @@ public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { maxWalSegmentSize = dsCfg.getWalSegmentSize(); mode = dsCfg.getWalMode(); - tlbSize = dsCfg.getWalThreadLocalBufferSize(); flushFreq = dsCfg.getWalFlushFrequency(); fsyncDelay = dsCfg.getWalFsyncDelayNanos(); alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages(); @@ -324,7 +343,7 @@ public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { "write ahead log archive directory" ); - serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion); + serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer); GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); @@ -384,12 +403,15 @@ private void checkWalConfiguration() throws IgniteCheckedException { try { if (mode == WALMode.BACKGROUND) { if (currHnd != null) - currHnd.flush((FileWALPointer)null, true); + currHnd.flush(null); } if (currHnd != null) currHnd.close(false); + if (walWriter != null) + walWriter.shutdown(); + if (archiver != null) archiver.shutdown(); @@ -400,7 +422,7 @@ private void checkWalConfiguration() throws IgniteCheckedException { decompressor.shutdown(); } catch (Exception e) { - U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e); + U.error(log, "Failed to gracefully close WAL segment: " + this.currHnd.fileIO, e); } } @@ -432,7 +454,7 @@ private void checkWalConfiguration() throws IgniteCheckedException { stop0(true); - currentHnd = null; + currHnd = null; } /** {@inheritDoc} */ @@ -448,22 +470,33 @@ private void checkWalConfiguration() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException { try { - assert currentHnd == null; + assert currHnd == null; assert lastPtr == null || lastPtr instanceof FileWALPointer; FileWALPointer filePtr = (FileWALPointer)lastPtr; - currentHnd = restoreWriteHandle(filePtr); + walWriter = new WALWriter(); + + if (!mmap) + walWriter.start(); + + currHnd = restoreWriteHandle(filePtr); + + // For new handle write serializer version to it. + if (filePtr == null) + currHnd.writeHeader(); - if (currentHnd.serializer.version() != serializer.version()) { + if (currHnd.serializer.version() != serializer.version()) { if (log.isInfoEnabled()) log.info("Record serializer version change detected, will start logging with a new WAL record " + - "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + - ", oldVer=" + currentHnd.serializer.version() + ']'); + "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() + + ", oldVer=" + currHnd.serializer.version() + ']'); - rollOver(currentHnd); + rollOver(currHnd); } + currHnd.resume = false; + if (mode == WALMode.BACKGROUND) { backgroundFlushSchedule = cctx.time().schedule(new Runnable() { @Override public void run() { @@ -481,8 +514,8 @@ private void checkWalConfiguration() throws IgniteCheckedException { } /** - * Schedules next check of inactivity period expired. Based on current record update timestamp. - * At timeout method does check of inactivity period and schedules new launch. + * Schedules next check of inactivity period expired. Based on current record update timestamp. At timeout method + * does check of inactivity period and schedules new launch. */ private void scheduleNextInactivityPeriodElapsedCheck() { final long lastRecMs = lastRecordLoggedMs.get(); @@ -518,13 +551,12 @@ private void scheduleNextInactivityPeriodElapsedCheck() { * @return Latest serializer version. */ public int serializerVersion() { - return serializerVersion; + return serializerVer; } /** - * Checks if there was elapsed significant period of inactivity. - * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate - * roll over by timeout
+ * Checks if there was elapsed significant period of inactivity. If WAL auto-archive is enabled using + * {@link #walAutoArchiveAfterInactivity} > 0 this method will activate roll over by timeout.
*/ private void checkWalRolloverRequiredDuringInactivityPeriod() { if (walAutoArchiveAfterInactivity <= 0) @@ -546,6 +578,8 @@ private void checkWalRolloverRequiredDuringInactivityPeriod() { final FileWriteHandle handle = currentHandle(); try { + handle.buf.close(); + rollOver(handle); } catch (IgniteCheckedException e) { @@ -556,7 +590,7 @@ private void checkWalRolloverRequiredDuringInactivityPeriod() { /** {@inheritDoc} */ @SuppressWarnings("TooBroadScope") - @Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException { + @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException, StorageException { if (serializer == null || mode == WALMode.NONE) return null; @@ -567,10 +601,10 @@ private void checkWalRolloverRequiredDuringInactivityPeriod() { return null; // Need to calculate record size first. - record.size(serializer.size(record)); + rec.size(serializer.size(rec)); for (; ; currWrHandle = rollOver(currWrHandle)) { - WALPointer ptr = currWrHandle.addRecord(record); + WALPointer ptr = currWrHandle.addRecord(rec); if (ptr != null) { metrics.onWalRecordLogged(); @@ -603,13 +637,11 @@ private void checkWalRolloverRequiredDuringInactivityPeriod() { FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr); - boolean forceFlush = filePtr != null && filePtr.forceFlush(); - - if (mode == WALMode.BACKGROUND && !forceFlush) + if (mode == WALMode.BACKGROUND) return; - if (mode == WALMode.LOG_ONLY || forceFlush) { - cur.flushOrWait(filePtr, false); + if (mode == LOG_ONLY) { + cur.flushOrWait(filePtr); return; } @@ -618,12 +650,11 @@ private void checkWalRolloverRequiredDuringInactivityPeriod() { if (filePtr != null && !cur.needFsync(filePtr)) return; - cur.fsync(filePtr, false); + cur.fsync(filePtr); } /** {@inheritDoc} */ - @Override public WALIterator replay(WALPointer start) - throws IgniteCheckedException, StorageException { + @Override public WALIterator replay(WALPointer start) throws IgniteCheckedException, StorageException { assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start; FileWriteHandle hnd = currentHandle(); @@ -704,7 +735,7 @@ private boolean hasIndex(long absIdx) { if (absIdx <= lastArchivedIndex()) return false; - FileWriteHandle cur = currentHnd; + FileWriteHandle cur = currHnd; return cur != null && cur.idx >= absIdx; } @@ -869,7 +900,7 @@ private File initDirectory(String cfg, String defDir, String consId, String msg) * @return Current log segment handle. */ private FileWriteHandle currentHandle() { - return currentHnd; + return currHnd; } /** @@ -883,9 +914,11 @@ private FileWriteHandle rollOver(FileWriteHandle cur) throws StorageException, I return hnd; if (hnd.close(true)) { - FileWriteHandle next = initNextWriteHandle(cur.idx); + FileWriteHandle next = initNextWriteHandle(cur); + + next.writeHeader(); - boolean swapped = currentHndUpd.compareAndSet(this, hnd, next); + boolean swapped = CURR_HND_UPD.compareAndSet(this, hnd, next); assert swapped : "Concurrent updates on rollover are not allowed"; @@ -913,14 +946,14 @@ private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws Ig File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo)); - int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); + int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); int len = lastReadPtr == null ? 0 : lastReadPtr.length(); try { FileIO fileIO = ioFactory.create(curFile); try { - int serVer = serializerVersion; + int serVer = serializerVer; // If we have existing segment, try to read version from it. if (lastReadPtr != null) { @@ -928,7 +961,7 @@ private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws Ig serVer = readSerializerVersionAndCompactedFlag(fileIO).get1(); } catch (SegmentEofException | EOFException ignore) { - serVer = serializerVersion; + serVer = serializerVer; } } @@ -936,19 +969,34 @@ private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws Ig if (log.isInfoEnabled()) log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + - ", offset=" + offset + ", ver=" + serVer + ']'); + ", offset=" + off + ", ver=" + serVer + ']'); + + SegmentedRingByteBuffer rbuf; + + if (mmap) { + try { + MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); + + rbuf = new SegmentedRingByteBuffer(buf, metrics); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + else + rbuf = new SegmentedRingByteBuffer(dsCfg.getWalBufferSize(), maxWalSegmentSize, DIRECT, metrics); + + if (lastReadPtr != null) + rbuf.init(lastReadPtr.fileOffset() + lastReadPtr.length()); FileWriteHandle hnd = new FileWriteHandle( fileIO, absIdx, cctx.igniteInstanceName(), - offset + len, - maxWalSegmentSize, - ser); - - // For new handle write serializer version to it. - if (lastReadPtr == null) - hnd.writeSerializerVersion(); + off + len, + true, + ser, + rbuf); archiver.currentWalIndex(absIdx); @@ -966,33 +1014,46 @@ private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws Ig } /** - * Fills the file header for a new segment. - * Calling this method signals we are done with the segment and it can be archived. - * If we don't have prepared file yet and achiever is busy this method blocks + * Fills the file header for a new segment. Calling this method signals we are done with the segment and it can be + * archived. If we don't have prepared file yet and achiever is busy this method blocks * - * @param curIdx current absolute segment released by WAL writer + * @param cur Current file write handle released by WAL writer * @return Initialized file handle. * @throws StorageException If IO exception occurred. * @throws IgniteCheckedException If failed. */ - private FileWriteHandle initNextWriteHandle(long curIdx) throws StorageException, IgniteCheckedException { + private FileWriteHandle initNextWriteHandle(FileWriteHandle cur) throws StorageException, IgniteCheckedException { try { - File nextFile = pollNextFile(curIdx); + File nextFile = pollNextFile(cur.idx); if (log.isDebugEnabled()) log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); FileIO fileIO = ioFactory.create(nextFile); + SegmentedRingByteBuffer rbuf; + + if (mmap) { + try { + MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); + + rbuf = new SegmentedRingByteBuffer(buf, metrics); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + else + rbuf = cur.buf.reset(); + FileWriteHandle hnd = new FileWriteHandle( fileIO, - curIdx + 1, + cur.idx + 1, cctx.igniteInstanceName(), 0, - maxWalSegmentSize, - serializer); - - hnd.writeSerializerVersion(); + false, + serializer, + rbuf); return hnd; } @@ -1094,8 +1155,7 @@ private void createFile(File file) throws IgniteCheckedException { } /** - * Retrieves next available file to write WAL data, waiting - * if necessary for a segment to become available. + * Retrieves next available file to write WAL data, waiting if necessary for a segment to become available. * * @param curIdx Current absolute WAL segment index. * @return File ready for use as new WAL segment. @@ -1137,32 +1197,28 @@ public static FileDescriptor[] scan(File[] allFiles) { private void checkEnvironment() throws StorageException { if (envFailed != null) throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " + - "previous error)", envFailed); + "previous error)", envFailed); } /** - * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate - * the work WAL segment: S(N) = N % dsCfg.walSegments. - * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment - * is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow - * write to S(W) if W - A <= walSegments.
+ * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate the + * work WAL segment: S(N) = N % dsCfg.walSegments. When a work segment is finished, it is given to the archiver. If + * the absolute index of last archived segment is denoted by A and the absolute index of next segment we want to + * write is denoted by W, then we can allow write to S(W) if W - A <= walSegments.
* - * Monitor of current object is used for notify on: - *