Skip to content

Commit

Permalink
GG-17434 Fix memory leak on unstable topology caused by partition res…
Browse files Browse the repository at this point in the history
…ervation
  • Loading branch information
tledkov committed Apr 29, 2019
1 parent afa3492 commit da0599e
Show file tree
Hide file tree
Showing 7 changed files with 464 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
Expand Down Expand Up @@ -261,6 +262,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Distributed latch manager. */
private ExchangeLatchManager latchMgr;

/** List of exchange aware components. */
private final List<PartitionsExchangeAware> exchangeAwareComps = new ArrayList<>();

/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
Expand Down Expand Up @@ -1136,6 +1140,31 @@ public void scheduleResendPartitions() {
}
}

/**
* Registers component that will be notified on every partition map exchange.
*
* @param comp Component to be registered.
*/
public void registerExchangeAwareComponent(PartitionsExchangeAware comp) {
exchangeAwareComps.add(new PartitionsExchangeAwareWrapper(comp));
}

/**
* Removes exchange aware component from list of listeners.
*
* @param comp Component to be registered.
*/
public void unregisterExchangeAwareComponent(PartitionsExchangeAware comp) {
exchangeAwareComps.remove(new PartitionsExchangeAwareWrapper(comp));
}

/**
* @return List of registered exchange listeners.
*/
public List<PartitionsExchangeAware> exchangeAwareComponents() {
return U.sealList(exchangeAwareComps);
}

/**
* Partition refresh callback for selected cache groups.
* For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
Expand Down Expand Up @@ -3579,6 +3608,73 @@ private void printToLog() {
}
}

/**
* That wrapper class allows avoiding the propagation of the user's exceptions into the Exchange thread.
*/
private class PartitionsExchangeAwareWrapper implements PartitionsExchangeAware {
/** */
private final PartitionsExchangeAware delegate;

/**
* Creates a new wrapper.
* @param delegate Delegate.
*/
public PartitionsExchangeAwareWrapper(PartitionsExchangeAware delegate) {
this.delegate = delegate;
}

/** {@inheritDoc} */
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
try {
delegate.onInitBeforeTopologyLock(fut);
}
catch (Exception e) {
U.warn(log, "Failed to execute exchange callback.", e);
}
}

/** {@inheritDoc} */
@Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
try {
delegate.onInitAfterTopologyLock(fut);
}
catch (Exception e) {
U.warn(log, "Failed to execute exchange callback.", e);
}
}

/** {@inheritDoc} */
@Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
try {
delegate.onDoneBeforeTopologyUnlock(fut);
}
catch (Exception e) {
U.warn(log, "Failed to execute exchange callback.", e);
}
}

/** {@inheritDoc} */
@Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
try {
delegate.onDoneAfterTopologyUnlock(fut);
}
catch (Exception e) {
U.warn(log, "Failed to execute exchange callback.", e);
}
}

/** {@inheritDoc} */
@Override public int hashCode() {
return delegate.hashCode();
}

/** {@inheritDoc} */
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override public boolean equals(Object obj) {
return delegate.equals(obj);
}
}

/**
* Class to limit action count for unique objects.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,9 @@ else if (msg instanceof WalStateAbstractMessage)

cctx.cache().registrateProxyRestart(resolveCacheRequests(exchActions), afterLsnrCompleteFut);

for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
comp.onInitBeforeTopologyLock(this);

updateTopologies(crdNode);

timeBag.finishGlobalStage("Determine exchange type");
Expand Down Expand Up @@ -876,13 +879,19 @@ else if (msg instanceof WalStateAbstractMessage)
}
}

for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
comp.onInitAfterTopologyLock(this);

if (exchLog.isInfoEnabled())
exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
}
catch (IgniteInterruptedCheckedException e) {
assert cctx.kernalContext().isStopping();
assert cctx.kernalContext().isStopping() || cctx.kernalContext().clientDisconnected();

onDone(new IgniteCheckedException("Node stopped"));
if (cctx.kernalContext().clientDisconnected())
onDone(new IgniteCheckedException("Client disconnected"));
else
onDone(new IgniteCheckedException("Node stopped"));

throw e;
}
Expand Down Expand Up @@ -2159,6 +2168,9 @@ private String exchangeTimingsLogMessage(String header, List<String> timings) {
if (err == null)
cctx.coordinators().onExchangeDone(events().discoveryCache());

for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
comp.onDoneBeforeTopologyUnlock(this);

// Create and destory caches and cache proxies.
cctx.cache().onExchangeDone(initialVersion(), exchActions, err);

Expand Down Expand Up @@ -2252,6 +2264,9 @@ private String exchangeTimingsLogMessage(String header, List<String> timings) {
}
}

for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
comp.onDoneAfterTopologyUnlock(this);

if (firstDiscoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;

/**
* Interface which allows to subscribe a component for partition map exchange events
* (via {@link GridCachePartitionExchangeManager#registerExchangeAwareComponent(PartitionsExchangeAware)}).
* Heavy computations shouldn't be performed in listener methods: aware components will be notified
* synchronously from exchange thread.
* Runtime exceptions thrown by listener methods will trigger failure handler (as per exchange thread is critical).
* Please ensure that your implementation will never throw an exception if you subscribe to exchange events for
* non-system-critical activities.
*/
public interface PartitionsExchangeAware {
/**
* Callback from exchange process initialization; called before topology is locked.
*
* @param fut Partition map exchange future.
*/
public default void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
// No-op.
}

