diff --git a/README.md b/README.md
index 2e4ee8a..1f575db 100644
--- a/README.md
+++ b/README.md
@@ -42,7 +42,7 @@ yarmi is yet anotehr RMI based on JSON. it's simple yet powerful when developing
com.doodream
yarmi-core
- 0.0.4
+ 0.0.5
```
@@ -148,8 +148,8 @@ public static class SimpleClient {
SimpleServiceDiscovery discovery = new SimpleServiceDiscovery();
discovery.startDiscovery(TestService.class, new ServiceDiscoveryListener() {
@Override
- public void onDiscovered(RMIServiceProxy proxy) {
- discoveredService.add(proxy);
+ public void onDiscovered(RMIServiceInfo info) {
+ discoveredService.add(RMIServiceInfo.toServiceProxy(info));
}
@Override
diff --git a/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java b/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
index b11bb8a..075aa2e 100644
--- a/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
+++ b/src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
@@ -234,9 +234,10 @@ private synchronized void registerProxy(RMIServiceProxy serviceProxy) {
private static Observable startDiscovery(ServiceDiscovery discovery, Class svc) {
return Observable.create(emitter -> discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() {
+
@Override
- public void onDiscovered(RMIServiceProxy proxy) {
- emitter.onNext(proxy);
+ public void onDiscovered(RMIServiceInfo info) {
+ emitter.onNext(RMIServiceInfo.toServiceProxy(info));
}
@Override
diff --git a/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java b/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
index 026a720..8119624 100644
--- a/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
+++ b/src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
@@ -2,9 +2,14 @@
import com.doodream.rmovjs.Properties;
import com.doodream.rmovjs.annotation.server.Service;
+import com.doodream.rmovjs.net.RMIServiceProxy;
+import com.doodream.rmovjs.net.ServiceAdapter;
+import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.server.RMIController;
import com.google.gson.annotations.SerializedName;
import io.reactivex.Observable;
+import io.reactivex.Single;
+import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
@@ -15,6 +20,7 @@
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeoutException;
@Builder
@EqualsAndHashCode(exclude = {"proxyFactoryHint"})
@@ -92,11 +98,53 @@ public void accept(List controllerInfos) throws Exception {
return builder.build();
}
- public static boolean isComplete(RMIServiceInfo info) {
+ public static boolean isValid(RMIServiceInfo info) {
return (info.getProxyFactoryHint() != null) &&
(info.getControllerInfos() != null);
}
+ public static RMIServiceProxy toServiceProxy(RMIServiceInfo info) {
+ return Single.just(info)
+ .map(new Function>() {
+ @Override
+ public Class> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
+ return rmiServiceInfo.getAdapter();
+ }
+ })
+ .map(new Function, Object>() {
+ @Override
+ public Object apply(Class> cls) throws Exception {
+ return cls.newInstance();
+ }
+ })
+ .cast(ServiceAdapter.class)
+ .map(new Function() {
+ @Override
+ public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
+ return serviceAdapter.getProxyFactory(info);
+ }
+ })
+ .map(new Function() {
+ @Override
+ public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
+ return serviceProxyFactory.build();
+ }
+ })
+ .onErrorReturn(new Function() {
+ @Override
+ public RMIServiceProxy apply(Throwable throwable) throws Exception {
+ return RMIServiceProxy.NULL_PROXY;
+ }
+ })
+ .filter(new Predicate() {
+ @Override
+ public boolean test(RMIServiceProxy proxy) throws Exception {
+ return !RMIServiceProxy.NULL_PROXY.equals(proxy);
+ }
+ })
+ .blockingGet(RMIServiceProxy.NULL_PROXY);
+ }
+
public void copyFrom(RMIServiceInfo info) {
setProxyFactoryHint(info.getProxyFactoryHint());
setParams(info.getParams());
diff --git a/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java b/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java
index 62d693e..c0ea938 100644
--- a/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java
+++ b/src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java
@@ -14,7 +14,6 @@
import io.reactivex.functions.BooleanSupplier;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
-import io.reactivex.functions.Predicate;
import io.reactivex.observables.GroupedObservable;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
@@ -22,7 +21,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Optional;
public abstract class BaseServiceAdapter implements ServiceAdapter {
@@ -123,19 +121,14 @@ public void accept(Request request) throws Exception {
});
}
})
- .doOnNext(new Consumer() {
+ .observeOn(Schedulers.io())
+ .subscribe(new Consumer() {
@Override
public void accept(Request request) throws Exception {
- request.setClient(adapter);
if(Log.isTraceEnabled()) {
Log.trace("Request <= {}", request);
}
- }
- })
- .observeOn(Schedulers.io())
- .subscribe(new Consumer() {
- @Override
- public void accept(Request request) throws Exception {
+ request.setClient(adapter);
final Response response = handleRequest.apply(request);
if(Log.isTraceEnabled()) {
Log.trace("Response => {}", response);
diff --git a/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java b/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java
index 944db94..ce395f5 100644
--- a/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java
+++ b/src/main/java/com/doodream/rmovjs/net/BaseServiceProxy.java
@@ -12,9 +12,14 @@
import com.doodream.rmovjs.server.BasicService;
import com.doodream.rmovjs.server.svc.HealthCheckController;
import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.BiFunction;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,7 +27,6 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Base64;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -60,7 +64,7 @@ public class BaseServiceProxy implements RMIServiceProxy {
private ConcurrentHashMap requestWaitQueue;
private CompositeDisposable compositeDisposable;
private Disposable pingDisposable;
- private int requestNonce;
+ private AtomicInteger requestId;
private RMIServiceInfo serviceInfo;
private Converter converter;
private RMISocket socket;
@@ -68,7 +72,6 @@ public class BaseServiceProxy implements RMIServiceProxy {
private Writer writer;
private Scheduler mListener = Schedulers.from(Executors.newWorkStealingPool(10));
-
public static BaseServiceProxy create(RMIServiceInfo info, RMISocket socket) {
return new BaseServiceProxy(info, socket);
}
@@ -83,7 +86,7 @@ private BaseServiceProxy(RMIServiceInfo info, RMISocket socket) {
openSemaphore = new AtomicInteger(0);
pingSemaphore = new AtomicInteger(0);
- requestNonce = 0;
+ requestId = new AtomicInteger(0);
serviceInfo = info;
isOpened = false;
this.socket = socket;
@@ -92,9 +95,10 @@ private BaseServiceProxy(RMIServiceInfo info, RMISocket socket) {
@Override
public synchronized void open() throws IOException, IllegalAccessException, InstantiationException {
if(!markAsUse(openSemaphore)) {
+ Log.debug("already opened");
return;
}
- Log.debug("Initialized");
+
RMINegotiator negotiator = (RMINegotiator) serviceInfo.getNegotiator().newInstance();
converter = (Converter) serviceInfo.getConverter().newInstance();
@@ -106,37 +110,47 @@ public synchronized void open() throws IOException, IllegalAccessException, Inst
Log.trace("open proxy for {} : success", serviceInfo.getName());
isOpened = true;
- compositeDisposable.add(Observable.create(emitter -> {
- try {
- while (isOpened) {
- Response response = reader.read(Response.class);
- if (response == null) {
- return;
- }
- if (response.hasScm()) {
- handleSessionControlMessage(response);
- continue;
+ compositeDisposable.add(Observable.create(new ObservableOnSubscribe() {
+ @Override
+ public void subscribe(ObservableEmitter emitter) throws Exception {
+ try {
+ while (isOpened) {
+ Response response = reader.read(Response.class);
+ if (response == null) {
+ return;
+ }
+ if (response.hasScm()) {
+ handleSessionControlMessage(response);
+ continue;
+ }
+ emitter.onNext(response);
}
- emitter.onNext(response);
+ } catch (IOException ignore) {
+ isOpened = false;
+ } finally {
+ emitter.onComplete();
}
- } catch (IOException ignore) {
- isOpened = false;
}
- finally {
- emitter.onComplete();
+ }).subscribeOn(mListener).subscribe(new Consumer() {
+ @Override
+ public void accept(Response response) throws Exception {
+ Request request = requestWaitQueue.remove(response.getNonce());
+ if (request == null) {
+ Log.warn("no mapped request exists : {}", response);
+ return;
+ }
+ synchronized (request) {
+ Log.debug("request({}) is response({})", request, response);
+ request.setResponse(response);
+ request.notifyAll(); // wakeup waiting thread
+ }
}
- }).subscribeOn(mListener).subscribe(response -> {
- Request request = requestWaitQueue.remove(response.getNonce());
- if(request == null) {
- Log.warn("no mapped request exists : {}", response);
- return;
+ }, new Consumer() {
+ @Override
+ public void accept(Throwable throwable) throws Exception {
+ onError(throwable);
}
- synchronized (request) {
- Log.debug("request({}) is response({})", request, response);
- request.setResponse(response);
- request.notifyAll(); // wakeup waiting thread
- }
- }, this::onError, this::close));
+ }));
}
@Override
@@ -148,49 +162,66 @@ public boolean isOpen() {
public Response request(Endpoint endpoint, long timeoutInMillisec, Object ...args) {
return Observable.just(Request.fromEndpoint(endpoint, args))
- .doOnNext(request -> request.setNonce(++requestNonce))
- .doOnNext(request -> {
- final BlobSession session = request.getSession();
- if(session != null) {
- registerSession(session);
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(Request request) throws Exception {
+ request.setNonce(requestId.incrementAndGet());
+ final BlobSession session = request.getSession();
+ if(session != null) {
+ registerSession(session);
+ }
+ if(Log.isTraceEnabled()) {
+ Log.trace("Request => {}", request);
+ }
}
})
- .doOnNext(request -> Log.trace("Request => {}", request))
- .map(request -> {
- requestWaitQueue.put(request.getNonce(), request);
- try {
- synchronized (request) {
- writer.write(request);
- if(timeoutInMillisec > 0) {
- request.wait(timeoutInMillisec);
- } else {
- request.wait();
- }
+ .map(new Function() {
+ @Override
+ public Response apply(Request request) throws Exception {
+ requestWaitQueue.put(request.getNonce(), request);
+ try {
+ synchronized (request) {
+ writer.write(request);
+ if (timeoutInMillisec > 0) {
+ request.wait(timeoutInMillisec);
+ } else {
+ request.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ return RMIError.CLOSED.getResponse();
+ }
+ if (request.getResponse() == null) {
+ return RMIError.TIMEOUT.getResponse();
+ }
+ return request.getResponse();
+ }
+ })
+ .doOnNext(new Consumer() {
+ @Override
+ public void accept(Response response) throws Exception {
+ if (response.isSuccessful()) {
+ response.resolve(converter, endpoint.getUnwrappedRetType());
}
- } catch (InterruptedException e) {
- return Optional.of(RMIError.CLOSED.getResponse());
- }
- if(request.getResponse() == null) {
- return Optional.of(RMIError.TIMEOUT.getResponse());
}
- return Optional.of(request.getResponse());
})
- .filter(Optional::isPresent)
- .map(Optional::get)
- .doOnNext(response -> {
- if(response.isSuccessful()) {
- response.resolve(converter, endpoint.getUnwrappedRetType());
+ .map(new Function() {
+ @Override
+ public Response apply(Response response) throws Exception {
+ if (response.isHasSessionSwitch()) {
+ return handleBlobResponse(response);
+ } else {
+ return response;
+ }
}
})
- .map(response -> {
- if(response.isHasSessionSwitch()) {
- return handleBlobResponse(response);
- } else {
- return response;
+ .defaultIfEmpty(RMIError.UNHANDLED.getResponse())
+ .doOnError(new Consumer() {
+ @Override
+ public void accept(Throwable throwable) throws Exception {
+ onError(throwable);
}
})
- .defaultIfEmpty(RMIError.UNHANDLED.getResponse())
- .doOnError(this::onError)
.blockingSingle();
}
@@ -213,12 +244,16 @@ private void handleSessionControlMessage(Response response) throws IOException,
BlobSession session;
session = sessionRegistry.get(scm.getKey());
if (session == null) {
- Log.warn("Session not available for {}", scm);
+ Log.warn("session not available for {} @ {}", scm.getCommand(), scm.getKey());
return;
}
session.handle(scm);
}
+ /**
+ * register session
+ * @param session
+ */
private void registerSession(BlobSession session) {
if (sessionRegistry.put(session.getKey(), session) != null) {
Log.warn("session : {} collision in registry", session.getKey());
@@ -236,9 +271,9 @@ private void unregisterSession(BlobSession session ) {
}
private void onError(Throwable throwable) {
- Log.error("Proxy closed {}", throwable);
+ Log.error("proxy closed {}({})", socket.getRemoteName() ,serviceInfo.getName(), throwable);
try {
- close();
+ actualClose();
} catch (IOException ignored) { }
}
@@ -246,24 +281,37 @@ public void close() throws IOException {
if(!markAsUnuse(openSemaphore)) {
return;
}
- stopPeriodicQosUpdate();
- if(!socket.isClosed()) {
- socket.close();
- }
+ actualClose();
+ }
+
+ /**
+ * close socket for this service proxy and stop listening to response from the remote service
+ * @throws IOException try to close socket when it is already closed by peer or has not been opened at all
+ */
+ private synchronized void actualClose() throws IOException {
if(!compositeDisposable.isDisposed()) {
compositeDisposable.dispose();
}
compositeDisposable.clear();
- // wake blocked thread from wait queue
- requestWaitQueue.values().forEach(request -> {
+ if(!pingDisposable.isDisposed()) {
+ pingDisposable.dispose();
+ }
+ if(!socket.isClosed()) {
+ socket.close();
+ }
+ for (Request request : requestWaitQueue.values()) {
synchronized (request) {
+ // put error response on the request
+ request.setResponse(RMIError.CLOSED.getResponse());
+ // wake blocked threads from wait queue
request.notifyAll();
}
- });
+ }
isOpened = false;
Log.debug("proxy for {} closed", serviceInfo.getName());
}
+
@Override
public void startPeriodicQosUpdate(long timeout, long interval, TimeUnit timeUnit) {
if(!markAsUse(pingSemaphore)) {
@@ -271,7 +319,12 @@ public void startPeriodicQosUpdate(long timeout, long interval, TimeUnit timeUni
}
pingDisposable = Observable.interval(interval, timeUnit)
.subscribeOn(Schedulers.io())
- .subscribe(aLong -> getQosUpdate(timeout));
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(Long aLong) throws Exception {
+ getQosUpdate(timeout);
+ }
+ });
}
@Override
@@ -337,9 +390,24 @@ public String who() {
@Override
public boolean provide(Class controller) {
return Observable.fromIterable(serviceInfo.getControllerInfos())
- .map(ControllerInfo::getStubCls)
- .map(controller::equals)
- .reduce((isThere1, isThere2) -> isThere1 || isThere2)
+ .map(new Function>() {
+ @Override
+ public Class> apply(ControllerInfo controllerInfo) throws Exception {
+ return controllerInfo.getStubCls();
+ }
+ })
+ .map(new Function, Boolean>() {
+ @Override
+ public Boolean apply(Class> stubCls) throws Exception {
+ return controller.equals(stubCls);
+ }
+ })
+ .reduce(new BiFunction() {
+ @Override
+ public Boolean apply(Boolean match1, Boolean match2) throws Exception {
+ return match1 || match2;
+ }
+ })
.blockingGet(false);
}
}
diff --git a/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java b/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java
index c2a060b..559b100 100644
--- a/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java
+++ b/src/main/java/com/doodream/rmovjs/net/tcp/TcpServiceAdapter.java
@@ -40,7 +40,7 @@ public TcpServiceAdapter() throws UnknownHostException {
@Override
public ServiceProxyFactory getProxyFactory(final RMIServiceInfo info) {
- if(!RMIServiceInfo.isComplete(info)) {
+ if(!RMIServiceInfo.isValid(info)) {
throw new IllegalArgumentException("Incomplete service info");
}
final String[] params = info.getParams().toArray(new String[0]);
@@ -70,7 +70,6 @@ public void accept(ServiceProxyFactory serviceProxyFactory) throws Exception {
@Override
protected void onStart() throws IOException {
serverSocket = new ServerSocket();
- Log.debug("service address {}", mAddress.getAddress().getHostAddress());
serverSocket.bind(mAddress);
Log.debug("service started @ {}", mAddress.getAddress().getHostAddress());
}
diff --git a/src/main/java/com/doodream/rmovjs/parameter/Param.java b/src/main/java/com/doodream/rmovjs/parameter/Param.java
index 736b439..3b3b2ca 100644
--- a/src/main/java/com/doodream/rmovjs/parameter/Param.java
+++ b/src/main/java/com/doodream/rmovjs/parameter/Param.java
@@ -71,7 +71,7 @@ public void apply(T value) {
public Object resolve(Converter converter, Type type) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
if(Types.isCastable(value, type)) {
- return (T) value;
+ return value;
}
return converter.resolve(value, type);
}
diff --git a/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java b/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java
index bc5772a..ee7fca2 100644
--- a/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java
+++ b/src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java
@@ -1,22 +1,19 @@
package com.doodream.rmovjs.sdp;
import com.doodream.rmovjs.model.RMIServiceInfo;
-import com.doodream.rmovjs.net.RMIServiceProxy;
-import com.doodream.rmovjs.net.ServiceAdapter;
-import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.serde.Converter;
import com.google.common.base.Preconditions;
-import io.reactivex.Observable;
+import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
+import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
@@ -26,12 +23,47 @@ public abstract class BaseServiceDiscovery implements ServiceDiscovery {
private static final Logger Log = LoggerFactory.getLogger(BaseServiceDiscovery.class);
+ protected interface DiscoveryEventListener {
+ void onStart();
+ void onDiscovered(RMIServiceInfo info);
+ void onError(Throwable e);
+ void onStop();
+ }
+
+
+ private static class ServiceInfoSource implements ObservableOnSubscribe, DiscoveryEventListener {
+ private ObservableEmitter emitter;
+
+ @Override
+ public void onStart() {
+ }
+
+ @Override
+ public void onDiscovered(RMIServiceInfo info) {
+ emitter.onNext(info);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ emitter.onError(e);
+ }
+
+ @Override
+ public void onStop() {
+ emitter.onComplete();
+ }
+
+ @Override
+ public void subscribe(ObservableEmitter observableEmitter) throws Exception {
+ this.emitter = observableEmitter;
+ }
+ }
+
+
private static final long TIMEOUT_IN_SEC = 5L;
- private long tickIntervalInMilliSec;
- private HashMap disposableMap;
+ private final HashMap disposableMap;
- BaseServiceDiscovery(long interval, TimeUnit unit) {
- tickIntervalInMilliSec = unit.toMillis(interval);
+ BaseServiceDiscovery() {
disposableMap = new HashMap<>();
}
@@ -41,10 +73,6 @@ public void startDiscovery(@NonNull Class service, boolean once, @NonNull Servic
}
- private Observable observeTick() {
- return Observable.interval(0L, tickIntervalInMilliSec, TimeUnit.MILLISECONDS);
- }
-
@Override
public void startDiscovery(@NonNull final Class service, final boolean once, long timeout, @NonNull TimeUnit unit, @NonNull final ServiceDiscoveryListener listener) throws IllegalAccessException, InstantiationException {
if(disposableMap.containsKey(service)) {
@@ -56,15 +84,12 @@ public void startDiscovery(@NonNull final Class service, final boolean once, lon
Preconditions.checkNotNull(converter, "converter is not declared");
final HashSet discoveryCache = new HashSet<>();
+ final ServiceInfoSource serviceInfoSource = new ServiceInfoSource();
+
+ onStartDiscovery(serviceInfoSource);
listener.onDiscoveryStarted();
- Observable serviceInfoObservable = observeTick()
- .map(new Function() {
- @Override
- public RMIServiceInfo apply(Long aLong) throws Exception {
- return receiveServiceInfo(converter);
- }
- })
+ disposableMap.put(service, Observable.create(serviceInfoSource)
.doOnNext(new Consumer() {
@Override
public void accept(RMIServiceInfo svcInfo) throws Exception {
@@ -99,44 +124,17 @@ public void accept(RMIServiceInfo discovered) throws Exception {
info.copyFrom(discovered);
}
})
- .timeout(timeout, unit);
-
-
- disposableMap.put(service, serviceInfoObservable
- .map(new Function>() {
- @Override
- public Class> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
- return rmiServiceInfo.getAdapter();
- }
- })
- .map(new Function, Object>() {
- @Override
- public Object apply(Class> cls) throws Exception {
- return cls.newInstance();
- }
- })
- .cast(ServiceAdapter.class)
- .map(new Function() {
- @Override
- public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
- return serviceAdapter.getProxyFactory(info);
- }
- })
- .map(new Function() {
- @Override
- public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
- return serviceProxyFactory.build();
- }
- })
+ .timeout(timeout, unit)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
- close();
+ onStopDiscovery();
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
})
- .doOnError(new Consumer() {
+ .subscribeOn(Schedulers.io())
+ .subscribe(listener::onDiscovered, new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
if(throwable instanceof TimeoutException) {
@@ -144,41 +142,22 @@ public void accept(Throwable throwable) throws Exception {
} else {
Log.warn("{}", throwable);
}
- close();
+ onStopDiscovery();
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
- })
- .doOnComplete(new Action() {
+ }, new Action() {
@Override
public void run() throws Exception {
- close();
+ onStopDiscovery();
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
- })
- .onErrorReturn(new Function() {
- @Override
- public RMIServiceProxy apply(Throwable throwable) throws Exception {
- return RMIServiceProxy.NULL_PROXY;
- }
- })
- .filter(new Predicate() {
- @Override
- public boolean test(RMIServiceProxy proxy) throws Exception {
- return !RMIServiceProxy.NULL_PROXY.equals(proxy);
- }
- })
- .subscribe(new Consumer() {
- @Override
- public void accept(RMIServiceProxy rmiServiceProxy) throws Exception {
- listener.onDiscovered(rmiServiceProxy);
- }
}));
-
}
+
@Override
public void cancelDiscovery(Class service) {
Disposable disposable = disposableMap.get(service);
@@ -191,7 +170,6 @@ public void cancelDiscovery(Class service) {
disposable.dispose();
}
-
- protected abstract RMIServiceInfo receiveServiceInfo(Converter converter) throws IOException;
- protected abstract void close();
+ protected abstract void onStartDiscovery(DiscoveryEventListener listener);
+ protected abstract void onStopDiscovery();
}
diff --git a/src/main/java/com/doodream/rmovjs/sdp/ServiceAdvertiser.java b/src/main/java/com/doodream/rmovjs/sdp/ServiceAdvertiser.java
index 0ddca8e..94f36ff 100644
--- a/src/main/java/com/doodream/rmovjs/sdp/ServiceAdvertiser.java
+++ b/src/main/java/com/doodream/rmovjs/sdp/ServiceAdvertiser.java
@@ -13,7 +13,7 @@ public interface ServiceAdvertiser {
* @param block
* @throws IOException
*/
- void startAdvertiser(RMIServiceInfo info, Converter converter, boolean block) throws IOException;
+ void startAdvertiser(RMIServiceInfo info, boolean block) throws IOException;
/**
* stop advertising
diff --git a/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java b/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java
index e085fb9..97dbc8e 100644
--- a/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java
+++ b/src/main/java/com/doodream/rmovjs/sdp/ServiceDiscoveryListener.java
@@ -1,9 +1,9 @@
package com.doodream.rmovjs.sdp;
-import com.doodream.rmovjs.net.RMIServiceProxy;
+import com.doodream.rmovjs.model.RMIServiceInfo;
public interface ServiceDiscoveryListener {
- void onDiscovered(RMIServiceProxy proxy);
+ void onDiscovered(RMIServiceInfo info);
void onDiscoveryStarted();
void onDiscoveryFinished() throws IllegalAccessException;
}
diff --git a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceAdvertiser.java b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceAdvertiser.java
index 9597404..857381c 100644
--- a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceAdvertiser.java
+++ b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceAdvertiser.java
@@ -2,6 +2,7 @@
import com.doodream.rmovjs.model.RMIServiceInfo;
import com.doodream.rmovjs.serde.Converter;
+import com.doodream.rmovjs.serde.json.JsonConverter;
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Consumer;
@@ -29,9 +30,10 @@ public class SimpleServiceAdvertiser implements ServiceAdvertiser {
private CompositeDisposable compositeDisposable = new CompositeDisposable();
@Override
- public synchronized void startAdvertiser(final RMIServiceInfo info, final Converter converter, boolean block) throws IOException {
+ public synchronized void startAdvertiser(final RMIServiceInfo info, boolean block) throws IOException {
Observable tickObservable = Observable.interval(0L, 3L, TimeUnit.SECONDS);
+ final JsonConverter converter = new JsonConverter();
compositeDisposable.add(tickObservable
.map(new Function() {
diff --git a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java
index ab4da12..a1a97fd 100644
--- a/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java
+++ b/src/main/java/com/doodream/rmovjs/sdp/SimpleServiceDiscovery.java
@@ -2,12 +2,19 @@
import com.doodream.rmovjs.model.RMIServiceInfo;
import com.doodream.rmovjs.serde.Converter;
+import com.doodream.rmovjs.serde.json.JsonConverter;
+import io.reactivex.Observable;
+import io.reactivex.disposables.CompositeDisposable;
+import io.reactivex.functions.Action;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.concurrent.TimeUnit;
/**
@@ -18,31 +25,59 @@
*/
public class SimpleServiceDiscovery extends BaseServiceDiscovery {
- private MulticastSocket serviceBroadcastSocket;
+ private DiscoveryEventListener listener;
+ private final CompositeDisposable disposable;
+ private final MulticastSocket serviceBroadcastSocket;
+ private final JsonConverter converter;
public SimpleServiceDiscovery() throws IOException {
- super(100L, TimeUnit.MILLISECONDS);
+ super();
+ disposable = new CompositeDisposable();
+ converter = new JsonConverter();
serviceBroadcastSocket = new MulticastSocket(SimpleServiceAdvertiser.BROADCAST_PORT);
serviceBroadcastSocket.joinGroup(InetAddress.getByName(SimpleServiceAdvertiser.MULTICAST_GROUP_IP));
}
@Override
- protected RMIServiceInfo receiveServiceInfo(Converter converter) throws IOException {
- byte[] buffer = new byte[64 * 1024];
- Arrays.fill(buffer, (byte) 0);
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- serviceBroadcastSocket.receive(packet);
- return converter.invert(packet.getData(), RMIServiceInfo.class);
+ protected void onStartDiscovery(DiscoveryEventListener listener) {
+ this.listener = listener;
+ disposable.add(Observable.interval(1000L, TimeUnit.MILLISECONDS)
+ .map(new Function() {
+ @Override
+ public RMIServiceInfo apply(Long aLong) throws Exception {
+ byte[] buffer = new byte[64 * 1024];
+ Arrays.fill(buffer, (byte) 0);
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+ serviceBroadcastSocket.receive(packet);
+ return converter.invert(packet.getData(), RMIServiceInfo.class);
+ }
+ })
+ .subscribe(new Consumer() {
+ @Override
+ public void accept(RMIServiceInfo info) throws Exception {
+ listener.onDiscovered(info);
+ }
+ }, new Consumer() {
+ @Override
+ public void accept(Throwable throwable) throws Exception {
+ listener.onError(throwable);
+ }
+ }, new Action() {
+ @Override
+ public void run() throws Exception {
+ listener.onStop();
+ }
+ }));
}
@Override
- protected void close() {
- if(serviceBroadcastSocket == null) {
- return;
+ protected void onStopDiscovery() {
+ if(!disposable.isDisposed()) {
+ disposable.dispose();
}
- if(serviceBroadcastSocket.isClosed()) {
- return;
+ disposable.clear();
+ if(!serviceBroadcastSocket.isClosed()) {
+ serviceBroadcastSocket.close();
}
- serviceBroadcastSocket.close();
}
}
diff --git a/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java b/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java
index 647efc7..2d86541 100644
--- a/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java
+++ b/src/main/java/com/doodream/rmovjs/serde/bson/BsonConverter.java
@@ -106,22 +106,23 @@ public T invert(byte[] b, Class cls) {
@Override
- public Object resolve(final Object unresolved, Type type) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
+ public Object resolve(final Object unresolved, Type type) throws InstantiationException, IllegalAccessException {
if(unresolved == null) {
return null;
}
Class clsz;
- if(type instanceof ParameterizedType) {
- clsz = Class.forName(((ParameterizedType) type).getRawType().getTypeName());
- } else {
- clsz = Class.forName(type.getTypeName());
+ try {
+ if (type instanceof ParameterizedType) {
+ clsz = Class.forName(((ParameterizedType) type).getRawType().getTypeName());
+ } else {
+ clsz = Class.forName(type.getTypeName());
+ }
+ } catch (ClassNotFoundException e) {
+ return unresolved;
}
final Class cls = clsz;
final Class unresolvedCls = unresolved.getClass();
- if(cls.equals(unresolvedCls) ||
- Types.isCastable(unresolved, type)) {
- return cls.cast(unresolved);
- }
+ Log.debug("resolve {} -> {}", unresolvedCls, cls);
if(unresolvedCls.equals(LinkedHashMap.class)) {
return resolveKvMap((Map, ?>) unresolved, cls);
@@ -134,19 +135,37 @@ public Object resolve(final Object unresolved, Type type) throws ClassNotFoundEx
}
ArrayList unresolvedList = (ArrayList) unresolved;
return Observable.fromIterable(unresolvedList).map(new Function() {
- @Override
- public Object apply(Object o) throws Exception {
- return resolve(o, typeArguments[0]);
- }}).toList().blockingGet();
+ @Override
+ public Object apply(Object o) throws Exception {
+ return resolve(o, typeArguments[0]);
+ }}).toList().blockingGet();
+ }
+
+ if(cls.equals(unresolvedCls) ||
+ Types.isCastable(unresolved, type) ||
+ Types.isCastable(unresolved, cls)) {
+ return cls.cast(unresolved);
+ }
+
+
+
+ if(unresolvedCls.getSuperclass().equals(Number.class)) {
+
}
try {
Constructor> constructor = cls.getConstructor(unresolvedCls);
return constructor.newInstance(unresolved);
- } catch (NoSuchMethodException | InvocationTargetException e) {
- Log.error("",e);
- return unresolved;
+ } catch (NoSuchMethodException | InvocationTargetException ignored) {
+
}
+ try {
+ Method valueOf = cls.getMethod("valueOf", String.class);
+ return valueOf.invoke(null, String.valueOf(unresolved));
+ } catch (NoSuchMethodException | InvocationTargetException ignored) {
+
+ }
+ return unresolved;
}
@@ -171,7 +190,4 @@ public void accept(Field field) throws Exception {
return resolved;
}
- private T handlePrimitive(Object unresolved, Class cls) {
- return (T) cls.cast(unresolved);
- }
}
diff --git a/src/main/java/com/doodream/rmovjs/server/RMIService.java b/src/main/java/com/doodream/rmovjs/server/RMIService.java
index fd2a7eb..bddc337 100644
--- a/src/main/java/com/doodream/rmovjs/server/RMIService.java
+++ b/src/main/java/com/doodream/rmovjs/server/RMIService.java
@@ -304,7 +304,7 @@ public Response apply(Request request) throws Exception {
return routeRequest(request);
}
}));
- advertiser.startAdvertiser(serviceInfo, converter, block);
+ advertiser.startAdvertiser(serviceInfo, block);
}
/**
diff --git a/src/main/java/com/doodream/rmovjs/util/Types.java b/src/main/java/com/doodream/rmovjs/util/Types.java
index e7205ee..6d9d254 100644
--- a/src/main/java/com/doodream/rmovjs/util/Types.java
+++ b/src/main/java/com/doodream/rmovjs/util/Types.java
@@ -1,5 +1,8 @@
package com.doodream.rmovjs.util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
@@ -8,6 +11,7 @@
import java.util.regex.Pattern;
public class Types {
+ private static final Logger Log = LoggerFactory.getLogger(Types.class);
private static Pattern INTERNAL_TYPE_SELECTOR_PATTERN = Pattern.compile("([^\\<\\>]+)\\<([\\s\\S]+)\\>");
public static Type[] unwrapType(String typeName) throws ClassNotFoundException, IllegalArgumentException {
@@ -98,15 +102,17 @@ public static boolean isCastable(T body, Class> rawCls) {
try {
rawCls.cast(body);
return true;
- } catch (ClassCastException ignored) { }
+ } catch (ClassCastException ignored) {
+ }
return false;
}
public static boolean isCastable(T body, Type type) {
try {
- type.getClass().cast(body);
+ ((Class) type).cast(body);
return true;
- } catch (ClassCastException ignored) { }
+ } catch (ClassCastException ignored) {
+ }
return false;
}
diff --git a/src/test/java/com/doodream/rmovjs/test/BsonConverterTest.java b/src/test/java/com/doodream/rmovjs/test/BsonConverterTest.java
deleted file mode 100644
index b8ee599..0000000
--- a/src/test/java/com/doodream/rmovjs/test/BsonConverterTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.doodream.rmovjs.test;
-
-
-import com.doodream.rmovjs.model.Response;
-import com.doodream.rmovjs.serde.Converter;
-import com.doodream.rmovjs.serde.Reader;
-import com.doodream.rmovjs.serde.Writer;
-import com.doodream.rmovjs.serde.bson.BsonConverter;
-import com.doodream.rmovjs.test.service.User;
-import com.doodream.rmovjs.util.Types;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class BsonConverterTest {
-
-
-
-
- @Test
- public void testNumericObject() throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
- Long id = 102L;
- Long result = testObjectTransfer(id, Long.class);
- Assert.assertEquals(id, result);
- }
-
- @Test
- public void testSimpleObject() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
- User user = new User();
- user.setAge(30);
- User userBody = testObjectTransfer(user, User.class);
- Assert.assertEquals(userBody, user);
- }
-
- @Test
- public void testGenericObject() throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
- List users = new LinkedList<>();
- User testUser = new User();
- testUser.setAge(30);
- testUser.setName("James");
- users.add(testUser);
- List usersResult = testObjectTransfer(users, Types.getType(List.class, User.class));
- Assert.assertEquals(users, usersResult);
- }
-
- @Test
- public void testComplexGeneric() throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
- List users = new ArrayList<>();
- User user = new User();
- user.setName("david");
- user.setAge(30);
- List> userLists = new ArrayList<>();
- userLists.add(users);
-
- List> userListResult = testObjectTransfer(userLists, Types.getType(List.class, Types.getType(List.class, User.class)));
- Assert.assertEquals(userListResult, userLists);
- }
-
- private T testObjectTransfer(T src, Type type) throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
- final Response response = Response.success(src);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Converter converter = new BsonConverter();
- Writer writer = converter.writer(baos);
- writer.write(response);
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- Reader reader = converter.reader(bais);
- Response parsedResponse = reader.read(Response.class);
- return (T) converter.resolve(parsedResponse.getBody(), type);
- }
-}
diff --git a/src/test/java/com/doodream/rmovjs/test/ConverterTest.java b/src/test/java/com/doodream/rmovjs/test/ConverterTest.java
new file mode 100644
index 0000000..a3ee1b9
--- /dev/null
+++ b/src/test/java/com/doodream/rmovjs/test/ConverterTest.java
@@ -0,0 +1,105 @@
+package com.doodream.rmovjs.test;
+
+
+import com.doodream.rmovjs.model.Response;
+import com.doodream.rmovjs.serde.Converter;
+import com.doodream.rmovjs.serde.Reader;
+import com.doodream.rmovjs.serde.Writer;
+import com.doodream.rmovjs.serde.bson.BsonConverter;
+import com.doodream.rmovjs.serde.json.JsonConverter;
+import com.doodream.rmovjs.test.service.User;
+import com.doodream.rmovjs.util.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ConverterTest {
+
+
+ private List converters;
+
+ @Before
+ public void setup() {
+ converters = Arrays.asList(
+ new JsonConverter(),
+ new BsonConverter()
+ );
+ }
+
+ @Test
+ public void converterSerDeserTest() throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException {
+ for (Converter converter : converters) {
+ Assert.assertTrue(testPrimitiveType(converter, 1.3, double.class));
+ Assert.assertTrue(testPrimitiveType(converter, 1, int.class));
+ Assert.assertTrue(testNumericObject(converter, 1.3f));
+ Assert.assertTrue(testNumericObject(converter, 100L));
+ Assert.assertTrue(testNumericObject(converter, 100));
+ Assert.assertTrue(testNumericObject(converter, 1.3));
+ Assert.assertTrue(testSimpleObject(converter));
+ Assert.assertTrue(testGenericObject(converter));
+ Assert.assertTrue(testComplexGeneric(converter));
+ }
+ }
+
+ private boolean testPrimitiveType(Converter converter, T v, Class> cls) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+ Object resolved = converter.resolve(v, cls);
+ return resolved.equals(v);
+ }
+
+ private boolean testNumericObject(Converter converter, T v) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+ T result = testObjectTransfer(converter, v, v.getClass());
+ return v.equals(result);
+ }
+
+ private boolean testSimpleObject(Converter converter) throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ User user = new User();
+ user.setAge(30);
+ User userBody = testObjectTransfer(converter, user, User.class);
+ return userBody.equals(user);
+ }
+
+ public boolean testGenericObject(Converter converter) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+ List users = new LinkedList<>();
+ User testUser = new User();
+ testUser.setAge(30);
+ testUser.setName("James");
+ users.add(testUser);
+ List usersResult = testObjectTransfer(converter, users, Types.getType(List.class, User.class));
+ return users.equals(usersResult);
+ }
+
+ private boolean testComplexGeneric(Converter converter) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+ List users = new ArrayList<>();
+ User user = new User();
+ user.setName("david");
+ user.setAge(30);
+ List> userLists = new ArrayList<>();
+ userLists.add(users);
+
+ List> userListResult = testObjectTransfer(converter, userLists, Types.getType(List.class, Types.getType(List.class, User.class)));
+ return userListResult.equals(userLists);
+ }
+
+ private T testObjectTransfer(Converter converter, T src, Type type) throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ final Response response = Response.success(src);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Writer writer = converter.writer(baos);
+ writer.write(response);
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ Reader reader = converter.reader(bais);
+ Response parsedResponse = reader.read(Response.class);
+ if(Types.isCastable(parsedResponse.getBody(), type)) {
+ return (T) parsedResponse.getBody();
+ }
+ return (T) converter.resolve(parsedResponse.getBody(), type);
+ }
+}