Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.ignite.internal.processors.cache;

import java.util.TreeMap;

/**
* Update counter implementation for MVCC mode.
*/
public class PartitionUpdateCounterMvccImpl extends PartitionUpdateCounterTrackingImpl {

private final CacheGroupContext grp;

/**
* @param grp Group.
*/
public PartitionUpdateCounterMvccImpl(CacheGroupContext grp) {
super(grp);
this.grp = grp;
}

/** {@inheritDoc} */
Expand All @@ -44,17 +46,4 @@ public PartitionUpdateCounterMvccImpl(CacheGroupContext grp) {
@Override protected PartitionUpdateCounterTrackingImpl createInstance() {
return new PartitionUpdateCounterMvccImpl(grp);
}

/** {@inheritDoc} */
@Override public PartitionUpdateCounter copy() {
PartitionUpdateCounterMvccImpl copy = new PartitionUpdateCounterMvccImpl(grp);

copy.cntr.set(cntr.get());
copy.first = first;
copy.queue = new TreeMap<>(queue);
copy.initCntr = initCntr;
copy.reserveCntr.set(reserveCntr.get());

return copy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
Expand Down Expand Up @@ -68,25 +68,25 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
private static final byte VERSION = 1;

/** Queue of applied out of order counter updates. */
protected NavigableMap<Long, Item> queue = new TreeMap<>();
private NavigableMap<Long, Item> queue = new TreeMap<>();

/** LWM. */
protected final AtomicLong cntr = new AtomicLong();
private volatile long cntr;

/** HWM. */
protected final AtomicLong reserveCntr = new AtomicLong();
private volatile long reserveCntr;

/** */
protected boolean first = true;
private boolean first = true;

/** */
protected final CacheGroupContext grp;
private final CacheGroupContext grp;

/**
* Initial counter points to last sequential update after WAL recovery.
* @deprecated TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11794
*/
@Deprecated protected volatile long initCntr;
@Deprecated private volatile long initCntr;

/**
* @param grp Group.
Expand All @@ -96,10 +96,11 @@ public PartitionUpdateCounterTrackingImpl(CacheGroupContext grp) {
}

/** {@inheritDoc} */
@Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
cntr.set(initUpdCntr);
@Override public synchronized void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
cntr = initUpdCntr;

reserveCntr.set(initCntr = initUpdCntr);
initCntr = initUpdCntr;
reserveCntr = initCntr;

queue = fromBytes(cntrUpdData);
}
Expand All @@ -111,35 +112,33 @@ public PartitionUpdateCounterTrackingImpl(CacheGroupContext grp) {

/** {@inheritDoc} */
@Override public long get() {
return cntr.get();
return cntr;
}

/** */
protected synchronized long highestAppliedCounter() {
return queue.isEmpty() ? cntr.get() : queue.lastEntry().getValue().absolute();
return queue.isEmpty() ? cntr : queue.lastEntry().getValue().absolute();
}

/**
* @return Next update counter. For tx mode called by {@link DataStreamerImpl} IsolatedUpdater.
*/
@Override public long next() {
long next = cntr.incrementAndGet();

reserveCntr.set(next);
@Override public synchronized long next() {
reserveCntr = ++cntr;

return next;
return reserveCntr;
}

/** {@inheritDoc} */
@Override public synchronized void update(long val) throws IgniteCheckedException {
// Reserved update counter is updated only on exchange.
long cur = get();
long cur = cntr;

// Always set reserved counter equal to max known counter.
long max = Math.max(val, cur);

if (reserveCntr.get() < max)
reserveCntr.set(max);
if (reserveCntr < max)
reserveCntr = max;

// Outdated counter (txs are possible before current topology future is finished if primary is not changed).
if (val < cur)
Expand All @@ -151,7 +150,7 @@ protected synchronized long highestAppliedCounter() {
if (val < highestAppliedCounter())
throw new IgniteCheckedException("Failed to update the counter [newVal=" + val + ", curState=" + this + ']');

cntr.set(val);
cntr = val;

/** If some holes are present at this point, thar means some update were missed on recovery and will be restored
* during rebalance. All gaps are safe to "forget".
Expand All @@ -165,7 +164,7 @@ protected synchronized long highestAppliedCounter() {

/** {@inheritDoc} */
@Override public synchronized boolean update(long start, long delta) {
long cur = cntr.get();
long cur = cntr;

if (cur > start)
return false;
Expand Down Expand Up @@ -208,22 +207,20 @@ else if (prevItem.within(next - 1))
if (nextItem != null)
next += nextItem.delta;

boolean res = cntr.compareAndSet(cur, next);

assert res;
cntr = next;

return true;
}
}

/** {@inheritDoc} */
@Override public void updateInitial(long start, long delta) {
@Override public synchronized void updateInitial(long start, long delta) {
update(start, delta);

initCntr = get();
initCntr = cntr;

if (reserveCntr.get() < initCntr)
reserveCntr.set(initCntr);
if (reserveCntr < initCntr)
reserveCntr = initCntr;
}

/** {@inheritDoc} */
Expand All @@ -236,37 +233,39 @@ else if (prevItem.within(next - 1))
if (gaps == null)
gaps = new GridLongList((queue.size() + 1) * 2);

long start = cntr.get() + 1;
long start = cntr + 1;
long end = item.getValue().start;

gaps.add(start);
gaps.add(end);

// Close pending ranges.
cntr.set(item.getValue().absolute());
cntr = item.getValue().absolute();

item = queue.pollFirstEntry();
}

reserveCntr.set(get());
reserveCntr = cntr;

return gaps;
}

/** {@inheritDoc} */
@Override public synchronized long reserve(long delta) {
long cntr = get();

long reserved = reserveCntr.getAndAdd(delta);
long reserved = reserveCntr;
reserveCntr += delta;

assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved;

return reserved;
}

/** {@inheritDoc} */
@Override public long next(long delta) {
return cntr.getAndAdd(delta);
@Override public synchronized long next(long delta) {
long next = cntr;
cntr += delta;
return next;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -339,15 +338,15 @@ else if (prevItem.within(next - 1))
@Override public synchronized void reset() {
initCntr = 0;

cntr.set(0);
cntr = 0;

reserveCntr.set(0);
reserveCntr = 0;

queue.clear();
}

/** {@inheritDoc} */
@Override public void resetInitialCounter() {
@Override public synchronized void resetInitialCounter() {
initCntr = 0;
}

Expand All @@ -356,7 +355,7 @@ else if (prevItem.within(next - 1))
*/
private static class Item {
/** */
private long start;
private final long start;

/** */
private long delta;
Expand All @@ -366,8 +365,8 @@ private static class Item {
* @param delta Delta value.
*/
private Item(long start, long delta) {
this.start = start;
this.delta = delta;
this.start = start;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -421,22 +420,24 @@ public boolean within(long cntr) {
if (o == null || getClass() != o.getClass())
return false;

PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl)o;
synchronized (this) {
PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl) o;

if (!queue.equals(cntr.queue))
return false;
if (!queue.equals(cntr.queue))
return false;

return this.cntr.get() == cntr.cntr.get();
return this.cntr == cntr.cntr;
}
}

/** {@inheritDoc} */
@Override public long reserved() {
return reserveCntr.get();
return reserveCntr;
}

/** {@inheritDoc} */
@Override public synchronized boolean empty() {
return get() == 0 && sequential();
return cntr == 0 && sequential();
}

/** {@inheritDoc} */
Expand All @@ -445,9 +446,9 @@ public boolean within(long cntr) {
}

/** {@inheritDoc} */
@Override public String toString() {
return "Counter [lwm=" + get() + ", holes=" + queue +
", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr.get() + ']';
@Override public synchronized String toString() {
return "Counter [lwm=" + cntr + ", holes=" + queue +
", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr + ']';
}

/** {@inheritDoc} */
Expand All @@ -456,14 +457,14 @@ public boolean within(long cntr) {
}

/** {@inheritDoc} */
@Override public PartitionUpdateCounter copy() {
@Override public synchronized PartitionUpdateCounter copy() {
PartitionUpdateCounterTrackingImpl copy = createInstance();

copy.cntr.set(cntr.get());
copy.cntr = cntr;
copy.first = first;
copy.queue = new TreeMap<>(queue);
copy.initCntr = initCntr;
copy.reserveCntr.set(reserveCntr.get());
copy.reserveCntr = reserveCntr;

return copy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.ignite.internal.processors.cache.persistence;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Objects;

import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
Expand All @@ -34,7 +36,6 @@
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounterTrackingImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

Expand Down Expand Up @@ -148,9 +149,10 @@ public void testNoSpuriousRebalancing() throws Exception {

PartitionUpdateCounterTrackingImpl cntr0 = (PartitionUpdateCounterTrackingImpl)part.dataStore().partUpdateCounter();

AtomicLong cntr = U.field(cntr0, "cntr");
Field[] declaredFields = cntr0.getClass().getDeclaredFields();

cntr.set(cntr.get() - 1);
Arrays.stream(declaredFields).filter(field -> "cntr".equals(field.getName())).findFirst()
.ifPresent(field -> updateValue(field, cntr0));

TestRecordingCommunicationSpi.spi(crd).record((node, msg) -> msg instanceof GridDhtPartitionDemandMessage);

Expand All @@ -167,4 +169,20 @@ public void testNoSpuriousRebalancing() throws Exception {
stopAllGrids();
}
}

private void updateValue(Field field, Object cntr0) {

Objects.requireNonNull(cntr0);

try {
field.setAccessible(true);

long o = (long) field.get(cntr0);
o -= 1;

field.set(cntr0, o);
} catch (IllegalAccessException ex) {
log.error("Reflective access exception while changing internal value of " + cntr0, ex);
}
}
}