Skip to content

Commit

Permalink
GG-27819 Prevent AssertionError on message unmarshalling, when classL…
Browse files Browse the repository at this point in the history
…oaderId contains id of node that already left
  • Loading branch information
denis-chudov committed Mar 10, 2020
1 parent a4e16b8 commit 5168791
Show file tree
Hide file tree
Showing 10 changed files with 514 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,30 +344,30 @@ private void initFilter(IgnitePredicate<Event> filter, GridKernalContext ctx) th

ClassLoader ldr = null;

if (cache != null) {
GridCacheDeploymentManager depMgr = cache.context().deploy();

GridDeploymentInfo depInfo = wrapper.depInfo;

if (depInfo != null) {
depMgr.p2pContext(
nodeId,
depInfo.classLoaderId(),
depInfo.userVersion(),
depInfo.deployMode(),
depInfo.participants()
);
}
try {
if (cache != null) {
GridCacheDeploymentManager depMgr = cache.context().deploy();

GridDeploymentInfo depInfo = wrapper.depInfo;

if (depInfo != null) {
depMgr.p2pContext(
nodeId,
depInfo.classLoaderId(),
depInfo.userVersion(),
depInfo.deployMode(),
depInfo.participants()
);
}

ldr = depMgr.globalLoader();
}
else {
U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
"when peer class loading is enabled: " + wrapper.cacheName + ". Will try to unmarshal " +
"with default class loader.");
}
ldr = depMgr.globalLoader();
}
else {
U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
"when peer class loading is enabled: " + wrapper.cacheName + ". Will try to unmarshal " +
"with default class loader.");
}

