From 6a97c46831dfb08d4d91ad28c92df336f1cc4f64 Mon Sep 17 00:00:00 2001 From: fritzprix Date: Tue, 23 Oct 2018 07:12:33 +0900 Subject: [PATCH 1/4] Add support for Primitive type (e.g. int, double, etc.) in serialization & deserialization --- .../doodream/rmovjs/serde/bson/BsonConverter.java | 14 +++++++++----- .../com/doodream/rmovjs/test/ConverterTest.java | 7 +++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java b/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java index 2002f01..2d86541 100644 --- a/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java +++ b/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java @@ -106,15 +106,19 @@ public T invert(byte[] b, Class cls) { @Override - public Object resolve(final Object unresolved, Type type) throws ClassNotFoundException, InstantiationException, IllegalAccessException { + public Object resolve(final Object unresolved, Type type) throws InstantiationException, IllegalAccessException { if(unresolved == null) { return null; } Class clsz; - if(type instanceof ParameterizedType) { - clsz = Class.forName(((ParameterizedType) type).getRawType().getTypeName()); - } else { - clsz = Class.forName(type.getTypeName()); + try { + if (type instanceof ParameterizedType) { + clsz = Class.forName(((ParameterizedType) type).getRawType().getTypeName()); + } else { + clsz = Class.forName(type.getTypeName()); + } + } catch (ClassNotFoundException e) { + return unresolved; } final Class cls = clsz; final Class unresolvedCls = unresolved.getClass(); diff --git a/src/test/java/com/doodream/rmovjs/test/ConverterTest.java b/src/test/java/com/doodream/rmovjs/test/ConverterTest.java index 6c2b934..a3ee1b9 100644 --- a/src/test/java/com/doodream/rmovjs/test/ConverterTest.java +++ b/src/test/java/com/doodream/rmovjs/test/ConverterTest.java @@ -38,6 +38,8 @@ public void setup() { @Test public void converterSerDeserTest() throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException { for (Converter converter : converters) { + Assert.assertTrue(testPrimitiveType(converter, 1.3, double.class)); + Assert.assertTrue(testPrimitiveType(converter, 1, int.class)); Assert.assertTrue(testNumericObject(converter, 1.3f)); Assert.assertTrue(testNumericObject(converter, 100L)); Assert.assertTrue(testNumericObject(converter, 100)); @@ -48,6 +50,11 @@ public void converterSerDeserTest() throws ClassNotFoundException, IOException, } } + private boolean testPrimitiveType(Converter converter, T v, Class cls) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException { + Object resolved = converter.resolve(v, cls); + return resolved.equals(v); + } + private boolean testNumericObject(Converter converter, T v) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException { T result = testObjectTransfer(converter, v, v.getClass()); return v.equals(result); From 9b5ee7ee9e518fd3864410e38ebc8c5261dde48a Mon Sep 17 00:00:00 2001 From: fritzprix Date: Wed, 7 Nov 2018 06:47:34 +0900 Subject: [PATCH 2/4] simplify implementation for service discovery and improve genericity Prior to this change, the implementation for service discovery should handle listed below.. 1. retrieve service information from service advertiser 2. convert it to service proxy that was too complicated to be applied widely used service discovery technology (like mdns ..) 1.separate service info converting functionality from the discovery 2.remove ad-hoc tick timer for service information retrival which is not well compatible to another service discovery implementation close #49, #48, #37, #39 --- .../doodream/rmovjs/client/HaRMIClient.java | 5 +- .../doodream/rmovjs/model/RMIServiceInfo.java | 50 +++++++- .../rmovjs/net/tcp/TcpServiceAdapter.java | 3 +- .../rmovjs/sdp/BaseServiceDiscovery.java | 120 ++++++++---------- .../rmovjs/sdp/ServiceDiscoveryListener.java | 4 +- .../rmovjs/sdp/SimpleServiceDiscovery.java | 61 +++++++-- 6 files changed, 154 insertions(+), 89 deletions(-) diff --git a/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java b/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java index b11bb8a..075aa2e 100644 --- a/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java +++ b/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java @@ -234,9 +234,10 @@ private synchronized void registerProxy(RMIServiceProxy serviceProxy) { private static Observable startDiscovery(ServiceDiscovery discovery, Class svc) { return Observable.create(emitter -> discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() { + @Override - public void onDiscovered(RMIServiceProxy proxy) { - emitter.onNext(proxy); + public void onDiscovered(RMIServiceInfo info) { + emitter.onNext(RMIServiceInfo.toServiceProxy(info)); } @Override diff --git a/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java b/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java index 026a720..8119624 100644 --- a/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java +++ b/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java @@ -2,9 +2,14 @@ import com.doodream.rmovjs.Properties; import com.doodream.rmovjs.annotation.server.Service; +import com.doodream.rmovjs.net.RMIServiceProxy; +import com.doodream.rmovjs.net.ServiceAdapter; +import com.doodream.rmovjs.net.ServiceProxyFactory; import com.doodream.rmovjs.server.RMIController; import com.google.gson.annotations.SerializedName; import io.reactivex.Observable; +import io.reactivex.Single; +import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.functions.Predicate; @@ -15,6 +20,7 @@ import java.lang.reflect.Field; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeoutException; @Builder @EqualsAndHashCode(exclude = {"proxyFactoryHint"}) @@ -92,11 +98,53 @@ public void accept(List controllerInfos) throws Exception { return builder.build(); } - public static boolean isComplete(RMIServiceInfo info) { + public static boolean isValid(RMIServiceInfo info) { return (info.getProxyFactoryHint() != null) && (info.getControllerInfos() != null); } + public static RMIServiceProxy toServiceProxy(RMIServiceInfo info) { + return Single.just(info) + .map(new Function>() { + @Override + public Class apply(RMIServiceInfo rmiServiceInfo) throws Exception { + return rmiServiceInfo.getAdapter(); + } + }) + .map(new Function, Object>() { + @Override + public Object apply(Class cls) throws Exception { + return cls.newInstance(); + } + }) + .cast(ServiceAdapter.class) + .map(new Function() { + @Override + public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception { + return serviceAdapter.getProxyFactory(info); + } + }) + .map(new Function() { + @Override + public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception { + return serviceProxyFactory.build(); + } + }) + .onErrorReturn(new Function() { + @Override + public RMIServiceProxy apply(Throwable throwable) throws Exception { + return RMIServiceProxy.NULL_PROXY; + } + }) + .filter(new Predicate() { + @Override + public boolean test(RMIServiceProxy proxy) throws Exception { + return !RMIServiceProxy.NULL_PROXY.equals(proxy); + } + }) + .blockingGet(RMIServiceProxy.NULL_PROXY); + } + public void copyFrom(RMIServiceInfo info) { setProxyFactoryHint(info.getProxyFactoryHint()); setParams(info.getParams()); diff --git a/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java b/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java index c2a060b..559b100 100644 --- a/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java +++ b/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java @@ -40,7 +40,7 @@ public TcpServiceAdapter() throws UnknownHostException { @Override public ServiceProxyFactory getProxyFactory(final RMIServiceInfo info) { - if(!RMIServiceInfo.isComplete(info)) { + if(!RMIServiceInfo.isValid(info)) { throw new IllegalArgumentException("Incomplete service info"); } final String[] params = info.getParams().toArray(new String[0]); @@ -70,7 +70,6 @@ public void accept(ServiceProxyFactory serviceProxyFactory) throws Exception { @Override protected void onStart() throws IOException { serverSocket = new ServerSocket(); - Log.debug("service address {}", mAddress.getAddress().getHostAddress()); serverSocket.bind(mAddress); Log.debug("service started @ {}", mAddress.getAddress().getHostAddress()); } diff --git a/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java b/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java index bc5772a..dd30e01 100644 --- a/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java +++ b/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java @@ -6,12 +6,13 @@ import com.doodream.rmovjs.net.ServiceProxyFactory; import com.doodream.rmovjs.serde.Converter; import com.google.common.base.Preconditions; -import io.reactivex.Observable; +import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.functions.Predicate; +import io.reactivex.schedulers.Schedulers; import lombok.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,12 +27,47 @@ public abstract class BaseServiceDiscovery implements ServiceDiscovery { private static final Logger Log = LoggerFactory.getLogger(BaseServiceDiscovery.class); + protected interface DiscoveryEventListener { + void onStart(); + void onDiscovered(RMIServiceInfo info); + void onError(Throwable e); + void onStop(); + } + + + private static class ServiceInfoSource implements ObservableOnSubscribe, DiscoveryEventListener { + private ObservableEmitter emitter; + + @Override + public void onStart() { + } + + @Override + public void onDiscovered(RMIServiceInfo info) { + emitter.onNext(info); + } + + @Override + public void onError(Throwable e) { + emitter.onError(e); + } + + @Override + public void onStop() { + emitter.onComplete(); + } + + @Override + public void subscribe(ObservableEmitter observableEmitter) throws Exception { + this.emitter = observableEmitter; + } + } + + private static final long TIMEOUT_IN_SEC = 5L; - private long tickIntervalInMilliSec; - private HashMap disposableMap; + private final HashMap disposableMap; - BaseServiceDiscovery(long interval, TimeUnit unit) { - tickIntervalInMilliSec = unit.toMillis(interval); + BaseServiceDiscovery() { disposableMap = new HashMap<>(); } @@ -41,10 +77,6 @@ public void startDiscovery(@NonNull Class service, boolean once, @NonNull Servic } - private Observable observeTick() { - return Observable.interval(0L, tickIntervalInMilliSec, TimeUnit.MILLISECONDS); - } - @Override public void startDiscovery(@NonNull final Class service, final boolean once, long timeout, @NonNull TimeUnit unit, @NonNull final ServiceDiscoveryListener listener) throws IllegalAccessException, InstantiationException { if(disposableMap.containsKey(service)) { @@ -56,15 +88,12 @@ public void startDiscovery(@NonNull final Class service, final boolean once, lon Preconditions.checkNotNull(converter, "converter is not declared"); final HashSet discoveryCache = new HashSet<>(); + final ServiceInfoSource serviceInfoSource = new ServiceInfoSource(); + + onStartDiscovery(serviceInfoSource); listener.onDiscoveryStarted(); - Observable serviceInfoObservable = observeTick() - .map(new Function() { - @Override - public RMIServiceInfo apply(Long aLong) throws Exception { - return receiveServiceInfo(converter); - } - }) + disposableMap.put(service, Observable.create(serviceInfoSource) .doOnNext(new Consumer() { @Override public void accept(RMIServiceInfo svcInfo) throws Exception { @@ -99,35 +128,7 @@ public void accept(RMIServiceInfo discovered) throws Exception { info.copyFrom(discovered); } }) - .timeout(timeout, unit); - - - disposableMap.put(service, serviceInfoObservable - .map(new Function>() { - @Override - public Class apply(RMIServiceInfo rmiServiceInfo) throws Exception { - return rmiServiceInfo.getAdapter(); - } - }) - .map(new Function, Object>() { - @Override - public Object apply(Class cls) throws Exception { - return cls.newInstance(); - } - }) - .cast(ServiceAdapter.class) - .map(new Function() { - @Override - public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception { - return serviceAdapter.getProxyFactory(info); - } - }) - .map(new Function() { - @Override - public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception { - return serviceProxyFactory.build(); - } - }) + .timeout(timeout, unit) .doOnDispose(new Action() { @Override public void run() throws Exception { @@ -136,7 +137,8 @@ public void run() throws Exception { listener.onDiscoveryFinished(); } }) - .doOnError(new Consumer() { + .subscribeOn(Schedulers.io()) + .subscribe(listener::onDiscovered, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { if(throwable instanceof TimeoutException) { @@ -148,37 +150,18 @@ public void accept(Throwable throwable) throws Exception { disposableMap.remove(service); listener.onDiscoveryFinished(); } - }) - .doOnComplete(new Action() { + }, new Action() { @Override public void run() throws Exception { close(); disposableMap.remove(service); listener.onDiscoveryFinished(); } - }) - .onErrorReturn(new Function() { - @Override - public RMIServiceProxy apply(Throwable throwable) throws Exception { - return RMIServiceProxy.NULL_PROXY; - } - }) - .filter(new Predicate() { - @Override - public boolean test(RMIServiceProxy proxy) throws Exception { - return !RMIServiceProxy.NULL_PROXY.equals(proxy); - } - }) - .subscribe(new Consumer() { - @Override - public void accept(RMIServiceProxy rmiServiceProxy) throws Exception { - listener.onDiscovered(rmiServiceProxy); - } })); - } + @Override public void cancelDiscovery(Class service) { Disposable disposable = disposableMap.get(service); @@ -191,7 +174,6 @@ public void cancelDiscovery(Class service) { disposable.dispose(); } - - protected abstract RMIServiceInfo receiveServiceInfo(Converter converter) throws IOException; + protected abstract void onStartDiscovery(DiscoveryEventListener listener); protected abstract void close(); } diff --git a/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java b/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java index e085fb9..97dbc8e 100644 --- a/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java +++ b/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java @@ -1,9 +1,9 @@ package com.doodream.rmovjs.sdp; -import com.doodream.rmovjs.net.RMIServiceProxy; +import com.doodream.rmovjs.model.RMIServiceInfo; public interface ServiceDiscoveryListener { - void onDiscovered(RMIServiceProxy proxy); + void onDiscovered(RMIServiceInfo info); void onDiscoveryStarted(); void onDiscoveryFinished() throws IllegalAccessException; } diff --git a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java index ab4da12..7824559 100644 --- a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java +++ b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java @@ -2,12 +2,19 @@ import com.doodream.rmovjs.model.RMIServiceInfo; import com.doodream.rmovjs.serde.Converter; +import com.doodream.rmovjs.serde.json.JsonConverter; +import io.reactivex.Observable; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.functions.Action; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.util.Arrays; +import java.util.HashMap; import java.util.concurrent.TimeUnit; /** @@ -18,31 +25,59 @@ */ public class SimpleServiceDiscovery extends BaseServiceDiscovery { - private MulticastSocket serviceBroadcastSocket; + private DiscoveryEventListener listener; + private final CompositeDisposable disposable; + private final MulticastSocket serviceBroadcastSocket; + private final JsonConverter converter; public SimpleServiceDiscovery() throws IOException { - super(100L, TimeUnit.MILLISECONDS); + super(); + disposable = new CompositeDisposable(); + converter = new JsonConverter(); serviceBroadcastSocket = new MulticastSocket(SimpleServiceAdvertiser.BROADCAST_PORT); serviceBroadcastSocket.joinGroup(InetAddress.getByName(SimpleServiceAdvertiser.MULTICAST_GROUP_IP)); } @Override - protected RMIServiceInfo receiveServiceInfo(Converter converter) throws IOException { - byte[] buffer = new byte[64 * 1024]; - Arrays.fill(buffer, (byte) 0); - DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - serviceBroadcastSocket.receive(packet); - return converter.invert(packet.getData(), RMIServiceInfo.class); + protected void onStartDiscovery(DiscoveryEventListener listener) { + this.listener = listener; + disposable.add(Observable.interval(1000L, TimeUnit.MILLISECONDS) + .map(new Function() { + @Override + public RMIServiceInfo apply(Long aLong) throws Exception { + byte[] buffer = new byte[64 * 1024]; + Arrays.fill(buffer, (byte) 0); + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + serviceBroadcastSocket.receive(packet); + return converter.invert(packet.getData(), RMIServiceInfo.class); + } + }) + .subscribe(new Consumer() { + @Override + public void accept(RMIServiceInfo info) throws Exception { + listener.onDiscovered(info); + } + }, new Consumer() { + @Override + public void accept(Throwable throwable) throws Exception { + listener.onError(throwable); + } + }, new Action() { + @Override + public void run() throws Exception { + listener.onStop(); + } + })); } @Override protected void close() { - if(serviceBroadcastSocket == null) { - return; + if(!disposable.isDisposed()) { + disposable.dispose(); } - if(serviceBroadcastSocket.isClosed()) { - return; + disposable.clear(); + if(!serviceBroadcastSocket.isClosed()) { + serviceBroadcastSocket.close(); } - serviceBroadcastSocket.close(); } } From ac4a905acf86875e2ea7acac8de02d568948af0f Mon Sep 17 00:00:00 2001 From: fritzprix Date: Wed, 7 Nov 2018 07:01:43 +0900 Subject: [PATCH 3/4] change method name for service discovery --- .../doodream/rmovjs/sdp/BaseServiceDiscovery.java | 12 ++++-------- .../doodream/rmovjs/sdp/SimpleServiceDiscovery.java | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java b/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java index dd30e01..ee7fca2 100644 --- a/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java +++ b/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java @@ -1,9 +1,6 @@ package com.doodream.rmovjs.sdp; import com.doodream.rmovjs.model.RMIServiceInfo; -import com.doodream.rmovjs.net.RMIServiceProxy; -import com.doodream.rmovjs.net.ServiceAdapter; -import com.doodream.rmovjs.net.ServiceProxyFactory; import com.doodream.rmovjs.serde.Converter; import com.google.common.base.Preconditions; import io.reactivex.*; @@ -17,7 +14,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.concurrent.TimeUnit; @@ -132,7 +128,7 @@ public void accept(RMIServiceInfo discovered) throws Exception { .doOnDispose(new Action() { @Override public void run() throws Exception { - close(); + onStopDiscovery(); disposableMap.remove(service); listener.onDiscoveryFinished(); } @@ -146,14 +142,14 @@ public void accept(Throwable throwable) throws Exception { } else { Log.warn("{}", throwable); } - close(); + onStopDiscovery(); disposableMap.remove(service); listener.onDiscoveryFinished(); } }, new Action() { @Override public void run() throws Exception { - close(); + onStopDiscovery(); disposableMap.remove(service); listener.onDiscoveryFinished(); } @@ -175,5 +171,5 @@ public void cancelDiscovery(Class service) { } protected abstract void onStartDiscovery(DiscoveryEventListener listener); - protected abstract void close(); + protected abstract void onStopDiscovery(); } diff --git a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java index 7824559..a1a97fd 100644 --- a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java +++ b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java @@ -71,7 +71,7 @@ public void run() throws Exception { } @Override - protected void close() { + protected void onStopDiscovery() { if(!disposable.isDisposed()) { disposable.dispose(); } From 527f771c8f933842842462af20c632b623b15d71 Mon Sep 17 00:00:00 2001 From: fritzprix Date: Wed, 7 Nov 2018 07:05:15 +0900 Subject: [PATCH 4/4] update README --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2e4ee8a..1f575db 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ yarmi is yet anotehr RMI based on JSON. it's simple yet powerful when developing com.doodream yarmi-core - 0.0.4 + 0.0.5 ``` @@ -148,8 +148,8 @@ public static class SimpleClient { SimpleServiceDiscovery discovery = new SimpleServiceDiscovery(); discovery.startDiscovery(TestService.class, new ServiceDiscoveryListener() { @Override - public void onDiscovered(RMIServiceProxy proxy) { - discoveredService.add(proxy); + public void onDiscovered(RMIServiceInfo info) { + discoveredService.add(RMIServiceInfo.toServiceProxy(info)); } @Override