Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1204,8 +1204,6 @@ private void onChannelOpened0(UUID rmtNodeId, GridIoMessage initMsg, Channel cha

if (topicOrd >= 0)
initMsg.topic(GridTopic.fromOrdinal(topicOrd));
else
initMsg.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()));
}

byte plc = initMsg.policy();
Expand Down Expand Up @@ -1253,8 +1251,6 @@ private void onMessage0(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {

if (topicOrd >= 0)
msg.topic(GridTopic.fromOrdinal(topicOrd));
else
msg.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()));
}

if (!started) {
Expand Down Expand Up @@ -1980,9 +1976,6 @@ private IgniteInternalFuture<Channel> openChannel(
);

try {
if (topicOrd < 0)
ioMsg.prepareMarshal(marsh);

return ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg);
}
catch (IgniteSpiException e) {
Expand Down Expand Up @@ -2054,9 +2047,6 @@ else if (async)
ackC.apply(null);
}
else {
if (topicOrd < 0)
ioMsg.prepareMarshal(marsh);

try {
if ((CommunicationSpi<?>)getSpi() instanceof TcpCommunicationSpi)
getTcpCommunicationSpi().sendMessage(node, ioMsg, ackC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/**
* Wrapper for all grid messages.
*/
public class GridIoMessage implements Message, SpanTransport {
public class GridIoMessage implements MarshallableMessage, SpanTransport {
/** */
public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE;

Expand Down Expand Up @@ -216,19 +217,14 @@ public int partition() {
return null;
}

/**
* @param marsh Marshaller.
*/
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (topic != null && topicBytes == null)
topicBytes = U.marshal(marsh, topic);
}

/**
* @param marsh Marshaller.
* @param ldr Class loader.
*/
public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
if (topicBytes != null && topic == null) {
topic = U.unmarshal(marsh, topicBytes, ldr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBeanSerializer;
import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest;
import org.apache.ignite.internal.managers.deployment.GridDeploymentRequestSerializer;
import org.apache.ignite.internal.managers.deployment.GridDeploymentRequestMarshallableSerializer;
import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
import org.apache.ignite.internal.managers.deployment.GridDeploymentResponseSerializer;
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequestSerializer;
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponseSerializer;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageSerializer;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageMarshallableSerializer;
import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessageSerializer;
import org.apache.ignite.internal.processors.authentication.UserAuthenticateResponseMessage;
Expand Down Expand Up @@ -403,12 +403,15 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh
factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer());
factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer());
factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer());
factory.register((short)8, GridIoMessage::new, new GridIoMessageSerializer());
factory.register((short)8, GridIoMessage::new,
new GridIoMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)9, GridIoUserMessage::new, new GridIoUserMessageSerializer());
factory.register((short)10, GridDeploymentInfoBean::new, new GridDeploymentInfoBeanSerializer());
factory.register((short)11, GridDeploymentRequest::new, new GridDeploymentRequestSerializer());
factory.register((short)11, GridDeploymentRequest::new,
new GridDeploymentRequestMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)12, GridDeploymentResponse::new, new GridDeploymentResponseSerializer());
factory.register((short)13, GridEventStorageMessage::new, new GridEventStorageMessageSerializer());
factory.register((short)13, GridEventStorageMessage::new,
new GridEventStorageMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)16, GridCacheTxRecoveryRequest::new, new GridCacheTxRecoveryRequestSerializer());
factory.register((short)17, GridCacheTxRecoveryResponse::new, new GridCacheTxRecoveryResponseSerializer());
factory.register((short)18, IndexQueryResultMeta::new, new IndexQueryResultMetaSerializer());
Expand Down Expand Up @@ -515,7 +518,8 @@ public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarsh
factory.register((short)169, ServiceSingleNodeDeploymentResult::new, new ServiceSingleNodeDeploymentResultSerializer());
factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new, new GridQueryKillRequestSerializer());
factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new, new GridQueryKillResponseSerializer());
factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new, new GridIoSecurityAwareMessageSerializer());
factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new,
new GridIoSecurityAwareMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new, new SessionChannelMessageSerializer());
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
factory.register((short)177, TcpInverseConnectionResponseMessage::new, new TcpInverseConnectionResponseMessageSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteNotPeerDeployable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;

import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
Expand Down Expand Up @@ -72,9 +71,6 @@ class GridDeploymentCommunication {
/** */
private final GridBusyLock busyLock = new GridBusyLock();

/** */
private final Marshaller marsh;

/**
* Creates new instance of deployment communication.
*
Expand All @@ -92,8 +88,6 @@ class GridDeploymentCommunication {
processDeploymentRequest(nodeId, msg);
}
};

marsh = ctx.marshaller();
}

/**
Expand Down Expand Up @@ -186,18 +180,6 @@ private void processResourceRequest(UUID nodeId, GridDeploymentRequest req) {
if (log.isDebugEnabled())
log.debug("Received peer class/resource loading request [originatingNodeId=" + nodeId + ", req=" + req + ']');

if (req.responseTopic() == null) {
try {
req.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process deployment request (will ignore) [" +
"originatingNodeId=" + nodeId + ", req=" + req + ']', e);

return;
}
}

GridDeploymentResponse res = new GridDeploymentResponse();

GridDeployment dep = ctx.deploy().getDeployment(req.classLoaderId());
Expand Down Expand Up @@ -417,9 +399,6 @@ GridDeploymentResponse sendResourceRequest(final String rsrcName, IgniteUuid cls

long start = U.currentTimeMillis();

if (req.responseTopic() != null && !ctx.localNodeId().equals(dstNode.id()))
req.prepareMarshal(marsh);

ctx.io().sendToGridTopic(dstNode, TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);

if (log.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;

/**
* Deployment request.
*/
public class GridDeploymentRequest implements Message {
public class GridDeploymentRequest implements MarshallableMessage {
/** Response topic. Response should be sent back to this topic. */
private Object resTopic;

Expand Down Expand Up @@ -134,19 +134,14 @@ public void nodeIds(Collection<UUID> nodeIds) {
this.nodeIds = nodeIds;
}

/**
* @param marsh Marshaller.
*/
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (resTopic != null && resTopicBytes == null)
resTopicBytes = U.marshal(marsh, resTopic);
}

/**
* @param marsh Marshaller.
* @param ldr Class loader.
*/
public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
if (resTopicBytes != null && resTopic == null) {
resTopic = U.unmarshal(marsh, resTopicBytes, ldr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,15 +1025,6 @@ private <T extends Event> List<T> query(IgnitePredicate<T> p, Collection<? exten

GridEventStorageMessage res = (GridEventStorageMessage)msg;

try {
res.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()), null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal events query response: " + msg, e);

return;
}

synchronized (qryMux) {
if (uids.remove(nodeId)) {
if (res.events() != null)
Expand Down Expand Up @@ -1146,11 +1137,8 @@ private void sendMessage(Collection<? extends ClusterNode> nodes, GridTopic topi
if (locNode != null)
ctx.io().sendToGridTopic(locNode, topic, msg, plc);

if (!rmtNodes.isEmpty()) {
msg.prepareMarshal(marsh);

if (!rmtNodes.isEmpty())
ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
}
}

/**
Expand Down Expand Up @@ -1216,7 +1204,7 @@ private class RequestListener implements GridMessageListener {
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for event filter " +
"(is peer class loading turned on?): " + req);

req.finishUnmarshal(marsh, U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(), ctx.config()));
req.finishUnmarshalFilters(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config()));

filter = (IgnitePredicate<Event>)req.filter();

Expand Down Expand Up @@ -1252,9 +1240,6 @@ private class RequestListener implements GridMessageListener {
if (log.isDebugEnabled())
log.debug("Sending event query response to node [nodeId=" + nodeId + "res=" + res + ']');

if (!ctx.localNodeId().equals(nodeId))
res.prepareMarshal(marsh);

ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL);
}
catch (ClusterTopologyCheckedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.jetbrains.annotations.Nullable;

/**
* Event storage message.
*/
public class GridEventStorageMessage implements Message {
public class GridEventStorageMessage implements MarshallableMessage {
/** */
private Object resTopic;

Expand Down Expand Up @@ -205,10 +205,8 @@ public void loaderParticipants(@Nullable Map<UUID, IgniteUuid> ldrParties) {
return ErrorMessage.error(errMsg);
}

/**
* @param marsh Marshaller.
*/
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (resTopic != null && resTopicBytes == null)
resTopicBytes = U.marshal(marsh, resTopic);

Expand All @@ -219,31 +217,33 @@ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
evtsBytes = U.marshal(marsh, evts);
}

/**
* @param marsh Marshaller.
* @param ldr Class loader.
* @param filterClsLdr Class loader for filter.
*/
public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader filterClsLdr) throws IgniteCheckedException {
/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
if (resTopicBytes != null && resTopic == null) {
resTopic = U.unmarshal(marsh, resTopicBytes, ldr);

resTopicBytes = null;
}

if (filterBytes != null && filter == null && filterClsLdr != null) {
filter = U.unmarshal(marsh, filterBytes, filterClsLdr);

filterBytes = null;
}

if (evtsBytes != null && evts == null) {
evts = U.unmarshal(marsh, evtsBytes, ldr);

evtsBytes = null;
}
}

/**
* @param marsh Marshaller.
* @param filterClsLdr Class loader for filter.
*/
public void finishUnmarshalFilters(Marshaller marsh, ClassLoader filterClsLdr) throws IgniteCheckedException {
if (filterBytes != null && filter == null) {
filter = U.unmarshal(marsh, filterBytes, filterClsLdr);

filterBytes = null;
}
}

/** {@inheritDoc} */
@Override public short directType() {
return 13;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ public void testResponseReceivingOnDeploymentRequestOfUnknownClass() throws Exce
GridDeploymentRequest req = new GridDeploymentRequest(TEST_TOPIC_NAME, locDep.classLoaderId(),
UNKNOWN_CLASS_NAME, false);

req.prepareMarshal(locNode.context().marshaller());

locNode.context().io().sendToGridTopic(remNode.localNode(), TOPIC_CLASSLOAD, req, GridIoPolicy.P2P_POOL);

// Сhecks that the expected response has been received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,6 @@ private void processTestClassLoaderHotRedeployment(DeploymentMode depMode) throw
assert taskCls1.getClassLoader() != taskCls2.getClassLoader();
assert taskCls1 != taskCls2;

// final AtomicBoolean undeployed = new AtomicBoolean(false);
//
// grid2.events().localListen(new GridLocalEventListener() {
// @Override public void onEvent(GridEvent evt) {
// if (evt.type() == EVT_TASK_UNDEPLOYED) {
// assert ((GridDeploymentEvent)evt).alias().equals(TASK_NAME);
//
// undeployed.set(true);
// }
// }
// }, EVT_TASK_UNDEPLOYED);

ignite2.compute().localDeployTask(taskCls1, taskCls1.getClassLoader());

Integer res1 = ignite1.compute().execute(taskCls1, Collections.singletonList(ignite2.cluster().localNode().id()));
Expand All @@ -149,10 +137,6 @@ private void processTestClassLoaderHotRedeployment(DeploymentMode depMode) throw
info("Result2: " + res2);

assert !res1.equals(res2);

// Thread.sleep(P2P_TIMEOUT * 2);
//
// assert undeployed.get();
}
finally {
stopGrid(2);
Expand Down