/**
* Callback from exchange process initialization; called after topology is locked.
* Guarantees that no more data updates will be performed on local node until exchange process is completed.
*
* @param fut Partition map exchange future.
*/
public default void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
// No-op.
}

/**
* Callback from exchange process completion; called before topology is unlocked.
* Guarantees that no updates were performed on local node since exchange process started.
*
* @param fut Partition map exchange future.
*/
public default void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
// No-op.
}

/**
* Callback from exchange process completion; called after topology is unlocked.
*
* @param fut Partition map exchange future.
*/
public default void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
// No-op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;

/**
* Partition reservation key.
Expand Down Expand Up @@ -48,6 +49,13 @@ public String cacheName() {
return cacheName;
}

/**
* @return Topology version of reservation.
*/
public AffinityTopologyVersion topologyVersion() {
return topVer;
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
Expand All @@ -70,4 +78,9 @@ public String cacheName() {

return res;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(PartitionReservationKey.class, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
Expand All @@ -45,31 +49,40 @@
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;

/**
* Class responsible for partition reservation for queries executed on local node.
* Prevents partitions from being eveicted from node during query execution.
* Class responsible for partition reservation for queries executed on local node. Prevents partitions from being
* evicted from node during query execution.
*/
public class PartitionReservationManager {
public class PartitionReservationManager implements PartitionsExchangeAware {
/** Special instance of reservable object for REPLICATED caches. */
private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable();

/** Kernal context. */
private final GridKernalContext ctx;

/** Reservations. */
/** Group reservations cache. When affinity version is not changed and all primary partitions must be reserved
* we get group reservation from this map instead of create new reservation group.
*/
private final ConcurrentMap<PartitionReservationKey, GridReservable> reservations = new ConcurrentHashMap<>();

/** Logger. */
private final IgniteLogger log;

/**
* Constructor.
*
* @param ctx Context.
*/
public PartitionReservationManager(GridKernalContext ctx) {
this.ctx = ctx;

log = ctx.log(PartitionReservationManager.class);

ctx.cache().context().exchange().registerExchangeAwareComponent(this);
}

/**
* @param cacheIds Cache IDs.
* @param topVer Topology version.
* @param reqTopVer Topology version from request.
* @param explicitParts Explicit partitions list.
* @param nodeId Node ID.
* @param reqId Request ID.
Expand All @@ -78,12 +91,14 @@ public PartitionReservationManager(GridKernalContext ctx) {
*/
public PartitionReservation reservePartitions(
@Nullable List<Integer> cacheIds,
AffinityTopologyVersion topVer,
AffinityTopologyVersion reqTopVer,
final int[] explicitParts,
UUID nodeId,
long reqId
) throws IgniteCheckedException {
assert topVer != null;
assert reqTopVer != null;

AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);

if (F.isEmpty(cacheIds))
return new PartitionReservation(Collections.emptyList());
Expand Down Expand Up @@ -307,6 +322,31 @@ private static GridDhtLocalPartition partition(GridCacheContext<?, ?> cctx, int
return cctx.topology().localPartition(p, NONE, false);
}

/**
* Cleanup group reservations cache on change affinity version.
*/
@Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) {
try {
// Must not do anything at the exchange thread. Dispatch to the management thread pool.
ctx.closure().runLocal(() -> {
AffinityTopologyVersion topVer = ctx.cache().context().exchange()
.lastAffinityChangedTopologyVersion(fut.topologyVersion());

reservations.forEach((key, r) -> {
if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) {
assert r instanceof GridDhtPartitionsReservation;

((GridDhtPartitionsReservation)r).invalidate();
}
});
},
GridIoPolicy.MANAGEMENT_POOL);
}
catch (Throwable e) {
log.error("Unexpected exception on start reservations cleanup", e);
}
}

/**
* Mapper fake reservation object for replicated caches.
*/
Expand Down

0 comments on commit da0599e

Please sign in to comment.