Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Apr 9, 2015
1 parent 5029aeb commit 88e1ff8
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 86 deletions.
Expand Up @@ -36,7 +36,6 @@
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.plugin.security.*; import org.apache.ignite.plugin.security.*;
import org.apache.ignite.plugin.segmentation.*; import org.apache.ignite.plugin.segmentation.*;
import org.apache.ignite.spi.*; import org.apache.ignite.spi.*;
Expand Down Expand Up @@ -172,9 +171,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Map of dynamic cache filters. */ /** Map of dynamic cache filters. */
private Map<String, CachePredicate> registeredCaches = new HashMap<>(); private Map<String, CachePredicate> registeredCaches = new HashMap<>();


/** */
private JdkMarshaller jdkMarsh = new JdkMarshaller();

/** @param ctx Context. */ /** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) { public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi()); super(ctx, ctx.config().getDiscoverySpi());
Expand Down Expand Up @@ -421,59 +417,40 @@ else if (evtType != EVT_NODE_METRICS_UPDATED)
}); });


spi.setDataExchange(new DiscoverySpiDataExchange() { spi.setDataExchange(new DiscoverySpiDataExchange() {
@Override public Map<Integer, byte[]> collect(UUID nodeId) { @Override public Map<Integer, Object> collect(UUID nodeId) {
assert nodeId != null; assert nodeId != null;


Map<Integer, byte[]> data = new HashMap<>(); Map<Integer, Object> data = new HashMap<>();


for (GridComponent comp : ctx.components()) { for (GridComponent comp : ctx.components()) {
Object compData = comp.collectDiscoveryData(nodeId); Object compData = comp.collectDiscoveryData(nodeId);


if (compData != null) { if (compData != null) {
assert comp.discoveryDataType() != null; assert comp.discoveryDataType() != null;


try { data.put(comp.discoveryDataType().ordinal(), compData);
byte[] bytes = jdkMarsh.marshal(compData);

data.put(comp.discoveryDataType().ordinal(), bytes);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery data " +
"[comp=" + comp + ", data=" + compData + ']', e);
}
} }
} }


return data; return data;
} }


@Override public void onExchange(UUID joiningNodeId, @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Object> data) {
UUID nodeId, for (Map.Entry<Integer, Object> e : data.entrySet()) {
Map<Integer, byte[]> data,
ClassLoader clsLdr) {
for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
GridComponent comp = null; GridComponent comp = null;


for (GridComponent c : ctx.components()) { for (GridComponent c : ctx.components()) {
if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == entry.getKey()) { if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) {
comp = c; comp = c;


break; break;
} }
} }


if (comp != null) { if (comp != null)
try { comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
Object compData = jdkMarsh.unmarshal(entry.getValue(), clsLdr);

comp.onDiscoveryDataReceived(joiningNodeId, nodeId, compData);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal discovery data for component: " + comp, e);
}
}
else else
U.warn(log, "Received discovery data for unknown component: " + entry.getKey()); U.warn(log, "Received discovery data for unknown component: " + e.getKey());
} }
} }
}); });
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
Expand Down Expand Up @@ -1559,7 +1560,7 @@ void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteChec
/** /**
* Discovery data. * Discovery data.
*/ */
private static class DiscoveryData implements Externalizable { private static class DiscoveryData implements Externalizable, GridPeerDeployAware {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand All @@ -1577,6 +1578,16 @@ public DiscoveryData() {
// No-op. // No-op.
} }


@Override
public Class<?> deployClass() {
return U.peerDeployAware0(items).deployClass();
}

@Override
public ClassLoader classLoader() {
return U.peerDeployAware0(items).classLoader();
}

/** /**
* @param nodeId Node ID. * @param nodeId Node ID.
*/ */
Expand Down Expand Up @@ -1616,7 +1627,7 @@ public void addItem(DiscoveryDataItem item) {
/** /**
* Discovery data item. * Discovery data item.
*/ */
private static class DiscoveryDataItem implements Externalizable { private static class DiscoveryDataItem implements Externalizable, GridPeerDeployAware {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand Down Expand Up @@ -1666,6 +1677,16 @@ public DiscoveryDataItem() {
this.interval = interval; this.interval = interval;
} }


@Override
public Class<?> deployClass() {
return prjPred.getClass();
}

@Override
public ClassLoader classLoader() {
return prjPred.getClass().getClassLoader();
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException { @Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeUuid(out, routineId); U.writeUuid(out, routineId);
Expand Down
Expand Up @@ -33,18 +33,16 @@ public interface DiscoverySpiDataExchange {
* @param joiningNodeId ID of new node that joins topology. * @param joiningNodeId ID of new node that joins topology.
* @return Collection of discovery data objects from different components. * @return Collection of discovery data objects from different components.
*/ */
public Map<Integer, byte[]> collect(UUID joiningNodeId); public Map<Integer, Object> collect(UUID joiningNodeId);


/** /**
* Notifies discovery manager about data received from remote node. * Notifies discovery manager about data received from remote node.
* *
* @param joiningNodeId Joining node ID. * @param joiningNodeId Joining node ID.
* @param nodeId Remote node ID for which data is provided. * @param nodeId Remote node ID for which data is provided.
* @param data Collection of marshalled discovery data objects from different components. * @param data Collection of marshalled discovery data objects from different components.
* @param clsLdr Class loader to use for discovery data unmarshalling.
*/ */
public void onExchange(UUID joiningNodeId, public void onExchange(UUID joiningNodeId,
UUID nodeId, UUID nodeId,
Map<Integer, byte[]> data, Map<Integer, Object> data);
ClassLoader clsLdr);
} }
Expand Up @@ -897,10 +897,9 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {


if (dataMap != null) { if (dataMap != null) {
for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) { for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet()) {
exchange.onExchange(newNodeId, // exchange.onExchange(newNodeId,
entry.getKey(), // entry.getKey(),
entry.getValue(), // entry.getValue());
exchangeClassLoader(newNodeId));
} }
} }


Expand All @@ -923,8 +922,8 @@ else if (log.isDebugEnabled())


Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); Map<Integer, byte[]> data = msg.newNodeDiscoveryData();


if (data != null) // if (data != null)
exchange.onExchange(newNodeId, newNodeId, data, exchangeClassLoader(newNodeId)); // exchange.onExchange(newNodeId, newNodeId, data);
} }
} }
} }
Expand Down

0 comments on commit 88e1ff8

Please sign in to comment.