try {
wrapper.p2pUnmarshal(ctx.config().getMarshaller(), U.resolveClassLoader(ldr, ctx.config()));
}
catch (IgniteCheckedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,13 @@ void undeployTask(UUID nodeId, String taskName) {
// Check for nested execution. In that case, if task
// is available locally by name, then we should ignore
// class loader ID.
GridDeployment dep = locStore.getDeployment(meta);
GridDeployment dep = checkDeployment(locStore.getDeployment(meta), "local");

if (dep == null) {
dep = ldrStore.getDeployment(ldr.classLoaderId());
dep = checkDeployment(ldrStore.getDeployment(ldr.classLoaderId()), "perLoader");

if (dep == null)
dep = verStore.getDeployment(ldr.classLoaderId());
dep = checkDeployment(verStore.getDeployment(ldr.classLoaderId()), "perVersion");
}

return dep;
Expand All @@ -311,6 +311,25 @@ else if (locDep != null) {
return locStore.explicitDeploy(cls, clsLdr);
}

/**
* Checks and logs possibly incorrect deployments.
*
* @param deployment Deployment.
* @param store Store name.
* @return Deployment.
*/
private GridDeployment checkDeployment(GridDeployment deployment, String store) {
if (deployment != null
&& deployment.participants() == null
&& !ctx.discovery().localNode().id().equals(deployment.classLoaderId().globalId())) {
log.warning("Possibly incorrect deployment detected in '" + store + "', can't be used to deserialize " +
"message on target node, participants=null, localNodeId=" + ctx.discovery().localNode().id() +
", deployment=" + deployment);
}

return deployment;
}

/**
* Gets any deployment by loader ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* Deployment metadata.
*/
class GridDeploymentMetadata {
public class GridDeploymentMetadata {
/** Deployment mode. */
private DeploymentMode depMode;

Expand Down Expand Up @@ -202,7 +202,7 @@ IgniteUuid classLoaderId() {
*
* @param clsLdrId Property clsLdrId.
*/
void classLoaderId(IgniteUuid clsLdrId) {
public void classLoaderId(IgniteUuid clsLdrId) {
this.clsLdrId = clsLdrId;
}

Expand Down Expand Up @@ -288,4 +288,4 @@ public IgnitePredicate<ClusterNode> nodeFilter() {
@Override public String toString() {
return S.toString(GridDeploymentMetadata.class, this, "seqNum", clsLdrId != null ? clsLdrId.localId() : "n/a");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public void p2pContext(
String userVer,
DeploymentMode mode,
Map<UUID, IgniteUuid> participants
) {
) throws IgnitePeerToPeerClassLoadingException {
localLdrId.set(ldrId);

assert depEnabled;
Expand Down Expand Up @@ -426,6 +426,12 @@ public void p2pContext(
depInfo = deps.get(ldrId);

if (depInfo == null) {
if (!sndId.equals(ldrId.globalId()) && participants == null) {
throw new IgnitePeerToPeerClassLoadingException("Unable to load class using class loader with id=" +
ldrId + ", senderId=" + sndId + ", participants=null (loader id doesn't match sender id " +
"and there are no more participants)");
}

depInfo = new CachedDeploymentInfo<>(sndId, ldrId, userVer, mode, participants);

CachedDeploymentInfo<K, V> old = deps.putIfAbsent(ldrId, depInfo);
Expand Down Expand Up @@ -668,7 +674,7 @@ else if (locDep.compareAndSet(null, newDep))
*
* @param deployable Deployable object.
*/
public void prepare(GridCacheDeployable deployable) {
public void prepare(GridCacheDeployable deployable) throws IgnitePeerToPeerClassLoadingException {
assert depEnabled;

// Only set deployment info if it was not set automatically.
Expand All @@ -684,14 +690,35 @@ public void prepare(GridCacheDeployable deployable) {
}
}

if (dep != null)
if (dep != null) {
checkDeploymentIsCorrect(dep, deployable);

deployable.prepare(dep);
}

if (log.isDebugEnabled())
log.debug("Prepared grid cache deployable [dep=" + dep + ", deployable=" + deployable + ']');
}
}

/**
* Checks if given deployment is correct to prepare a message.
*
* @param deployment Deployment.
* @param deployable Deployable message.
* @throws IgnitePeerToPeerClassLoadingException If deployment is incorrect.
*/
private void checkDeploymentIsCorrect(GridDeploymentInfoBean deployment, GridCacheDeployable deployable)
throws IgnitePeerToPeerClassLoadingException {
if (deployment.participants() == null
&& !cctx.localNode().id().equals(deployment.classLoaderId().globalId())) {
throw new IgnitePeerToPeerClassLoadingException("Could not use deployment to prepare deployable, " +
"because local node id does not correspond with class loader id, and there are no more participants, " +
"localNodeId=" + cctx.localNode().id() + ", deployment=" + deployment + ", deployable=" + deployable +
", locDep=" + locDep.get());
}
}

/**
* @return First global deployment.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,23 +1605,23 @@ private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) {

GridDeploymentInfo bean = cacheMsg.deployInfo();

if (bean != null) {
assert depEnabled : "Received deployment info while peer class loading is disabled [nodeId=" + nodeId +
", msg=" + cacheMsg + ']';

cctx.deploy().p2pContext(
nodeId,
bean.classLoaderId(),
bean.userVersion(),
bean.deployMode(),
bean.participants()
);
try {
if (bean != null) {
assert depEnabled : "Received deployment info while peer class loading is disabled [nodeId=" + nodeId +
", msg=" + cacheMsg + ']';

if (log.isDebugEnabled())
log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']');
}
cctx.deploy().p2pContext(
nodeId,
bean.classLoaderId(),
bean.userVersion(),
bean.deployMode(),
bean.participants()
);

if (log.isDebugEnabled())
log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']');
}

try {
cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
}
catch (IgniteCheckedException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.IgniteCheckedException;

/**
* This exception is thrown in communication-related cases when it is impossible to load class from any peer.
*/
public class IgnitePeerToPeerClassLoadingException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;

/** */
public IgnitePeerToPeerClassLoadingException(String s) {
super(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -913,21 +913,21 @@ private void notifyCallback0(UUID nodeId,

ClassLoader ldr = depMgr.globalLoader();

if (ctx.config().isPeerClassLoadingEnabled()) {
GridDeploymentInfo depInfo = e.deployInfo();

if (depInfo != null) {
depMgr.p2pContext(
nodeId,
depInfo.classLoaderId(),
depInfo.userVersion(),
depInfo.deployMode(),
depInfo.participants()
);
try {
if (ctx.config().isPeerClassLoadingEnabled()) {
GridDeploymentInfo depInfo = e.deployInfo();

if (depInfo != null) {
depMgr.p2pContext(
nodeId,
depInfo.classLoaderId(),
depInfo.userVersion(),
depInfo.deployMode(),
depInfo.participants()
);
}
}
}

try {
e.unmarshal(cctx, ldr);

Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, e);
Expand Down

0 comments on commit 5168791

Please sign in to comment.