From 2ee40192766cbb9a3c8ff6a4d486138db0602080 Mon Sep 17 00:00:00 2001 From: ymolochkov Date: Thu, 14 Jan 2021 13:57:12 +0300 Subject: [PATCH 1/2] IGNITE-13949 improved concurrent behaviour in PartitionUpdateCounterTrackingImpl and its subclasses --- .../cache/PartitionUpdateCounterMvccImpl.java | 19 +--- .../PartitionUpdateCounterTrackingImpl.java | 107 +++++++++--------- 2 files changed, 58 insertions(+), 68 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java index 2e3066a2a04922..d520ce491c878c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java @@ -17,17 +17,19 @@ package org.apache.ignite.internal.processors.cache; -import java.util.TreeMap; - /** * Update counter implementation for MVCC mode. */ public class PartitionUpdateCounterMvccImpl extends PartitionUpdateCounterTrackingImpl { + + private final CacheGroupContext grp; + /** * @param grp Group. */ public PartitionUpdateCounterMvccImpl(CacheGroupContext grp) { super(grp); + this.grp = grp; } /** {@inheritDoc} */ @@ -44,17 +46,4 @@ public PartitionUpdateCounterMvccImpl(CacheGroupContext grp) { @Override protected PartitionUpdateCounterTrackingImpl createInstance() { return new PartitionUpdateCounterMvccImpl(grp); } - - /** {@inheritDoc} */ - @Override public PartitionUpdateCounter copy() { - PartitionUpdateCounterMvccImpl copy = new PartitionUpdateCounterMvccImpl(grp); - - copy.cntr.set(cntr.get()); - copy.first = first; - copy.queue = new TreeMap<>(queue); - copy.initCntr = initCntr; - copy.reserveCntr.set(reserveCntr.get()); - - return copy; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java index 0f9d72c5b84bfd..4215fcebfccd4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord; @@ -68,25 +68,25 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte private static final byte VERSION = 1; /** Queue of applied out of order counter updates. */ - protected NavigableMap queue = new TreeMap<>(); + private NavigableMap queue = new TreeMap<>(); /** LWM. */ - protected final AtomicLong cntr = new AtomicLong(); + private volatile long cntr; /** HWM. */ - protected final AtomicLong reserveCntr = new AtomicLong(); + private volatile long reserveCntr; /** */ - protected boolean first = true; + private boolean first = true; /** */ - protected final CacheGroupContext grp; + private final CacheGroupContext grp; /** * Initial counter points to last sequential update after WAL recovery. * @deprecated TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11794 */ - @Deprecated protected volatile long initCntr; + @Deprecated private volatile long initCntr; /** * @param grp Group. @@ -96,10 +96,11 @@ public PartitionUpdateCounterTrackingImpl(CacheGroupContext grp) { } /** {@inheritDoc} */ - @Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) { - cntr.set(initUpdCntr); + @Override public synchronized void init(long initUpdCntr, @Nullable byte[] cntrUpdData) { + cntr = initUpdCntr; - reserveCntr.set(initCntr = initUpdCntr); + initCntr = initUpdCntr; + reserveCntr = initCntr; queue = fromBytes(cntrUpdData); } @@ -111,35 +112,33 @@ public PartitionUpdateCounterTrackingImpl(CacheGroupContext grp) { /** {@inheritDoc} */ @Override public long get() { - return cntr.get(); + return cntr; } /** */ protected synchronized long highestAppliedCounter() { - return queue.isEmpty() ? cntr.get() : queue.lastEntry().getValue().absolute(); + return queue.isEmpty() ? cntr : queue.lastEntry().getValue().absolute(); } /** * @return Next update counter. For tx mode called by {@link DataStreamerImpl} IsolatedUpdater. */ - @Override public long next() { - long next = cntr.incrementAndGet(); - - reserveCntr.set(next); + @Override public synchronized long next() { + reserveCntr = ++cntr; - return next; + return reserveCntr; } /** {@inheritDoc} */ @Override public synchronized void update(long val) throws IgniteCheckedException { // Reserved update counter is updated only on exchange. - long cur = get(); + long cur = cntr; // Always set reserved counter equal to max known counter. long max = Math.max(val, cur); - if (reserveCntr.get() < max) - reserveCntr.set(max); + if (reserveCntr < max) + reserveCntr = max; // Outdated counter (txs are possible before current topology future is finished if primary is not changed). if (val < cur) @@ -151,7 +150,7 @@ protected synchronized long highestAppliedCounter() { if (val < highestAppliedCounter()) throw new IgniteCheckedException("Failed to update the counter [newVal=" + val + ", curState=" + this + ']'); - cntr.set(val); + cntr = val; /** If some holes are present at this point, thar means some update were missed on recovery and will be restored * during rebalance. All gaps are safe to "forget". @@ -165,7 +164,7 @@ protected synchronized long highestAppliedCounter() { /** {@inheritDoc} */ @Override public synchronized boolean update(long start, long delta) { - long cur = cntr.get(); + long cur = cntr; if (cur > start) return false; @@ -208,22 +207,20 @@ else if (prevItem.within(next - 1)) if (nextItem != null) next += nextItem.delta; - boolean res = cntr.compareAndSet(cur, next); - - assert res; + cntr = next; return true; } } /** {@inheritDoc} */ - @Override public void updateInitial(long start, long delta) { + @Override public synchronized void updateInitial(long start, long delta) { update(start, delta); - initCntr = get(); + initCntr = cntr; - if (reserveCntr.get() < initCntr) - reserveCntr.set(initCntr); + if (reserveCntr < initCntr) + reserveCntr = initCntr; } /** {@inheritDoc} */ @@ -236,28 +233,28 @@ else if (prevItem.within(next - 1)) if (gaps == null) gaps = new GridLongList((queue.size() + 1) * 2); - long start = cntr.get() + 1; + long start = cntr + 1; long end = item.getValue().start; gaps.add(start); gaps.add(end); // Close pending ranges. - cntr.set(item.getValue().absolute()); + cntr = item.getValue().absolute(); item = queue.pollFirstEntry(); } - reserveCntr.set(get()); + reserveCntr = cntr; return gaps; } /** {@inheritDoc} */ @Override public synchronized long reserve(long delta) { - long cntr = get(); - long reserved = reserveCntr.getAndAdd(delta); + long reserved = reserveCntr; + reserveCntr += delta; assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved; @@ -265,8 +262,10 @@ else if (prevItem.within(next - 1)) } /** {@inheritDoc} */ - @Override public long next(long delta) { - return cntr.getAndAdd(delta); + @Override public synchronized long next(long delta) { + long next = cntr; + cntr += delta; + return next; } /** {@inheritDoc} */ @@ -339,15 +338,15 @@ else if (prevItem.within(next - 1)) @Override public synchronized void reset() { initCntr = 0; - cntr.set(0); + cntr = 0; - reserveCntr.set(0); + reserveCntr = 0; queue.clear(); } /** {@inheritDoc} */ - @Override public void resetInitialCounter() { + @Override public synchronized void resetInitialCounter() { initCntr = 0; } @@ -356,7 +355,7 @@ else if (prevItem.within(next - 1)) */ private static class Item { /** */ - private long start; + private final long start; /** */ private long delta; @@ -366,8 +365,8 @@ private static class Item { * @param delta Delta value. */ private Item(long start, long delta) { - this.start = start; this.delta = delta; + this.start = start; } /** {@inheritDoc} */ @@ -421,22 +420,24 @@ public boolean within(long cntr) { if (o == null || getClass() != o.getClass()) return false; - PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl)o; + synchronized (this) { + PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl) o; - if (!queue.equals(cntr.queue)) - return false; + if (!queue.equals(cntr.queue)) + return false; - return this.cntr.get() == cntr.cntr.get(); + return this.cntr == cntr.cntr; + } } /** {@inheritDoc} */ @Override public long reserved() { - return reserveCntr.get(); + return reserveCntr; } /** {@inheritDoc} */ @Override public synchronized boolean empty() { - return get() == 0 && sequential(); + return cntr == 0 && sequential(); } /** {@inheritDoc} */ @@ -445,9 +446,9 @@ public boolean within(long cntr) { } /** {@inheritDoc} */ - @Override public String toString() { - return "Counter [lwm=" + get() + ", holes=" + queue + - ", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr.get() + ']'; + @Override public synchronized String toString() { + return "Counter [lwm=" + cntr + ", holes=" + queue + + ", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr + ']'; } /** {@inheritDoc} */ @@ -456,14 +457,14 @@ public boolean within(long cntr) { } /** {@inheritDoc} */ - @Override public PartitionUpdateCounter copy() { + @Override public synchronized PartitionUpdateCounter copy() { PartitionUpdateCounterTrackingImpl copy = createInstance(); - copy.cntr.set(cntr.get()); + copy.cntr = cntr; copy.first = first; copy.queue = new TreeMap<>(queue); copy.initCntr = initCntr; - copy.reserveCntr.set(reserveCntr.get()); + copy.reserveCntr = reserveCntr; return copy; } From 58e8848059085b0bac4a3b9f6e28c9902e6f0c86 Mon Sep 17 00:00:00 2001 From: ymolochkov Date: Thu, 14 Jan 2021 18:29:24 +0300 Subject: [PATCH 2/2] IGNITE-13949 improved concurrent behaviour in PartitionUpdateCounterTrackingImpl and its subclasses --- ...ePdsSpuriousRebalancingOnNodeJoinTest.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java index 564a72878d20d3..1b9e0b5ccf6ddf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Objects; + import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -34,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -148,9 +149,10 @@ public void testNoSpuriousRebalancing() throws Exception { PartitionUpdateCounterTrackingImpl cntr0 = (PartitionUpdateCounterTrackingImpl)part.dataStore().partUpdateCounter(); - AtomicLong cntr = U.field(cntr0, "cntr"); + Field[] declaredFields = cntr0.getClass().getDeclaredFields(); - cntr.set(cntr.get() - 1); + Arrays.stream(declaredFields).filter(field -> "cntr".equals(field.getName())).findFirst() + .ifPresent(field -> updateValue(field, cntr0)); TestRecordingCommunicationSpi.spi(crd).record((node, msg) -> msg instanceof GridDhtPartitionDemandMessage); @@ -167,4 +169,20 @@ public void testNoSpuriousRebalancing() throws Exception { stopAllGrids(); } } + + private void updateValue(Field field, Object cntr0) { + + Objects.requireNonNull(cntr0); + + try { + field.setAccessible(true); + + long o = (long) field.get(cntr0); + o -= 1; + + field.set(cntr0, o); + } catch (IllegalAccessException ex) { + log.error("Reflective access exception while changing internal value of " + cntr0, ex); + } + } }