From 659cd31cce85aebdc097c20f807d5b1cd4ef476b Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Fri, 14 Dec 2018 12:01:19 +0300 Subject: [PATCH] IGNITE-10680 Add the ability to use existing kernel context in standalone WAL iterator --- .../internal/MarshallerContextImpl.java | 8 +-- .../wal/reader/IgniteWalIteratorFactory.java | 26 ++++++++- .../reader/StandaloneWalRecordsIterator.java | 14 ++--- .../db/wal/reader/IgniteWalReaderTest.java | 56 +++++++++++++++++++ 4 files changed, 91 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 9bad1eacb772e..7d5bbda4100ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -576,10 +576,10 @@ public Iterator>> currentMappings() { } /** - * @return custom marshaller mapping files directory. Used for standalone WAL iteration + * @return {@code True} if marshaller context is initialized. */ - @Nullable public File getMarshallerMappingFileStoreDir() { - return marshallerMappingFileStoreDir; + public boolean initialized() { + return fileStore != null; } /** @@ -656,4 +656,4 @@ static final class CombinedMap extends AbstractMap return userMap.containsKey(key) || sysMap.containsKey(key); } } -} \ No newline at end of file +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 697ddeda4d052..6e5759dd88f5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -174,7 +174,8 @@ public WALIterator iterator( iteratorParametersBuilder.validate(); return new StandaloneWalRecordsIterator(log, - prepareSharedCtx(iteratorParametersBuilder), + iteratorParametersBuilder.sharedCtx == null ? prepareSharedCtx(iteratorParametersBuilder) : + iteratorParametersBuilder.sharedCtx, iteratorParametersBuilder.ioFactory, resolveWalFiles(iteratorParametersBuilder), iteratorParametersBuilder.filter, @@ -410,6 +411,14 @@ public static class IteratorParametersBuilder { */ @Nullable private File marshallerMappingFileStoreDir; + /** + * Cache shared context. In case context is specified binary objects converting and unmarshalling will be + * performed using processors of this shared context. + *
This field can't be specified together with {@link #binaryMetadataFileStoreDir} or + * {@link #marshallerMappingFileStoreDir} fields. + * */ + @Nullable private GridCacheSharedContext sharedCtx; + /** */ @Nullable private IgniteBiPredicate filter; @@ -508,6 +517,16 @@ public IteratorParametersBuilder marshallerMappingFileStoreDir(File marshallerMa return this; } + /** + * @param sharedCtx Cache shared context. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder sharedContext(GridCacheSharedContext sharedCtx) { + this.sharedCtx = sharedCtx; + + return this; + } + /** * @param filter Record filter for skip records during iteration. * @return IteratorParametersBuilder Self reference. @@ -562,6 +581,7 @@ public IteratorParametersBuilder copy() { .ioFactory(ioFactory) .binaryMetadataFileStoreDir(binaryMetadataFileStoreDir) .marshallerMappingFileStoreDir(marshallerMappingFileStoreDir) + .sharedContext(sharedCtx) .from(lowBound) .to(highBound) .filter(filter) @@ -576,6 +596,10 @@ public void validate() throws IllegalArgumentException { A.ensure(U.isPow2(pageSize), "Page size must be a power of 2."); A.ensure(bufferSize >= pageSize * 2, "Buffer to small."); + + A.ensure(sharedCtx == null || (binaryMetadataFileStoreDir == null && + marshallerMappingFileStoreDir == null), "GridCacheSharedContext and binaryMetadataFileStoreDir/" + + "marshallerMappingFileStoreDir can't be specified in the same time"); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 21275d8cd3d8d..00495e135e5a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.reader; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -440,8 +439,7 @@ private boolean checkBounds(long idx) { final KeyCacheObject key; final CacheObject val; - final File marshallerMappingFileStoreDir = - fakeCacheObjCtx.kernalContext().marshallerContext().getMarshallerMappingFileStoreDir(); + boolean keepBinary = this.keepBinary || !fakeCacheObjCtx.kernalContext().marshallerContext().initialized(); if (dataEntry instanceof LazyDataEntry) { final LazyDataEntry lazyDataEntry = (LazyDataEntry)dataEntry; @@ -462,7 +460,7 @@ private boolean checkBounds(long idx) { val = dataEntry.value(); } - return unwrapDataEntry(fakeCacheObjCtx, dataEntry, key, val, marshallerMappingFileStoreDir); + return unwrapDataEntry(fakeCacheObjCtx, dataEntry, key, val, keepBinary); } /** @@ -471,11 +469,11 @@ private boolean checkBounds(long idx) { * @param dataEntry Data entry. * @param key Entry key. * @param val Entry value. - * @param marshallerMappingFileStoreDir Marshaller directory. + * @param keepBinary Don't convert non primitive types. * @return Unwrapped entry. */ private @NotNull DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry, - KeyCacheObject key, CacheObject val, File marshallerMappingFileStoreDir) { + KeyCacheObject key, CacheObject val, boolean keepBinary) { if (dataEntry instanceof MvccDataEntry) return new UnwrapMvccDataEntry( dataEntry.cacheId(), @@ -489,7 +487,7 @@ private boolean checkBounds(long idx) { dataEntry.partitionCounter(), ((MvccDataEntry)dataEntry).mvccVer(), coCtx, - keepBinary || marshallerMappingFileStoreDir == null); + keepBinary); else return new UnwrapDataEntry( dataEntry.cacheId(), @@ -502,7 +500,7 @@ private boolean checkBounds(long idx) { dataEntry.partitionId(), dataEntry.partitionCounter(), coCtx, - keepBinary || marshallerMappingFileStoreDir == null); + keepBinary); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java index 9dfd23288834d..d60a8bade776b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -55,6 +56,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.events.WalSegmentArchivedEvent; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; @@ -821,6 +823,60 @@ public void testReadEmptyWal() throws Exception { ); } + /** + * Tests WAL iterator which uses shared cache context of currently started Ignite node. + */ + @Test + public void testIteratorWithCurrentKernelContext() throws Exception { + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + int cntEntries = 100; + + putDummyRecords(ignite, cntEntries); + + String workDir = U.defaultWorkDirectory(); + + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); + + IteratorParametersBuilder iterParametersBuilder = + createIteratorParametersBuilder(workDir, genDbSubfolderName(ignite, 0)) + .filesOrDirs(workDir) + .binaryMetadataFileStoreDir(null) + .marshallerMappingFileStoreDir(null) + .sharedContext(ignite.context().cache().context()); + + AtomicInteger cnt = new AtomicInteger(); + + IgniteBiInClosure objConsumer = (key, val) -> { + if (val instanceof IndexedObject) { + assertEquals(key, ((IndexedObject)val).iVal); + assertEquals(key, cnt.getAndIncrement()); + } + }; + + iterateAndCountDataRecord(factory.iterator(iterParametersBuilder.copy()), objConsumer, null); + + assertEquals(cntEntries, cnt.get()); + + // Test without converting non primary types. + iterParametersBuilder.keepBinary(true); + + cnt.set(0); + + IgniteBiInClosure binObjConsumer = (key, val) -> { + if (val instanceof BinaryObject) { + assertEquals(key, ((BinaryObject)val).field("iVal")); + assertEquals(key, cnt.getAndIncrement()); + } + }; + + iterateAndCountDataRecord(factory.iterator(iterParametersBuilder.copy()), binObjConsumer, null); + + assertEquals(cntEntries, cnt.get()); + } + /** * Creates and fills cache with data. *