From b5ea6e1b5b072995f8e97a19183ebebfbae6cf20 Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Thu, 30 Aug 2018 19:00:08 +0300 Subject: [PATCH 01/10] IGNITE-8149 Cache.size is updated on MVCC transaction commit --- .../communication/GridIoMessageFactory.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 6 + .../cache/IgniteCacheOffheapManager.java | 8 + .../cache/IgniteCacheOffheapManagerImpl.java | 54 ++-- .../GridDistributedTxRemoteAdapter.java | 48 ++- .../dht/GridDhtTxAbstractEnlistFuture.java | 2 +- .../dht/GridDhtTxFinishFuture.java | 6 +- .../dht/GridDhtTxFinishRequest.java | 14 +- .../dht/GridDhtTxLocalAdapter.java | 47 ++- .../distributed/dht/GridDhtTxRemote.java | 12 - ...sMap.java => PartitionUpdateCounters.java} | 10 +- .../persistence/GridCacheOffheapManager.java | 13 + .../cache/transactions/IgniteInternalTx.java | 7 +- .../cache/transactions/IgniteTxAdapter.java | 48 ++- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 108 +++---- .../cache/transactions/IgniteTxLocalEx.java | 8 + .../transactions/IgniteTxLocalState.java | 16 + .../IgniteTxLocalStateAdapter.java | 23 ++ .../cache/transactions/IgniteTxRemoteEx.java | 12 - .../cache/transactions/TxCounters.java | 82 +++++ .../cache/mvcc/CacheMvccSizeTest.java | 283 ++++++++++++++++++ .../IgniteCacheMvccSqlTestSuite.java | 2 + 23 files changed, 644 insertions(+), 171 deletions(-) rename modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/{GridDhtPartitionsUpdateCountersMap.java => PartitionUpdateCounters.java} (91%) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java create mode 100644 modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 8dddd8ba2ece5..536e7b8a106e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -68,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; @@ -78,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; @@ -1055,7 +1055,7 @@ public GridIoMessageFactory(MessageFactory[] ext) { break; case 157: - msg = new GridDhtPartitionsUpdateCountersMap(); + msg = new PartitionUpdateCounters(); break; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 249fbb3a14ec7..0da41dab9fe37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1122,6 +1122,9 @@ else if (op == CREATE && tx.local() && (res.resultType() == ResultType.PREV_NOT_ if (tx.local()) updateCntr = nextMvccPartitionCounter(); + if (res.resultType() == ResultType.PREV_NULL) + tx.txCounters().accumulateSizeDelta(cctx.cacheId(), partition(), 1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), @@ -1217,6 +1220,9 @@ else if (res.resultType() == ResultType.LOCKED) { if (tx.local()) updateCntr = nextMvccPartitionCounter(); + if (res.resultType() == ResultType.PREV_NOT_NULL) + tx.txCounters().accumulateSizeDelta(cctx.cacheId(), partition(), -1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = logTxUpdate(tx, null, 0, updateCntr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index a0213940de37e..fdf42febde2a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -619,6 +619,14 @@ interface CacheDataStore { */ long fullSize(); + /** + * Updates size metric for particular cache. + * + * @param cacheId Cache ID. + * @param delta Size delta. + */ + void updateSize(int cacheId, long delta); + /** * @return Update counter. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 2fe097c3c5865..e9f9ee5232ecb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1449,36 +1449,14 @@ public CacheDataStoreImpl( * @param cacheId Cache ID. */ void incrementSize(int cacheId) { - storageSize.incrementAndGet(); - - if (grp.sharedGroup()) { - AtomicLong size = cacheSizes.get(cacheId); - - if (size == null) { - AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = new AtomicLong()); - - if (old != null) - size = old; - } - - size.incrementAndGet(); - } + updateSize(cacheId, 1); } /** * @param cacheId Cache ID. */ void decrementSize(int cacheId) { - storageSize.decrementAndGet(); - - if (grp.sharedGroup()) { - AtomicLong size = cacheSizes.get(cacheId); - - if (size == null) - return; - - size.decrementAndGet(); - } + updateSize(cacheId, -1); } /** {@inheritDoc} */ @@ -1515,6 +1493,24 @@ void decrementSize(int cacheId) { return storageSize.get(); } + /** {@inheritDoc} */ + @Override public void updateSize(int cacheId, long delta) { + storageSize.addAndGet(delta); + + if (grp.sharedGroup()) { + AtomicLong size = cacheSizes.get(cacheId); + + if (size == null) { + AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = new AtomicLong()); + + if (old != null) + size = old; + } + + size.addAndGet(delta); + } + } + /** {@inheritDoc} */ @Override public long nextUpdateCounter() { return cntr.incrementAndGet(); @@ -1936,8 +1932,6 @@ else if (res == ResultType.PREV_NULL && noCreate) { assert !old; - incrementSize(cctx.cacheId()); - GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr.enabled()) @@ -2267,11 +2261,13 @@ else if (res == ResultType.PREV_NOT_NULL) { rowStore.removeRow(row.link()); - decrementSize(cctx.cacheId()); - if (first) first = false; } + + // first == true means there were no row versions + if (!first) + decrementSize(cctx.cacheId()); } /** {@inheritDoc} */ @@ -2301,8 +2297,6 @@ else if (res == ResultType.PREV_NOT_NULL) { rowStore.removeRow(cleanupRow.link()); - decrementSize(cctx.cacheId()); - res++; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index dd6ea48e1b33c..8ae0c81982dec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -51,7 +51,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -807,7 +808,7 @@ else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or in } } - updateLocalCounters(); + applyTxCounters(); if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) { // Set new update counters for data entries received from persisted tx entries. @@ -845,29 +846,32 @@ else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or in } } - /** - * Applies update counters to the local partitions. - */ - private void updateLocalCounters() { - Map updCntrsMap = updateCountersMap(); + /** {@inheritDoc} */ + @Override protected void applyTxCounters() { + super.applyTxCounters(); + + Map updCntrs = txCounters().updateCounters(); - if (F.isEmpty(updCntrsMap)) + if (F.isEmpty(updCntrs)) return; - for (Map.Entry entry : updCntrsMap.entrySet()) { - GridCacheContext cacheCtx = cctx.cacheContext(entry.getKey()); + for (Map.Entry entry : updCntrs.entrySet()) { + int cacheId = entry.getKey(); - GridDhtPartitionsUpdateCountersMap cacheUpdCntrs = entry.getValue(); + GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology(); - assert cacheUpdCntrs != null && !F.isEmpty(cacheUpdCntrs.updateCounters()); + Map cacheUpdCntrs = entry.getValue().updateCounters(); - for (Map.Entry e : cacheUpdCntrs.updateCounters().entrySet()) { - Long updCntr = e.getValue(); - GridDhtLocalPartition part = cacheCtx.topology().localPartition(e.getKey()); + assert cacheUpdCntrs != null; - assert part != null && updCntr != null && updCntr > 0; + for (Map.Entry e : cacheUpdCntrs.entrySet()) { + long updCntr = e.getValue(); - part.updateCounter(updCntr); + GridDhtLocalPartition dhtPart = top.localPartition(e.getKey()); + + assert dhtPart != null && updCntr > 0; + + dhtPart.updateCounter(updCntr); } } } @@ -1022,16 +1026,6 @@ protected void addExplicit(IgniteTxEntry e) { } } - /** {@inheritDoc} */ - @Override public void updateCountersMap(Map updCntrsMap) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Map updateCountersMap() { - return null; - } - /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index bb11df5590c62..a3471c7e3d13a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -593,7 +593,7 @@ private void addToBatch(KeyCacheObject key, CacheObject val, List updCntrsMap = null; + Map updCntrsForNode = null; if (dhtMapping.queryUpdate() && commit) - updCntrsMap = tx.updateCountersForNode(n); + updCntrsForNode = tx.filterUpdateCountersForBackupNode(n); GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), @@ -489,7 +489,7 @@ private boolean finish(boolean commit, false, false, mvccSnapshot, - updCntrsMap); + updCntrsForNode); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 2696c0e0621d4..6d2b9f94cb166 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -73,9 +73,10 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** */ private MvccSnapshot mvccSnapshot; - /** Map of update counters made by this tx. Mapping: cacheId -> partId -> updCntr. */ - @GridDirectMap(keyType = Integer.class, valueType = GridDhtPartitionsUpdateCountersMap.class) - private Map updCntrs; + /** */ + @GridDirectMap(keyType = Integer.class, valueType = PartitionUpdateCounters.class) + private Map updCntrs; + /** * Empty constructor required for {@link Externalizable}. */ @@ -220,7 +221,7 @@ public GridDhtTxFinishRequest( boolean retVal, boolean waitRemoteTxs, MvccSnapshot mvccSnapshot, - Map updCntrs + Map updCntrs ) { this(nearNodeId, futId, @@ -362,10 +363,11 @@ public boolean needReturnValue() { public void needReturnValue(boolean retVal) { setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK); } + /** - * @return Partition update counters map. + * @return Partition counters update deferred until transaction commit. */ - public Map updateCountersMap() { + public Map updateCounters() { return updCntrs; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 11b46a0f2fab8..79e860dd344ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -20,6 +20,7 @@ import java.io.Externalizable; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -31,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -907,7 +909,7 @@ public GridFutureAdapter finishFuture(GridFutureAdapter f, Throwable e * * @return Current lock future or null if it's safe to roll back. */ - public @Nullable IgniteInternalFuture tryRollbackAsync() { + @Nullable public IgniteInternalFuture tryRollbackAsync() { while (true) { final IgniteInternalFuture fut = lockFut; @@ -937,6 +939,49 @@ protected final IgniteInternalFuture chainOnePhasePre return prepFut; } + /** + * @return Partition counters map for the given backup node. + */ + public Map filterUpdateCountersForBackupNode(ClusterNode node) { + Map updCntrs = txCounters().updateCounters(); + + if (F.isEmpty(updCntrs)) + return null; + + Map res = new HashMap<>(); + + AffinityTopologyVersion top = topologyVersionSnapshot(); + + for (Map.Entry entry : updCntrs.entrySet()) { + Integer cacheId = entry.getKey(); + + Map partsCntrs = entry.getValue().updateCounters(); + + assert !F.isEmpty(partsCntrs); + + GridCacheAffinityManager affinity = cctx.cacheContext(cacheId).affinity(); + + PartitionUpdateCounters resBackupUpdates = new PartitionUpdateCounters(); + + for (Map.Entry e : partsCntrs.entrySet()) { + Integer p = e.getKey(); + + Long cntr = e.getValue(); + + if (affinity.backupByPartition(node, p, top)) { + assert cntr != null && cntr > 0 : cntr; + + resBackupUpdates.updateCounters().put(p, cntr); + } + } + + if (!resBackupUpdates.updateCounters().isEmpty()) + res.put(cacheId, resBackupUpdates); + } + + return res; + } + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 08ecf28e3a3b0..9a1763be4051f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -79,8 +79,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { /** Store write through flag. */ private boolean storeWriteThrough; - /** Map of update counters made by this tx. Mapping: cacheId -> partId -> updCntr. */ - private Map updCntrs; /** * Empty constructor required for {@link Externalizable}. */ @@ -502,16 +500,6 @@ public void mvccEnlistBatch(GridCacheContext ctx, EnlistOperation op, List updCntrsMap) { - this.updCntrs = updCntrsMap; - } - - /** {@inheritDoc} */ - @Override public Map updateCountersMap() { - return updCntrs; - } - /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxRemote.class, this, "super", super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java similarity index 91% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java index 7a345d13dfbba..772bec3463949 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java @@ -27,11 +27,11 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** - * Partitions update counters message. + * Partition update counters message. */ -public class GridDhtPartitionsUpdateCountersMap implements Message { +public class PartitionUpdateCounters implements Message { /** */ - private static final long serialVersionUID = -4599730112233297219L; + private static final long serialVersionUID = 193442457510062844L; /** Map of update counters made by this tx. Mapping: partId -> updCntr. */ @GridDirectMap(keyType = Integer.class, valueType = Long.class) @@ -40,7 +40,7 @@ public class GridDhtPartitionsUpdateCountersMap implements Message { /** * */ - public GridDhtPartitionsUpdateCountersMap() { + public PartitionUpdateCounters() { updCntrs = new HashMap<>(); } @@ -99,7 +99,7 @@ public void updateCounters(Map updCntrs) { } - return reader.afterMessageRead(GridDhtPartitionsUpdateCountersMap.class); + return reader.afterMessageRead(PartitionUpdateCounters.class); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 4c45352ce5585..3676d3b78987b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1552,6 +1552,19 @@ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { } } + /** {@inheritDoc} */ + @Override public void updateSize(int cacheId, long delta) { + try { + CacheDataStore delegate0 = init0(false); + + if (delegate0 != null) + delegate0.updateSize(cacheId, delta); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + /** {@inheritDoc} */ @Override public long updateCounter() { try { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 4acf078bc8038..be625c83518f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -650,4 +650,9 @@ public void completedVersions(GridCacheVersion base, * @return Mvcc snapshot. */ public MvccSnapshot mvccSnapshot(); -} \ No newline at end of file + + /** + * @return Transaction counters. + */ + public TxCounters txCounters(); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index ee5a58ee49129..1711652b75eb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; @@ -62,10 +63,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -275,6 +278,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @GridToStringExclude private volatile IgniteInternalFuture rollbackFut; + /** */ + private TxCounters txCounters = new TxCounters(); + /** * Empty constructor required for {@link Externalizable}. */ @@ -2014,6 +2020,41 @@ protected Object readResolve() throws ObjectStreamException { */ public abstract void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException; + /** {@inheritDoc} */ + @Override public TxCounters txCounters() { + return txCounters; + } + + /** + * Make counters accumulated during transaction visible outside of transaciton. + */ + protected void applyTxCounters() { + Map> sizeDeltas = txCounters.sizeDeltas(); + + if (F.isEmpty(sizeDeltas)) + return; + + for (Map.Entry> entry : sizeDeltas.entrySet()) { + Integer cacheId = entry.getKey(); + Map partDeltas = entry.getValue(); + + assert !F.isEmpty(partDeltas); + + GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology(); + + for (Map.Entry e : partDeltas.entrySet()) { + Integer p = e.getKey(); + long delta = e.getValue().get(); + + GridDhtLocalPartition dhtPart = top.localPartition(p); + + assert dhtPart != null; + + dhtPart.dataStore().updateSize(cacheId, delta); + } + } + } + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxAdapter.class, this, @@ -2286,6 +2327,11 @@ private static class TxShadow implements IgniteInternalTx { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } + /** {@inheritDoc} */ + @Override public TxCounters txCounters() { + return null; + } + /** {@inheritDoc} */ @Override public IgniteTxState txState() { return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 7541b433ec478..03198bf5e52c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1391,7 +1391,7 @@ else if (log.isDebugEnabled()) tx.setPartitionUpdateCounters( req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null); - tx.updateCountersMap(req.updateCountersMap()); + tx.txCounters().updateCounters(req.updateCounters()); tx.commitRemoteTx(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b86273f9f1a6e..abe3c7e7c889e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.Duration; @@ -32,7 +31,6 @@ import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteInternalFuture; @@ -59,7 +57,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.StorageException; @@ -162,9 +160,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** */ private GridLongList mvccWaitTxs; - /** Update counters map */ - private Map> updCntrs; - /** */ private volatile boolean qryEnlisted; @@ -918,7 +913,7 @@ else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or in } } - updateLocalPartitionCounters(); + applyTxCounters(); if (ptr != null && !cctx.tm().logTxRecords()) cctx.wal().flush(ptr, false); @@ -1562,6 +1557,11 @@ public Map> partsMap() { return null; } + /** {@inheritDoc} */ + @Override public void touchPartition(int cacheId, int partId) { + txState.touchPartition(cacheId, partId); + } + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, "super", super.toString(), @@ -1644,74 +1644,28 @@ public long entryExpireTime(IgniteTxKey key) { } /** - * @return Partition counters map for the given backup node. + * Merges mvcc update counters to the partition update counters. For mvcc transactions we update partitions + * counters only on commit phase. */ - public Map updateCountersForNode(ClusterNode node) { - if (F.isEmpty(updCntrs)) + private Map applyAndCollectLocalUpdateCounters() { + if (F.isEmpty(txState.touchedPartitions())) return null; - Map res = new HashMap<>(); - - for (Map.Entry> entry : updCntrs.entrySet()) { - Map partsCntrs = entry.getValue(); - - assert !F.isEmpty(partsCntrs); - - GridCacheContext ctx0 = cctx.cacheContext(entry.getKey()); - - GridDhtPartitionsUpdateCountersMap resBackupCntrs = new GridDhtPartitionsUpdateCountersMap(); + HashMap updCntrs = new HashMap<>(); - for (Map.Entry e : partsCntrs.entrySet()) { - Long cntr = partsCntrs.get(e.getKey()); + for (Map.Entry> entry : txState.touchedPartitions().entrySet()) { + Integer cacheId = entry.getKey(); - if (ctx0.affinity().backupByPartition(node, e.getKey(), topologyVersionSnapshot())) { - assert cntr != null && cntr > 0 : cntr; + Set parts = entry.getValue(); - resBackupCntrs.updateCounters().put(e.getKey(), cntr); - } - } - - if (!resBackupCntrs.updateCounters().isEmpty()) - res.put(entry.getKey(), resBackupCntrs); - } - - return res; - } + assert !F.isEmpty(parts); - /** - * @param cacheId Cache id. - * @param part Partition id. - */ - @SuppressWarnings("Java8MapApi") - public void addPartitionCountersMapping(Integer cacheId, Integer part) { - if (updCntrs == null) - updCntrs = new ConcurrentHashMap<>(); + GridCacheContext ctx0 = cctx.cacheContext(cacheId); - Map partUpdCntrs = updCntrs.get(cacheId); + HashMap partCntrs = new HashMap<>(); - if (partUpdCntrs == null) - updCntrs.put(cacheId, partUpdCntrs = new ConcurrentHashMap<>()); - - partUpdCntrs.put(part, 0L); - } - - /** - * Merges mvcc update counters to the partition update counters. For mvcc transactions we update partitions - * counters only on commit phase. - */ - private void updateLocalPartitionCounters() { - if (F.isEmpty(updCntrs)) - return; - - for (Map.Entry> entry : updCntrs.entrySet()) { - Map partsCntrs = entry.getValue(); - - assert !F.isEmpty(partsCntrs); - - GridCacheContext ctx0 = cctx.cacheContext(entry.getKey()); - - for (Map.Entry e : partsCntrs.entrySet()) { - GridDhtLocalPartition dhtPart = ctx0.topology().localPartition(e.getKey()); + for (int p : parts) { + GridDhtLocalPartition dhtPart = ctx0.topology().localPartition(p); assert dhtPart != null; @@ -1719,11 +1673,17 @@ private void updateLocalPartitionCounters() { dhtPart.updateCounter(cntr); - Long prev = partsCntrs.put(e.getKey(), cntr); - - assert prev == 0L : prev; + partCntrs.put(p, cntr); } + + PartitionUpdateCounters pc = new PartitionUpdateCounters(); + + pc.updateCounters(partCntrs); + + updCntrs.put(cacheId, pc); } + + return updCntrs; } /** @@ -1747,6 +1707,16 @@ public void markQueryEnlisted(MvccSnapshot ver) { } } + /** {@inheritDoc} */ + @Override protected void applyTxCounters() { + super.applyTxCounters(); + + Map updCntrs = applyAndCollectLocalUpdateCounters(); + + // remember counters for subsequent sending to backups + txCounters().updateCounters(updCntrs); + } + /** * Post-lock closure alias. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index b61b1a9a629b5..651be603d37a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -56,4 +56,12 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @throws IgniteCheckedException If finish failed. */ public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException; + + /** + * Remembers that particular cache partition was touched by current tx. + * + * @param cacheId Cache id. + * @param partId Partition id. + */ + public void touchPartition(int cacheId, int partId); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java index 123d3964705b8..01eb4f4417877 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Map; +import java.util.Set; + /** * */ @@ -41,4 +44,17 @@ public interface IgniteTxLocalState extends IgniteTxState { * */ public void seal(); + + /** + * @return Cache partitions touched by current tx. + */ + public Map> touchedPartitions(); + + /** + * Remembers that particular cache partition was touched by current tx. + * + * @param cacheId Cache id. + * @param partId Partition id. + */ + public void touchPartition(int cacheId, int partId); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java index 4943aac863b24..588c0fb875ad0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.typedef.internal.U; @@ -25,6 +29,9 @@ * */ public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState { + /** */ + private volatile Map> touchedParts; + /** * @param cacheCtx Cache context. * @param tx Transaction. @@ -40,4 +47,20 @@ protected final void onTxEnd(GridCacheContext cacheCtx, IgniteInternalTx tx, boo cacheCtx.cache().metrics0().onTxRollback(durationNanos); } } + + /** {@inheritDoc} */ + @Override public Map> touchedPartitions() { + Map> parts = touchedParts; + + return parts != null ? Collections.unmodifiableMap(parts) : null; + } + + /** {@inheritDoc} */ + @Override public void touchPartition(int cacheId, int partId) { + if (touchedParts == null) + touchedParts = new ConcurrentHashMap<>(); + + touchedParts.computeIfAbsent(cacheId, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())) + .add(partId); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java index 1e0645ffb3ab5..87cc7ccea3635 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** @@ -52,14 +50,4 @@ public void doneRemote(GridCacheVersion baseVer, * @param cntrs Partition update indexes. */ public void setPartitionUpdateCounters(long[] cntrs); - - /** - * @param updCntrsMap Partition update counters map: cacheId -> partId -> updateCntr. - */ - public void updateCountersMap(Map updCntrsMap); - - /** - * @return Partition update counters map: cacheId -> partId -> updateCntr. - */ - public Map updateCountersMap(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java new file mode 100644 index 0000000000000..fbfca4dbb89c2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; + +/** + * Values which should be tracked during transaction execution and applied on commit. + */ +public class TxCounters { + /** Size changes for cache partitions made by transaction */ + private final ConcurrentMap> sizeDeltas = new ConcurrentHashMap<>(); + /** Update counters for cache partitions in the end of transaction */ + private Map updCntrs; + + /** + * Accumulates size change for cache partition. + * + * @param cacheId Cache id. + * @param part Partition id. + * @param delta Size delta. + */ + public void accumulateSizeDelta(int cacheId, int part, long delta) { + ConcurrentMap partDeltas = sizeDeltas.get(cacheId); + + if (partDeltas == null) { + ConcurrentMap partDeltas0 = + sizeDeltas.putIfAbsent(cacheId, partDeltas = new ConcurrentHashMap<>()); + + if (partDeltas0 != null) + partDeltas = partDeltas0; + } + + AtomicLong accDelta = partDeltas.get(part); + + if (accDelta == null) { + AtomicLong accDelta0 = partDeltas.putIfAbsent(part, accDelta = new AtomicLong()); + + if (accDelta0 != null) + accDelta = accDelta0; + } + + // here AtomicLong is used more as a container, + // every instance is assumed to be accessed in thread-confined manner + accDelta.set(accDelta.get() + delta); + } + + /** */ + public void updateCounters(Map updCntrs) { + this.updCntrs = updCntrs; + } + + /** */ + public Map updateCounters() { + return updCntrs != null ? Collections.unmodifiableMap(updCntrs) : null; + } + + /** */ + public Map> sizeDeltas() { + return Collections.unmodifiableMap(sizeDeltas); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java new file mode 100644 index 0000000000000..353184bf19716 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.stream.IntStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; + +import static org.apache.ignite.cache.CachePeekMode.BACKUP; + +/** + * + */ +public class CacheMvccSizeTest extends CacheMvccAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** + * @throws Exception If failed. + */ + public void testInsert() throws Exception { + IgniteEx ignite = startGrid(0); + IgniteCache personTbl = createTable(ignite); + + personTbl.query(q("begin")); + personTbl.query(q("insert into person values(1, 'a')")); + + assertEquals(0, personTbl.size()); + personTbl.query(q("commit")); + + assertEquals(1, personTbl.size()); + } + + /** + * @throws Exception If failed. + */ + public void testDelete() throws Exception { + IgniteEx ignite = startGrid(0); + IgniteCache personTbl = createTable(ignite); + personTbl.query(q("insert into person values(1, 'a')")); + + personTbl.query(q("begin")); + personTbl.query(q("delete from person")); + + assertEquals(1, personTbl.size()); + personTbl.query(q("commit")); + + assertEquals(0, personTbl.size()); + } + + /** + * @throws Exception If failed. + */ + public void testInsertMultipleKeys() throws Exception { + IgniteEx ignite = startGrid(0); + IgniteCache personTbl = createTable(ignite); + + personTbl.query(q("begin")); + personTbl.query(q("insert into person values(1, 'a')")); + personTbl.query(q("insert into person values(%d, 'b')", keyInSamePartition(ignite, "person", 1))); + personTbl.query(q("insert into person values(%d, 'c')", keyInDifferentPartition(ignite, "person", 1))); + + assertEquals(0, personTbl.size()); + personTbl.query(q("commit")); + + assertEquals(3, personTbl.size()); + } + + /** + * @throws Exception If failed. + */ + public void testInsertWithBackups() throws Exception { + startGridsMultiThreaded(2); + IgniteEx ignite = grid(0); + IgniteCache personTbl = createTable(ignite); + + personTbl.query(q("begin")); + personTbl.query(q("insert into person values(1, 'a')")); + + assertEquals(0, personTbl.size(BACKUP)); + personTbl.query(q("commit")); + + assertEquals(1, personTbl.size(BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testDeleteWithBackups() throws Exception { + startGridsMultiThreaded(2); + IgniteEx ignite = grid(0); + IgniteCache personTbl = createTable(ignite); + personTbl.query(q("insert into person values(1, 'a')")); + + personTbl.query(q("begin")); + personTbl.query(q("delete from person where id = 1")); + + assertEquals(1, personTbl.size(BACKUP)); + personTbl.query(q("commit")); + + assertEquals(0, personTbl.size(BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testInsertAndDeleteWithBackups() throws Exception { + startGridsMultiThreaded(2); + IgniteEx ignite = grid(0); + IgniteCache personTbl = createTable(ignite); + personTbl.query(q("insert into person values(1, 'a')")); + + personTbl.query(q("begin")); + personTbl.query(q("insert into person values(2, 'b')")); + personTbl.query(q("delete from person where id = 1")); + + assertEquals(1, personTbl.size()); + assertEquals(1, personTbl.size(BACKUP)); + personTbl.query(q("commit")); + + assertEquals(1, personTbl.size()); + assertEquals(1, personTbl.size(BACKUP)); + } + + /** + * @throws Exception if failed. + */ + public void testSizeIsNotChangedByReader() throws Exception { + IgniteEx ignite = startGrid(0); + + IgniteCache tbl = createTable(ignite); + + tbl.query(q("insert into person values(1, 'a')")); + + assertEquals(1, tbl.size()); + + tbl.query(q("select * from person")).getAll(); + + assertEquals(1, tbl.size()); + } + + /** + * @throws Exception if failed. + */ + public void testDataStreamerModifiesReplicatedCacheSize() throws Exception { + startGridsMultiThreaded(2); + + IgniteEx ignite = grid(0); + + ignite.createCache( + new CacheConfiguration<>("test") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setCacheMode(CacheMode.REPLICATED) + ); + + try (IgniteDataStreamer streamer = ignite.dataStreamer("test")) { + streamer.addData(1, "a"); + + streamer.addData(keyInDifferentPartition(ignite, "test", 1), "b"); + } + + assertEquals(2, ignite.cache("test").size()); + + assertEquals(1, grid(0).cache("test").localSize()); + assertEquals(1, grid(0).cache("test").localSize(BACKUP)); + assertEquals(1, grid(1).cache("test").localSize()); + assertEquals(1, grid(1).cache("test").localSize(BACKUP)); + } + + /** + * @throws Exception if failed. + */ + public void testSizeIsConsistentAfterRebalance() throws Exception { + IgniteEx ignite = startGrid(0); + + IgniteCache tbl = createTable(ignite); + + for (int i = 0; i < 100; i++) + tbl.query(q("insert into person values(?, ?)").setArgs(i, i)); + + startGrid(1); + + awaitPartitionMapExchange(); + + IgniteCache tbl0 = grid(0).cache("person"); + IgniteCache tbl1 = grid(1).cache("person"); + + assert tbl0.localSize() != 0 && tbl1.localSize() != 0; + + assertEquals(100, tbl1.size()); + assertEquals(100, tbl0.localSize() + tbl1.localSize()); + } + + /** + * @throws Exception If failed. + */ + public void testSizeIsConsistentAfterRebalanceDuringInsert() throws Exception { + IgniteEx ignite = startGrid(0); + + IgniteCache tbl = createTable(ignite); + + Future f = null; + + for (int i = 0; i < 100; i++) { + if (i == 50) + f = ForkJoinPool.commonPool().submit(() -> startGrid(1)); + + tbl.query(q("insert into person values(?, ?)").setArgs(i, i)); + } + + f.get(); + + awaitPartitionMapExchange(); + + IgniteCache tbl0 = grid(0).cache("person"); + IgniteCache tbl1 = grid(1).cache("person"); + + assert tbl0.localSize() != 0 && tbl1.localSize() != 0; + + assertEquals(100, tbl1.size()); + assertEquals(100, tbl0.localSize() + tbl1.localSize()); + } + + /** */ + private static IgniteCache createTable(IgniteEx ignite) { + IgniteCache sqlNexus = ignite.getOrCreateCache(new CacheConfiguration<>("sqlNexus").setSqlSchema("PUBLIC")); + sqlNexus.query(q("" + + "create table person(" + + " id int primary key," + + " name varchar" + + ") with \"atomicity=transactional,template=replicated,cache_name=person\"")); + assert ignite.cachex("person").configuration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL; + assert ignite.cachex("person").configuration().getCacheMode() == CacheMode.REPLICATED; + return ignite.cache("person"); + } + + /** */ + private static SqlFieldsQuery q(String fSql, Object... args) { + return new SqlFieldsQuery(String.format(fSql, args)); + } + + /** */ + private static int keyInSamePartition(Ignite ignite, String cacheName, int key) { + Affinity affinity = ignite.affinity(cacheName); + return IntStream.iterate(key + 1, i -> i + 1) + .filter(i -> affinity.partition(i) == affinity.partition(key)) + .findFirst().getAsInt(); + } + + /** */ + private static int keyInDifferentPartition(Ignite ignite, String cacheName, int key) { + Affinity affinity = ignite.affinity(cacheName); + return IntStream.iterate(key + 1, i -> i + 1) + .filter(i -> affinity.partition(i) != affinity.partition(key)) + .findFirst().getAsInt(); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java index a8087da077850..5e4c635b05590 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesWithReducerTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithConcurrentJdbcTransactionTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest; @@ -67,6 +68,7 @@ public static TestSuite suite() { suite.addTestSuite(CacheMvccBulkLoadTest.class); suite.addTestSuite(CacheMvccStreamingInsertTest.class); suite.addTestSuite(CacheMvccDmlSimpleTest.class); + suite.addTestSuite(CacheMvccSizeTest.class); return suite; } From 55c579ef9525569a611e877975b56026ddd5e660 Mon Sep 17 00:00:00 2001 From: Igor Seliverstov Date: Fri, 31 Aug 2018 12:39:20 +0300 Subject: [PATCH 02/10] minors --- .../cache/distributed/dht/GridDhtTxLocalAdapter.java | 9 +++++---- .../distributed/dht/PartitionUpdateCounters.java | 12 ++++++++---- .../cache/transactions/IgniteInternalTx.java | 2 +- .../cache/transactions/IgniteTxLocalAdapter.java | 10 +++------- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 79e860dd344ae..f94f2f6bc70fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -940,6 +940,7 @@ protected final IgniteInternalFuture chainOnePhasePre } /** + * @param node Backup node. * @return Partition counters map for the given backup node. */ public Map filterUpdateCountersForBackupNode(ClusterNode node) { @@ -961,7 +962,7 @@ public Map filterUpdateCountersForBackupNode(C GridCacheAffinityManager affinity = cctx.cacheContext(cacheId).affinity(); - PartitionUpdateCounters resBackupUpdates = new PartitionUpdateCounters(); + Map resCntrs = new HashMap<>(partsCntrs.size()); for (Map.Entry e : partsCntrs.entrySet()) { Integer p = e.getKey(); @@ -971,12 +972,12 @@ public Map filterUpdateCountersForBackupNode(C if (affinity.backupByPartition(node, p, top)) { assert cntr != null && cntr > 0 : cntr; - resBackupUpdates.updateCounters().put(p, cntr); + resCntrs.put(p, cntr); } } - if (!resBackupUpdates.updateCounters().isEmpty()) - res.put(cacheId, resBackupUpdates); + if (!resCntrs.isEmpty()) + res.put(cacheId, new PartitionUpdateCounters(resCntrs)); } return res; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java index 772bec3463949..54be2d682b945 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.plugin.extensions.communication.Message; @@ -37,11 +36,16 @@ public class PartitionUpdateCounters implements Message { @GridDirectMap(keyType = Integer.class, valueType = Long.class) private Map updCntrs; + /** */ + public PartitionUpdateCounters() { + // No-op. + } + /** - * + * @param updCntrs Update counters map. */ - public PartitionUpdateCounters() { - updCntrs = new HashMap<>(); + public PartitionUpdateCounters(Map updCntrs) { + this.updCntrs = updCntrs; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index be625c83518f5..c5f19ec89ad24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -655,4 +655,4 @@ public void completedVersions(GridCacheVersion base, * @return Transaction counters. */ public TxCounters txCounters(); -} +} \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index abe3c7e7c889e..5e5776c5a2f99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1662,9 +1662,9 @@ private Map applyAndCollectLocalUpdateCounters GridCacheContext ctx0 = cctx.cacheContext(cacheId); - HashMap partCntrs = new HashMap<>(); + Map partCntrs = new HashMap<>(parts.size()); - for (int p : parts) { + for (Integer p : parts) { GridDhtLocalPartition dhtPart = ctx0.topology().localPartition(p); assert dhtPart != null; @@ -1676,11 +1676,7 @@ private Map applyAndCollectLocalUpdateCounters partCntrs.put(p, cntr); } - PartitionUpdateCounters pc = new PartitionUpdateCounters(); - - pc.updateCounters(partCntrs); - - updCntrs.put(cacheId, pc); + updCntrs.put(cacheId, new PartitionUpdateCounters(partCntrs)); } return updCntrs; From 379584f6b7248867d5569bdab93aad785b49e049 Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Fri, 31 Aug 2018 13:16:46 +0300 Subject: [PATCH 03/10] remove extra space --- .../cache/distributed/dht/PartitionUpdateCounters.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java index 54be2d682b945..5b1eccdce033c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java @@ -34,7 +34,7 @@ public class PartitionUpdateCounters implements Message { /** Map of update counters made by this tx. Mapping: partId -> updCntr. */ @GridDirectMap(keyType = Integer.class, valueType = Long.class) - private Map updCntrs; + private Map updCntrs; /** */ public PartitionUpdateCounters() { From 951a54aff4a232854885961e83c8a41079cbf243 Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Mon, 3 Sep 2018 13:16:30 +0300 Subject: [PATCH 04/10] improve tests: reduce grid restart number, test rollbacks, multi statement transactions, failing statements --- .../cache/mvcc/CacheMvccSizeTest.java | 276 ++++++++++++------ 1 file changed, 185 insertions(+), 91 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java index 353184bf19716..968eed68cfa03 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.processors.cache.mvcc; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; +import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -29,140 +31,224 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; import static org.apache.ignite.cache.CachePeekMode.BACKUP; /** * */ +// TODO: Concurrent tests +// TODO: Tests for cache API: failing tests or separate ticket public class CacheMvccSizeTest extends CacheMvccAbstractTest { /** {@inheritDoc} */ @Override protected CacheMode cacheMode() { return CacheMode.PARTITIONED; } - /** - * @throws Exception If failed. - */ - public void testInsert() throws Exception { - IgniteEx ignite = startGrid(0); - IgniteCache personTbl = createTable(ignite); - - personTbl.query(q("begin")); - personTbl.query(q("insert into person values(1, 'a')")); + /** */ + private void checkSizeModificationByOperation(String sql, boolean commit, int expSizeDelta) throws Exception { + checkSizeModificationByOperation(c -> {}, cache -> cache.query(q(sql)).getAll(), commit, expSizeDelta); + } - assertEquals(0, personTbl.size()); - personTbl.query(q("commit")); + /** */ + private void checkSizeModificationByOperation(String initSql, String sql, boolean commit, + int expSizeDelta) throws Exception { + checkSizeModificationByOperation( + cache -> cache.query(q(initSql)).getAll(), + cache -> cache.query(q(sql)).getAll(), + commit, + expSizeDelta); + } - assertEquals(1, personTbl.size()); + /** */ + private void checkSizeModificationByOperation(Consumer> inTx, boolean commit, + int expSizeDelta) throws Exception { + checkSizeModificationByOperation(c -> {}, inTx, commit, expSizeDelta); } - /** - * @throws Exception If failed. - */ - public void testDelete() throws Exception { - IgniteEx ignite = startGrid(0); - IgniteCache personTbl = createTable(ignite); - personTbl.query(q("insert into person values(1, 'a')")); + /** */ + private void checkSizeModificationByOperation(Consumer> beforeTx, + Consumer> inTx, boolean commit, int expSizeDelta) throws Exception { + IgniteCache tbl0 = grid(0).cache("person"); - personTbl.query(q("begin")); - personTbl.query(q("delete from person")); + tbl0.query(q("delete from person")); - assertEquals(1, personTbl.size()); - personTbl.query(q("commit")); + beforeTx.accept(tbl0); - assertEquals(0, personTbl.size()); - } + int initSize = tbl0.size(); - /** - * @throws Exception If failed. - */ - public void testInsertMultipleKeys() throws Exception { - IgniteEx ignite = startGrid(0); - IgniteCache personTbl = createTable(ignite); + tbl0.query(q("begin")); + + inTx.accept(tbl0); + + // size is not changed before commit + assertEquals(0, tbl0.size() - initSize); - personTbl.query(q("begin")); - personTbl.query(q("insert into person values(1, 'a')")); - personTbl.query(q("insert into person values(%d, 'b')", keyInSamePartition(ignite, "person", 1))); - personTbl.query(q("insert into person values(%d, 'c')", keyInDifferentPartition(ignite, "person", 1))); + if (commit) + tbl0.query(q("commit")); + else + tbl0.query(q("rollback")); - assertEquals(0, personTbl.size()); - personTbl.query(q("commit")); + assertEquals(expSizeDelta, tbl0.size() - initSize); + assertEquals(tbl0.size(), table(grid(1)).size()); - assertEquals(3, personTbl.size()); + assertEquals(tbl0.size(), tbl0.size(BACKUP)); + assertEquals(tbl0.size(), table(grid(1)).size(BACKUP)); } /** - * @throws Exception If failed. + * @throws Exception if failed. */ - public void testInsertWithBackups() throws Exception { + public void testSql() throws Exception { startGridsMultiThreaded(2); - IgniteEx ignite = grid(0); - IgniteCache personTbl = createTable(ignite); - - personTbl.query(q("begin")); - personTbl.query(q("insert into person values(1, 'a')")); - assertEquals(0, personTbl.size(BACKUP)); - personTbl.query(q("commit")); - - assertEquals(1, personTbl.size(BACKUP)); + createTable(grid(0)); + + checkSizeModificationByOperation("insert into person values(1, 'a')", true, 1); + + checkSizeModificationByOperation("insert into person values(1, 'a')", false, 0); + + checkSizeModificationByOperation( + personTbl -> personTbl.query(q("insert into person values(1, 'a')")), + personTbl -> { + try { + personTbl.query(q("insert into person values(1, 'a')")); + } + catch (Exception e) { + if (e.getCause() instanceof IgniteSQLException) { + assertEquals(IgniteQueryErrorCode.DUPLICATE_KEY, + ((IgniteSQLException)e.getCause()).statusCode()); + } + else { + e.printStackTrace(); + + fail("Unexpected exceptions"); + } + } + }, + true, 0); + + checkSizeModificationByOperation("merge into person(id, name) values(1, 'a')", true, 1); + + checkSizeModificationByOperation("merge into person(id, name) values(1, 'a')", false, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "merge into person(id, name) values(1, 'b')", true, 0); + + checkSizeModificationByOperation("update person set name = 'b' where id = 1", true, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "update person set name = 'b' where id = 1", true, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "delete from person where id = 1", true, -1); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "delete from person where id = 1", false, 0); + + checkSizeModificationByOperation("delete from person where id = 1", true, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "select * from person", true, 0); + + checkSizeModificationByOperation("select * from person", true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("insert into person values(1, 'a')")); + personTbl.query(q("insert into person values(%d, 'b')", keyInSamePartition(grid(0), "person", 1))); + personTbl.query(q("insert into person values(%d, 'c')", keyInDifferentPartition(grid(0), "person", 1))); + }, true, 3); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("insert into person values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); + }, true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("insert into person values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("insert into person values(1, 'a')")); + }, true, 1); + + checkSizeModificationByOperation( + personTbl -> personTbl.query(q("insert into person values(1, 'a')")), + personTbl -> { + personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("insert into person values(1, 'a')")); + }, true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); + }, true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + }, true, 1); + + checkSizeModificationByOperation( + personTbl -> personTbl.query(q("merge into person(id, name) values(1, 'a')")), + personTbl -> { + personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + }, true, 0); } /** - * @throws Exception If failed. + * @throws Exception if failed. */ - public void testDeleteWithBackups() throws Exception { + public void testWriteConflictDoesNotChangeSize() throws Exception { startGridsMultiThreaded(2); - IgniteEx ignite = grid(0); - IgniteCache personTbl = createTable(ignite); - personTbl.query(q("insert into person values(1, 'a')")); - - personTbl.query(q("begin")); - personTbl.query(q("delete from person where id = 1")); - assertEquals(1, personTbl.size(BACKUP)); - personTbl.query(q("commit")); + IgniteCache tbl0 = createTable(grid(0)); - assertEquals(0, personTbl.size(BACKUP)); - } + tbl0.query(q("insert into person values(1, 'a')")); - /** - * @throws Exception If failed. - */ - public void testInsertAndDeleteWithBackups() throws Exception { - startGridsMultiThreaded(2); - IgniteEx ignite = grid(0); - IgniteCache personTbl = createTable(ignite); - personTbl.query(q("insert into person values(1, 'a')")); + tbl0.query(q("begin")); - personTbl.query(q("begin")); - personTbl.query(q("insert into person values(2, 'b')")); - personTbl.query(q("delete from person where id = 1")); + tbl0.query(q("delete from person where id = 1")); - assertEquals(1, personTbl.size()); - assertEquals(1, personTbl.size(BACKUP)); - personTbl.query(q("commit")); + CompletableFuture conflictingStarted = new CompletableFuture<>(); - assertEquals(1, personTbl.size()); - assertEquals(1, personTbl.size(BACKUP)); - } + CompletableFuture fut = CompletableFuture.runAsync(() -> { + tbl0.query(q("begin")); - /** - * @throws Exception if failed. - */ - public void testSizeIsNotChangedByReader() throws Exception { - IgniteEx ignite = startGrid(0); + try { + tbl0.query(q("select * from person")).getAll(); + conflictingStarted.complete(null); - IgniteCache tbl = createTable(ignite); + tbl0.query(q("merge into person(id, name) values(1, 'b')")); + } + finally { + tbl0.query(q("commit")); + } + }); - tbl.query(q("insert into person values(1, 'a')")); + conflictingStarted.join(); + tbl0.query(q("commit")); - assertEquals(1, tbl.size()); + try { + fut.join(); + } + catch (Exception e) { + if (e.getCause().getCause() instanceof IgniteSQLException) + assertTrue(e.getMessage().toLowerCase().contains("version mismatch")); + else { + e.printStackTrace(); + + fail("Unexpected exception"); + } + } - tbl.query(q("select * from person")).getAll(); + assertEquals(0, tbl0.size()); + assertEquals(0, table(grid(1)).size()); - assertEquals(1, tbl.size()); + assertEquals(0, tbl0.size(BACKUP)); + assertEquals(0, table(grid(1)).size(BACKUP)); } /** @@ -247,17 +333,25 @@ public void testSizeIsConsistentAfterRebalanceDuringInsert() throws Exception { assertEquals(100, tbl0.localSize() + tbl1.localSize()); } + /** */ + private static IgniteCache table(IgniteEx ignite) { + assert ignite.cachex("person").configuration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL; + assert ignite.cachex("person").configuration().getCacheMode() == CacheMode.REPLICATED; + + return ignite.cache("person"); + } + /** */ private static IgniteCache createTable(IgniteEx ignite) { IgniteCache sqlNexus = ignite.getOrCreateCache(new CacheConfiguration<>("sqlNexus").setSqlSchema("PUBLIC")); + sqlNexus.query(q("" + "create table person(" + " id int primary key," + " name varchar" + ") with \"atomicity=transactional,template=replicated,cache_name=person\"")); - assert ignite.cachex("person").configuration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL; - assert ignite.cachex("person").configuration().getCacheMode() == CacheMode.REPLICATED; - return ignite.cache("person"); + + return table(ignite); } /** */ From ce899a14c661ad7158cdefe8d270453b8f962b47 Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Mon, 3 Sep 2018 15:10:40 +0300 Subject: [PATCH 05/10] add insert-delete concurrent test, fix delete counting after waking up on released lock --- .../processors/cache/GridCacheMapEntry.java | 6 +++ .../cache/mvcc/CacheMvccSizeTest.java | 53 ++++++++++++++++++- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 0da41dab9fe37..e5194cbc86642 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -4996,6 +4996,9 @@ else if (res.resultType() == ResultType.LOCKED) { if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; + if (res.resultType() == ResultType.PREV_NOT_NULL) + tx.txCounters().accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), @@ -5283,6 +5286,9 @@ else if (op == CREATE && tx.local() && (res.resultType() == ResultType.PREV_NOT_ updateCntr0 = tx.local() ? entry.nextMvccPartitionCounter() : updateCntr; + if (res.resultType() == ResultType.PREV_NULL) + tx.txCounters().accumulateSizeDelta(cctx.cacheId(), entry.partition(), 1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java index 968eed68cfa03..b2fcf32b1abe9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java @@ -20,6 +20,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.ignite.Ignite; @@ -32,6 +34,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.query.IgniteSQLException; import static org.apache.ignite.cache.CachePeekMode.BACKUP; @@ -39,8 +42,6 @@ /** * */ -// TODO: Concurrent tests -// TODO: Tests for cache API: failing tests or separate ticket public class CacheMvccSizeTest extends CacheMvccAbstractTest { /** {@inheritDoc} */ @Override protected CacheMode cacheMode() { @@ -198,6 +199,54 @@ public void testSql() throws Exception { }, true, 0); } + /** + * @throws Exception if failed. + */ + public void testInsertDeleteConcurrent() throws Exception { + startGridsMultiThreaded(2); + + IgniteCache tbl0 = createTable(grid(0)); + + SqlFieldsQuery insert = new SqlFieldsQuery("insert into person(id, name) values(?, 'a')"); + + SqlFieldsQuery delete = new SqlFieldsQuery("delete from person where id = ?"); + + CompletableFuture insertFut = CompletableFuture.supplyAsync(() -> { + int cnt = 0; + + for (int i = 0; i < 1000; i++) + cnt += update(insert.setArgs(ThreadLocalRandom.current().nextInt(10)), tbl0); + + return cnt; + }); + + CompletableFuture deleteFut = CompletableFuture.supplyAsync(() -> { + int cnt = 0; + + for (int i = 0; i < 1000; i++) + cnt += update(delete.setArgs(ThreadLocalRandom.current().nextInt(10)), tbl0); + + return cnt; + }); + + int expSize = insertFut.join() - deleteFut.join(); + + assertEquals(expSize, tbl0.size()); + assertEquals(expSize, table(grid(1)).size()); + + assertEquals(expSize, tbl0.size(BACKUP)); + assertEquals(expSize, table(grid(1)).size(BACKUP)); + } + + /** */ + private int update(SqlFieldsQuery qry, IgniteCache cache) { + try { + return Integer.parseInt(cache.query(qry).getAll().get(0).get(0).toString()); + } catch (Exception e) { + return 0; + } + } + /** * @throws Exception if failed. */ From 8768b49a401ef2a17f33908b6842e25fb795cf7a Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Mon, 3 Sep 2018 16:46:56 +0300 Subject: [PATCH 06/10] make touched partitions in IgniteTxLocalStateAdapter non-concurrent --- .../transactions/IgniteTxLocalStateAdapter.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java index 588c0fb875ad0..9c6ef8f3a91e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java @@ -18,10 +18,12 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.typedef.internal.U; @@ -30,7 +32,9 @@ */ public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState { /** */ - private volatile Map> touchedParts; + private static final Function> CREATE_INT_SET = k -> new HashSet<>(); + /** */ + private Map> touchedParts; /** * @param cacheCtx Cache context. @@ -58,9 +62,8 @@ protected final void onTxEnd(GridCacheContext cacheCtx, IgniteInternalTx tx, boo /** {@inheritDoc} */ @Override public void touchPartition(int cacheId, int partId) { if (touchedParts == null) - touchedParts = new ConcurrentHashMap<>(); + touchedParts = new HashMap<>(); - touchedParts.computeIfAbsent(cacheId, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())) - .add(partId); + touchedParts.computeIfAbsent(cacheId, CREATE_INT_SET).add(partId); } } From 15c255a6e6b27a1a8331f365490145fdea372cc8 Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Mon, 3 Sep 2018 17:20:31 +0300 Subject: [PATCH 07/10] initialize TxCounters laziy in IgniteTxAdapter --- .../processors/cache/GridCacheMapEntry.java | 8 ++++---- .../GridDistributedTxRemoteAdapter.java | 7 +++++-- .../dht/GridDhtTxLocalAdapter.java | 7 +++++-- .../cache/transactions/IgniteInternalTx.java | 5 +++-- .../cache/transactions/IgniteTxAdapter.java | 19 ++++++++++++++----- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 2 +- .../cache/transactions/TxCounters.java | 2 +- 8 files changed, 34 insertions(+), 18 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index e5194cbc86642..39c003a074c96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1123,7 +1123,7 @@ else if (op == CREATE && tx.local() && (res.resultType() == ResultType.PREV_NOT_ updateCntr = nextMvccPartitionCounter(); if (res.resultType() == ResultType.PREV_NULL) - tx.txCounters().accumulateSizeDelta(cctx.cacheId(), partition(), 1); + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), partition(), 1); if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( @@ -1221,7 +1221,7 @@ else if (res.resultType() == ResultType.LOCKED) { updateCntr = nextMvccPartitionCounter(); if (res.resultType() == ResultType.PREV_NOT_NULL) - tx.txCounters().accumulateSizeDelta(cctx.cacheId(), partition(), -1); + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), partition(), -1); if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = logTxUpdate(tx, null, 0, updateCntr); @@ -4997,7 +4997,7 @@ else if (res.resultType() == ResultType.LOCKED) { updateCntr0 = updateCntr; if (res.resultType() == ResultType.PREV_NOT_NULL) - tx.txCounters().accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1); + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1); if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( @@ -5287,7 +5287,7 @@ else if (op == CREATE && tx.local() && (res.resultType() == ResultType.PREV_NOT_ updateCntr0 = tx.local() ? entry.nextMvccPartitionCounter() : updateCntr; if (res.resultType() == ResultType.PREV_NULL) - tx.txCounters().accumulateSizeDelta(cctx.cacheId(), entry.partition(), 1); + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), entry.partition(), 1); if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 8ae0c81982dec..8e96ae2dcee54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; +import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; @@ -850,11 +851,13 @@ else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or in @Override protected void applyTxCounters() { super.applyTxCounters(); - Map updCntrs = txCounters().updateCounters(); + TxCounters txCntrs = txCounters(false); - if (F.isEmpty(updCntrs)) + if (txCntrs == null) return; + Map updCntrs = txCntrs.updateCounters(); + for (Map.Entry entry : updCntrs.entrySet()) { int cacheId = entry.getKey(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index f94f2f6bc70fb..613f1600f03d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; @@ -944,11 +945,13 @@ protected final IgniteInternalFuture chainOnePhasePre * @return Partition counters map for the given backup node. */ public Map filterUpdateCountersForBackupNode(ClusterNode node) { - Map updCntrs = txCounters().updateCounters(); + TxCounters txCntrs = txCounters(false); - if (F.isEmpty(updCntrs)) + if (txCntrs == null) return null; + Map updCntrs = txCntrs.updateCounters(); + Map res = new HashMap<>(); AffinityTopologyVersion top = topologyVersionSnapshot(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index c5f19ec89ad24..05ebe5d31496b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -653,6 +653,7 @@ public void completedVersions(GridCacheVersion base, /** * @return Transaction counters. + * @param createIfAbsent {@code True} if non-null instance is needed. */ - public TxCounters txCounters(); -} \ No newline at end of file + @Nullable public TxCounters txCounters(boolean createIfAbsent); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 1711652b75eb8..c6d19919f868a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -139,6 +139,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement private static final AtomicReferenceFieldUpdater FINALIZING_UPD = AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, FinalizationStatus.class, "finalizing"); + /** */ + private static final AtomicReferenceFieldUpdater TX_COUNTERS_UPD = + AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, TxCounters.class, "txCounters"); + /** Logger. */ protected static IgniteLogger log; @@ -279,7 +283,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement private volatile IgniteInternalFuture rollbackFut; /** */ - private TxCounters txCounters = new TxCounters(); + private volatile TxCounters txCounters = new TxCounters(); /** * Empty constructor required for {@link Externalizable}. @@ -2021,7 +2025,10 @@ protected Object readResolve() throws ObjectStreamException { public abstract void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException; /** {@inheritDoc} */ - @Override public TxCounters txCounters() { + @Nullable @Override public TxCounters txCounters(boolean createIfAbsent) { + if (createIfAbsent && txCounters == null) + TX_COUNTERS_UPD.compareAndSet(this, null, new TxCounters()); + return txCounters; } @@ -2029,11 +2036,13 @@ protected Object readResolve() throws ObjectStreamException { * Make counters accumulated during transaction visible outside of transaciton. */ protected void applyTxCounters() { - Map> sizeDeltas = txCounters.sizeDeltas(); + TxCounters txCntrs = txCounters(false); - if (F.isEmpty(sizeDeltas)) + if (txCntrs == null) return; + Map> sizeDeltas = txCntrs.sizeDeltas(); + for (Map.Entry> entry : sizeDeltas.entrySet()) { Integer cacheId = entry.getKey(); Map partDeltas = entry.getValue(); @@ -2328,7 +2337,7 @@ private static class TxShadow implements IgniteInternalTx { } /** {@inheritDoc} */ - @Override public TxCounters txCounters() { + @Nullable @Override public TxCounters txCounters(boolean createIfAbsent) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 03198bf5e52c2..32f4dd43352f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1391,7 +1391,7 @@ else if (log.isDebugEnabled()) tx.setPartitionUpdateCounters( req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null); - tx.txCounters().updateCounters(req.updateCounters()); + tx.txCounters(true).updateCounters(req.updateCounters()); tx.commitRemoteTx(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 5e5776c5a2f99..bfe67ee0fdeda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1710,7 +1710,7 @@ public void markQueryEnlisted(MvccSnapshot ver) { Map updCntrs = applyAndCollectLocalUpdateCounters(); // remember counters for subsequent sending to backups - txCounters().updateCounters(updCntrs); + txCounters(true).updateCounters(updCntrs); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java index fbfca4dbb89c2..2ad4f9410d2f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java @@ -72,7 +72,7 @@ public void updateCounters(Map updCntrs) { /** */ public Map updateCounters() { - return updCntrs != null ? Collections.unmodifiableMap(updCntrs) : null; + return updCntrs != null ? Collections.unmodifiableMap(updCntrs) : Collections.emptyMap(); } /** */ From d3049d2c1cd28b604d74afab82f53895eddbd78f Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Mon, 3 Sep 2018 17:29:12 +0300 Subject: [PATCH 08/10] formatting --- .../processors/cache/mvcc/CacheMvccSizeTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java index b2fcf32b1abe9..bd783547e176c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java @@ -158,18 +158,23 @@ public void testSql() throws Exception { checkSizeModificationByOperation(personTbl -> { personTbl.query(q("insert into person values(1, 'a')")); + personTbl.query(q("insert into person values(%d, 'b')", keyInSamePartition(grid(0), "person", 1))); + personTbl.query(q("insert into person values(%d, 'c')", keyInDifferentPartition(grid(0), "person", 1))); }, true, 3); checkSizeModificationByOperation(personTbl -> { personTbl.query(q("insert into person values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); }, true, 0); checkSizeModificationByOperation(personTbl -> { personTbl.query(q("insert into person values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("insert into person values(1, 'a')")); }, true, 1); @@ -177,17 +182,21 @@ public void testSql() throws Exception { personTbl -> personTbl.query(q("insert into person values(1, 'a')")), personTbl -> { personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("insert into person values(1, 'a')")); }, true, 0); checkSizeModificationByOperation(personTbl -> { personTbl.query(q("merge into person(id, name) values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); }, true, 0); checkSizeModificationByOperation(personTbl -> { personTbl.query(q("merge into person(id, name) values(1, 'a')")); + personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("merge into person(id, name) values(1, 'a')")); }, true, 1); @@ -195,6 +204,7 @@ public void testSql() throws Exception { personTbl -> personTbl.query(q("merge into person(id, name) values(1, 'a')")), personTbl -> { personTbl.query(q("delete from person where id = 1")); + personTbl.query(q("merge into person(id, name) values(1, 'a')")); }, true, 0); } @@ -324,6 +334,7 @@ public void testDataStreamerModifiesReplicatedCacheSize() throws Exception { assertEquals(1, grid(0).cache("test").localSize()); assertEquals(1, grid(0).cache("test").localSize(BACKUP)); + assertEquals(1, grid(1).cache("test").localSize()); assertEquals(1, grid(1).cache("test").localSize(BACKUP)); } @@ -411,6 +422,7 @@ private static SqlFieldsQuery q(String fSql, Object... args) { /** */ private static int keyInSamePartition(Ignite ignite, String cacheName, int key) { Affinity affinity = ignite.affinity(cacheName); + return IntStream.iterate(key + 1, i -> i + 1) .filter(i -> affinity.partition(i) == affinity.partition(key)) .findFirst().getAsInt(); @@ -419,6 +431,7 @@ private static int keyInSamePartition(Ignite ignite, String cacheName, int key) /** */ private static int keyInDifferentPartition(Ignite ignite, String cacheName, int key) { Affinity affinity = ignite.affinity(cacheName); + return IntStream.iterate(key + 1, i -> i + 1) .filter(i -> affinity.partition(i) != affinity.partition(key)) .findFirst().getAsInt(); From 1bee658d9124b59332dd324dbee4349c924c15d7 Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Tue, 4 Sep 2018 12:31:29 +0300 Subject: [PATCH 09/10] add test checking that delete after unlock decrements cache size --- .../cache/mvcc/CacheMvccSizeTest.java | 53 ++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java index bd783547e176c..32709ffbb496d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.ignite.Ignite; @@ -34,7 +33,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.query.IgniteSQLException; import static org.apache.ignite.cache.CachePeekMode.BACKUP; @@ -156,6 +154,11 @@ public void testSql() throws Exception { checkSizeModificationByOperation("select * from person", true, 0); + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "select * from person where id = 1 for update", true, 0); + + checkSizeModificationByOperation("select * from person where id = 1 for update", true, 0); + checkSizeModificationByOperation(personTbl -> { personTbl.query(q("insert into person values(1, 'a')")); @@ -310,6 +313,52 @@ public void testWriteConflictDoesNotChangeSize() throws Exception { assertEquals(0, table(grid(1)).size(BACKUP)); } + /** + * @throws Exception if failed. + */ + public void testDeleteChangesSizeAfterUnlock() throws Exception { + startGridsMultiThreaded(2); + + IgniteCache tbl0 = createTable(grid(0)); + + tbl0.query(q("insert into person values(1, 'a')")); + + tbl0.query(q("begin")); + + tbl0.query(q("select * from person where id = 1 for update")).getAll(); + + CompletableFuture asyncThread = new CompletableFuture<>(); + + CompletableFuture fut = CompletableFuture.runAsync(() -> { + tbl0.query(q("begin")); + + try { + tbl0.query(q("select * from person")).getAll(); + + asyncThread.complete(Thread.currentThread()); + tbl0.query(q("delete from person where id = 1")); + } + finally { + tbl0.query(q("commit")); + } + }); + + Thread concThread = asyncThread.join(); + + // wait until concurrent thread blocks awaiting entry mvcc lock release + while (concThread.getState() == Thread.State.RUNNABLE && !Thread.currentThread().isInterrupted()); + + tbl0.query(q("commit")); + + fut.join(); + + assertEquals(0, tbl0.size()); + assertEquals(0, table(grid(1)).size()); + + assertEquals(0, tbl0.size(BACKUP)); + assertEquals(0, table(grid(1)).size(BACKUP)); + } + /** * @throws Exception if failed. */ From 63c8c4f3481e561a07189ae6d838a37fd9c7c029 Mon Sep 17 00:00:00 2001 From: ipavlukhin Date: Wed, 5 Sep 2018 19:06:49 +0300 Subject: [PATCH 10/10] resolve IgniteCacheMvccSqlTestSuite merge conflict --- .../ignite/testsuites/IgniteCacheMvccSqlTestSuite.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java index b0a4243fd73a7..c8f7643a42fce 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -51,9 +51,10 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite { public static TestSuite suite() { TestSuite suite = new TestSuite("IgniteCache SQL MVCC Test Suite"); - // Simle tests. + // Simple tests. suite.addTestSuite(CacheMvccDmlSimpleTest.class); suite.addTestSuite(SqlTransactionsCommandsWithMvccEnabledSelfTest.class); + suite.addTestSuite(CacheMvccSizeTest.class); suite.addTestSuite(GridIndexRebuildWithMvccEnabledSelfTest.class); @@ -83,10 +84,6 @@ public static TestSuite suite() { suite.addTestSuite(CacheMvccPartitionedSqlCoordinatorFailoverTest.class); suite.addTestSuite(CacheMvccReplicatedSqlCoordinatorFailoverTest.class); - suite.addTestSuite(CacheMvccBulkLoadTest.class); - suite.addTestSuite(CacheMvccStreamingInsertTest.class); - suite.addTestSuite(CacheMvccDmlSimpleTest.class); - suite.addTestSuite(CacheMvccSizeTest.class); return suite; }