From 59e6fec0b92c353ee5e128b9343a59f4b99bd468 Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Thu, 8 Dec 2016 14:53:16 +0300 Subject: [PATCH] IGNITE-4027 Extract PlatformTargetProxy interface This closes #1188 --- .../platform/PlatformAbstractTarget.java | 268 +++--------------- .../platform/PlatformAsyncTarget.java | 44 +++ .../platform/PlatformNoopProcessor.java | 41 +-- .../platform/PlatformProcessor.java | 42 +-- .../platform/PlatformProcessorImpl.java | 87 +++--- .../processors/platform/PlatformTarget.java | 103 ++++--- .../platform/PlatformTargetProxy.java | 126 ++++++++ .../platform/PlatformTargetProxyImpl.java | 222 +++++++++++++++ .../binary/PlatformBinaryProcessor.java | 6 +- .../platform/cache/PlatformCache.java | 15 +- .../platform/cache/PlatformCacheIterator.java | 2 +- .../cache/affinity/PlatformAffinity.java | 4 +- .../affinity/PlatformAffinityFunction.java | 7 +- .../PlatformAffinityFunctionTarget.java | 4 +- .../query/PlatformAbstractQueryCursor.java | 4 +- .../query/PlatformContinuousQueryProxy.java | 3 +- .../callback/PlatformCallbackGateway.java | 6 +- .../callback/PlatformCallbackUtils.java | 6 +- .../cluster/PlatformClusterGroup.java | 18 +- .../platform/compute/PlatformCompute.java | 15 +- .../datastreamer/PlatformDataStreamer.java | 4 +- .../PlatformStreamReceiverImpl.java | 8 +- .../datastructures/PlatformAtomicLong.java | 4 +- .../PlatformAtomicReference.java | 8 +- .../PlatformAtomicSequence.java | 2 +- .../platform/events/PlatformEvents.java | 15 +- .../platform/messaging/PlatformMessaging.java | 9 +- .../platform/services/PlatformServices.java | 27 +- .../transactions/PlatformTransactions.java | 8 +- .../platform/utils/PlatformFutureUtils.java | 14 +- .../utils/PlatformListenableTarget.java | 62 ++++ .../cpp/jni/include/ignite/jni/exports.h | 3 - .../cpp/jni/include/ignite/jni/java.h | 7 - .../platforms/cpp/jni/project/vs/module.def | 2 - modules/platforms/cpp/jni/src/exports.cpp | 8 - modules/platforms/cpp/jni/src/java.cpp | 76 ++--- .../Apache.Ignite.Core.csproj | 1 + .../Apache.Ignite.Core/Impl/Common/Future.cs | 13 +- .../Impl/Common/Listenable.cs | 49 ++++ .../Impl/Compute/ComputeImpl.cs | 4 +- .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 2 +- .../Impl/Unmanaged/IgniteJniNativeMethods.cs | 8 - .../Impl/Unmanaged/UnmanagedUtils.cs | 5 - 43 files changed, 817 insertions(+), 545 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java create mode 100644 modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 2df86acee9ce8..506470b5b24af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -24,16 +24,16 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; +import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** * Abstract interop target. */ -public abstract class PlatformAbstractTarget implements PlatformTarget { +public abstract class PlatformAbstractTarget implements PlatformTarget, PlatformAsyncTarget { /** Constant: TRUE.*/ protected static final int TRUE = 1; @@ -60,144 +60,6 @@ protected PlatformAbstractTarget(PlatformContext platformCtx) { log = platformCtx.kernalContext().log(PlatformAbstractTarget.class); } - /** {@inheritDoc} */ - @Override public long inLongOutLong(int type, long val) throws Exception { - try { - return processInLongOutLong(type, val); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public long inStreamOutLong(int type, long memPtr) throws Exception { - try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { - BinaryRawReaderEx reader = platformCtx.reader(mem); - - return processInStreamOutLong(type, reader, mem); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object inStreamOutObject(int type, long memPtr) throws Exception { - try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) { - BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null; - - return processInStreamOutObject(type, reader); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public void outStream(int type, long memPtr) throws Exception { - try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { - PlatformOutputStream out = mem.output(); - - BinaryRawWriterEx writer = platformCtx.writer(out); - - processOutStream(type, writer); - - out.synchronize(); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object outObject(int type) throws Exception { - try { - return processOutObject(type); - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception { - try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { - BinaryRawReaderEx reader = platformCtx.reader(inMem); - - try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) { - PlatformOutputStream out = outMem.output(); - - BinaryRawWriterEx writer = platformCtx.writer(out); - - processInStreamOutStream(type, reader, writer); - - out.synchronize(); - } - } - catch (Exception e) { - throw convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr) - throws Exception { - PlatformMemory inMem = null; - PlatformMemory outMem = null; - - try { - BinaryRawReaderEx reader = null; - - if (inMemPtr != 0) { - inMem = platformCtx.memory().get(inMemPtr); - - reader = platformCtx.reader(inMem); - } - - PlatformOutputStream out = null; - BinaryRawWriterEx writer = null; - - if (outMemPtr != 0) { - outMem = platformCtx.memory().get(outMemPtr); - - out = outMem.output(); - - writer = platformCtx.writer(out); - } - - Object res = processInObjectStreamOutObjectStream(type, arg, reader, writer); - - if (out != null) - out.synchronize(); - - return res; - } - catch (Exception e) { - throw convertException(e); - } - finally { - try { - if (inMem != null) - inMem.close(); - } - finally { - if (outMem != null) - outMem.close(); - } - } - } - - /** - * Convert caught exception. - * - * @param e Exception to convert. - * @return Converted exception. - */ - public Exception convertException(Exception e) { - return e; - } - /** * @return Context. */ @@ -206,128 +68,60 @@ public PlatformContext platformContext() { } /** {@inheritDoc} */ - @Override public void listenFuture(final long futId, int typ) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this); + @Override public Exception convertException(Exception e) { + return e; } /** {@inheritDoc} */ - @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this); - } - - /** - * When overridden in a derived class, gets future for the current operation. - * - * @return current future. - * @throws IgniteCheckedException If failed. - */ - protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { throw new IgniteCheckedException("Future listening is not supported in " + getClass()); } - /** - * When overridden in a derived class, gets a custom future writer. - * - * @param opId Operation id. - * @return A custom writer for given op id. - */ - @Nullable protected PlatformFutureUtils.Writer futureWriter(int opId){ + /** {@inheritDoc} */ + @Override @Nullable public PlatformFutureUtils.Writer futureWriter(int opId){ return null; } - /** - * Process IN operation. - * - * @param type Type. - * @param val Value. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process IN operation. - * - * @param type Type. - * @param reader Binary reader. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process IN operation. - * - * @param type Type. - * @param reader Binary reader. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { return processInStreamOutLong(type, reader); } - /** - * Process IN-OUT operation. - * - * @param type Type. - * @param reader Binary reader. - * @param writer Binary writer. - * @throws IgniteCheckedException In case of exception. - */ - protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + /** {@inheritDoc} */ + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { throwUnsupported(type); } - /** - * Process IN operation with managed object as result. - * - * @param type Type. - * @param reader Binary reader. - * @return Result. - * @throws IgniteCheckedException In case of exception. - */ - protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) + throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process IN-OUT operation. - * - * @param type Type. - * @param arg Argument. - * @param reader Binary reader. - * @param writer Binary writer. - * @throws IgniteCheckedException In case of exception. - */ - protected Object processInObjectStreamOutObjectStream(int type, @Nullable Object arg, BinaryRawReaderEx reader, - BinaryRawWriterEx writer) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg, + BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { return throwUnsupported(type); } - /** - * Process OUT operation. - * - * @param type Type. - * @param writer Binary writer. - * @throws IgniteCheckedException In case of exception. - */ - protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { throwUnsupported(type); } - /** - * Process OUT operation. - * - * @param type Type. - * @throws IgniteCheckedException In case of exception. - */ - protected Object processOutObject(int type) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { return throwUnsupported(type); } @@ -338,7 +132,7 @@ protected Object processOutObject(int type) throws IgniteCheckedException { * @return Dummy value which is never returned. * @throws IgniteCheckedException Exception to be thrown. */ - protected T throwUnsupported(int type) throws IgniteCheckedException { + private T throwUnsupported(int type) throws IgniteCheckedException { throw new IgniteCheckedException("Unsupported operation type: " + type); } @@ -411,4 +205,14 @@ protected long readAndListenFuture(BinaryRawReader reader) throws IgniteCheckedE return TRUE; } + + /** + * Wraps a listenable to be returned to platform. + * + * @param listenable Listenable. + * @return Target. + */ + protected PlatformTarget wrapListenable(PlatformListenable listenable) { + return new PlatformListenableTarget(listenable, platformCtx); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java new file mode 100644 index 0000000000000..a4d35c9bd7a39 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Async target. + */ +public interface PlatformAsyncTarget { + /** + * Gets future for the current operation. + * + * @return current future. + * @throws IgniteCheckedException If failed. + */ + IgniteInternalFuture currentFuture() throws IgniteCheckedException; + + /** + * Gets a custom future writer. + * + * @param opId Operation id. + * @return A custom writer for given op id. + */ + @Nullable PlatformFutureUtils.Writer futureWriter(int opId); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java index fd357ec77f67c..29114180e0c37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java @@ -61,27 +61,27 @@ public PlatformNoopProcessor(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { return null; } @@ -91,47 +91,48 @@ public PlatformNoopProcessor(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException { + @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) + throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget transactions() { + @Override public PlatformTargetProxy transactions() { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget projection() throws IgniteCheckedException { + @Override public PlatformTargetProxy projection() throws IgniteCheckedException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget compute(PlatformTarget grp) { + @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget message(PlatformTarget grp) { + @Override public PlatformTargetProxy message(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget events(PlatformTarget grp) { + @Override public PlatformTargetProxy events(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget services(PlatformTarget grp) { + @Override public PlatformTargetProxy services(PlatformTargetProxy grp) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget extensions() { + @Override public PlatformTargetProxy extensions() { return null; } @@ -142,7 +143,7 @@ public PlatformNoopProcessor(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException { return null; } @@ -157,22 +158,22 @@ public PlatformNoopProcessor(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create) throws IgniteException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create) throws IgniteException { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) { return null; } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) { return null; } @@ -187,7 +188,7 @@ public PlatformNoopProcessor(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget binaryProcessor() { + @Override public PlatformTargetProxy binaryProcessor() { return null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java index f01175eaf9dd6..e0d94d1017ac1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java @@ -26,7 +26,7 @@ /** * Platform processor. */ -@SuppressWarnings("UnusedDeclaration") +@SuppressWarnings({"UnusedDeclaration", "UnnecessaryInterfaceModifier"}) public interface PlatformProcessor extends GridProcessor { /** * Gets owning Ignite instance. @@ -68,7 +68,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException; /** * Create cache. @@ -77,7 +77,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException; /** * Get or create cache. @@ -86,7 +86,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException; /** * Create cache. @@ -95,7 +95,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException; + public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException; /** * Get or create cache. @@ -104,7 +104,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Cache. * @throws IgniteCheckedException If failed. */ - public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException; + public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException; /** * Destroy dynamically created cache. @@ -121,7 +121,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Affinity. * @throws IgniteCheckedException If failed. */ - public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException; + public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException; /** * Get data streamer. @@ -131,14 +131,14 @@ public interface PlatformProcessor extends GridProcessor { * @return Data streamer. * @throws IgniteCheckedException If failed. */ - public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException; + public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException; /** * Get transactions. * * @return Transactions. */ - public PlatformTarget transactions(); + public PlatformTargetProxy transactions(); /** * Get projection. @@ -146,7 +146,7 @@ public interface PlatformProcessor extends GridProcessor { * @return Projection. * @throws IgniteCheckedException If failed. */ - public PlatformTarget projection() throws IgniteCheckedException; + public PlatformTargetProxy projection() throws IgniteCheckedException; /** * Create interop compute. @@ -154,7 +154,7 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Compute instance. */ - public PlatformTarget compute(PlatformTarget grp); + public PlatformTargetProxy compute(PlatformTargetProxy grp); /** * Create interop messaging. @@ -162,7 +162,7 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Messaging instance. */ - public PlatformTarget message(PlatformTarget grp); + public PlatformTargetProxy message(PlatformTargetProxy grp); /** * Create interop events. @@ -170,7 +170,7 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Events instance. */ - public PlatformTarget events(PlatformTarget grp); + public PlatformTargetProxy events(PlatformTargetProxy grp); /** * Create interop services. @@ -178,14 +178,14 @@ public interface PlatformProcessor extends GridProcessor { * @param grp Cluster group. * @return Services instance. */ - public PlatformTarget services(PlatformTarget grp); + public PlatformTargetProxy services(PlatformTargetProxy grp); /** * Get platform extensions. Override this method to provide any additional targets and operations you need. * * @return Platform extensions. */ - public PlatformTarget extensions(); + public PlatformTargetProxy extensions(); /** * Register cache store. @@ -203,7 +203,7 @@ public interface PlatformProcessor extends GridProcessor { * @param create Create flag. * @return Platform atomic long. */ - public PlatformTarget atomicLong(String name, long initVal, boolean create); + public PlatformTargetProxy atomicLong(String name, long initVal, boolean create); /** * Get or create AtomicSequence. @@ -212,7 +212,7 @@ public interface PlatformProcessor extends GridProcessor { * @param create Create flag. * @return Platform atomic long. */ - public PlatformTarget atomicSequence(String name, long initVal, boolean create); + public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create); /** * Get or create AtomicReference. @@ -221,7 +221,7 @@ public interface PlatformProcessor extends GridProcessor { * @param create Create flag. * @return Platform atomic long. */ - public PlatformTarget atomicReference(String name, long memPtr, boolean create); + public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create); /** * Gets the configuration of the current Ignite instance. @@ -244,7 +244,7 @@ public interface PlatformProcessor extends GridProcessor { * @param memPtr Pointer to a stream with near cache config. 0 for default config. * @return Cache. */ - public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr); + public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr); /** * Gets existing near cache with the given name or creates a new one. @@ -253,7 +253,7 @@ public interface PlatformProcessor extends GridProcessor { * @param memPtr Pointer to a stream with near cache config. 0 for default config. * @return Cache. */ - public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr); + public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr); /** * Gets a value indicating whether Ignite logger has specified level enabled. @@ -277,5 +277,5 @@ public interface PlatformProcessor extends GridProcessor { * * @return Binary processor. */ - public PlatformTarget binaryProcessor(); + public PlatformTargetProxy binaryProcessor(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index f775987024135..8c81ebb177a85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -220,7 +220,7 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException { IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name); if (cache == null) @@ -230,7 +230,7 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException { IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name); assert cache != null; @@ -239,7 +239,7 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException { IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name); assert cache != null; @@ -248,7 +248,7 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException { BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr)); CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader); @@ -260,7 +260,7 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { + @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException { BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr)); CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader); @@ -278,60 +278,60 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException { - return new PlatformAffinity(platformCtx, ctx, name); + @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException { + return proxy(new PlatformAffinity(platformCtx, ctx, name)); } /** {@inheritDoc} */ - @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) + @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException { IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName); ldr.keepBinary(true); - return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary); + return proxy(new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary)); } /** {@inheritDoc} */ - @Override public PlatformTarget transactions() { - return new PlatformTransactions(platformCtx); + @Override public PlatformTargetProxy transactions() { + return proxy(new PlatformTransactions(platformCtx)); } /** {@inheritDoc} */ - @Override public PlatformTarget projection() throws IgniteCheckedException { - return new PlatformClusterGroup(platformCtx, ctx.grid().cluster()); + @Override public PlatformTargetProxy projection() throws IgniteCheckedException { + return proxy(new PlatformClusterGroup(platformCtx, ctx.grid().cluster())); } /** {@inheritDoc} */ - @Override public PlatformTarget compute(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM); + return proxy(new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM)); } /** {@inheritDoc} */ - @Override public PlatformTarget message(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy message(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection())); + return proxy(new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection()))); } /** {@inheritDoc} */ - @Override public PlatformTarget events(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy events(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection())); + return proxy(new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection()))); } /** {@inheritDoc} */ - @Override public PlatformTarget services(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; + @Override public PlatformTargetProxy services(PlatformTargetProxy grp) { + PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap(); - return new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false); + return proxy(new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false)); } /** {@inheritDoc} */ - @Override public PlatformTarget extensions() { + @Override public PlatformTargetProxy extensions() { return null; } @@ -356,28 +356,32 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException { GridCacheAtomicLongImpl atomicLong = (GridCacheAtomicLongImpl)ignite().atomicLong(name, initVal, create); if (atomicLong == null) return null; - return new PlatformAtomicLong(platformCtx, atomicLong); + return proxy(new PlatformAtomicLong(platformCtx, atomicLong)); } /** {@inheritDoc} */ - @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException { + @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create) + throws IgniteException { IgniteAtomicSequence atomicSeq = ignite().atomicSequence(name, initVal, create); if (atomicSeq == null) return null; - return new PlatformAtomicSequence(platformCtx, atomicSeq); + return proxy(new PlatformAtomicSequence(platformCtx, atomicSeq)); } /** {@inheritDoc} */ - @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException { - return PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create); + @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create) + throws IgniteException { + PlatformAtomicReference ref = PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create); + + return ref != null ? proxy(ref) : null; } /** {@inheritDoc} */ @@ -427,7 +431,7 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) { NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr); IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createNearCache(cacheName, cfg); @@ -436,7 +440,7 @@ public PlatformProcessorImpl(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) { + @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) { NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr); IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateNearCache(cacheName, cfg); @@ -447,8 +451,8 @@ public PlatformProcessorImpl(GridKernalContext ctx) { /** * Creates new platform cache. */ - private PlatformTarget createPlatformCache(IgniteCacheProxy cache) { - return new PlatformCache(platformCtx, cache, false, cacheExts); + private PlatformTargetProxy createPlatformCache(IgniteCacheProxy cache) { + return proxy(new PlatformCache(platformCtx, cache, false, cacheExts)); } /** {@inheritDoc} */ @@ -504,8 +508,8 @@ private PlatformTarget createPlatformCache(IgniteCacheProxy cache) { } /** {@inheritDoc} */ - @Override public PlatformTarget binaryProcessor() { - return new PlatformBinaryProcessor(platformCtx); + @Override public PlatformTargetProxy binaryProcessor() { + return proxy(new PlatformBinaryProcessor(platformCtx)); } /** @@ -579,6 +583,13 @@ private static PlatformCacheExtension[] prepareCacheExtensions(Collection */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl) cacheAsync.future()).internalFuture(); } /** */ - @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { + @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) { if (opId == OP_GET_ALL) return WRITER_GET_ALL; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java index 292caeaade056..4c11cc01cace4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java @@ -47,7 +47,7 @@ public PlatformCacheIterator(PlatformContext platformCtx, Iterator } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_NEXT: if (iter.hasNext()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java index 12df18890889e..e24345c272946 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java @@ -117,7 +117,7 @@ public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_PARTITION: return aff.partition(reader.readObjectDetached()); @@ -168,7 +168,7 @@ public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_PRIMARY_PARTITIONS: { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java index 8076a19b56f2c..2d3cada42d268 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; @@ -279,7 +280,11 @@ public byte getOverrideFlags() { ? new PlatformAffinityFunctionTarget(ctx, baseFunc) : null; - ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget); + PlatformTargetProxyImpl baseTargetProxy = baseTarget != null + ? new PlatformTargetProxyImpl(baseTarget, ctx) + : null; + + ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTargetProxy); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java index 8a07b335a1add..342e726f51e31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java @@ -71,7 +71,7 @@ protected PlatformAffinityFunctionTarget(PlatformContext platformCtx, AffinityFu } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { if (type == OP_PARTITION) return baseFunc.partition(reader.readObjectDetached()); else if (type == OP_REMOVE_NODE) { @@ -84,7 +84,7 @@ else if (type == OP_REMOVE_NODE) { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { if (type == OP_ASSIGN_PARTITIONS) { AffinityFunctionContext affCtx = currentAffCtx.get(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java index 6a259caa63e83..f2014252b561c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java @@ -71,7 +71,7 @@ public PlatformAbstractQueryCursor(PlatformContext platformCtx, QueryCursorEx } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET_BATCH: { assert iter != null : "iterator() has not been called"; @@ -136,7 +136,7 @@ public PlatformAbstractQueryCursor(PlatformContext platformCtx, QueryCursorEx } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_ITERATOR: iter = cursor.iterator(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java index 04f17ff2804c1..27d784a94b998 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java @@ -20,6 +20,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; /** * Proxy that implements PlatformTarget. @@ -41,7 +42,7 @@ public PlatformContinuousQueryProxy(PlatformContext platformCtx, PlatformContinu } /** {@inheritDoc} */ - @Override public Object outObject(int type) throws Exception { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { return qry.getInitialQueryCursor(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index f21861e3e2f1c..c77f5018a9ac5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -19,7 +19,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; import org.apache.ignite.internal.util.GridStripedSpinBusyLock; /** @@ -429,7 +429,7 @@ public void dataStreamerTopologyUpdate(long ptr, long topVer, int topSize) { * @param memPtr Stream pointer. * @param keepBinary Binary flag. */ - public void dataStreamerStreamReceiverInvoke(long ptr, Object cache, long memPtr, boolean keepBinary) { + public void dataStreamerStreamReceiverInvoke(long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary) { enter(); try { @@ -995,7 +995,7 @@ public void onStop() { * @param baseFunc Optional func for base calls. * @return Affinity function pointer. */ - public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) { + public long affinityFunctionInit(long memPtr, PlatformTargetProxy baseFunc) { enter(); try { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index 50c4c28a85f2b..9d60ec0d333d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.platform.callback; -import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; /** * Platform callback utility methods. Implemented in target platform. All methods in this class must be @@ -226,7 +226,7 @@ public class PlatformCallbackUtils { * @param memPtr Stream pointer. * @param keepBinary Binary flag. */ - static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr, + static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary); /** @@ -504,7 +504,7 @@ static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Objec * @param baseFunc Optional func for base calls. * @return Affinity function pointer. */ - static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc); + static native long affinityFunctionInit(long envPtr, long memPtr, PlatformTargetProxy baseFunc); /** * Gets the partition from affinity function. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java index dc73468b5e5c9..f49f477b78e1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.cache.PlatformCache; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -120,7 +121,7 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { /** {@inheritDoc} */ @SuppressWarnings("deprecation") - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_METRICS: platformCtx.writeClusterMetrics(writer, prj.metrics()); @@ -134,7 +135,7 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { /** {@inheritDoc} */ @SuppressWarnings({"ConstantConditions", "deprecation"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_METRICS_FILTERED: { @@ -217,7 +218,7 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_PING_NODE: return pingNode(reader.readUuid()) ? TRUE : FALSE; @@ -228,7 +229,8 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) + throws IgniteCheckedException { switch (type) { case OP_FOR_NODE_IDS: { Collection ids = PlatformUtils.readCollection(reader); @@ -272,8 +274,8 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { } /** {@inheritDoc} */ - @Override protected Object processInObjectStreamOutObjectStream( - int type, @Nullable Object arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public PlatformTarget processInObjectStreamOutObjectStream( + int type, @Nullable PlatformTarget arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_FOR_OTHERS: { @@ -289,7 +291,7 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_FOR_REMOTES: return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes()); @@ -314,7 +316,7 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_RESET_METRICS: { assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 8ff15d5513555..fd1c2d4938d44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; import org.apache.ignite.internal.util.future.IgniteFutureImpl; @@ -98,7 +99,7 @@ public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp, String pla } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_UNICAST: @@ -120,7 +121,7 @@ public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp, String pla } case OP_EXEC_ASYNC: - return executeJavaTask(reader, true); + return wrapListenable((PlatformListenable) executeJavaTask(reader, true)); default: return super.processInStreamOutObject(type, reader); @@ -128,7 +129,7 @@ public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp, String pla } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_WITH_TIMEOUT: { compute.withTimeout(val); @@ -154,7 +155,7 @@ public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp, String pla * @param reader Reader. * @param broadcast broadcast flag. */ - private PlatformListenable processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast, + private PlatformTarget processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast, boolean affinity) { PlatformAbstractTask task; @@ -221,7 +222,7 @@ private PlatformJob nextClosureJob(PlatformAbstractTask task, BinaryRawReaderEx } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_EXEC: @@ -239,7 +240,7 @@ private PlatformJob nextClosureJob(PlatformAbstractTask task, BinaryRawReaderEx * * @param task Task. */ - private PlatformListenable executeNative0(final PlatformAbstractTask task) { + private PlatformTarget executeNative0(final PlatformAbstractTask task) { IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null); fut.listen(new IgniteInClosure() { @@ -257,7 +258,7 @@ private PlatformListenable executeNative0(final PlatformAbstractTask task) { } }); - return PlatformFutureUtils.getListenable(fut); + return wrapListenable(PlatformFutureUtils.getListenable(fut)); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index cd5fba059c218..7d71a9e6c08e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -114,7 +114,7 @@ public PlatformDataStreamer(PlatformContext platformCtx, String cacheName, DataS } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_UPDATE: int plc = reader.readInt(); @@ -169,7 +169,7 @@ else if (plc == PLC_FLUSH) } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, final long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, final long val) throws IgniteCheckedException { switch (type) { case OP_SET_ALLOW_OVERWRITE: ldr.allowOverwrite(val == TRUE); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java index add11ed2ea6e3..d0992fc54ef47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl; import org.apache.ignite.internal.processors.platform.cache.PlatformCache; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; @@ -89,8 +91,10 @@ public PlatformStreamReceiverImpl(Object pred, long ptr, boolean keepBinary, Pla out.synchronize(); - ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepBinary), - mem.pointer(), keepBinary); + PlatformCache cache0 = new PlatformCache(ctx, cache, keepBinary); + PlatformTargetProxy cacheProxy = new PlatformTargetProxyImpl(cache0, ctx); + + ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, cacheProxy, mem.pointer(), keepBinary); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java index 811e38bcadd79..b57b1409cfd3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java @@ -80,7 +80,7 @@ public PlatformAtomicLong(PlatformContext ctx, GridCacheAtomicLongImpl atomicLon } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_COMPARE_AND_SET: long cmp = reader.readLong(); @@ -99,7 +99,7 @@ public PlatformAtomicLong(PlatformContext ctx, GridCacheAtomicLongImpl atomicLon } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_ADD_AND_GET: return atomicLong.addAndGet(val); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java index 63b5b86ec6b69..a6442590d7b46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java @@ -94,7 +94,7 @@ private PlatformAtomicReference(PlatformContext ctx, GridCacheAtomicReferenceImp } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { if (type == OP_GET) writer.writeObject(atomicRef.get()); else @@ -102,7 +102,7 @@ private PlatformAtomicReference(PlatformContext ctx, GridCacheAtomicReferenceImp } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { if (type == OP_SET) { atomicRef.set(reader.readObjectDetached()); @@ -114,7 +114,7 @@ private PlatformAtomicReference(PlatformContext ctx, GridCacheAtomicReferenceImp } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { if (type == OP_COMPARE_AND_SET_AND_GET) { Object val = reader.readObjectDetached(); @@ -134,7 +134,7 @@ private PlatformAtomicReference(PlatformContext ctx, GridCacheAtomicReferenceImp } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_CLOSE: atomicRef.close(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java index c35273189f13f..6d17a72fcad70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java @@ -71,7 +71,7 @@ public PlatformAtomicSequence(PlatformContext ctx, IgniteAtomicSequence atomicSe /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_ADD_AND_GET: return atomicSeq.addAndGet(val); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index 383e7ab655991..9ddcc374fb5aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; @@ -117,7 +118,7 @@ public PlatformEvents(PlatformContext platformCtx, IgniteEvents events) { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_RECORD_LOCAL: @@ -168,7 +169,7 @@ public PlatformEvents(PlatformContext platformCtx, IgniteEvents events) { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_LOCAL_QUERY: { @@ -271,7 +272,7 @@ private Collection startRemoteQuery(BinaryRawReaderEx reader, IgniteEvent } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET_ENABLED_EVENTS: writeEventTypes(events.enabledEvents(), writer); @@ -284,7 +285,7 @@ private Collection startRemoteQuery(BinaryRawReaderEx reader, IgniteEvent } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_WITH_ASYNC: if (events.isAsync()) @@ -297,7 +298,7 @@ private Collection startRemoteQuery(BinaryRawReaderEx reader, IgniteEvent } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_IS_ENABLED: return events.isEnabled((int)val) ? TRUE : FALSE; @@ -310,12 +311,12 @@ private Collection startRemoteQuery(BinaryRawReaderEx reader, IgniteEvent } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl)eventsAsync.future()).internalFuture(); } /** {@inheritDoc} */ - @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { + @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) { switch (opId) { case OP_WAIT_FOR_LOCAL: return eventResWriter; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java index 216427a38439f..6fe109e11b2a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.future.IgniteFutureImpl; @@ -86,7 +87,7 @@ public PlatformMessaging(PlatformContext platformCtx, IgniteMessaging messaging) } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_SEND: @@ -149,7 +150,7 @@ public PlatformMessaging(PlatformContext platformCtx, IgniteMessaging messaging) /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_REMOTE_LISTEN:{ @@ -181,12 +182,12 @@ private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messagi } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl)messagingAsync.future()).internalFuture(); } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_WITH_ASYNC: if (messaging.isAsync()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 962a4c0af00c3..22a7fa2df145f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetService; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; @@ -144,7 +145,7 @@ private ServiceDescriptor findDescriptor(String name) { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_DOTNET_DEPLOY: { @@ -195,7 +196,7 @@ private ServiceDescriptor findDescriptor(String name) { } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_DOTNET_SERVICES: { @@ -223,8 +224,8 @@ private ServiceDescriptor findDescriptor(String name) { } /** {@inheritDoc} */ - @Override protected Object processInObjectStreamOutObjectStream(int type, Object arg, BinaryRawReaderEx reader, - BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, PlatformTarget arg, + BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_INVOKE: { assert arg != null; @@ -260,7 +261,7 @@ private ServiceDescriptor findDescriptor(String name) { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_DESCRIPTORS: { Collection descs = services.serviceDescriptors(); @@ -299,7 +300,7 @@ private ServiceDescriptor findDescriptor(String name) { } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_WITH_ASYNC: if (services.isAsync()) @@ -315,7 +316,7 @@ private ServiceDescriptor findDescriptor(String name) { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_CANCEL_ALL: services.cancelAll(); @@ -327,7 +328,7 @@ private ServiceDescriptor findDescriptor(String name) { } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_SERVICE_PROXY: { String name = reader.readString(); @@ -343,14 +344,14 @@ private ServiceDescriptor findDescriptor(String name) { : new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, platformCtx.kernalContext()); - return new ServiceProxyHolder(proxy, d.serviceClass()); + return new ServiceProxyHolder(proxy, d.serviceClass(), platformContext()); } } return super.processInStreamOutObject(type, reader); } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl)servicesAsync.future()).internalFuture(); } @@ -392,7 +393,7 @@ private void dotnetDeploy(BinaryRawReaderEx reader, IgniteServices services) { * Proxy holder. */ @SuppressWarnings("unchecked") - private static class ServiceProxyHolder { + private static class ServiceProxyHolder extends PlatformAbstractTarget { /** */ private final Object proxy; @@ -422,7 +423,9 @@ private static class ServiceProxyHolder { * @param proxy Proxy object. * @param clazz Proxy class. */ - private ServiceProxyHolder(Object proxy, Class clazz) { + private ServiceProxyHolder(Object proxy, Class clazz, PlatformContext ctx) { + super(ctx); + assert proxy != null; assert clazz != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java index 1b41712e71793..3cee2b1388190 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java @@ -153,7 +153,7 @@ private Transaction tx(long id) { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_COMMIT: tx(val).commit(); @@ -184,7 +184,7 @@ private Transaction tx(long id) { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { long txId = reader.readLong(); final Transaction asyncTx = (Transaction)tx(txId).withAsync(); @@ -220,7 +220,7 @@ private Transaction tx(long id) { } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_START: { TransactionConcurrency txConcurrency = TransactionConcurrency.fromOrdinal(reader.readInt()); @@ -245,7 +245,7 @@ private Transaction tx(long id) { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_CACHE_CONFIG_PARAMETERS: TransactionConfiguration txCfg = platformCtx.kernalContext().config().getTransactionConfiguration(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index 5985d22ae54ce..e81f4c6b9ce80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -20,8 +20,8 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; @@ -71,7 +71,7 @@ public class PlatformFutureUtils { * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, - final int typ, PlatformAbstractTarget target) { + final int typ, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, null, target); @@ -88,7 +88,7 @@ public static PlatformListenable listen(final PlatformContext ctx, IgniteInterna * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, - final int typ, PlatformAbstractTarget target) { + final int typ, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, null, target); @@ -107,7 +107,7 @@ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, - final int typ, Writer writer, PlatformAbstractTarget target) { + final int typ, Writer writer, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, writer, target); @@ -126,7 +126,7 @@ public static PlatformListenable listen(final PlatformContext ctx, IgniteInterna * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, - final int typ, Writer writer, PlatformAbstractTarget target) { + final int typ, Writer writer, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, writer, target); @@ -144,7 +144,7 @@ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, - Writer writer, PlatformAbstractTarget target) { + Writer writer, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, TYP_OBJ, writer, target); @@ -183,7 +183,7 @@ public static PlatformListenable getListenable(IgniteFuture fut) { */ @SuppressWarnings("unchecked") public static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final - int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) { + int typ, @Nullable final Writer writer, final PlatformTarget target) { final PlatformCallbackGateway gate = ctx.gateway(); listenable.listen(new IgniteBiInClosure() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java new file mode 100644 index 0000000000000..7d659135f7a6e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.utils; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +/** + * Wraps listenable in a platform target. + */ +public class PlatformListenableTarget extends PlatformAbstractTarget { + /** */ + private static final int OP_CANCEL = 1; + + /** */ + private static final int OP_IS_CANCELLED = 2; + + /** Wrapped listenable */ + private final PlatformListenable listenable; + + /** + * Constructor. + * + * @param platformCtx Context. + */ + public PlatformListenableTarget(PlatformListenable listenable, PlatformContext platformCtx) { + super(platformCtx); + + assert listenable != null; + + this.listenable = listenable; + } + + /** {@inheritDoc} */ + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { + switch (type) { + case OP_CANCEL: + return listenable.cancel() ? TRUE : FALSE; + + case OP_IS_CANCELLED: + return listenable.isCancelled() ? TRUE : FALSE; + } + + return super.processInLongOutLong(type, val); + } +} diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h index 3052435298ea5..a2e5cbb09edd9 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h @@ -80,9 +80,6 @@ extern "C" { void IGNITE_CALL IgniteDestroyJvm(gcj::JniContext* ctx); - bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj); - bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj); - void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler); void IGNITE_CALL IgniteRemoveConsoleHandler(gcj::ConsoleWriteHandler consoleHandler); } diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index 07df001c43b83..97e4412a18545 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -286,10 +286,6 @@ namespace ignite jmethodID m_PlatformUtils_reallocate; jmethodID m_PlatformUtils_errData; - jclass c_PlatformListenable; - jmethodID m_PlatformListenable_cancel; - jmethodID m_PlatformListenable_isCancelled; - /** * Constructor. */ @@ -465,9 +461,6 @@ namespace ignite jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr); - bool ListenableCancel(jobject obj); - bool ListenableIsCancelled(jobject obj); - jobject Acquire(jobject obj); void DestroyJvm(); diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def index e58ac3ba0dbe4..fb56dca31ca4f 100644 --- a/modules/platforms/cpp/jni/project/vs/module.def +++ b/modules/platforms/cpp/jni/project/vs/module.def @@ -36,8 +36,6 @@ IgniteDestroyJvm @86 IgniteTargetOutObject @91 IgniteProcessorExtensions @97 IgniteProcessorAtomicLong @98 -IgniteListenableCancel @110 -IgniteListenableIsCancelled @111 IgniteProcessorCreateCacheFromConfig @114 IgniteProcessorGetOrCreateCacheFromConfig @115 IgniteProcessorGetIgniteConfiguration @116 diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp index dde98fbb6af91..b842c03b20f40 100644 --- a/modules/platforms/cpp/jni/src/exports.cpp +++ b/modules/platforms/cpp/jni/src/exports.cpp @@ -214,14 +214,6 @@ extern "C" { ctx->DestroyJvm(); } - bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj) { - return ctx->ListenableCancel(static_cast(obj)); - } - - bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj) { - return ctx->ListenableIsCancelled(static_cast(obj)); - } - void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler) { gcj::JniContext::SetConsoleHandler(consoleHandler); } diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 2d3cf729467f1..9626fbb564b27 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -194,33 +194,33 @@ namespace ignite const char* C_PLATFORM_PROCESSOR = "org/apache/ignite/internal/processors/platform/PlatformProcessor"; JniMethod M_PLATFORM_PROCESSOR_RELEASE_START = JniMethod("releaseStart", "()V", false); - JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); JniMethod M_PLATFORM_PROCESSOR_DESTROY_CACHE = JniMethod("destroyCache", "(Ljava/lang/String;)V", false); - JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); JniMethod M_PLATFORM_PROCESSOR_GET_IGNITE_CONFIGURATION = JniMethod("getIgniteConfiguration", "(J)V", false); JniMethod M_PLATFORM_PROCESSOR_GET_CACHE_NAMES = JniMethod("getCacheNames", "(J)V", false); JniMethod M_PLATFORM_PROCESSOR_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(I)Z", false); JniMethod M_PLATFORM_PROCESSOR_LOGGER_LOG = JniMethod("loggerLog", "(ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", false); - JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); - const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTarget"; + const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTargetProxy"; JniMethod M_PLATFORM_TARGET_IN_LONG_OUT_LONG = JniMethod("inLongOutLong", "(IJ)J", false); JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_LONG = JniMethod("inStreamOutLong", "(IJ)J", false); JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT = JniMethod("inStreamOutObject", "(IJ)Ljava/lang/Object;", false); @@ -260,7 +260,7 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_CONTINUOUS_QUERY_FILTER_RELEASE = JniMethod("continuousQueryFilterRelease", "(JJ)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_TOPOLOGY_UPDATE = JniMethod("dataStreamerTopologyUpdate", "(JJJI)V", true); - JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLjava/lang/Object;JZ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;JZ)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BYTE_RES = JniMethod("futureByteResult", "(JJI)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BOOL_RES = JniMethod("futureBoolResult", "(JJI)V", true); @@ -307,7 +307,7 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_LOG = JniMethod("loggerLog", "(JILjava/lang/String;Ljava/lang/String;Ljava/lang/String;J)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(JI)Z", true); - JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)J", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true); @@ -327,10 +327,6 @@ namespace ignite JniMethod M_PLATFORM_IGNITION_STOP = JniMethod("stop", "(Ljava/lang/String;Z)Z", true); JniMethod M_PLATFORM_IGNITION_STOP_ALL = JniMethod("stopAll", "(Z)V", true); - const char* C_PLATFORM_LISTENABLE = "org/apache/ignite/internal/processors/platform/utils/PlatformListenable"; - JniMethod M_PLATFORM_LISTENABLE_CANCEL = JniMethod("cancel", "()Z", false); - JniMethod M_PLATFORM_LISTENABLE_IS_CANCELED = JniMethod("isCancelled", "()Z", false); - /* STATIC STATE. */ gcc::CriticalSection JVM_LOCK; gcc::CriticalSection CONSOLE_LOCK; @@ -552,10 +548,6 @@ namespace ignite m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC); m_PlatformUtils_errData = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_ERR_DATA); - c_PlatformListenable = FindClass(env, C_PLATFORM_LISTENABLE); - m_PlatformListenable_cancel = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_CANCEL); - m_PlatformListenable_isCancelled = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_IS_CANCELED); - // Find utility classes which are not used from context, but are still required in other places. CheckClass(env, C_PLATFORM_NO_CALLBACK_EXCEPTION); } @@ -1447,28 +1439,6 @@ namespace ignite return LocalToGlobal(env, res); } - bool JniContext::ListenableCancel(jobject obj) - { - JNIEnv* env = Attach(); - - jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_cancel); - - ExceptionCheck(env); - - return res != 0;; - } - - bool JniContext::ListenableIsCancelled(jobject obj) - { - JNIEnv* env = Attach(); - - jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_isCancelled); - - ExceptionCheck(env); - - return res != 0;; - } - jobject JniContext::Acquire(jobject obj) { if (obj) { diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index f945efea27d3c..6421b8cbb94e7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -181,6 +181,7 @@ + diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs index 68bd9d423acc6..50102a7cf0423 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs @@ -24,7 +24,6 @@ namespace Apache.Ignite.Core.Impl.Common using System.Threading.Tasks; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Binary.IO; - using Apache.Ignite.Core.Impl.Unmanaged; /// /// Grid future implementation. @@ -40,7 +39,7 @@ public sealed class Future : IFutureInternal private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); /** */ - private volatile IUnmanagedTarget _unmanagedTarget; + private volatile Listenable _listenable; /// /// Constructor. @@ -84,7 +83,7 @@ public Task Task /// The cancellation token. public Task GetTask(CancellationToken cancellationToken) { - Debug.Assert(_unmanagedTarget != null); + Debug.Assert(_listenable != null); // OnTokenCancel will fire even if cancellationToken is already cancelled. cancellationToken.Register(OnTokenCancel); @@ -169,11 +168,11 @@ public void OnDone(T res, Exception err) /// /// Sets unmanaged future target for cancellation. /// - internal void SetTarget(IUnmanagedTarget target) + internal void SetTarget(Listenable target) { Debug.Assert(target != null); - _unmanagedTarget = target; + _listenable = target; } /// @@ -181,8 +180,8 @@ internal void SetTarget(IUnmanagedTarget target) /// private void OnTokenCancel() { - if (_unmanagedTarget != null) - UnmanagedUtils.ListenableCancel(_unmanagedTarget); + if (_listenable != null) + _listenable.Cancel(); } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs new file mode 100644 index 0000000000000..6da98ab74ae0c --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// + /// Platform listenable. + /// + internal class Listenable : PlatformTarget + { + /** */ + private const int OpCancel = 1; + + /// + /// Initializes a new instance of the class. + /// + /// Target. + /// Marshaller. + public Listenable(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) + { + // No-op. + } + + /// + /// Cancels the listenable. + /// + public void Cancel() + { + DoOutInOp(OpCancel); + } + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs index bc7c7d9e6d05d..d36caf354ae83 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -195,7 +195,7 @@ public Future ExecuteJavaTaskAsync(string taskName, obje var future = holder.Future; - future.SetTarget(futTarget); + future.SetTarget(new Listenable(futTarget, Marshaller)); return future; } @@ -550,7 +550,7 @@ protected override T Unmarshal(IBinaryStream stream) writeAction(writer); }); - holder.Future.SetTarget(futTarget); + holder.Future.SetTarget(new Listenable(futTarget, Marshaller)); } catch (Exception e) { diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index f4a07f6daf51f..9cf2a6c7aa2ba 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -869,7 +869,7 @@ protected virtual T Unmarshal(IBinaryStream stream) throw; } - fut.SetTarget(futTarget); + fut.SetTarget(new Listenable(futTarget, _marsh)); return fut; } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs index c746866da100d..c4f3e19e92b7f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs @@ -167,14 +167,6 @@ internal static unsafe class IgniteJniNativeMethods [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteDestroyJvm")] public static extern void DestroyJvm(void* ctx); - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")] - [return: MarshalAs(UnmanagedType.U1)] - public static extern bool ListenableCancel(void* ctx, void* target); - - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableIsCancelled")] - [return: MarshalAs(UnmanagedType.U1)] - public static extern bool ListenableIsCancelled(void* ctx, void* target); - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteSetConsoleHandler")] public static extern void SetConsoleHandler(void* consoleHandler); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs index f36c35ff0c913..0a2a1f0e0ca52 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs @@ -516,11 +516,6 @@ internal static void DestroyJvm(void* ctx) JNI.DestroyJvm(ctx); } - internal static bool ListenableCancel(IUnmanagedTarget target) - { - return JNI.ListenableCancel(target.Context, target.Target); - } - #endregion } }