Skip to content

Commit

Permalink
simplify implementation for service discovery and improve genericity
Browse files Browse the repository at this point in the history
Prior to this change, the implementation for service discovery should handle listed below..
1. retrieve service information from service advertiser
2. convert it to service proxy
that was too complicated to be applied widely used service discovery technology (like mdns ..)

1.separate service info converting functionality from the discovery
2.remove ad-hoc tick timer for service information retrival which is not well compatible to another service discovery implementation
close #49, #48, #37, #39
  • Loading branch information
fritzprix committed Nov 6, 2018
1 parent fce19b0 commit 9b5ee7e
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 89 deletions.
5 changes: 3 additions & 2 deletions src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ private synchronized void registerProxy(RMIServiceProxy serviceProxy) {

private static Observable<RMIServiceProxy> startDiscovery(ServiceDiscovery discovery, Class svc) {
return Observable.create(emitter -> discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() {

@Override
public void onDiscovered(RMIServiceProxy proxy) {
emitter.onNext(proxy);
public void onDiscovered(RMIServiceInfo info) {
emitter.onNext(RMIServiceInfo.toServiceProxy(info));
}

@Override
Expand Down
50 changes: 49 additions & 1 deletion src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import com.doodream.rmovjs.Properties;
import com.doodream.rmovjs.annotation.server.Service;
import com.doodream.rmovjs.net.RMIServiceProxy;
import com.doodream.rmovjs.net.ServiceAdapter;
import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.server.RMIController;
import com.google.gson.annotations.SerializedName;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
Expand All @@ -15,6 +20,7 @@
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;

@Builder
@EqualsAndHashCode(exclude = {"proxyFactoryHint"})
Expand Down Expand Up @@ -92,11 +98,53 @@ public void accept(List<ControllerInfo> controllerInfos) throws Exception {
return builder.build();
}

public static boolean isComplete(RMIServiceInfo info) {
public static boolean isValid(RMIServiceInfo info) {
return (info.getProxyFactoryHint() != null) &&
(info.getControllerInfos() != null);
}

public static RMIServiceProxy toServiceProxy(RMIServiceInfo info) {
return Single.just(info)
.map(new Function<RMIServiceInfo, Class<?>>() {
@Override
public Class<?> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
return rmiServiceInfo.getAdapter();
}
})
.map(new Function<Class<?>, Object>() {
@Override
public Object apply(Class<?> cls) throws Exception {
return cls.newInstance();
}
})
.cast(ServiceAdapter.class)
.map(new Function<ServiceAdapter, ServiceProxyFactory>() {
@Override
public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
return serviceAdapter.getProxyFactory(info);
}
})
.map(new Function<ServiceProxyFactory, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
return serviceProxyFactory.build();
}
})
.onErrorReturn(new Function<Throwable, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(Throwable throwable) throws Exception {
return RMIServiceProxy.NULL_PROXY;
}
})
.filter(new Predicate<RMIServiceProxy>() {
@Override
public boolean test(RMIServiceProxy proxy) throws Exception {
return !RMIServiceProxy.NULL_PROXY.equals(proxy);
}
})
.blockingGet(RMIServiceProxy.NULL_PROXY);
}

public void copyFrom(RMIServiceInfo info) {
setProxyFactoryHint(info.getProxyFactoryHint());
setParams(info.getParams());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public TcpServiceAdapter() throws UnknownHostException {

@Override
public ServiceProxyFactory getProxyFactory(final RMIServiceInfo info) {
if(!RMIServiceInfo.isComplete(info)) {
if(!RMIServiceInfo.isValid(info)) {
throw new IllegalArgumentException("Incomplete service info");
}
final String[] params = info.getParams().toArray(new String[0]);
Expand Down Expand Up @@ -70,7 +70,6 @@ public void accept(ServiceProxyFactory serviceProxyFactory) throws Exception {
@Override
protected void onStart() throws IOException {
serverSocket = new ServerSocket();
Log.debug("service address {}", mAddress.getAddress().getHostAddress());
serverSocket.bind(mAddress);
Log.debug("service started @ {}", mAddress.getAddress().getHostAddress());
}
Expand Down
120 changes: 51 additions & 69 deletions src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.serde.Converter;
import com.google.common.base.Preconditions;
import io.reactivex.Observable;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,12 +27,47 @@ public abstract class BaseServiceDiscovery implements ServiceDiscovery {

private static final Logger Log = LoggerFactory.getLogger(BaseServiceDiscovery.class);

protected interface DiscoveryEventListener {
void onStart();
void onDiscovered(RMIServiceInfo info);
void onError(Throwable e);
void onStop();
}


private static class ServiceInfoSource implements ObservableOnSubscribe<RMIServiceInfo>, DiscoveryEventListener {
private ObservableEmitter<RMIServiceInfo> emitter;

@Override
public void onStart() {
}

@Override
public void onDiscovered(RMIServiceInfo info) {
emitter.onNext(info);
}

@Override
public void onError(Throwable e) {
emitter.onError(e);
}

@Override
public void onStop() {
emitter.onComplete();
}

@Override
public void subscribe(ObservableEmitter<RMIServiceInfo> observableEmitter) throws Exception {
this.emitter = observableEmitter;
}
}


private static final long TIMEOUT_IN_SEC = 5L;
private long tickIntervalInMilliSec;
private HashMap<Class, Disposable> disposableMap;
private final HashMap<Class, Disposable> disposableMap;

BaseServiceDiscovery(long interval, TimeUnit unit) {
tickIntervalInMilliSec = unit.toMillis(interval);
BaseServiceDiscovery() {
disposableMap = new HashMap<>();
}

Expand All @@ -41,10 +77,6 @@ public void startDiscovery(@NonNull Class service, boolean once, @NonNull Servic
}


private Observable<Long> observeTick() {
return Observable.interval(0L, tickIntervalInMilliSec, TimeUnit.MILLISECONDS);
}

@Override
public void startDiscovery(@NonNull final Class service, final boolean once, long timeout, @NonNull TimeUnit unit, @NonNull final ServiceDiscoveryListener listener) throws IllegalAccessException, InstantiationException {
if(disposableMap.containsKey(service)) {
Expand All @@ -56,15 +88,12 @@ public void startDiscovery(@NonNull final Class service, final boolean once, lon
Preconditions.checkNotNull(converter, "converter is not declared");

final HashSet<RMIServiceInfo> discoveryCache = new HashSet<>();
final ServiceInfoSource serviceInfoSource = new ServiceInfoSource();

onStartDiscovery(serviceInfoSource);
listener.onDiscoveryStarted();

Observable<RMIServiceInfo> serviceInfoObservable = observeTick()
.map(new Function<Long, RMIServiceInfo>() {
@Override
public RMIServiceInfo apply(Long aLong) throws Exception {
return receiveServiceInfo(converter);
}
})
disposableMap.put(service, Observable.create(serviceInfoSource)
.doOnNext(new Consumer<RMIServiceInfo>() {
@Override
public void accept(RMIServiceInfo svcInfo) throws Exception {
Expand Down Expand Up @@ -99,35 +128,7 @@ public void accept(RMIServiceInfo discovered) throws Exception {
info.copyFrom(discovered);
}
})
.timeout(timeout, unit);


disposableMap.put(service, serviceInfoObservable
.map(new Function<RMIServiceInfo, Class<?>>() {
@Override
public Class<?> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
return rmiServiceInfo.getAdapter();
}
})
.map(new Function<Class<?>, Object>() {
@Override
public Object apply(Class<?> cls) throws Exception {
return cls.newInstance();
}
})
.cast(ServiceAdapter.class)
.map(new Function<ServiceAdapter, ServiceProxyFactory>() {
@Override
public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
return serviceAdapter.getProxyFactory(info);
}
})
.map(new Function<ServiceProxyFactory, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
return serviceProxyFactory.build();
}
})
.timeout(timeout, unit)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Expand All @@ -136,7 +137,8 @@ public void run() throws Exception {
listener.onDiscoveryFinished();
}
})
.doOnError(new Consumer<Throwable>() {
.subscribeOn(Schedulers.io())
.subscribe(listener::onDiscovered, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if(throwable instanceof TimeoutException) {
Expand All @@ -148,37 +150,18 @@ public void accept(Throwable throwable) throws Exception {
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
})
.doOnComplete(new Action() {
}, new Action() {
@Override
public void run() throws Exception {
close();
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
})
.onErrorReturn(new Function<Throwable, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(Throwable throwable) throws Exception {
return RMIServiceProxy.NULL_PROXY;
}
})
.filter(new Predicate<RMIServiceProxy>() {
@Override
public boolean test(RMIServiceProxy proxy) throws Exception {
return !RMIServiceProxy.NULL_PROXY.equals(proxy);
}
})
.subscribe(new Consumer<RMIServiceProxy>() {
@Override
public void accept(RMIServiceProxy rmiServiceProxy) throws Exception {
listener.onDiscovered(rmiServiceProxy);
}
}));

}



@Override
public void cancelDiscovery(Class service) {
Disposable disposable = disposableMap.get(service);
Expand All @@ -191,7 +174,6 @@ public void cancelDiscovery(Class service) {
disposable.dispose();
}


protected abstract RMIServiceInfo receiveServiceInfo(Converter converter) throws IOException;
protected abstract void onStartDiscovery(DiscoveryEventListener listener);
protected abstract void close();
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.doodream.rmovjs.sdp;

import com.doodream.rmovjs.net.RMIServiceProxy;
import com.doodream.rmovjs.model.RMIServiceInfo;

public interface ServiceDiscoveryListener {
void onDiscovered(RMIServiceProxy proxy);
void onDiscovered(RMIServiceInfo info);
void onDiscoveryStarted();
void onDiscoveryFinished() throws IllegalAccessException;
}
Loading

0 comments on commit 9b5ee7e

Please sign in to comment.