From bd509bec4e18ebf694b57c47defe1d501998c748 Mon Sep 17 00:00:00 2001 From: Pavel Tupitsyn Date: Thu, 31 Mar 2016 14:19:38 +0300 Subject: [PATCH] IGNITE-2686: .NET: Implemented ability to invoke foreign services from .NET. This closes #513. --- .../platform/services/PlatformServices.java | 249 +++++++++++- .../platform/utils/PlatformUtils.java | 128 +++++++ .../processors/service/GridServiceProxy.java | 324 ++++++++-------- .../platform/PlatformDeployServiceTask.java | 360 ++++++++++++++++++ modules/platforms/cpp/common/src/java.cpp | 2 +- .../Services/ServiceProxyTest.cs | 6 +- .../Services/ServicesTest.cs | 249 +++++++++++- .../Apache.Ignite.Core.csproj | 1 + .../Impl/Binary/BinarySystemHandlers.cs | 21 +- .../Impl/Common/Platform.cs | 35 ++ .../Apache.Ignite.Core/Impl/ExceptionUtils.cs | 2 +- .../Impl/Services/ServiceDescriptor.cs | 7 +- .../Impl/Services/ServiceProxyInvoker.cs | 25 +- .../Impl/Services/ServiceProxySerializer.cs | 63 ++- .../Impl/Services/Services.cs | 13 +- 15 files changed, 1278 insertions(+), 207 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java create mode 100644 modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Platform.cs diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 963c72e6ca44d..f35574126a190 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.platform.services; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; @@ -29,13 +30,19 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; +import org.apache.ignite.internal.processors.service.GridServiceProxy; import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; +import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -54,11 +61,21 @@ public class PlatformServices extends PlatformAbstractTarget { private static final int OP_DOTNET_SERVICES = 3; /** */ - private static final int OP_DOTNET_INVOKE = 4; + private static final int OP_INVOKE = 4; /** */ private static final int OP_DESCRIPTORS = 5; + /** */ + private static final byte PLATFORM_JAVA = 0; + + /** */ + private static final byte PLATFORM_DOTNET = 1; + + /** */ + private static final CopyOnWriteConcurrentMap, Method> SVC_METHODS + = new CopyOnWriteConcurrentMap<>(); + /** */ private final IgniteServices services; @@ -125,8 +142,31 @@ public void cancelAll() { * @param sticky Whether or not Ignite should always contact the same remote service. * @return Either proxy over remote service or local service if it is deployed locally. */ - public Object dotNetServiceProxy(String name, boolean sticky) { - return services.serviceProxy(name, PlatformDotNetService.class, sticky); + public Object serviceProxy(String name, boolean sticky) { + ServiceDescriptor d = findDescriptor(name); + + if (d == null) + throw new IgniteException("Failed to find deployed service: " + name); + + Object proxy = PlatformService.class.isAssignableFrom(d.serviceClass()) + ? services.serviceProxy(name, PlatformService.class, sticky) + : new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, platformCtx.kernalContext()); + + return new ServiceProxyHolder(proxy, d.serviceClass()); + } + + /** + * Finds a service descriptor by name. + * + * @param name Service name. + * @return Descriptor or null. + */ + private ServiceDescriptor findDescriptor(String name) { + for (ServiceDescriptor d : services.serviceDescriptors()) + if (d.name().equals(name)) + return d; + + return null; } /** {@inheritDoc} */ @@ -202,9 +242,9 @@ public Object dotNetServiceProxy(String name, boolean sticky) { @Override protected void processInObjectStreamOutStream(int type, Object arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { - case OP_DOTNET_INVOKE: { + case OP_INVOKE: { assert arg != null; - assert arg instanceof PlatformDotNetService; + assert arg instanceof ServiceProxyHolder; String mthdName = reader.readString(); @@ -220,7 +260,7 @@ public Object dotNetServiceProxy(String name, boolean sticky) { args = null; try { - Object result = ((PlatformDotNetService)arg).invokeMethod(mthdName, srvKeepBinary, args); + Object result = ((ServiceProxyHolder)arg).invoke(mthdName, srvKeepBinary, args); PlatformUtils.writeInvocationResult(writer, result, null); } @@ -251,6 +291,11 @@ public Object dotNetServiceProxy(String name, boolean sticky) { writer.writeUuid(d.originNodeId()); writer.writeObject(d.affinityKey()); + // Write platform. There are only 2 platforms now. + byte platform = d.serviceClass().equals(PlatformDotNetServiceImpl.class) + ? PLATFORM_DOTNET : PLATFORM_JAVA; + writer.writeByte(platform); + Map top = d.topologySnapshot(); PlatformUtils.writeMap(writer, top, new PlatformWriterBiClosure() { @@ -274,4 +319,196 @@ public Object dotNetServiceProxy(String name, boolean sticky) { @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl)services.future()).internalFuture(); } + + /** + * Proxy holder. + */ + @SuppressWarnings("unchecked") + private static class ServiceProxyHolder { + /** */ + private final Object proxy; + + /** */ + private final Class serviceClass; + + /** */ + private static final Map, Class> PRIMITIVES_TO_WRAPPERS = new HashMap<>(); + + /** + * Class initializer. + */ + static { + PRIMITIVES_TO_WRAPPERS.put(boolean.class, Boolean.class); + PRIMITIVES_TO_WRAPPERS.put(byte.class, Byte.class); + PRIMITIVES_TO_WRAPPERS.put(char.class, Character.class); + PRIMITIVES_TO_WRAPPERS.put(double.class, Double.class); + PRIMITIVES_TO_WRAPPERS.put(float.class, Float.class); + PRIMITIVES_TO_WRAPPERS.put(int.class, Integer.class); + PRIMITIVES_TO_WRAPPERS.put(long.class, Long.class); + PRIMITIVES_TO_WRAPPERS.put(short.class, Short.class); + } + + /** + * Ctor. + * + * @param proxy Proxy object. + * @param clazz Proxy class. + */ + private ServiceProxyHolder(Object proxy, Class clazz) { + assert proxy != null; + assert clazz != null; + + this.proxy = proxy; + serviceClass = clazz; + } + + /** + * Invokes the proxy. + * @param mthdName Method name. + * @param srvKeepBinary Binary flag. + * @param args Args. + * @return Invocation result. + * @throws IgniteCheckedException + * @throws NoSuchMethodException + */ + public Object invoke(String mthdName, boolean srvKeepBinary, Object[] args) + throws IgniteCheckedException, NoSuchMethodException { + if (proxy instanceof PlatformService) { + return ((PlatformService)proxy).invokeMethod(mthdName, srvKeepBinary, args); + } + else { + assert proxy instanceof GridServiceProxy; + + // Deserialize arguments for Java service when not in binary mode + if (!srvKeepBinary) + args = PlatformUtils.unwrapBinariesInArray(args); + + Method mtd = getMethod(serviceClass, mthdName, args); + + return ((GridServiceProxy)proxy).invokeMethod(mtd, args); + } + } + + /** + * Finds a suitable method in a class + * + * @param clazz Class. + * @param mthdName Name. + * @param args Args. + * @return Method. + */ + private static Method getMethod(Class clazz, String mthdName, Object[] args) throws NoSuchMethodException { + assert clazz != null; + assert mthdName != null; + assert args != null; + + T3 cacheKey = new T3<>(clazz, mthdName, args.length); + Method res = SVC_METHODS.get(cacheKey); + + if (res != null) + return res; + + Method[] allMethods = clazz.getMethods(); + + List methods = new ArrayList<>(allMethods.length); + + // Filter by name and param count + for (Method m : allMethods) + if (m.getName().equals(mthdName) && m.getParameterTypes().length == args.length) + methods.add(m); + + if (methods.size() == 1) { + res = methods.get(0); + + // Update cache only when there is a single method with a given name and arg count. + SVC_METHODS.put(cacheKey, res); + + return res; + } + + if (methods.isEmpty()) + throw new NoSuchMethodException("Could not find proxy method '" + mthdName + "' in class " + clazz); + + // Filter by param types + for (int i = 0; i < methods.size(); i++) + if (!areMethodArgsCompatible(methods.get(i).getParameterTypes(), args)) + methods.remove(i--); + + if (methods.size() == 1) + return methods.get(0); + + if (methods.isEmpty()) + throw new NoSuchMethodException("Could not find proxy method '" + mthdName + "' in class " + clazz); + + throw new NoSuchMethodException("Ambiguous proxy method '" + mthdName + "' in class " + clazz); + } + + /** + * Determines whether specified method arguments are compatible with given method parameter definitions. + * + * @param argTypes Method arg types. + * @param args Method args. + * @return Whether specified args are compatible with argTypes. + */ + private static boolean areMethodArgsCompatible(Class[] argTypes, Object[] args) { + for (int i = 0; i < args.length; i++){ + // arg is always of a primitive wrapper class, and argTypes can have actual primitive + Object arg = args[i]; + Class argType = wrap(argTypes[i]); + + if (arg != null && !argType.isAssignableFrom(arg.getClass())) + return false; + } + + return true; + } + + /** + * Gets corresponding wrapper for a primitive type. + * @param c Class to convert. + * + * @return Primitive wrapper, or the same class. + */ + @SuppressWarnings("unchecked") + private static Class wrap(Class c) { + return c.isPrimitive() ? PRIMITIVES_TO_WRAPPERS.get(c) : c; + } + } + + /** + * Concurrent map. + */ + private static class CopyOnWriteConcurrentMap { + /** */ + private volatile Map map = new HashMap<>(); + + /** + * Gets a value. + * + * @param key Key. + * @return Value. + */ + public V get(K key) { + return map.get(key); + } + + /** + * Puts a value. + * + * @param key Key. + * @param val Value. + */ + public void put(K key, V val) { + synchronized (this){ + if (map.containsKey(key)) + return; + + Map map0 = new HashMap<>(map); + + map0.put(key, val); + + map = map0; + } + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index d09b2c76e8b1c..04b1a17a6d070 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -21,6 +21,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridKernalContext; @@ -31,6 +32,7 @@ import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformExtendedException; @@ -40,6 +42,7 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; @@ -50,11 +53,18 @@ import javax.cache.CacheException; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; +import java.math.BigDecimal; +import java.security.Timestamp; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX; @@ -791,6 +801,124 @@ public static GridBinaryMarshaller marshaller() { } } + /** + * @param o Object to unwrap. + * @return Unwrapped object. + */ + private static Object unwrapBinary(Object o) { + if (o == null) + return null; + + if (knownArray(o)) + return o; + + if (o instanceof Map.Entry) { + Map.Entry entry = (Map.Entry)o; + + Object key = entry.getKey(); + + Object uKey = unwrapBinary(key); + + Object val = entry.getValue(); + + Object uVal = unwrapBinary(val); + + return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o; + } + else if (BinaryUtils.knownCollection(o)) + return unwrapKnownCollection((Collection)o); + else if (BinaryUtils.knownMap(o)) + return unwrapBinariesIfNeeded((Map)o); + else if (o instanceof Object[]) + return unwrapBinariesInArray((Object[])o); + else if (o instanceof BinaryObject) + return ((BinaryObject)o).deserialize(); + + return o; + } + + /** + * @param obj Obj. + * @return True is obj is a known simple type array. + */ + private static boolean knownArray(Object obj) { + return obj instanceof String[] || + obj instanceof boolean[] || + obj instanceof byte[] || + obj instanceof char[] || + obj instanceof int[] || + obj instanceof long[] || + obj instanceof short[] || + obj instanceof Timestamp[] || + obj instanceof double[] || + obj instanceof float[] || + obj instanceof UUID[] || + obj instanceof BigDecimal[]; + } + + /** + * @param o Object to test. + * @return True if collection should be recursively unwrapped. + */ + private static boolean knownCollection(Object o) { + Class cls = o == null ? null : o.getClass(); + + return cls == ArrayList.class || cls == LinkedList.class || cls == HashSet.class; + } + + /** + * @param o Object to test. + * @return True if map should be recursively unwrapped. + */ + private static boolean knownMap(Object o) { + Class cls = o == null ? null : o.getClass(); + + return cls == HashMap.class || cls == LinkedHashMap.class; + } + + /** + * @param col Collection to unwrap. + * @return Unwrapped collection. + */ + @SuppressWarnings("TypeMayBeWeakened") + private static Collection unwrapKnownCollection(Collection col) { + Collection col0 = BinaryUtils.newKnownCollection(col); + + for (Object obj : col) + col0.add(unwrapBinary(obj)); + + return col0; + } + + /** + * Unwrap array of binaries if needed. + * + * @param arr Array. + * @return Result. + */ + public static Object[] unwrapBinariesInArray(Object[] arr) { + Object[] res = new Object[arr.length]; + + for (int i = 0; i < arr.length; i++) + res[i] = unwrapBinary(arr[i]); + + return res; + } + + /** + * Unwraps map. + * + * @param map Map to unwrap. + * @return Unwrapped collection. + */ + private static Map unwrapBinariesIfNeeded(Map map) { + Map map0 = BinaryUtils.newMap(map); + + for (Map.Entry e : map.entrySet()) + map0.put(unwrapBinary(e.getKey()), unwrapBinary(e.getValue())); + + return map0; + } /** * Private constructor. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index f29d239d00924..7528550944ac5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -54,7 +54,7 @@ /** * Wrapper for making {@link org.apache.ignite.services.Service} class proxies. */ -class GridServiceProxy implements Serializable { +public class GridServiceProxy implements Serializable { /** */ private static final long serialVersionUID = 0L; @@ -78,6 +78,12 @@ class GridServiceProxy implements Serializable { /** {@code True} if projection includes local node. */ private boolean hasLocNode; + /** Service name. */ + private final String name; + + /** Whether multi-node request should be done. */ + private final boolean sticky; + /** * @param prj Grid projection. * @param name Service name. @@ -86,7 +92,7 @@ class GridServiceProxy implements Serializable { * @param ctx Context. */ @SuppressWarnings("unchecked") - GridServiceProxy(ClusterGroup prj, + public GridServiceProxy(ClusterGroup prj, String name, Class svc, boolean sticky, @@ -94,6 +100,8 @@ class GridServiceProxy implements Serializable { { this.prj = prj; this.ctx = ctx; + this.name = name; + this.sticky = sticky; hasLocNode = hasLocalNode(prj); log = ctx.log(getClass()); @@ -101,7 +109,7 @@ class GridServiceProxy implements Serializable { proxy = (T)Proxy.newProxyInstance( svc.getClassLoader(), new Class[] {svc}, - new ProxyInvocationHandler(name, sticky) + new ProxyInvocationHandler() ); } @@ -118,207 +126,199 @@ private boolean hasLocalNode(ClusterGroup prj) { return false; } - /** - * @return Proxy object for a given instance. - */ - T proxy() { - return proxy; - } + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + public Object invokeMethod(final Method mtd, final Object[] args) { + if (U.isHashCodeMethod(mtd)) + return System.identityHashCode(proxy); + else if (U.isEqualsMethod(mtd)) + return proxy == args[0]; + else if (U.isToStringMethod(mtd)) + return GridServiceProxy.class.getSimpleName() + " [name=" + name + ", sticky=" + sticky + ']'; - /** - * Invocation handler for service proxy. - */ - private class ProxyInvocationHandler implements InvocationHandler { - /** Service name. */ - private final String name; + ctx.gateway().readLock(); - /** Whether multi-node request should be done. */ - private final boolean sticky; + try { + while (true) { + ClusterNode node = null; - /** - * @param name Name. - * @param sticky Sticky. - */ - private ProxyInvocationHandler(String name, boolean sticky) { - this.name = name; - this.sticky = sticky; - } + try { + node = nodeForService(name, sticky); - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override public Object invoke(Object proxy, final Method mtd, final Object[] args) { - if (U.isHashCodeMethod(mtd)) - return System.identityHashCode(proxy); - else if (U.isEqualsMethod(mtd)) - return proxy == args[0]; - else if (U.isToStringMethod(mtd)) - return GridServiceProxy.class.getSimpleName() + " [name=" + name + ", sticky=" + sticky + ']'; - - ctx.gateway().readLock(); - - try { - while (true) { - ClusterNode node = null; - - try { - node = nodeForService(name, sticky); - - if (node == null) - throw new IgniteException("Failed to find deployed service: " + name); - - // If service is deployed locally, then execute locally. - if (node.isLocal()) { - ServiceContextImpl svcCtx = ctx.service().serviceContext(name); - - if (svcCtx != null) - return mtd.invoke(svcCtx.service(), args); - } - else { - // Execute service remotely. - return ctx.closure().callAsyncNoFailover( - BALANCE, - new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args), - Collections.singleton(node), - false - ).get(); - } - } - catch (GridServiceNotFoundException | ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Service was not found or topology changed (will retry): " + e.getMessage()); - } - catch (RuntimeException | Error e) { - throw e; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + if (node == null) + throw new IgniteException("Failed to find deployed service: " + name); + + // If service is deployed locally, then execute locally. + if (node.isLocal()) { + ServiceContextImpl svcCtx = ctx.service().serviceContext(name); + + if (svcCtx != null) + return mtd.invoke(svcCtx.service(), args); } - catch (Exception e) { - throw new IgniteException(e); + else { + // Execute service remotely. + return ctx.closure().callAsyncNoFailover( + BALANCE, + new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args), + Collections.singleton(node), + false + ).get(); } + } + catch (GridServiceNotFoundException | ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Service was not found or topology changed (will retry): " + e.getMessage()); + } + catch (RuntimeException | Error e) { + throw e; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (Exception e) { + throw new IgniteException(e); + } - // If we are here, that means that service was not found - // or topology was changed. In this case, we erase the - // previous sticky node and try again. - rmtNode.compareAndSet(node, null); + // If we are here, that means that service was not found + // or topology was changed. In this case, we erase the + // previous sticky node and try again. + rmtNode.compareAndSet(node, null); - // Add sleep between retries to avoid busy-wait loops. - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + // Add sleep between retries to avoid busy-wait loops. + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); - throw new IgniteException(e); - } + throw new IgniteException(e); } } - finally { - ctx.gateway().readUnlock(); - } } + finally { + ctx.gateway().readUnlock(); + } + } - /** - * @param sticky Whether multi-node request should be done. - * @param name Service name. - * @return Node with deployed service or {@code null} if there is no such node. - */ - private ClusterNode nodeForService(String name, boolean sticky) throws IgniteCheckedException { - do { // Repeat if reference to remote node was changed. - if (sticky) { - ClusterNode curNode = rmtNode.get(); + /** + * @param sticky Whether multi-node request should be done. + * @param name Service name. + * @return Node with deployed service or {@code null} if there is no such node. + */ + private ClusterNode nodeForService(String name, boolean sticky) throws IgniteCheckedException { + do { // Repeat if reference to remote node was changed. + if (sticky) { + ClusterNode curNode = rmtNode.get(); - if (curNode != null) - return curNode; + if (curNode != null) + return curNode; - curNode = randomNodeForService(name); + curNode = randomNodeForService(name); - if (curNode == null) - return null; + if (curNode == null) + return null; - if (rmtNode.compareAndSet(null, curNode)) - return curNode; - } - else - return randomNodeForService(name); + if (rmtNode.compareAndSet(null, curNode)) + return curNode; } - while (true); + else + return randomNodeForService(name); } + while (true); + } - /** - * @param name Service name. - * @return Local node if it has a given service deployed or randomly chosen remote node, - * otherwise ({@code null} if given service is not deployed on any node. - */ - private ClusterNode randomNodeForService(String name) throws IgniteCheckedException { - if (hasLocNode && ctx.service().service(name) != null) - return ctx.discovery().localNode(); - - Map snapshot = ctx.service().serviceTopology(name); - - if (snapshot == null || snapshot.isEmpty()) - return null; + /** + * @param name Service name. + * @return Local node if it has a given service deployed or randomly chosen remote node, + * otherwise ({@code null} if given service is not deployed on any node. + */ + private ClusterNode randomNodeForService(String name) throws IgniteCheckedException { + if (hasLocNode && ctx.service().service(name) != null) + return ctx.discovery().localNode(); - // Optimization for cluster singletons. - if (snapshot.size() == 1) { - UUID nodeId = snapshot.keySet().iterator().next(); + Map snapshot = ctx.service().serviceTopology(name); - return prj.node(nodeId); - } + if (snapshot == null || snapshot.isEmpty()) + return null; - Collection nodes = prj.nodes(); + // Optimization for cluster singletons. + if (snapshot.size() == 1) { + UUID nodeId = snapshot.keySet().iterator().next(); - // Optimization for 1 node in projection. - if (nodes.size() == 1) { - ClusterNode n = nodes.iterator().next(); + return prj.node(nodeId); + } - return snapshot.containsKey(n.id()) ? n : null; - } + Collection nodes = prj.nodes(); - // Optimization if projection is the whole grid. - if (prj.predicate() == F.alwaysTrue()) { - int idx = ThreadLocalRandom8.current().nextInt(snapshot.size()); + // Optimization for 1 node in projection. + if (nodes.size() == 1) { + ClusterNode n = nodes.iterator().next(); - int i = 0; + return snapshot.containsKey(n.id()) ? n : null; + } - // Get random node. - for (Map.Entry e : snapshot.entrySet()) { - if (i++ >= idx) { - if (e.getValue() > 0) - return ctx.discovery().node(e.getKey()); - } - } + // Optimization if projection is the whole grid. + if (prj.predicate() == F.alwaysTrue()) { + int idx = ThreadLocalRandom8.current().nextInt(snapshot.size()); - i = 0; + int i = 0; - // Circle back. - for (Map.Entry e : snapshot.entrySet()) { + // Get random node. + for (Map.Entry e : snapshot.entrySet()) { + if (i++ >= idx) { if (e.getValue() > 0) return ctx.discovery().node(e.getKey()); - - if (i++ == idx) - return null; } } - else { - List nodeList = new ArrayList<>(nodes.size()); - for (ClusterNode n : nodes) { - Integer cnt = snapshot.get(n.id()); + i = 0; - if (cnt != null && cnt > 0) - nodeList.add(n); - } + // Circle back. + for (Map.Entry e : snapshot.entrySet()) { + if (e.getValue() > 0) + return ctx.discovery().node(e.getKey()); - if (nodeList.isEmpty()) + if (i++ == idx) return null; + } + } + else { + List nodeList = new ArrayList<>(nodes.size()); - int idx = ThreadLocalRandom8.current().nextInt(nodeList.size()); + for (ClusterNode n : nodes) { + Integer cnt = snapshot.get(n.id()); - return nodeList.get(idx); + if (cnt != null && cnt > 0) + nodeList.add(n); } - return null; + if (nodeList.isEmpty()) + return null; + + int idx = ThreadLocalRandom8.current().nextInt(nodeList.size()); + + return nodeList.get(idx); + } + + return null; + } + + /** + * @return Proxy object for a given instance. + */ + T proxy() { + return proxy; + } + + /** + * Invocation handler for service proxy. + */ + private class ProxyInvocationHandler implements InvocationHandler { + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public Object invoke(Object proxy, final Method mtd, final Object[] args) { + return invokeMethod(mtd, args); } } diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java new file mode 100644 index 0000000000000..05b7bd0316d15 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.platform; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeTaskAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceContext; +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Task that deploys a Java service. + */ +public class PlatformDeployServiceTask extends ComputeTaskAdapter { + /** {@inheritDoc} */ + @Nullable @Override public Map map(List subgrid, + @Nullable String serviceName) throws IgniteException { + return Collections.singletonMap(new PlatformDeployServiceJob(serviceName), F.first(subgrid)); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List results) throws IgniteException { + return results.get(0).getData(); + } + + /** + * Job. + */ + private static class PlatformDeployServiceJob extends ComputeJobAdapter { + /** Service name. */ + private final String serviceName; + + /** Ignite. */ + @SuppressWarnings("UnusedDeclaration") + @IgniteInstanceResource + private Ignite ignite; + + /** + * Ctor. + * + * @param serviceName Service name. + */ + private PlatformDeployServiceJob(String serviceName) { + assert serviceName != null; + this.serviceName = serviceName; + } + + /** {@inheritDoc} */ + @Override public Object execute() throws IgniteException { + ignite.services().deployNodeSingleton(serviceName, new PlatformTestService()); + + return null; + } + } + + /** + * Test service. + */ + @SuppressWarnings("UnusedDeclaration") + public static class PlatformTestService implements Service { + /** */ + private boolean isCancelled; + + /** */ + private boolean isInitialized; + + /** */ + private boolean isExecuted; + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + isCancelled = true; + } + + /** {@inheritDoc} */ + @Override public void init(ServiceContext ctx) throws Exception { + isInitialized = true; + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) throws Exception { + isExecuted = true; + } + + /** + * Returns a value indicating whether this service is cancelled. + */ + public boolean isCancelled() { + return isCancelled; + } + + /** + * Returns a value indicating whether this service is initialized. + */ + public boolean isInitialized() { + return isInitialized; + } + + /** + * Returns a value indicating whether this service is executed. + */ + public boolean isExecuted() { + return isExecuted; + } + + /** */ + public byte test(byte arg) { + return (byte) (arg + 1); + } + + /** */ + public short test(short arg) { + return (short) (arg + 1); + } + + /** */ + public int test(int arg) { + return arg + 1; + } + + /** */ + public long test(long arg) { + return arg + 1; + } + + /** */ + public float test(float arg) { + return arg + 1.5f; + } + + /** */ + public double test(double arg) { + return arg + 2.5; + } + + /** */ + public boolean test(boolean arg) { + return !arg; + } + + /** */ + public char test(char arg) { + return (char) (arg + 1); + } + + /** */ + public String test(String arg) { + return arg == null ? null : arg + "!"; + } + + /** */ + public Byte testWrapper(Byte arg) { + return arg == null ? null : (byte) (arg + 1); + } + + /** */ + public Short testWrapper(Short arg) { + return arg == null ? null : (short) (arg + 1); + } + + /** */ + public Integer testWrapper(Integer arg) { + return arg == null ? null : arg + 1; + } + + /** */ + public Long testWrapper(Long arg) { + return arg == null ? null : arg + 1; + } + + /** */ + public Float testWrapper(Float arg) { + return arg == null ? null : arg + 1.5f; + } + + /** */ + public Double testWrapper(Double arg) { + return arg == null ? null : arg + 2.5; + } + + /** */ + public Boolean testWrapper(Boolean arg) { + return arg == null ? null : !arg; + } + + /** */ + public Character testWrapper(Character arg) { + return arg == null ? null : (char) (arg + 1); + } + + /** */ + public byte[] testArray(byte[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public short[] testArray(short[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public int[] testArray(int[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public long[] testArray(long[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public double[] testArray(double[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public float[] testArray(float[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public String[] testArray(String[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public char[] testArray(char[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] += 1; + + return arg; + } + + /** */ + public boolean[] testArray(boolean[] arg) { + if (arg != null) + for (int i = 0; i < arg.length; i++) + arg[i] = !arg[i]; + + return arg; + } + + /** */ + public Integer testNull(Integer arg) { + return arg == null ? null : arg + 1; + } + + /** */ + public int testParams(Object... args) { + return args.length; + } + + /** */ + public int test(int x, String y) { + return x + 1; + } + + /** */ + public int test(String x, int y) { + return y + 1; + } + + /** */ + public PlatformComputeBinarizable testBinarizable(PlatformComputeBinarizable arg) { + return arg == null ? null : new PlatformComputeBinarizable(arg.field + 1); + } + + /** */ + public Object[] testBinarizableArray(Object[] arg) { + if (arg == null) + return null; + + for (int i = 0; i < arg.length; i++) + arg[i] = arg[i] == null + ? null + : new PlatformComputeBinarizable(((PlatformComputeBinarizable)arg[i]).field + 1); + + return arg; + } + + /** */ + public Collection testBinarizableCollection(Collection arg) { + if (arg == null) + return null; + + Collection res = new ArrayList<>(arg.size()); + + for(Object x : arg) + res.add(new PlatformComputeBinarizable(((PlatformComputeBinarizable)x).field + 1)); + + return res; + } + + /** */ + public BinaryObject testBinaryObject(BinaryObject o) { + if (o == null) + return null; + + return o.toBuilder().setField("field", 15).build(); + } + } +} diff --git a/modules/platforms/cpp/common/src/java.cpp b/modules/platforms/cpp/common/src/java.cpp index ae06c3005c266..c0290384155bf 100644 --- a/modules/platforms/cpp/common/src/java.cpp +++ b/modules/platforms/cpp/common/src/java.cpp @@ -386,7 +386,7 @@ namespace ignite JniMethod M_PLATFORM_SERVICES_WITH_SERVER_KEEP_PORTABLE = JniMethod("withServerKeepBinary", "()Lorg/apache/ignite/internal/processors/platform/services/PlatformServices;", false); JniMethod M_PLATFORM_SERVICES_CANCEL = JniMethod("cancel", "(Ljava/lang/String;)V", false); JniMethod M_PLATFORM_SERVICES_CANCEL_ALL = JniMethod("cancelAll", "()V", false); - JniMethod M_PLATFORM_SERVICES_SERVICE_PROXY = JniMethod("dotNetServiceProxy", "(Ljava/lang/String;Z)Ljava/lang/Object;", false); + JniMethod M_PLATFORM_SERVICES_SERVICE_PROXY = JniMethod("serviceProxy", "(Ljava/lang/String;Z)Ljava/lang/Object;", false); const char* C_PLATFORM_ATOMIC_LONG = "org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong"; JniMethod M_PLATFORM_ATOMIC_LONG_GET = JniMethod("get", "()J", false); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServiceProxyTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServiceProxyTest.cs index 973381c11092e..5110b2973e859 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServiceProxyTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServiceProxyTest.cs @@ -24,6 +24,7 @@ namespace Apache.Ignite.Core.Tests.Services using System.Reflection; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Memory; using Apache.Ignite.Core.Impl.Services; using Apache.Ignite.Core.Services; @@ -165,7 +166,8 @@ public void TestMissingMethods() var ex = Assert.Throws(() => prx.MissingMethod()); Assert.AreEqual("Failed to invoke proxy: there is no method 'MissingMethod'" + - " in type 'Apache.Ignite.Core.Tests.Services.ServiceProxyTest+TestIgniteService'", ex.Message); + " in type 'Apache.Ignite.Core.Tests.Services.ServiceProxyTest+TestIgniteService'" + + " with 0 arguments", ex.Message); } /// @@ -268,7 +270,7 @@ private object InvokeProxyMethod(MethodBase method, object[] args) // 1) Write to a stream inStream.WriteBool(SrvKeepBinary); // WriteProxyMethod does not do this, but Java does - ServiceProxySerializer.WriteProxyMethod(_marsh.StartMarshal(inStream), method, args); + ServiceProxySerializer.WriteProxyMethod(_marsh.StartMarshal(inStream), method, args, Platform.DotNet); inStream.SynchronizeOutput(); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs index c563a61874a78..b06d6dd90ef84 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs @@ -19,7 +19,8 @@ namespace Apache.Ignite.Core.Tests.Services { using System; - using System.Collections.Generic; + using System.Collections; + using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Threading; using Apache.Ignite.Core.Binary; @@ -242,11 +243,7 @@ public void TestCancel() public void TestGetServiceProxy([Values(true, false)] bool binarizable) { // Test proxy without a service - var prx = Services.GetServiceProxy(SvcName); - - Assert.IsTrue(prx != null); - - var ex = Assert.Throws(() => Assert.IsTrue(prx.Initialized)).InnerException; + var ex = Assert.Throws(()=> Services.GetServiceProxy(SvcName)); Assert.AreEqual("Failed to find deployed service: " + SvcName, ex.Message); // Deploy to grid2 & grid3 @@ -262,7 +259,7 @@ public void TestGetServiceProxy([Values(true, false)] bool binarizable) Assert.IsNull(Services.GetService(SvcName)); // Get proxy - prx = Services.GetServiceProxy(SvcName); + var prx = Services.GetServiceProxy(SvcName); // Check proxy properties Assert.IsNotNull(prx); @@ -509,6 +506,96 @@ public void TestMarshalExceptionOnWrite() Assert.IsNull(svc0); } + [Test] + public void TestCallJavaService() + { + const string javaSvcName = "javaService"; + + // Deploy Java service + Grid1.GetCompute() + .ExecuteJavaTask("org.apache.ignite.platform.PlatformDeployServiceTask", javaSvcName); + + // Verify decriptor + var descriptor = Services.GetServiceDescriptors().Single(x => x.Name == javaSvcName); + Assert.AreEqual(javaSvcName, descriptor.Name); + Assert.Throws(() => + { + // ReSharper disable once UnusedVariable + var type = descriptor.Type; + }); + + var svc = Services.GetServiceProxy(javaSvcName, false); + var binSvc = Services.WithKeepBinary().WithServerKeepBinary() + .GetServiceProxy(javaSvcName, false); + + // Basics + Assert.IsTrue(svc.isInitialized()); + Assert.IsTrue(svc.isExecuted()); + Assert.IsFalse(svc.isCancelled()); + + // Primitives + Assert.AreEqual(4, svc.test((byte) 3)); + Assert.AreEqual(5, svc.test((short) 4)); + Assert.AreEqual(6, svc.test(5)); + Assert.AreEqual(6, svc.test((long) 5)); + Assert.AreEqual(3.8f, svc.test(2.3f)); + Assert.AreEqual(5.8, svc.test(3.3)); + Assert.IsFalse(svc.test(true)); + Assert.AreEqual('b', svc.test('a')); + Assert.AreEqual("Foo!", svc.test("Foo")); + + // Nullables (Java wrapper types) + Assert.AreEqual(4, svc.testWrapper(3)); + Assert.AreEqual(5, svc.testWrapper((short?) 4)); + Assert.AreEqual(6, svc.testWrapper((int?)5)); + Assert.AreEqual(6, svc.testWrapper((long?) 5)); + Assert.AreEqual(3.8f, svc.testWrapper(2.3f)); + Assert.AreEqual(5.8, svc.testWrapper(3.3)); + Assert.AreEqual(false, svc.testWrapper(true)); + Assert.AreEqual('b', svc.testWrapper('a')); + + // Arrays + Assert.AreEqual(new byte[] {2, 3, 4}, svc.testArray(new byte[] {1, 2, 3})); + Assert.AreEqual(new short[] {2, 3, 4}, svc.testArray(new short[] {1, 2, 3})); + Assert.AreEqual(new[] {2, 3, 4}, svc.testArray(new[] {1, 2, 3})); + Assert.AreEqual(new long[] {2, 3, 4}, svc.testArray(new long[] {1, 2, 3})); + Assert.AreEqual(new float[] {2, 3, 4}, svc.testArray(new float[] {1, 2, 3})); + Assert.AreEqual(new double[] {2, 3, 4}, svc.testArray(new double[] {1, 2, 3})); + Assert.AreEqual(new[] {"a1", "b1"}, svc.testArray(new [] {"a", "b"})); + Assert.AreEqual(new[] {'c', 'd'}, svc.testArray(new[] {'b', 'c'})); + Assert.AreEqual(new[] {false, true, false}, svc.testArray(new[] {true, false, true})); + + // Nulls + Assert.AreEqual(9, svc.testNull(8)); + Assert.IsNull(svc.testNull(null)); + + // params / varargs + Assert.AreEqual(5, svc.testParams(1, 2, 3, 4, "5")); + Assert.AreEqual(0, svc.testParams()); + + // Overloads + Assert.AreEqual(3, svc.test(2, "1")); + Assert.AreEqual(3, svc.test("1", 2)); + + // Binary + Assert.AreEqual(7, svc.testBinarizable(new PlatformComputeBinarizable {Field = 6}).Field); + + // Binary collections + var arr = new [] {10, 11, 12}.Select(x => new PlatformComputeBinarizable {Field = x}).ToArray(); + Assert.AreEqual(new[] {11, 12, 13}, svc.testBinarizableCollection(arr) + .OfType().Select(x => x.Field).ToArray()); + Assert.AreEqual(new[] {11, 12, 13}, + svc.testBinarizableArray(arr).OfType().Select(x => x.Field).ToArray()); + + // Binary object + Assert.AreEqual(15, + binSvc.testBinaryObject( + Grid1.GetBinary().ToBinary(new PlatformComputeBinarizable {Field = 6})) + .GetField("Field")); + + Services.Cancel(javaSvcName); + } + /// /// Tests the footer setting. /// @@ -530,9 +617,9 @@ private void StartGrids() if (Grid1 != null) return; - Grid1 = Ignition.Start(Configuration("config\\compute\\compute-grid1.xml")); - Grid2 = Ignition.Start(Configuration("config\\compute\\compute-grid2.xml")); - Grid3 = Ignition.Start(Configuration("config\\compute\\compute-grid3.xml")); + Grid1 = Ignition.Start(GetConfiguration("config\\compute\\compute-grid1.xml")); + Grid2 = Ignition.Start(GetConfiguration("config\\compute\\compute-grid2.xml")); + Grid3 = Ignition.Start(GetConfiguration("config\\compute\\compute-grid3.xml")); Grids = new[] { Grid1, Grid2, Grid3 }; } @@ -574,7 +661,7 @@ private static void CheckServiceStarted(IIgnite grid, int count = 1) /// /// Gets the Ignite configuration. /// - private IgniteConfiguration Configuration(string springConfigUrl) + private IgniteConfiguration GetConfiguration(string springConfigUrl) { if (!CompactFooter) springConfigUrl = ComputeApiTestFullFooter.ReplaceFooterSetting(springConfigUrl); @@ -584,15 +671,11 @@ private IgniteConfiguration Configuration(string springConfigUrl) SpringConfigUrl = springConfigUrl, JvmClasspath = TestUtils.CreateTestClasspath(), JvmOptions = TestUtils.TestJavaOptions(), - BinaryConfiguration = new BinaryConfiguration - { - TypeConfigurations = new List - { - new BinaryTypeConfiguration(typeof(TestIgniteServiceBinarizable)), - new BinaryTypeConfiguration(typeof(TestIgniteServiceBinarizableErr)), - new BinaryTypeConfiguration(typeof(BinarizableObject)) - } - } + BinaryConfiguration = new BinaryConfiguration( + typeof (TestIgniteServiceBinarizable), + typeof (TestIgniteServiceBinarizableErr), + typeof (PlatformComputeBinarizable), + typeof (BinarizableObject)) }; } @@ -854,5 +937,131 @@ private class BinarizableObject { public int Val { get; set; } } + + /// + /// Java service proxy interface. + /// + [SuppressMessage("ReSharper", "InconsistentNaming")] + private interface IJavaService + { + /** */ + bool isCancelled(); + + /** */ + bool isInitialized(); + + /** */ + bool isExecuted(); + + /** */ + byte test(byte x); + + /** */ + short test(short x); + + /** */ + int test(int x); + + /** */ + long test(long x); + + /** */ + float test(float x); + + /** */ + double test(double x); + + /** */ + char test(char x); + + /** */ + string test(string x); + + /** */ + bool test(bool x); + + /** */ + byte? testWrapper(byte? x); + + /** */ + short? testWrapper(short? x); + + /** */ + int? testWrapper(int? x); + + /** */ + long? testWrapper(long? x); + + /** */ + float? testWrapper(float? x); + + /** */ + double? testWrapper(double? x); + + /** */ + char? testWrapper(char? x); + + /** */ + bool? testWrapper(bool? x); + + /** */ + byte[] testArray(byte[] x); + + /** */ + short[] testArray(short[] x); + + /** */ + int[] testArray(int[] x); + + /** */ + long[] testArray(long[] x); + + /** */ + float[] testArray(float[] x); + + /** */ + double[] testArray(double[] x); + + /** */ + char[] testArray(char[] x); + + /** */ + string[] testArray(string[] x); + + /** */ + bool[] testArray(bool[] x); + + /** */ + int test(int x, string y); + /** */ + int test(string x, int y); + + /** */ + int? testNull(int? x); + + /** */ + int testParams(params object[] args); + + /** */ + PlatformComputeBinarizable testBinarizable(PlatformComputeBinarizable x); + + /** */ + object[] testBinarizableArray(object[] x); + + /** */ + ICollection testBinarizableCollection(ICollection x); + + /** */ + IBinaryObject testBinaryObject(IBinaryObject x); + } + + /// + /// Interop class. + /// + private class PlatformComputeBinarizable + { + /** */ + public int Field { get; set; } + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 1be10b421e847..9a5211c9c29b4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -143,6 +143,7 @@ + diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs index 89925ddb469ac..ccb2d1b006c16 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinarySystemHandlers.cs @@ -169,7 +169,8 @@ public static BinarySystemWriteHandler GetWriteHandler(Type type) var handler = FindWriteHandler(t, out supportsHandles); - return handler == null ? null : new BinarySystemWriteHandler(handler, supportsHandles); + return handler == null ? null : new BinarySystemWriteHandler(handler, supportsHandles, + handler == WriteSerializable); }); } @@ -791,17 +792,23 @@ internal class BinarySystemWriteHandler /** */ private readonly bool _supportsHandles; + /** */ + private readonly bool _isSerializable; + /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// The write action. /// Handles flag. - public BinarySystemWriteHandler(Action writeAction, bool supportsHandles = false) + /// Determines whether this handler writes objects as serializable. + public BinarySystemWriteHandler(Action writeAction, bool supportsHandles, + bool isSerializable) { Debug.Assert(writeAction != null); _writeAction = writeAction; _supportsHandles = supportsHandles; + _isSerializable = isSerializable; } /// @@ -821,5 +828,13 @@ public bool SupportsHandles { get { return _supportsHandles; } } + + /// + /// Gets or sets a value indicating whether this handler writes objects as serializable + /// + public bool IsSerializable + { + get { return _isSerializable; } + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Platform.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Platform.cs new file mode 100644 index 0000000000000..e72e8e81bd9e0 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Platform.cs @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + /// + /// Represents an Ignite platform. + /// + public enum Platform + { + /// + /// Java platform. + /// + Java = 0, + + /// + /// .NET platform. + /// + DotNet = 1 + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs index 695f156d05f14..c7c36e0f3f72c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs @@ -121,7 +121,7 @@ public static Exception GetException(IIgnite ignite, string clsName, string msg, if (Exs.TryGetValue(clsName, out ctor)) { - var match = InnerClassRegex.Match(msg); + var match = InnerClassRegex.Match(msg ?? string.Empty); ExceptionFactoryDelegate innerCtor; diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs index f2806ffeb4bd7..2e5de39a17c72 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs @@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Services using System.Diagnostics; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Collections; + using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Services; /// @@ -55,6 +56,7 @@ public ServiceDescriptor(string name, BinaryReader reader, IServices services) TotalCount = reader.ReadInt(); OriginNodeId = reader.ReadGuid() ?? Guid.Empty; AffinityKey = reader.ReadObject(); + Platform = (Platform) reader.ReadByte(); var mapSize = reader.ReadInt(); var snap = new Dictionary(mapSize); @@ -80,11 +82,14 @@ public Type Type catch (Exception ex) { throw new ServiceInvocationException( - "Failed to retrieve service type. It has either been cancelled, or is not a .Net service", ex); + "Failed to retrieve service type. It has either been cancelled, or is not a .NET service", ex); } } } + /** */ + public Platform Platform { get; private set; } + /** */ public int TotalCount { get; private set; } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs index dc61a3474f5ba..295dd56e95324 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs @@ -24,12 +24,17 @@ namespace Apache.Ignite.Core.Impl.Services using System.Globalization; using System.Linq; using System.Reflection; + using Apache.Ignite.Core.Impl.Common; /// /// Invokes service proxy methods. /// - internal static class ServiceProxyInvoker + internal static class ServiceProxyInvoker { + /** Cached method info. */ + private static readonly CopyOnWriteConcurrentDictionary, MethodInfo> Methods = + new CopyOnWriteConcurrentDictionary, MethodInfo>(); + /// /// Invokes the service method according to data from a stream, /// and writes invocation result to the output stream. @@ -69,17 +74,31 @@ private static MethodBase GetMethodOrThrow(Type svcType, string methodName, obje Debug.Assert(svcType != null); Debug.Assert(!string.IsNullOrWhiteSpace(methodName)); + // 0) Check cached methods + var cacheKey = Tuple.Create(svcType, methodName, arguments.Length); + MethodInfo res; + + if (Methods.TryGetValue(cacheKey, out res)) + return res; + // 1) Find methods by name var methods = svcType.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) - .Where(m => CleanupMethodName(m) == methodName).ToArray(); + .Where(m => CleanupMethodName(m) == methodName && m.GetParameters().Length == arguments.Length) + .ToArray(); if (methods.Length == 1) + { + // Update cache only when there is a single method with a given name and arg count. + Methods.GetOrAdd(cacheKey, x => methods[0]); + return methods[0]; + } if (methods.Length == 0) throw new InvalidOperationException( string.Format(CultureInfo.InvariantCulture, - "Failed to invoke proxy: there is no method '{0}' in type '{1}'", methodName, svcType)); + "Failed to invoke proxy: there is no method '{0}' in type '{1}' with {2} arguments", + methodName, svcType, arguments.Length)); // 2) There is more than 1 method with specified name - resolve with argument types. methods = methods.Where(m => AreMethodArgsCompatible(arguments, m.GetParameters())).ToArray(); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs index e49fbf1c7286a..8e44360ff7b65 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs @@ -18,11 +18,13 @@ namespace Apache.Ignite.Core.Impl.Services { using System; + using System.Collections; using System.Diagnostics; using System.Reflection; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Services; /// @@ -36,7 +38,9 @@ internal static class ServiceProxySerializer /// Writer. /// Method. /// Arguments. - public static void WriteProxyMethod(BinaryWriter writer, MethodBase method, object[] arguments) + /// The platform. + public static void WriteProxyMethod(BinaryWriter writer, MethodBase method, object[] arguments, + Platform platform) { Debug.Assert(writer != null); Debug.Assert(method != null); @@ -48,8 +52,21 @@ public static void WriteProxyMethod(BinaryWriter writer, MethodBase method, obje writer.WriteBoolean(true); writer.WriteInt(arguments.Length); - foreach (var arg in arguments) - writer.WriteObject(arg); + if (platform == Platform.DotNet) + { + // Write as is + foreach (var arg in arguments) + writer.WriteObject(arg); + } + else + { + // Other platforms do not support Serializable, need to convert arrays and collections + var methodArgs = method.GetParameters(); + Debug.Assert(methodArgs.Length == arguments.Length); + + for (int i = 0; i < arguments.Length; i++) + WriteArgForPlatforms(writer, methodArgs[i], arguments[i]); + } } else writer.WriteBoolean(false); @@ -136,5 +153,45 @@ public static object ReadInvocationResult(IBinaryStream stream, Marshaller marsh : new ServiceInvocationException("Proxy method invocation failed with an exception. " + "Examine InnerException for details.", (Exception) err); } + + /// + /// Writes the argument in platform-compatible format. + /// + private static void WriteArgForPlatforms(BinaryWriter writer, ParameterInfo param, object arg) + { + var hnd = GetPlatformArgWriter(param, arg); + + if (hnd != null) + hnd(writer, arg); + else + writer.WriteObject(arg); + } + + /// + /// Gets arg writer for platform-compatible service calls. + /// + private static Action GetPlatformArgWriter(ParameterInfo param, object arg) + { + var type = param.ParameterType; + + // Unwrap nullable + type = Nullable.GetUnderlyingType(type) ?? type; + + if (arg == null || type.IsPrimitive) + return null; + + var handler = BinarySystemHandlers.GetWriteHandler(type); + + if (handler != null && !handler.IsSerializable) + return null; + + if (type.IsArray) + return (writer, o) => writer.WriteArrayInternal((Array) o); + + if (arg is ICollection) + return (writer, o) => writer.WriteCollection((ICollection) o); + + return null; + } } } \ No newline at end of file diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs index 2360558cb9955..3d55f0632038c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs @@ -320,7 +320,7 @@ public ICollection GetServices(string name) var res = new List(count); for (var i = 0; i < count; i++) - res.Add((T)Marshaller.Ignite.HandleRegistry.Get(r.ReadLong())); + res.Add(Marshaller.Ignite.HandleRegistry.Get(r.ReadLong())); return res; } @@ -348,9 +348,10 @@ public ICollection GetServices(string name) return locInst; var javaProxy = UU.ServicesGetServiceProxy(Target, name, sticky); + var platform = GetServiceDescriptors().Cast().Single(x => x.Name == name).Platform; - return new ServiceProxy((method, args) => InvokeProxyMethod(javaProxy, method, args)) - .GetTransparentProxy(); + return new ServiceProxy((method, args) => + InvokeProxyMethod(javaProxy, method, args, platform)).GetTransparentProxy(); } /// @@ -359,13 +360,15 @@ public ICollection GetServices(string name) /// Unmanaged proxy. /// Method to invoke. /// Arguments. + /// The platform. /// /// Invocation result. /// - private unsafe object InvokeProxyMethod(IUnmanagedTarget proxy, MethodBase method, object[] args) + private unsafe object InvokeProxyMethod(IUnmanagedTarget proxy, MethodBase method, object[] args, + Platform platform) { return DoOutInOp(OpInvokeMethod, - writer => ServiceProxySerializer.WriteProxyMethod(writer, method, args), + writer => ServiceProxySerializer.WriteProxyMethod(writer, method, args, platform), stream => ServiceProxySerializer.ReadInvocationResult(stream, Marshaller, _keepBinary), proxy.Target); } }