From 66d46ecacd0a6babce6e0a722c580d6646eeb9cb Mon Sep 17 00:00:00 2001 From: vozerov-gridgain Date: Tue, 1 Sep 2015 11:23:50 +0300 Subject: [PATCH] Platforms: WIP on better interfaces. --- .../processors/platform/PlatformContext.java | 13 +++++---- .../cluster/PlatformClusterNodeFilter.java | 28 +++++++++++++++++++ .../datastreamer/PlatformStreamReceiver.java | 27 ++++++++++++++++++ ...ava => PlatformClusterNodeFilterImpl.java} | 7 ++--- ...r.java => PlatformStreamReceiverImpl.java} | 27 +++++++++--------- 5 files changed, 78 insertions(+), 24 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java rename modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/{PlatformClusterNodeFilter.java => PlatformClusterNodeFilterImpl.java} (90%) rename modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/{PlatformStreamReceiver.java => PlatformStreamReceiverImpl.java} (84%) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index bff0fc812e7b5..4c703604b8b29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.platform; -import java.util.Collection; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cluster.ClusterMetrics; @@ -33,15 +31,18 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; +import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter; import org.apache.ignite.internal.processors.platform.compute.PlatformJob; +import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiver; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.stream.StreamReceiver; import org.jetbrains.annotations.Nullable; +import java.util.Collection; +import java.util.UUID; + /** * Platform context. Acts as an entry point for platform operations. */ @@ -266,7 +267,7 @@ public interface PlatformContext { * @param keepPortable Keep portable flag. * @return Stream receiver. */ - public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable); + public PlatformStreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable); /** * Create cluster node filter. @@ -274,5 +275,5 @@ public interface PlatformContext { * @param filter Native filter. * @return Cluster node filter. */ - public IgnitePredicate createClusterNodeFilter(Object filter); + public PlatformClusterNodeFilter createClusterNodeFilter(Object filter); } \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java new file mode 100644 index 0000000000000..fd550fb0a9ad5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cluster; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.lang.IgnitePredicate; + +/** + * Platform cluster node filter marker interface. + */ +public interface PlatformClusterNodeFilter extends IgnitePredicate { + // No-op. +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java new file mode 100644 index 0000000000000..910892077bf40 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.datastreamer; + +import org.apache.ignite.stream.StreamReceiver; + +/** + * Platform data streamer filter marker interface. + */ +public interface PlatformStreamReceiver extends StreamReceiver { + // No-op. +} diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java similarity index 90% rename from modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java rename to modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java index eb203cdaeb32a..5ba9a854b5d75 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java @@ -25,20 +25,19 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.IgniteInstanceResource; /** * Interop cluster node filter. */ -public class PlatformClusterNodeFilter extends PlatformAbstractPredicate implements IgnitePredicate { +public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate implements PlatformClusterNodeFilter { /** */ private static final long serialVersionUID = 0L; /** * {@link java.io.Externalizable} support. */ - public PlatformClusterNodeFilter() { + public PlatformClusterNodeFilterImpl() { // No-op. } @@ -48,7 +47,7 @@ public PlatformClusterNodeFilter() { * @param pred .Net portable predicate. * @param ctx Kernal context. */ - public PlatformClusterNodeFilter(Object pred, PlatformContext ctx) { + public PlatformClusterNodeFilterImpl(Object pred, PlatformContext ctx) { super(pred, 0, ctx); } diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java similarity index 84% rename from modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java rename to modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java index 851216a91e568..92250c0193f59 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiver.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java @@ -17,11 +17,6 @@ package org.apache.ignite.internal.processors.platform.datastreamer; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import java.util.Map; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -33,12 +28,17 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.stream.StreamReceiver; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Map; /** * Interop receiver. */ -public class PlatformStreamReceiver extends PlatformAbstractPredicate implements StreamReceiver { +public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implements PlatformStreamReceiver { /** */ private static final long serialVersionUID = 0L; @@ -48,7 +48,7 @@ public class PlatformStreamReceiver extends PlatformAbstractPredicate impl /** * Constructor. */ - public PlatformStreamReceiver() + public PlatformStreamReceiverImpl() { super(); } @@ -60,7 +60,7 @@ public PlatformStreamReceiver() * @param ptr Pointer to receiver in the native platform. * @param ctx Kernal context. */ - public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) { + public PlatformStreamReceiverImpl(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) { super(pred, ptr, ctx); assert pred != null; @@ -69,7 +69,7 @@ public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, Platf } /** {@inheritDoc} */ - @Override public void receive(IgniteCache cache, Collection> collection) + @Override public void receive(IgniteCache cache, Collection> collection) throws IgniteException { assert ctx != null; @@ -82,15 +82,15 @@ public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, Platf writer.writeInt(collection.size()); - for (Map.Entry e : collection) { + for (Map.Entry e : collection) { writer.writeObject(e.getKey()); writer.writeObject(e.getValue()); } out.synchronize(); - ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, - new PlatformCache(ctx, cache, keepPortable), mem.pointer(), keepPortable); + ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepPortable), + mem.pointer(), keepPortable); } } @@ -116,5 +116,4 @@ public void setIgniteInstance(Ignite ignite) { keepPortable = in.readBoolean(); } - } \ No newline at end of file