Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.0.5 snapshot primitive sedes #55

Merged
merged 5 commits into from
Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ yarmi is yet anotehr RMI based on JSON. it's simple yet powerful when developing
<dependency>
<groupId>com.doodream</groupId>
<artifactId>yarmi-core</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
</dependency>
</dependencies>
```
Expand Down Expand Up @@ -148,8 +148,8 @@ public static class SimpleClient {
SimpleServiceDiscovery discovery = new SimpleServiceDiscovery();
discovery.startDiscovery(TestService.class, new ServiceDiscoveryListener() {
@Override
public void onDiscovered(RMIServiceProxy proxy) {
discoveredService.add(proxy);
public void onDiscovered(RMIServiceInfo info) {
discoveredService.add(RMIServiceInfo.toServiceProxy(info));
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ private synchronized void registerProxy(RMIServiceProxy serviceProxy) {

private static Observable<RMIServiceProxy> startDiscovery(ServiceDiscovery discovery, Class svc) {
return Observable.create(emitter -> discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() {

@Override
public void onDiscovered(RMIServiceProxy proxy) {
emitter.onNext(proxy);
public void onDiscovered(RMIServiceInfo info) {
emitter.onNext(RMIServiceInfo.toServiceProxy(info));
}

@Override
Expand Down
50 changes: 49 additions & 1 deletion src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import com.doodream.rmovjs.Properties;
import com.doodream.rmovjs.annotation.server.Service;
import com.doodream.rmovjs.net.RMIServiceProxy;
import com.doodream.rmovjs.net.ServiceAdapter;
import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.server.RMIController;
import com.google.gson.annotations.SerializedName;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
Expand All @@ -15,6 +20,7 @@
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;

@Builder
@EqualsAndHashCode(exclude = {"proxyFactoryHint"})
Expand Down Expand Up @@ -92,11 +98,53 @@ public void accept(List<ControllerInfo> controllerInfos) throws Exception {
return builder.build();
}

public static boolean isComplete(RMIServiceInfo info) {
public static boolean isValid(RMIServiceInfo info) {
return (info.getProxyFactoryHint() != null) &&
(info.getControllerInfos() != null);
}

public static RMIServiceProxy toServiceProxy(RMIServiceInfo info) {
return Single.just(info)
.map(new Function<RMIServiceInfo, Class<?>>() {
@Override
public Class<?> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
return rmiServiceInfo.getAdapter();
}
})
.map(new Function<Class<?>, Object>() {
@Override
public Object apply(Class<?> cls) throws Exception {
return cls.newInstance();
}
})
.cast(ServiceAdapter.class)
.map(new Function<ServiceAdapter, ServiceProxyFactory>() {
@Override
public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
return serviceAdapter.getProxyFactory(info);
}
})
.map(new Function<ServiceProxyFactory, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
return serviceProxyFactory.build();
}
})
.onErrorReturn(new Function<Throwable, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(Throwable throwable) throws Exception {
return RMIServiceProxy.NULL_PROXY;
}
})
.filter(new Predicate<RMIServiceProxy>() {
@Override
public boolean test(RMIServiceProxy proxy) throws Exception {
return !RMIServiceProxy.NULL_PROXY.equals(proxy);
}
})
.blockingGet(RMIServiceProxy.NULL_PROXY);
}

public void copyFrom(RMIServiceInfo info) {
setProxyFactoryHint(info.getProxyFactoryHint());
setParams(info.getParams());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public TcpServiceAdapter() throws UnknownHostException {

@Override
public ServiceProxyFactory getProxyFactory(final RMIServiceInfo info) {
if(!RMIServiceInfo.isComplete(info)) {
if(!RMIServiceInfo.isValid(info)) {
throw new IllegalArgumentException("Incomplete service info");
}
final String[] params = info.getParams().toArray(new String[0]);
Expand Down Expand Up @@ -70,7 +70,6 @@ public void accept(ServiceProxyFactory serviceProxyFactory) throws Exception {
@Override
protected void onStart() throws IOException {
serverSocket = new ServerSocket();
Log.debug("service address {}", mAddress.getAddress().getHostAddress());
serverSocket.bind(mAddress);
Log.debug("service started @ {}", mAddress.getAddress().getHostAddress());
}
Expand Down
132 changes: 55 additions & 77 deletions src/main/java/com/doodream/rmovjs/sdp/BaseServiceDiscovery.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package com.doodream.rmovjs.sdp;

import com.doodream.rmovjs.model.RMIServiceInfo;
import com.doodream.rmovjs.net.RMIServiceProxy;
import com.doodream.rmovjs.net.ServiceAdapter;
import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.serde.Converter;
import com.google.common.base.Preconditions;
import io.reactivex.Observable;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,12 +23,47 @@ public abstract class BaseServiceDiscovery implements ServiceDiscovery {

private static final Logger Log = LoggerFactory.getLogger(BaseServiceDiscovery.class);

protected interface DiscoveryEventListener {
void onStart();
void onDiscovered(RMIServiceInfo info);
void onError(Throwable e);
void onStop();
}


private static class ServiceInfoSource implements ObservableOnSubscribe<RMIServiceInfo>, DiscoveryEventListener {
private ObservableEmitter<RMIServiceInfo> emitter;

@Override
public void onStart() {
}

@Override
public void onDiscovered(RMIServiceInfo info) {
emitter.onNext(info);
}

@Override
public void onError(Throwable e) {
emitter.onError(e);
}

@Override
public void onStop() {
emitter.onComplete();
}

@Override
public void subscribe(ObservableEmitter<RMIServiceInfo> observableEmitter) throws Exception {
this.emitter = observableEmitter;
}
}


private static final long TIMEOUT_IN_SEC = 5L;
private long tickIntervalInMilliSec;
private HashMap<Class, Disposable> disposableMap;
private final HashMap<Class, Disposable> disposableMap;

BaseServiceDiscovery(long interval, TimeUnit unit) {
tickIntervalInMilliSec = unit.toMillis(interval);
BaseServiceDiscovery() {
disposableMap = new HashMap<>();
}

Expand All @@ -41,10 +73,6 @@ public void startDiscovery(@NonNull Class service, boolean once, @NonNull Servic
}


private Observable<Long> observeTick() {
return Observable.interval(0L, tickIntervalInMilliSec, TimeUnit.MILLISECONDS);
}

@Override
public void startDiscovery(@NonNull final Class service, final boolean once, long timeout, @NonNull TimeUnit unit, @NonNull final ServiceDiscoveryListener listener) throws IllegalAccessException, InstantiationException {
if(disposableMap.containsKey(service)) {
Expand All @@ -56,15 +84,12 @@ public void startDiscovery(@NonNull final Class service, final boolean once, lon
Preconditions.checkNotNull(converter, "converter is not declared");

final HashSet<RMIServiceInfo> discoveryCache = new HashSet<>();
final ServiceInfoSource serviceInfoSource = new ServiceInfoSource();

onStartDiscovery(serviceInfoSource);
listener.onDiscoveryStarted();

Observable<RMIServiceInfo> serviceInfoObservable = observeTick()
.map(new Function<Long, RMIServiceInfo>() {
@Override
public RMIServiceInfo apply(Long aLong) throws Exception {
return receiveServiceInfo(converter);
}
})
disposableMap.put(service, Observable.create(serviceInfoSource)
.doOnNext(new Consumer<RMIServiceInfo>() {
@Override
public void accept(RMIServiceInfo svcInfo) throws Exception {
Expand Down Expand Up @@ -99,86 +124,40 @@ public void accept(RMIServiceInfo discovered) throws Exception {
info.copyFrom(discovered);
}
})
.timeout(timeout, unit);


disposableMap.put(service, serviceInfoObservable
.map(new Function<RMIServiceInfo, Class<?>>() {
@Override
public Class<?> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
return rmiServiceInfo.getAdapter();
}
})
.map(new Function<Class<?>, Object>() {
@Override
public Object apply(Class<?> cls) throws Exception {
return cls.newInstance();
}
})
.cast(ServiceAdapter.class)
.map(new Function<ServiceAdapter, ServiceProxyFactory>() {
@Override
public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
return serviceAdapter.getProxyFactory(info);
}
})
.map(new Function<ServiceProxyFactory, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
return serviceProxyFactory.build();
}
})
.timeout(timeout, unit)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
close();
onStopDiscovery();
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
})
.doOnError(new Consumer<Throwable>() {
.subscribeOn(Schedulers.io())
.subscribe(listener::onDiscovered, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if(throwable instanceof TimeoutException) {
Log.debug("Discovery Timeout");
} else {
Log.warn("{}", throwable);
}
close();
onStopDiscovery();
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
})
.doOnComplete(new Action() {
}, new Action() {
@Override
public void run() throws Exception {
close();
onStopDiscovery();
disposableMap.remove(service);
listener.onDiscoveryFinished();
}
})
.onErrorReturn(new Function<Throwable, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(Throwable throwable) throws Exception {
return RMIServiceProxy.NULL_PROXY;
}
})
.filter(new Predicate<RMIServiceProxy>() {
@Override
public boolean test(RMIServiceProxy proxy) throws Exception {
return !RMIServiceProxy.NULL_PROXY.equals(proxy);
}
})
.subscribe(new Consumer<RMIServiceProxy>() {
@Override
public void accept(RMIServiceProxy rmiServiceProxy) throws Exception {
listener.onDiscovered(rmiServiceProxy);
}
}));

}



@Override
public void cancelDiscovery(Class service) {
Disposable disposable = disposableMap.get(service);
Expand All @@ -191,7 +170,6 @@ public void cancelDiscovery(Class service) {
disposable.dispose();
}


protected abstract RMIServiceInfo receiveServiceInfo(Converter converter) throws IOException;
protected abstract void close();
protected abstract void onStartDiscovery(DiscoveryEventListener listener);
protected abstract void onStopDiscovery();
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.doodream.rmovjs.sdp;

import com.doodream.rmovjs.net.RMIServiceProxy;
import com.doodream.rmovjs.model.RMIServiceInfo;

public interface ServiceDiscoveryListener {
void onDiscovered(RMIServiceProxy proxy);
void onDiscovered(RMIServiceInfo info);
void onDiscoveryStarted();
void onDiscoveryFinished() throws IllegalAccessException;
}
Loading