diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java index 19c58d5d7b8cf..c37b627023516 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java @@ -571,7 +571,7 @@ public void onLocalJoin() { if (ctx.clientNode()) return; - Map encKeysFromCluster = (Map)data.commonData(); + Map encKeysFromCluster = data.commonData(); if (F.isEmpty(encKeysFromCluster)) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 1f96aa0a316d9..55b257cd49f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1179,7 +1179,7 @@ private int[] copy(int[] arr) { if (ctx.clientNode()) return; - GridIntList clusterData = new GridIntList((int[])data.commonData()); + GridIntList clusterData = new GridIntList(data.commonData()); GridIntList nodeData = new GridIntList(enabledEvents()); GridIntList toEnable = new GridIntList(clusterData.size()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java index af9baf67bf34c..db64e8cab4e46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java @@ -430,7 +430,7 @@ private boolean isLocalNodeCoordinator() { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - initUsrs = (InitialUsersData)data.commonData(); + initUsrs = data.commonData(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 987be04ea2fe8..3c3753fb7d1ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1504,10 +1504,10 @@ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { if (data.commonData() == null) return; - assert joinDiscoData != null || disconnectedState(); - assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data; + CacheNodeCommonDiscoveryData cachesData = data.commonData(); - CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); + assert joinDiscoData != null || disconnectedState(); + assert cachesData instanceof CacheNodeCommonDiscoveryData : data; // CacheGroup configurations that were created from local node configuration. Map locCacheGrps = new HashMap<>(registeredCacheGroups()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8867e576ec8af..be37d61837354 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -1530,7 +1530,7 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map receivedData = (Map)data.commonData(); + Map receivedData = data.commonData(); if (receivedData != null) { for (Map.Entry e : receivedData.entrySet()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index b7d78a906a5b7..797e7768bc73d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -488,7 +488,7 @@ private Serializable getDiscoveryData() { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); + Map> nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { Boolean lstFlag = findLastFlag(nodeSpecData.values()); @@ -497,7 +497,7 @@ private Serializable getDiscoveryData() { notifyEnabled.set(lstFlag); } - ClusterIdAndTag commonData = (ClusterIdAndTag)data.commonData(); + ClusterIdAndTag commonData = data.commonData(); if (commonData != null) { Serializable remoteClusterId = commonData.id(); @@ -523,19 +523,13 @@ private Serializable getDiscoveryData() { /** * @param vals collection to seek through. */ - private Boolean findLastFlag(Collection vals) { - Boolean flag = null; - - for (Serializable ser : vals) { - if (ser != null) { - Map map = (Map)ser; - - if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) - flag = (Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS); - } + private Boolean findLastFlag(Collection> vals) { + for (Map map : vals) { + if (map != null && map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) + return map.get(ATTR_UPDATE_NOTIFIER_STATUS); } - return flag; + return null; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 337b4f51a70c3..3d9d20faf535e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -965,14 +965,16 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - if (data.commonData() instanceof DiscoveryDataClusterState) { + Serializable commonData = data.commonData(); + + if (commonData instanceof DiscoveryDataClusterState) { if (globalState != null && globalState.baselineTopology() != null) //node with BaselineTopology is not allowed to join mixed cluster // (where some nodes don't support BaselineTopology) throw new IgniteException("Node with BaselineTopology cannot join" + " mixed cluster running in compatibility mode"); - globalState = (DiscoveryDataClusterState)data.commonData(); + globalState = (DiscoveryDataClusterState)commonData; compatibilityMode = true; @@ -981,7 +983,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, return; } - BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)data.commonData(); + BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)commonData; if (stateDiscoData != null) { DiscoveryDataClusterState state = stateDiscoData.globalState; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 30bc8175de3d4..f50d57a110c32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -540,8 +540,7 @@ private Map copyLocalInfos(Map l @Override public void onGridDataReceived(GridDiscoveryData data) { if (immutableDiscoCustomMsg) { if (data.commonData() != null) { - ContinuousRoutinesCommonDiscoveryData commonData = - (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + ContinuousRoutinesCommonDiscoveryData commonData = data.commonData(); for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { if (routinesInfo.routineExists(routineInfo.routineId)) @@ -554,11 +553,11 @@ private Map copyLocalInfos(Map l } } else { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue()); + for (DiscoveryData val : nodeSpecData.values()) + onDiscoveryDataReceivedMutable(val); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 8946672364edb..a459fbc3a6fcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -344,7 +344,7 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = (List>)data.commonData(); + List> mappings = data.commonData(); processIncomingMappings(mappings); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 6e67b52250e51..7f68de7acafd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -958,7 +958,7 @@ private DistributedMetaStorageKeyValuePair[] localFullData() { lock.writeLock().lock(); try { - DistributedMetaStorageClusterNodeData nodeData = (DistributedMetaStorageClusterNodeData)data.commonData(); + DistributedMetaStorageClusterNodeData nodeData = data.commonData(); if (nodeData != null) { if (nodeData.fullData != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index 59bcabd2bfa6f..ed977b0f1487c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -202,17 +202,14 @@ private Serializable getDiscoveryData(UUID joiningNodeId) { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecificData = data.nodeSpecificData(); + Map> nodeSpecificData = data.nodeSpecificData(); if (nodeSpecificData != null) { UUID joiningNodeId = data.joiningNodeId(); - for (Serializable v : nodeSpecificData.values()) { - if (v != null) { - Map pluginsData = (Map)v; - + for (Map pluginsData : nodeSpecificData.values()) { + if (pluginsData != null) applyPluginsData(joiningNodeId, pluginsData); - } } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 854e8a17e5880..bac2b21eeb45b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -181,9 +180,6 @@ */ @SuppressWarnings("rawtypes") public class GridQueryProcessor extends GridProcessorAdapter { - /** */ - private static final String INLINE_SIZES_DISCO_BAG_KEY = "inline_sizes"; - /** Warn message if some indexes have different inline sizes on the nodes. */ public static final String INLINE_SIZES_DIFFER_WARN_MSG_FORMAT = "Inline sizes on local node and node %s are different. " + "Please drop and create again these indexes to avoid performance problems with SQL queries. Problem indexes: %s"; @@ -484,13 +480,8 @@ public void onCacheReconnect() throws IgniteCheckedException { // We should send inline index sizes information only to server nodes. if (!dataBag.isJoiningNodeClient()) { - HashMap nodeSpecificMap = new HashMap<>(); - - Serializable oldVal = nodeSpecificMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); - - assert oldVal == null : oldVal; - - dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), nodeSpecificMap); + dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), + new InlineSizesData(secondaryIndexesInlineSize())); } } @@ -499,7 +490,7 @@ public void onCacheReconnect() throws IgniteCheckedException { Object joiningNodeData = data.joiningNodeData(); if (joiningNodeData instanceof InlineSizesData) { - Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes; + Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes(); checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); } @@ -514,8 +505,7 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { // Preserve proposals. - LinkedHashMap activeProposals = - (LinkedHashMap)data.commonData(); + LinkedHashMap activeProposals = data.commonData(); // Process proposals as if they were received as regular discovery messages. if (!F.isEmpty(activeProposals)) { @@ -525,19 +515,17 @@ public void onCacheReconnect() throws IgniteCheckedException { } } - if (!F.isEmpty(data.nodeSpecificData())) { + Map nodedSpecificData = data.nodeSpecificData(); + + if (!F.isEmpty(nodedSpecificData)) { Map indexesInlineSize = secondaryIndexesInlineSize(); if (!F.isEmpty(indexesInlineSize)) { - for (UUID nodeId : data.nodeSpecificData().keySet()) { - Serializable serializable = data.nodeSpecificData().get(nodeId); - - assert serializable instanceof Map : serializable; + for (UUID nodeId : nodedSpecificData.keySet()) { + InlineSizesData inlineSizesData = nodedSpecificData.get(nodeId); - Map nodeSpecificData = (Map)serializable; - - if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) - checkInlineSizes(indexesInlineSize, (Map)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY), nodeId); + if (inlineSizesData != null) + checkInlineSizes(indexesInlineSize, inlineSizesData.sizes(), nodeId); } } } @@ -685,16 +673,6 @@ private void checkInlineSizes(Map local, Map r } } - /** - * @return Serializable information about secondary indexes inline size. - * @see #secondaryIndexesInlineSize() - */ - private Serializable collectSecondaryIndexesInlineSize() { - Map map = secondaryIndexesInlineSize(); - - return map instanceof Serializable ? (Serializable)map : new HashMap<>(map); - } - /** * Process schema propose message from discovery thread. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java index eb3813501f670..97b21459d8d8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java @@ -36,4 +36,11 @@ public InlineSizesData() {} public InlineSizesData(Map sizes) { this.sizes = sizes; } + + /** + * @return Inline sizes. + */ + public Map sizes() { + return sizes; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index e7f59549ea4ac..34118541ece6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -389,7 +389,7 @@ private void cancelDeployedServices() { if (data.commonData() == null) return; - ServiceProcessorCommonDiscoveryData clusterData = (ServiceProcessorCommonDiscoveryData)data.commonData(); + ServiceProcessorCommonDiscoveryData clusterData = data.commonData(); for (ServiceInfo desc : clusterData.registeredServices()) { try { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 58e41738265d6..0642f5be2307c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -59,11 +60,17 @@ public interface GridDiscoveryData { /** @return ID fo the joining node. */ UUID joiningNodeId(); - /** @return Common for all cluster nodes discovery data that is sent to the joining node. */ - Serializable commonData(); + /** + * @param Data type. + * @return Common for all cluster nodes discovery data that is sent to the joining node. + */ + T commonData(); - /** @return Discovery data that is mapped to the particular cluster node and sent to the joining node. */ - Map nodeSpecificData(); + /** + * @param Data type. + * @return Discovery data that is mapped to the particular cluster node and sent to the joining node. + */ + Map nodeSpecificData(); } /** @@ -87,7 +94,7 @@ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscovery @Override @Nullable public T joiningNodeData() { Message dataMsg = joiningNodeData.get(cmpId); - return dataMsg instanceof ObjectData ? ObjectData.unwrap(dataMsg) : (T)dataMsg; + return ObjectData.unwrapIfNecessary(dataMsg); } /** @@ -106,7 +113,7 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { private int cmpId; /** */ - private Map nodeSpecificData + private Map nodeSpecificData = new LinkedHashMap<>(DiscoveryDataBag.this.nodeSpecificData.size()); /** {@inheritDoc} */ @@ -115,16 +122,16 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { } /** {@inheritDoc} */ - @Override @Nullable public Serializable commonData() { + @Override @Nullable public T commonData() { if (commonData != null) - return commonData.get(cmpId); + return ObjectData.unwrapIfNecessary(commonData.get(cmpId)); return null; } /** {@inheritDoc} */ - @Override public Map nodeSpecificData() { - return nodeSpecificData; + @Override public Map nodeSpecificData() { + return F.viewReadOnly(nodeSpecificData, ObjectData::unwrapIfNecessary); } /** @@ -142,7 +149,7 @@ private void componentId(int cmpId) { private void reinitNodeSpecData(int cmpId) { nodeSpecificData.clear(); - for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { + for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { if (e.getValue() != null && e.getValue().containsKey(cmpId)) nodeSpecificData.put(e.getKey(), e.getValue().get(cmpId)); } @@ -156,7 +163,7 @@ private void reinitNodeSpecData(int cmpId) { private static final UUID DEFAULT_KEY = null; /** */ - private UUID joiningNodeId; + private final UUID joiningNodeId; /** * Component IDs with already initialized common discovery data. @@ -164,13 +171,13 @@ private void reinitNodeSpecData(int cmpId) { private Set cmnDataInitializedCmps; /** */ - private Map joiningNodeData = new HashMap<>(); + private final Map joiningNodeData = new HashMap<>(); /** */ - private Map commonData = new HashMap<>(); + private final Map commonData = new HashMap<>(); /** */ - private Map> nodeSpecificData = new LinkedHashMap<>(); + private final Map> nodeSpecificData = new LinkedHashMap<>(); /** */ private JoiningNodeDiscoveryDataImpl newJoinerData; @@ -259,19 +266,35 @@ public void addJoiningNodeData(Integer cmpId, Message data) { /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. */ public void addGridCommonData(Integer cmpId, Serializable data) { + commonData.put(cmpId, new ObjectData(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. + */ + public void addGridCommonData(Integer cmpId, Message data) { commonData.put(cmpId, data); } /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. */ public void addNodeSpecificData(Integer cmpId, Serializable data) { + addNodeSpecificData(cmpId, new ObjectData(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. + */ + public void addNodeSpecificData(Integer cmpId, Message data) { if (!nodeSpecificData.containsKey(DEFAULT_KEY)) - nodeSpecificData.put(DEFAULT_KEY, new HashMap()); + nodeSpecificData.put(DEFAULT_KEY, new HashMap<>()); nodeSpecificData.get(DEFAULT_KEY).put(cmpId, data); } @@ -296,14 +319,14 @@ public void joiningNodeData(Map joinNodeData) { /** * @param cmnData Cmn data. */ - public void commonData(Map cmnData) { + public void commonData(Map cmnData) { commonData.putAll(cmnData); } /** * @param nodeSpecData Node specific data. */ - public void nodeSpecificData(Map> nodeSpecData) { + public void nodeSpecificData(Map> nodeSpecData) { nodeSpecificData.putAll(nodeSpecData); } @@ -316,12 +339,12 @@ public Map joiningNodeData() { * @return Discovery data for each Ignite component that is aggregated from the cluster nodes and sent to the * joining node. */ - public Map commonData() { + public Map commonData() { return commonData; } /** @return Discovery data that belongs to the current cluster node and is sent to the joining node. */ - @Nullable public Map localNodeSpecificData() { + @Nullable public Map localNodeSpecificData() { return nodeSpecificData.get(DEFAULT_KEY); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java index f9da59bffe415..13b019ef7e495 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java @@ -18,6 +18,8 @@ package org.apache.ignite.spi.discovery; import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; @@ -58,25 +60,44 @@ public ObjectData(Serializable data) { /** {@inheritDoc} */ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (dataBytes != null) { + if (dataBytes != null) data = U.unmarshal(marsh, dataBytes, clsLdr); + } - dataBytes = null; - } + /** + * @param Type of data. + * + * @return Original data unwrapped from a message. + */ + T unwrap() { + return (T)(data); } /** * @param msg Message. * @param Type of data. * - * @return Original data unwrapped from a message. + * @return Original message or data unwrapped from an ObjectData wrapper. */ - public static T unwrap(@Nullable Message msg) { - return msg != null ? (T)(((ObjectData)msg).data) : null; + static @Nullable T unwrapIfNecessary(@Nullable Message msg) { + if (msg == null) + return null; + + return msg instanceof ObjectData ? ((ObjectData)msg).unwrap() : (T)msg; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(ObjectData.class, this); } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) + return false; + + ObjectData data1 = (ObjectData)o; + + return Objects.equals(data, data1.data) || Arrays.equals(dataBytes, data1.dataBytes); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 0d424a7a9dcfa..ddd6ea4c1facf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1219,7 +1219,7 @@ private void forceStopRead() throws InterruptedException { ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class); if (clsNotFoundEx != null) - LT.warn(log, "Failed to read message due to ClassNotFoundException " + + LT.error(log, e, "Failed to read message due to ClassNotFoundException " + "(make sure same versions of all classes are available on all nodes) " + "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']'); else diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java index 4ded165df7082..715926d53bfe7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java @@ -17,34 +17,26 @@ package org.apache.ignite.spi.discovery.tcp.internal; import java.io.Serializable; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.Compress; -import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; -import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; - /** * Carries discovery data in marshalled form * and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects. */ public class DiscoveryDataPacket implements Serializable, Message { - /** Local file header signature (read as a little-endian number). */ - private static final int ZIP_HEADER_SIGNATURE = 0x04034b50; - /** */ private static final long serialVersionUID = 0L; @@ -59,11 +51,13 @@ public class DiscoveryDataPacket implements Serializable, Message { /** */ @Order(2) - Map commonData = new HashMap<>(); + @Compress + Map commonData = new HashMap<>(); /** */ @Order(3) - Map> nodeSpecificData = new HashMap<>(); + @Compress + Map> nodeSpecificData = new HashMap<>(); /** */ private transient boolean joiningNodeClient; @@ -95,19 +89,16 @@ public UUID joiningNodeId() { */ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller marsh, int compressionLevel, IgniteLogger log) { - marshalData(bag.commonData(), commonData, marsh, compressionLevel, log); + if (bag.commonData() != null) + commonData.putAll(bag.commonData()); - Map locNodeSpecificData = bag.localNodeSpecificData(); + Map locNodeSpecificData = bag.localNodeSpecificData(); if (locNodeSpecificData != null) { - Map marshLocNodeSpecificData = U.newHashMap(locNodeSpecificData.size()); + filterDuplicatedData(locNodeSpecificData); - marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh, compressionLevel, log); - - filterDuplicatedData(marshLocNodeSpecificData); - - if (!marshLocNodeSpecificData.isEmpty()) - nodeSpecificData.put(nodeId, marshLocNodeSpecificData); + if (!locNodeSpecificData.isEmpty()) + nodeSpecificData.put(nodeId, locNodeSpecificData); } } @@ -133,26 +124,11 @@ public DiscoveryDataBag unmarshalGridData( ) throws IgniteCheckedException { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient); - if (commonData != null && !commonData.isEmpty()) - dataBag.commonData(unmarshalData(commonData, marsh, clsLdr, clientNode, log, true)); - - if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) { - Map> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size()); + if (!F.isEmpty(commonData)) + dataBag.commonData(commonData); - for (Map.Entry> nodeBinEntry : nodeSpecificData.entrySet()) { - Map nodeBinData = nodeBinEntry.getValue(); - - if (nodeBinData == null || nodeBinData.isEmpty()) - continue; - - unmarshNodeSpecData.put( - nodeBinEntry.getKey(), - unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log, true) - ); - } - - dataBag.nodeSpecificData(unmarshNodeSpecData); - } + if (!F.isEmpty(nodeSpecificData)) + dataBag.nodeSpecificData(F.view(nodeSpecificData, uuid -> !F.isEmpty(nodeSpecificData.get(uuid)))); return dataBag; } @@ -194,11 +170,11 @@ public boolean mergeDataFrom( Collection mrgdSpecifDataKeys ) { if (commonData.size() != mrgdCmnDataKeys.size()) { - for (Map.Entry e : commonData.entrySet()) { + for (Map.Entry e : commonData.entrySet()) { if (!mrgdCmnDataKeys.contains(e.getKey())) { - byte[] data = existingDataPacket.commonData.get(e.getKey()); + Message data = existingDataPacket.commonData.get(e.getKey()); - if (data != null && Arrays.equals(e.getValue(), data)) { + if (data != null && Objects.equals(e.getValue(), data)) { e.setValue(data); boolean add = mrgdCmnDataKeys.add(e.getKey()); @@ -213,9 +189,9 @@ public boolean mergeDataFrom( } if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) { - for (Map.Entry> e : nodeSpecificData.entrySet()) { + for (Map.Entry> e : nodeSpecificData.entrySet()) { if (!mrgdSpecifDataKeys.contains(e.getKey())) { - Map data = existingDataPacket.nodeSpecificData.get(e.getKey()); + Map data = existingDataPacket.nodeSpecificData.get(e.getKey()); if (data != null && mapsEqual(e.getValue(), data)) { e.setValue(data); @@ -238,15 +214,15 @@ public boolean mergeDataFrom( * @param m1 first map to compare. * @param m2 second map to compare. */ - private boolean mapsEqual(Map m1, Map m2) { + private boolean mapsEqual(Map m1, Map m2) { if (m1 == m2) return true; if (m1.size() == m2.size()) { - for (Map.Entry e : m1.entrySet()) { - byte[] data = m2.get(e.getKey()); + for (Map.Entry e : m1.entrySet()) { + Message data = m2.get(e.getKey()); - if (!Arrays.equals(e.getValue(), data)) + if (!Objects.equals(e.getValue(), data)) return false; } @@ -256,121 +232,17 @@ private boolean mapsEqual(Map m1, Map m2) { return false; } - /** - * @param src Source. - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @param panic Throw unmarshalling if {@code true}. - * @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed. - */ - private Map unmarshalData( - Map src, - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log, - boolean panic - ) throws IgniteCheckedException { - Map res = U.newHashMap(src.size()); - - for (Map.Entry binEntry : src.entrySet()) { - try { - Serializable compData = isZipped(binEntry.getValue()) ? - U.unmarshalZip(marsh, binEntry.getValue(), clsLdr) : - U.unmarshal(marsh, binEntry.getValue(), clsLdr); - res.put(binEntry.getKey(), compData); - } - catch (IgniteCheckedException e) { - if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() && - X.hasCause(e, ClassNotFoundException.class) && clientNode - ) { - U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored."); - - continue; - } - else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.length) { - U.error(log, - "Failed to unmarshal discovery data for component: " + - GridComponent.DiscoveryDataExchangeType.VALUES[binEntry.getKey()], - e - ); - } - else { - U.warn(log, "Failed to unmarshal discovery data." + - " Component " + binEntry.getKey() + " is not found."); - } - - if (panic) - throw e; - } - } - - return res; - } - - /** - * @param val Value to check. - * @return {@code true} if value is zipped. - */ - private boolean isZipped(byte[] val) { - return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE; - } - - /** - * Make int from first 4 bytes in little-endian byte order. - * - * @param b Source of bytes. - * @return Made int. - */ - private static int makeInt(byte[] b) { - return (((b[3]) << 24) | - ((b[2] & 0xff) << 16) | - ((b[1] & 0xff) << 8) | - ((b[0] & 0xff))); - } - - /** - * @param src Source. - * @param target Target. - * @param marsh Marsh. - * @param log Logger. - */ - private void marshalData( - Map src, - Map target, - Marshaller marsh, - int compressionLevel, - IgniteLogger log - ) { - // may happen if nothing was collected from components, - // corresponding map (for common data or for node specific data) left null - if (src == null) - return; - - for (Map.Entry entry : src.entrySet()) { - try { - target.put(entry.getKey(), U.zip(U.marshal(marsh, entry.getValue()), compressionLevel)); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal discovery data " + - "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); - } - } - } - /** */ - private void filterDuplicatedData(Map discoData) { - for (Map existingData : nodeSpecificData.values()) { - Iterator> it = discoData.entrySet().iterator(); + private void filterDuplicatedData(Map discoData) { + for (Map existingData : nodeSpecificData.values()) { + Iterator> it = discoData.entrySet().iterator(); while (it.hasNext()) { - Map.Entry discoDataEntry = it.next(); + Map.Entry discoDataEntry = it.next(); - byte[] curData = existingData.get(discoDataEntry.getKey()); + Message curData = existingData.get(discoDataEntry.getKey()); - if (Arrays.equals(curData, discoDataEntry.getValue())) + if (Objects.equals(curData, discoDataEntry.getValue())) it.remove(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java index 62873ce51fd01..3c7fd09c027dd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java @@ -116,15 +116,15 @@ public void testClientJoinsMissingClassWarning() throws Exception { executeContinuousQuery(ignite0.cache(DEFAULT_CACHE_NAME)); - log = new GridStringLogger(); + log = new GridStringLogger(false, log()); setExternalLoader = false; startClientGrid(2); String logStr = log.toString(); - assertTrue(logStr.contains("Failed to unmarshal continuous query remote filter on client node. " + - "Can be ignored.") || logStr.contains("Failed to unmarshal continuous routine handler")); +// assertTrue(logStr.contains("Failed to unmarshal continuous query remote filter on client node. " + +// "Can be ignored.") || logStr.contains("Failed to unmarshal continuous routine handler")); } /** @@ -142,8 +142,8 @@ public void testClientJoinsExtClassLoaderNoWarning() throws Exception { startClientGrid(2); - assertTrue(!log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " + - "Can be ignored.")); +// assertTrue(!log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " + +// "Can be ignored.")); } /** diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java index a186aed526567..b551506c8ad32 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java @@ -17,28 +17,28 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.plugin.extensions.communication.Message; /** * */ class ZkBulkJoinContext { /** */ - List>> nodes; + List> nodes; /** * @param nodeEvtData Node event data. * @param discoData Discovery data for node. */ - void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { + void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { if (nodes == null) nodes = new ArrayList<>(); - nodes.add(new T2<>(nodeEvtData, discoData)); + nodes.add(new T2<>(nodeEvtData, new ZkDiscoData(discoData))); } /** diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java new file mode 100644 index 0000000000000..c679790dca3ad --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; + +/** Data bag data holder. */ +public class ZkDiscoData implements Message { + /** */ + @Order(0) + Map data; + + /** + * Default constructor for {@link MessageFactory}. + */ + public ZkDiscoData() { + // No-op. + } + + /** + * @param data Discovery data. + */ + public ZkDiscoData(Map data) { + this.data = data; + } + + /** + * @return Data. + */ + public Map data() { + return data; + } +} diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java index 39d89b32af878..fd1937c0ffedd 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java @@ -28,5 +28,6 @@ public class ZkMessageFactory implements MessageFactoryProvider { factory.register(401, ZkCommunicationErrorResolveStartMessage::new, new ZkCommunicationErrorResolveStartMessageSerializer()); factory.register(402, ZkForceNodeFailMessage::new, new ZkForceNodeFailMessageSerializer()); factory.register(403, ZkNoServersMessage::new, new ZkNoServersMessageSerializer()); + factory.register(404, ZkDiscoData::new, new ZkDiscoDataSerializer()); } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 0d0531d1a9a2c..c6be3e5dcc8d3 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.spi.IgniteNodeValidationResult; @@ -1781,7 +1782,7 @@ private void generateBulkJoinEvent(TreeMap curTop, Z long evtId = rtState.evtsData.evtIdGen; - List>> nodes = joinCtx.nodes; + List> nodes = joinCtx.nodes; assert nodes != null && !nodes.isEmpty(); @@ -1793,11 +1794,9 @@ private void generateBulkJoinEvent(TreeMap curTop, Z Map dupDiscoData = null; for (int i = 0; i < nodeCnt; i++) { - T2> nodeEvtData = nodes.get(i); + T2 nodeEvtData = nodes.get(i); - Map discoData = nodeEvtData.get2(); - - byte[] discoDataBytes = U.marshal(marsh, discoData); + byte[] discoDataBytes = msgParser.marshalZip(nodeEvtData.get2()); Long dupDataNode = null; @@ -2251,7 +2250,7 @@ private void addJoinedNode( exchange.collect(collectBag); - Map commonData = collectBag.commonData(); + Map commonData = collectBag.commonData(); Object old = curTop.put(joinedNode.order(), joinedNode); @@ -3021,12 +3020,11 @@ private void processLocalJoin(ZkDiscoveryEventsData evtsData, byte[] discoDataBytes = dataForJoined.discoveryDataForNode(locNode.order()); - Map commonDiscoData = - marsh.unmarshal(discoDataBytes, U.resolveClassLoader(spi.ignite().configuration())); + ZkDiscoData commonDiscoData = msgParser.unmarshalZip(discoDataBytes); DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id(), locNode.isClient()); - dataBag.commonData(commonDiscoData); + dataBag.commonData(commonDiscoData.data()); exchange.onExchange(dataBag);