Skip to content

Commit

Permalink
IGNITE-2686: .NET: Implemented ability to invoke foreign services fro…
Browse files Browse the repository at this point in the history
…m .NET. This closes #513.
  • Loading branch information
Pavel Tupitsyn authored and vozerov-gridgain committed Mar 31, 2016
1 parent f199553 commit bd509be
Show file tree
Hide file tree
Showing 15 changed files with 1,278 additions and 207 deletions.
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.platform.services; package org.apache.ignite.internal.processors.platform.services;


import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices; import org.apache.ignite.IgniteServices;
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawReaderEx;
Expand All @@ -29,13 +30,19 @@
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
import org.apache.ignite.internal.processors.service.GridServiceProxy;
import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.Service; import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDescriptor; import org.apache.ignite.services.ServiceDescriptor;


import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;


Expand All @@ -54,11 +61,21 @@ public class PlatformServices extends PlatformAbstractTarget {
private static final int OP_DOTNET_SERVICES = 3; private static final int OP_DOTNET_SERVICES = 3;


/** */ /** */
private static final int OP_DOTNET_INVOKE = 4; private static final int OP_INVOKE = 4;


/** */ /** */
private static final int OP_DESCRIPTORS = 5; private static final int OP_DESCRIPTORS = 5;


/** */
private static final byte PLATFORM_JAVA = 0;

/** */
private static final byte PLATFORM_DOTNET = 1;

/** */
private static final CopyOnWriteConcurrentMap<T3<Class, String, Integer>, Method> SVC_METHODS
= new CopyOnWriteConcurrentMap<>();

/** */ /** */
private final IgniteServices services; private final IgniteServices services;


Expand Down Expand Up @@ -125,8 +142,31 @@ public void cancelAll() {
* @param sticky Whether or not Ignite should always contact the same remote service. * @param sticky Whether or not Ignite should always contact the same remote service.
* @return Either proxy over remote service or local service if it is deployed locally. * @return Either proxy over remote service or local service if it is deployed locally.
*/ */
public Object dotNetServiceProxy(String name, boolean sticky) { public Object serviceProxy(String name, boolean sticky) {
return services.serviceProxy(name, PlatformDotNetService.class, sticky); ServiceDescriptor d = findDescriptor(name);

if (d == null)
throw new IgniteException("Failed to find deployed service: " + name);

Object proxy = PlatformService.class.isAssignableFrom(d.serviceClass())
? services.serviceProxy(name, PlatformService.class, sticky)
: new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, platformCtx.kernalContext());

return new ServiceProxyHolder(proxy, d.serviceClass());
}

/**
* Finds a service descriptor by name.
*
* @param name Service name.
* @return Descriptor or null.
*/
private ServiceDescriptor findDescriptor(String name) {
for (ServiceDescriptor d : services.serviceDescriptors())
if (d.name().equals(name))
return d;

return null;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -202,9 +242,9 @@ public Object dotNetServiceProxy(String name, boolean sticky) {
@Override protected void processInObjectStreamOutStream(int type, Object arg, BinaryRawReaderEx reader, @Override protected void processInObjectStreamOutStream(int type, Object arg, BinaryRawReaderEx reader,
BinaryRawWriterEx writer) throws IgniteCheckedException { BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) { switch (type) {
case OP_DOTNET_INVOKE: { case OP_INVOKE: {
assert arg != null; assert arg != null;
assert arg instanceof PlatformDotNetService; assert arg instanceof ServiceProxyHolder;


String mthdName = reader.readString(); String mthdName = reader.readString();


Expand All @@ -220,7 +260,7 @@ public Object dotNetServiceProxy(String name, boolean sticky) {
args = null; args = null;


try { try {
Object result = ((PlatformDotNetService)arg).invokeMethod(mthdName, srvKeepBinary, args); Object result = ((ServiceProxyHolder)arg).invoke(mthdName, srvKeepBinary, args);


PlatformUtils.writeInvocationResult(writer, result, null); PlatformUtils.writeInvocationResult(writer, result, null);
} }
Expand Down Expand Up @@ -251,6 +291,11 @@ public Object dotNetServiceProxy(String name, boolean sticky) {
writer.writeUuid(d.originNodeId()); writer.writeUuid(d.originNodeId());
writer.writeObject(d.affinityKey()); writer.writeObject(d.affinityKey());


// Write platform. There are only 2 platforms now.
byte platform = d.serviceClass().equals(PlatformDotNetServiceImpl.class)
? PLATFORM_DOTNET : PLATFORM_JAVA;
writer.writeByte(platform);

Map<UUID, Integer> top = d.topologySnapshot(); Map<UUID, Integer> top = d.topologySnapshot();


PlatformUtils.writeMap(writer, top, new PlatformWriterBiClosure<UUID, Integer>() { PlatformUtils.writeMap(writer, top, new PlatformWriterBiClosure<UUID, Integer>() {
Expand All @@ -274,4 +319,196 @@ public Object dotNetServiceProxy(String name, boolean sticky) {
@Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)services.future()).internalFuture(); return ((IgniteFutureImpl)services.future()).internalFuture();
} }

/**
* Proxy holder.
*/
@SuppressWarnings("unchecked")
private static class ServiceProxyHolder {
/** */
private final Object proxy;

/** */
private final Class serviceClass;

/** */
private static final Map<Class<?>, Class<?>> PRIMITIVES_TO_WRAPPERS = new HashMap<>();

/**
* Class initializer.
*/
static {
PRIMITIVES_TO_WRAPPERS.put(boolean.class, Boolean.class);
PRIMITIVES_TO_WRAPPERS.put(byte.class, Byte.class);
PRIMITIVES_TO_WRAPPERS.put(char.class, Character.class);
PRIMITIVES_TO_WRAPPERS.put(double.class, Double.class);
PRIMITIVES_TO_WRAPPERS.put(float.class, Float.class);
PRIMITIVES_TO_WRAPPERS.put(int.class, Integer.class);
PRIMITIVES_TO_WRAPPERS.put(long.class, Long.class);
PRIMITIVES_TO_WRAPPERS.put(short.class, Short.class);
}

/**
* Ctor.
*
* @param proxy Proxy object.
* @param clazz Proxy class.
*/
private ServiceProxyHolder(Object proxy, Class clazz) {
assert proxy != null;
assert clazz != null;

this.proxy = proxy;
serviceClass = clazz;
}

/**
* Invokes the proxy.
* @param mthdName Method name.
* @param srvKeepBinary Binary flag.
* @param args Args.
* @return Invocation result.
* @throws IgniteCheckedException
* @throws NoSuchMethodException
*/
public Object invoke(String mthdName, boolean srvKeepBinary, Object[] args)
throws IgniteCheckedException, NoSuchMethodException {
if (proxy instanceof PlatformService) {
return ((PlatformService)proxy).invokeMethod(mthdName, srvKeepBinary, args);
}
else {
assert proxy instanceof GridServiceProxy;

// Deserialize arguments for Java service when not in binary mode
if (!srvKeepBinary)
args = PlatformUtils.unwrapBinariesInArray(args);

Method mtd = getMethod(serviceClass, mthdName, args);

return ((GridServiceProxy)proxy).invokeMethod(mtd, args);
}
}

/**
* Finds a suitable method in a class
*
* @param clazz Class.
* @param mthdName Name.
* @param args Args.
* @return Method.
*/
private static Method getMethod(Class clazz, String mthdName, Object[] args) throws NoSuchMethodException {
assert clazz != null;
assert mthdName != null;
assert args != null;

T3<Class, String, Integer> cacheKey = new T3<>(clazz, mthdName, args.length);
Method res = SVC_METHODS.get(cacheKey);

if (res != null)
return res;

Method[] allMethods = clazz.getMethods();

List<Method> methods = new ArrayList<>(allMethods.length);

// Filter by name and param count
for (Method m : allMethods)
if (m.getName().equals(mthdName) && m.getParameterTypes().length == args.length)
methods.add(m);

if (methods.size() == 1) {
res = methods.get(0);

// Update cache only when there is a single method with a given name and arg count.
SVC_METHODS.put(cacheKey, res);

return res;
}

if (methods.isEmpty())
throw new NoSuchMethodException("Could not find proxy method '" + mthdName + "' in class " + clazz);

// Filter by param types
for (int i = 0; i < methods.size(); i++)
if (!areMethodArgsCompatible(methods.get(i).getParameterTypes(), args))
methods.remove(i--);

if (methods.size() == 1)
return methods.get(0);

if (methods.isEmpty())
throw new NoSuchMethodException("Could not find proxy method '" + mthdName + "' in class " + clazz);

throw new NoSuchMethodException("Ambiguous proxy method '" + mthdName + "' in class " + clazz);
}

/**
* Determines whether specified method arguments are compatible with given method parameter definitions.
*
* @param argTypes Method arg types.
* @param args Method args.
* @return Whether specified args are compatible with argTypes.
*/
private static boolean areMethodArgsCompatible(Class[] argTypes, Object[] args) {
for (int i = 0; i < args.length; i++){
// arg is always of a primitive wrapper class, and argTypes can have actual primitive
Object arg = args[i];
Class argType = wrap(argTypes[i]);

if (arg != null && !argType.isAssignableFrom(arg.getClass()))
return false;
}

return true;
}

/**
* Gets corresponding wrapper for a primitive type.
* @param c Class to convert.
*
* @return Primitive wrapper, or the same class.
*/
@SuppressWarnings("unchecked")
private static Class wrap(Class c) {
return c.isPrimitive() ? PRIMITIVES_TO_WRAPPERS.get(c) : c;
}
}

/**
* Concurrent map.
*/
private static class CopyOnWriteConcurrentMap<K, V> {
/** */
private volatile Map<K, V> map = new HashMap<>();

/**
* Gets a value.
*
* @param key Key.
* @return Value.
*/
public V get(K key) {
return map.get(key);
}

/**
* Puts a value.
*
* @param key Key.
* @param val Value.
*/
public void put(K key, V val) {
synchronized (this){
if (map.containsKey(key))
return;

Map<K, V> map0 = new HashMap<>(map);

map0.put(key, val);

map = map0;
}
}
}
} }

0 comments on commit bd509be

Please sign in to comment.