diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 93daeda2a3f07..9a6ff11ca6dc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -563,7 +563,7 @@ public void start() throws IgniteCheckedException { if (!isLocal()) initSize /= ctx.affinity().partitions(); - map = new GridCacheConcurrentMapImpl(ctx, entryFactory(), initSize); + map = new GridCacheLocalConcurrentMap(ctx, entryFactory(), initSize); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java index 10f5ca3a5b832..c1dbd0cb440ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java @@ -37,7 +37,7 @@ /** * Implementation of concurrent cache map. */ -public class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap { +public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap { /** Default load factor. */ private static final float DFLT_LOAD_FACTOR = 0.75f; @@ -53,9 +53,6 @@ public class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap { /** Cache context. */ private final GridCacheContext ctx; - /** Public size counter. */ - private final AtomicInteger pubSize = new AtomicInteger(); - /** * Creates a new, empty map with the specified initial * capacity. @@ -211,8 +208,12 @@ public GridCacheConcurrentMapImpl( topVer); } - if (sizeChange != 0) - pubSize.addAndGet(sizeChange); + assert Math.abs(sizeChange) <= 1; + + if (sizeChange == -1) + decrementPublicSize(cur); + else if (sizeChange == 1) + incrementPublicSize(cur); return cur; } @@ -241,21 +242,6 @@ public GridCacheConcurrentMapImpl( return map.size(); } - /** {@inheritDoc} */ - @Override public int publicSize() { - return pubSize.get(); - } - - /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { - pubSize.incrementAndGet(); - } - - /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { - pubSize.decrementAndGet(); - } - /** {@inheritDoc} */ @Override public Set keySet(final CacheEntryPredicate... filter) { final IgnitePredicate p = new IgnitePredicate() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java new file mode 100644 index 0000000000000..db99272ef685d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * GridCacheConcurrentMap implementation for local and near caches. + */ +public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl { + /** */ + private final AtomicInteger pubSize = new AtomicInteger(); + + public GridCacheLocalConcurrentMap(GridCacheContext ctx, + GridCacheMapEntryFactory factory, int initialCapacity) { + super(ctx, factory, initialCapacity); + } + + public GridCacheLocalConcurrentMap(GridCacheContext ctx, + GridCacheMapEntryFactory factory, int initialCapacity, float loadFactor, int concurrencyLevel) { + super(ctx, factory, initialCapacity, loadFactor, concurrencyLevel); + } + + /** {@inheritDoc} */ + @Override public int publicSize() { + return pubSize.get(); + } + + /** {@inheritDoc} */ + @Override public void incrementPublicSize(GridCacheEntryEx e) { + pubSize.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void decrementPublicSize(GridCacheEntryEx e) { + pubSize.decrementAndGet(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 1c7db68b8bcad..d3ec2afddaaa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -512,7 +512,7 @@ boolean own() { if (partState == OWNING) return true; - assert partState== MOVING || partState == LOST; + assert partState == MOVING || partState == LOST; if (casState(state, OWNING)) { if (log.isDebugEnabled()) @@ -608,7 +608,7 @@ void tryEvictAsync(boolean updateSeq) { GridDhtPartitionState partState = getPartState(state); - if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && + if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && getSize(state) == 0 && partState == RENTING && getReservations(state) == 0 && !groupReserved() && casState(state, EVICTED)) { if (log.isDebugEnabled()) @@ -652,7 +652,7 @@ private boolean addEvicting() { * */ private void clearEvicting() { - boolean free; + boolean free; while (true) { int cnt = evictGuard.get(); @@ -726,7 +726,7 @@ public void tryEvict() throws NodeStoppingException { // Attempt to evict partition entries from cache. clearAll(); - if (isEmpty() && casState(state, EVICTED)) { + if (isEmpty() && getSize(state) == 0 && casState(state, EVICTED)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); // finishDestroy() will be initiated by clearEvicting(). @@ -808,6 +808,9 @@ public void updateCounter(long val) { store.updateCounter(val); } + /** + * @param val Initial update index value. + */ public void initialUpdateCounter(long val) { store.updateInitialCounter(val); } @@ -970,8 +973,38 @@ private void clearDeferredDeletes() { "createTime", U.format(createTime)); } + /** {@inheritDoc} */ + @Override public int publicSize() { + return getSize(state.get()); + } + + /** {@inheritDoc} */ + @Override public void incrementPublicSize(GridCacheEntryEx e) { + while (true) { + long state = this.state.get(); + + assert getPartState(state) != EVICTED; + + if (this.state.compareAndSet(state, setSize(state, getSize(state) + 1))) + return; + } + } + + /** {@inheritDoc} */ + @Override public void decrementPublicSize(GridCacheEntryEx e) { + while (true) { + long state = this.state.get(); + + assert getPartState(state) != EVICTED; + assert getSize(state) > 0; + + if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1))) + return; + } + } + private static GridDhtPartitionState getPartState(long state) { - return GridDhtPartitionState.fromOrdinal((int) (state & (0x0000000000000007L))); + return GridDhtPartitionState.fromOrdinal((int)(state & (0x0000000000000007L))); } private static long setPartState(long state, GridDhtPartitionState partState) { @@ -979,7 +1012,7 @@ private static long setPartState(long state, GridDhtPartitionState partState) { } private static int getReservations(long state) { - return (int) ((state & 0x00000000FFFF0000L) >> 16); + return (int)((state & 0x00000000FFFF0000L) >> 16); } private static long setReservations(long state, int reservations) { @@ -991,7 +1024,7 @@ private static int getSize(long state) { } private static long setSize(long state, int size) { - return (state & (~0xFFFFFFFF00000000L)) | ((long) size << 32); + return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 7c1c38b81948f..0d62985297cc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheLocalConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCachePreloader; @@ -118,7 +119,7 @@ protected GridNearCacheAdapter(GridCacheContext ctx) { /** {@inheritDoc} */ @Override public void onReconnected() { - map = new GridCacheConcurrentMapImpl( + map = new GridCacheLocalConcurrentMap( ctx, entryFactory(), ctx.config().getNearConfiguration().getNearStartSize());