Skip to content

Commit

Permalink
ignite-4851 : Made partition state change atomic with size check.
Browse files Browse the repository at this point in the history
  • Loading branch information
ilantukh committed Apr 3, 2017
1 parent 07ecddc commit 89c2ebb
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 30 deletions.
Expand Up @@ -563,7 +563,7 @@ public void start() throws IgniteCheckedException {
if (!isLocal()) if (!isLocal())
initSize /= ctx.affinity().partitions(); initSize /= ctx.affinity().partitions();


map = new GridCacheConcurrentMapImpl(ctx, entryFactory(), initSize); map = new GridCacheLocalConcurrentMap(ctx, entryFactory(), initSize);
} }
} }


Expand Down
Expand Up @@ -37,7 +37,7 @@
/** /**
* Implementation of concurrent cache map. * Implementation of concurrent cache map.
*/ */
public class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap { public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap {
/** Default load factor. */ /** Default load factor. */
private static final float DFLT_LOAD_FACTOR = 0.75f; private static final float DFLT_LOAD_FACTOR = 0.75f;


Expand All @@ -53,9 +53,6 @@ public class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap {
/** Cache context. */ /** Cache context. */
private final GridCacheContext ctx; private final GridCacheContext ctx;


/** Public size counter. */
private final AtomicInteger pubSize = new AtomicInteger();

/** /**
* Creates a new, empty map with the specified initial * Creates a new, empty map with the specified initial
* capacity. * capacity.
Expand Down Expand Up @@ -211,8 +208,12 @@ public GridCacheConcurrentMapImpl(
topVer); topVer);
} }


if (sizeChange != 0) assert Math.abs(sizeChange) <= 1;
pubSize.addAndGet(sizeChange);
if (sizeChange == -1)
decrementPublicSize(cur);
else if (sizeChange == 1)
incrementPublicSize(cur);


return cur; return cur;
} }
Expand Down Expand Up @@ -241,21 +242,6 @@ public GridCacheConcurrentMapImpl(
return map.size(); return map.size();
} }


/** {@inheritDoc} */
@Override public int publicSize() {
return pubSize.get();
}

/** {@inheritDoc} */
@Override public void incrementPublicSize(GridCacheEntryEx e) {
pubSize.incrementAndGet();
}

/** {@inheritDoc} */
@Override public void decrementPublicSize(GridCacheEntryEx e) {
pubSize.decrementAndGet();
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public Set<KeyCacheObject> keySet(final CacheEntryPredicate... filter) { @Override public Set<KeyCacheObject> keySet(final CacheEntryPredicate... filter) {
final IgnitePredicate<KeyCacheObject> p = new IgnitePredicate<KeyCacheObject>() { final IgnitePredicate<KeyCacheObject> p = new IgnitePredicate<KeyCacheObject>() {
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.ignite.internal.processors.cache;

import java.util.concurrent.atomic.AtomicInteger;

/**
* GridCacheConcurrentMap implementation for local and near caches.
*/
public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl {
/** */
private final AtomicInteger pubSize = new AtomicInteger();

public GridCacheLocalConcurrentMap(GridCacheContext ctx,
GridCacheMapEntryFactory factory, int initialCapacity) {
super(ctx, factory, initialCapacity);
}

public GridCacheLocalConcurrentMap(GridCacheContext ctx,
GridCacheMapEntryFactory factory, int initialCapacity, float loadFactor, int concurrencyLevel) {
super(ctx, factory, initialCapacity, loadFactor, concurrencyLevel);
}

/** {@inheritDoc} */
@Override public int publicSize() {
return pubSize.get();
}

/** {@inheritDoc} */
@Override public void incrementPublicSize(GridCacheEntryEx e) {
pubSize.incrementAndGet();
}

/** {@inheritDoc} */
@Override public void decrementPublicSize(GridCacheEntryEx e) {
pubSize.decrementAndGet();
}
}
Expand Up @@ -512,7 +512,7 @@ boolean own() {
if (partState == OWNING) if (partState == OWNING)
return true; return true;


assert partState== MOVING || partState == LOST; assert partState == MOVING || partState == LOST;


if (casState(state, OWNING)) { if (casState(state, OWNING)) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down Expand Up @@ -608,7 +608,7 @@ void tryEvictAsync(boolean updateSeq) {


GridDhtPartitionState partState = getPartState(state); GridDhtPartitionState partState = getPartState(state);


if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && getSize(state) == 0 &&
partState == RENTING && getReservations(state) == 0 && !groupReserved() && partState == RENTING && getReservations(state) == 0 && !groupReserved() &&
casState(state, EVICTED)) { casState(state, EVICTED)) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
Expand Down Expand Up @@ -652,7 +652,7 @@ private boolean addEvicting() {
* *
*/ */
private void clearEvicting() { private void clearEvicting() {
boolean free; boolean free;


while (true) { while (true) {
int cnt = evictGuard.get(); int cnt = evictGuard.get();
Expand Down Expand Up @@ -726,7 +726,7 @@ public void tryEvict() throws NodeStoppingException {
// Attempt to evict partition entries from cache. // Attempt to evict partition entries from cache.
clearAll(); clearAll();


if (isEmpty() && casState(state, EVICTED)) { if (isEmpty() && getSize(state) == 0 && casState(state, EVICTED)) {
if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Evicted partition: " + this); log.debug("Evicted partition: " + this);
// finishDestroy() will be initiated by clearEvicting(). // finishDestroy() will be initiated by clearEvicting().
Expand Down Expand Up @@ -808,6 +808,9 @@ public void updateCounter(long val) {
store.updateCounter(val); store.updateCounter(val);
} }


/**
* @param val Initial update index value.
*/
public void initialUpdateCounter(long val) { public void initialUpdateCounter(long val) {
store.updateInitialCounter(val); store.updateInitialCounter(val);
} }
Expand Down Expand Up @@ -970,16 +973,46 @@ private void clearDeferredDeletes() {
"createTime", U.format(createTime)); "createTime", U.format(createTime));
} }


/** {@inheritDoc} */
@Override public int publicSize() {
return getSize(state.get());
}

/** {@inheritDoc} */
@Override public void incrementPublicSize(GridCacheEntryEx e) {
while (true) {
long state = this.state.get();

assert getPartState(state) != EVICTED;

if (this.state.compareAndSet(state, setSize(state, getSize(state) + 1)))
return;
}
}

/** {@inheritDoc} */
@Override public void decrementPublicSize(GridCacheEntryEx e) {
while (true) {
long state = this.state.get();

assert getPartState(state) != EVICTED;
assert getSize(state) > 0;

if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1)))
return;
}
}

private static GridDhtPartitionState getPartState(long state) { private static GridDhtPartitionState getPartState(long state) {
return GridDhtPartitionState.fromOrdinal((int) (state & (0x0000000000000007L))); return GridDhtPartitionState.fromOrdinal((int)(state & (0x0000000000000007L)));
} }


private static long setPartState(long state, GridDhtPartitionState partState) { private static long setPartState(long state, GridDhtPartitionState partState) {
return (state & (~0x0000000000000007L)) | partState.ordinal(); return (state & (~0x0000000000000007L)) | partState.ordinal();
} }


private static int getReservations(long state) { private static int getReservations(long state) {
return (int) ((state & 0x00000000FFFF0000L) >> 16); return (int)((state & 0x00000000FFFF0000L) >> 16);
} }


private static long setReservations(long state, int reservations) { private static long setReservations(long state, int reservations) {
Expand All @@ -991,7 +1024,7 @@ private static int getSize(long state) {
} }


private static long setSize(long state, int size) { private static long setSize(long state, int size) {
return (state & (~0xFFFFFFFF00000000L)) | ((long) size << 32); return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32);
} }


/** /**
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheLocalConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCachePreloader; import org.apache.ignite.internal.processors.cache.GridCachePreloader;
Expand Down Expand Up @@ -118,7 +119,7 @@ protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onReconnected() { @Override public void onReconnected() {
map = new GridCacheConcurrentMapImpl( map = new GridCacheLocalConcurrentMap(
ctx, ctx,
entryFactory(), entryFactory(),
ctx.config().getNearConfiguration().getNearStartSize()); ctx.config().getNearConfiguration().getNearStartSize());
Expand Down

0 comments on commit 89c2ebb

Please sign in to comment.