Skip to content

Commit

Permalink
GG-19412 Fail Continuous Query registration and do not fail node if r…
Browse files Browse the repository at this point in the history
…emote filter class is missing
  • Loading branch information
pavlukhin committed Jul 11, 2019
1 parent b23d76e commit a28138f
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.managers.discovery;

import org.jetbrains.annotations.NotNull;

/**
* Exception which can be used to access a message which failed to be deserialized completely using Java serialization.
* Throwed from deserialization methods it can be caught by a caller.
* <p>
* Should be {@link RuntimeException} because of limitations of Java serialization mechanisms.
* <p>
* Catching {@link ClassNotFoundException} inside deserialization methods cannot do the same trick because
* Java deserialization remembers such exception internally and will rethrow it anyway upon returing to a user.
*/
public class IncompleteDeserializationException extends RuntimeException {
/** */
private static final long serialVersionUID = 0L;

/** */
private final DiscoveryCustomMessage m;

/**
* @param m Message.
*/
public IncompleteDeserializationException(@NotNull DiscoveryCustomMessage m) {
super(null, null, false, false);

this.m = m;
}

/**
* @return Message.
*/
@NotNull public DiscoveryCustomMessage message() {
return m;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1073,37 +1073,37 @@ public IgniteInternalFuture<?> stopRoutine(UUID routineId) {
doStop = true;
}

if (doStop) {
boolean stop = false;
if (doStop) {
boolean stop = false;

// Unregister routine locally.
LocalRoutineInfo routine = locInfos.remove(routineId);
// Unregister routine locally.
LocalRoutineInfo routine = locInfos.remove(routineId);

if (routine != null) {
stop = true;
if (routine != null) {
stop = true;

// Unregister handler locally.
unregisterHandler(routineId, routine.hnd, true);
}
// Unregister handler locally.
unregisterHandler(routineId, routine.hnd, true);
}

if (!stop && discoProtoVer == 2)
stop = routinesInfo.routineExists(routineId);
if (!stop && discoProtoVer == 2)
stop = routinesInfo.routineExists(routineId);

// Finish if routine is not found (wrong ID is provided).
if (!stop) {
stopFuts.remove(routineId);
// Finish if routine is not found (wrong ID is provided).
if (!stop) {
stopFuts.remove(routineId);

fut.onDone();

return fut;
}

try {
ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
}
catch (IgniteCheckedException e) {
fut.onDone(e);
}
try {
ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
}
catch (IgniteCheckedException e) {
fut.onDone(e);
}

if (ctx.isStopping())
fut.onDone();
Expand Down Expand Up @@ -1362,10 +1362,21 @@ private void processStartAckRequest(AffinityTopologyVersion topVer,
* @param req Start request.
*/
private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) {
UUID routineId = req.routineId();
if (node.id().equals(ctx.localNodeId()))
return;

UUID routineId = req.routineId();

if (req.deserializationException() != null && checkNodeFilter(req)) {
IgniteCheckedException err = new IgniteCheckedException(req.deserializationException());

req.addError(node.id(), err);

U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', err);

return;
}

StartRequestData data = req.startRequestData();

GridContinuousHandler hnd = data.handler();
Expand Down Expand Up @@ -1461,6 +1472,15 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage
req.addError(ctx.localNodeId(), err);
}

/** */
private boolean checkNodeFilter(StartRoutineDiscoveryMessage req) {
StartRequestData reqData = req.startRequestData();
IgnitePredicate<ClusterNode> prjPred;

return reqData == null || (prjPred = reqData.projectionPredicate()) == null
|| prjPred.apply(ctx.discovery().localNode());
}

/**
* @param sndId Sender node ID.
* @param msg Message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@

package org.apache.ignite.internal.processors.continuous;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

/**
*
* Discovery message used for Continuous Query registration.
*/
public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
Expand All @@ -35,7 +40,8 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
private final StartRequestData startReqData;

/** */
private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
// Initilized here as well to preserve compatibility with previous versions
private Map<UUID, IgniteCheckedException> errs = new HashMap<>();

/** */
private Map<Integer, T2<Long, Long>> updateCntrs;
Expand All @@ -46,6 +52,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** Keep binary flag. */
private boolean keepBinary;

/** */
private transient ClassNotFoundException deserEx;

/**
* @param routineId Routine id.
* @param startReqData Start request data.
Expand All @@ -69,6 +78,9 @@ public StartRequestData startRequestData() {
* @param e Exception.
*/
public void addError(UUID nodeId, IgniteCheckedException e) {
if (errs == null)
errs = new HashMap<>();

errs.put(nodeId, e);
}

Expand Down Expand Up @@ -107,7 +119,7 @@ public void addUpdateCounters(UUID nodeId, Map<Integer, T2<Long, Long>> cntrs) {
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
return errs;
return errs != null ? errs : Collections.emptyMap();
}

/**
Expand All @@ -124,7 +136,28 @@ public boolean keepBinary() {

/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
return new StartRoutineAckDiscoveryMessage(routineId, errs(), updateCntrs, updateCntrsPerNode);
}

/** */
private void readObject(ObjectInputStream in) throws IOException {
// Override default serialization in order to tolerate missing classes exceptions (e.g. remote filter class).
// We need this means because CQ registration process assumes that an "ack message" will be sent.
try {
in.defaultReadObject();
}
catch (ClassNotFoundException e) {
deserEx = e;

throw new IncompleteDeserializationException(this);
}
}

/**
* @return Exception occurred during deserialization.
*/
@Nullable public ClassNotFoundException deserializationException() {
return deserEx;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.ignite.spi.discovery.tcp.messages;

import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -95,7 +98,16 @@ public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msg
*/
@Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
if (msg == null) {
msg = U.unmarshal(marsh, msgBytes, ldr);
try {
msg = U.unmarshal(marsh, msgBytes, ldr);
}
catch (IgniteCheckedException e) {
// Try to resurrect a message in a case of deserialization failure
if (e.getCause() instanceof IncompleteDeserializationException)
return new CustomMessageWrapper(((IncompleteDeserializationException)e.getCause()).message());

throw e;
}

assert msg != null;
}
Expand Down

0 comments on commit a28138f

Please sign in to comment.