Skip to content

Commit

Permalink
0.0.5 snapshot ha (#56)
Browse files Browse the repository at this point in the history
* solve random request drop
there was random request drop happened in concurrent method invocation

request and response has 1:1 relationship with simple incremental unique value.
however, increase of the value was not atomic, which means two different requests can have the  same value.
so one request remains not notified by response while the other is notified normally.

* change field name

* bug fix (reflect API update)

* 0.0.5 snapshot primitive sedes (#54)

* Add support for Primitive type (e.g. int, double, etc.) in serialization & deserialization

* simplify implementation for service discovery and improve genericity
Prior to this change, the implementation for service discovery should handle listed below..
1. retrieve service information from service advertiser
2. convert it to service proxy
that was too complicated to be applied widely used service discovery technology (like mdns ..)

1.separate service info converting functionality from the discovery
2.remove ad-hoc tick timer for service information retrival which is not well compatible to another service discovery implementation
close #49, #48, #37, #39

* change method name for service discovery

* 0.0.5 snapshot primitive sedes (#55)

* Add support for Primitive type (e.g. int, double, etc.) in serialization & deserialization

* simplify implementation for service discovery and improve genericity
Prior to this change, the implementation for service discovery should handle listed below..
1. retrieve service information from service advertiser
2. convert it to service proxy
that was too complicated to be applied widely used service discovery technology (like mdns ..)

1.separate service info converting functionality from the discovery
2.remove ad-hoc tick timer for service information retrival which is not well compatible to another service discovery implementation
close #49, #48, #37, #39

* change method name for service discovery

* update README
  • Loading branch information
fritzprix committed Nov 6, 2018
1 parent d42c7eb commit d960160
Show file tree
Hide file tree
Showing 17 changed files with 467 additions and 293 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ yarmi is yet anotehr RMI based on JSON. it's simple yet powerful when developing
<dependency>
<groupId>com.doodream</groupId>
<artifactId>yarmi-core</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
</dependency>
</dependencies>
```
Expand Down Expand Up @@ -148,8 +148,8 @@ public static class SimpleClient {
SimpleServiceDiscovery discovery = new SimpleServiceDiscovery();
discovery.startDiscovery(TestService.class, new ServiceDiscoveryListener() {
@Override
public void onDiscovered(RMIServiceProxy proxy) {
discoveredService.add(proxy);
public void onDiscovered(RMIServiceInfo info) {
discoveredService.add(RMIServiceInfo.toServiceProxy(info));
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ private synchronized void registerProxy(RMIServiceProxy serviceProxy) {

private static Observable<RMIServiceProxy> startDiscovery(ServiceDiscovery discovery, Class svc) {
return Observable.create(emitter -> discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() {

@Override
public void onDiscovered(RMIServiceProxy proxy) {
emitter.onNext(proxy);
public void onDiscovered(RMIServiceInfo info) {
emitter.onNext(RMIServiceInfo.toServiceProxy(info));
}

@Override
Expand Down
50 changes: 49 additions & 1 deletion src/main/java/com/doodream/rmovjs/model/RMIServiceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import com.doodream.rmovjs.Properties;
import com.doodream.rmovjs.annotation.server.Service;
import com.doodream.rmovjs.net.RMIServiceProxy;
import com.doodream.rmovjs.net.ServiceAdapter;
import com.doodream.rmovjs.net.ServiceProxyFactory;
import com.doodream.rmovjs.server.RMIController;
import com.google.gson.annotations.SerializedName;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
Expand All @@ -15,6 +20,7 @@
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;

@Builder
@EqualsAndHashCode(exclude = {"proxyFactoryHint"})
Expand Down Expand Up @@ -92,11 +98,53 @@ public void accept(List<ControllerInfo> controllerInfos) throws Exception {
return builder.build();
}

public static boolean isComplete(RMIServiceInfo info) {
public static boolean isValid(RMIServiceInfo info) {
return (info.getProxyFactoryHint() != null) &&
(info.getControllerInfos() != null);
}

public static RMIServiceProxy toServiceProxy(RMIServiceInfo info) {
return Single.just(info)
.map(new Function<RMIServiceInfo, Class<?>>() {
@Override
public Class<?> apply(RMIServiceInfo rmiServiceInfo) throws Exception {
return rmiServiceInfo.getAdapter();
}
})
.map(new Function<Class<?>, Object>() {
@Override
public Object apply(Class<?> cls) throws Exception {
return cls.newInstance();
}
})
.cast(ServiceAdapter.class)
.map(new Function<ServiceAdapter, ServiceProxyFactory>() {
@Override
public ServiceProxyFactory apply(ServiceAdapter serviceAdapter) throws Exception {
return serviceAdapter.getProxyFactory(info);
}
})
.map(new Function<ServiceProxyFactory, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(ServiceProxyFactory serviceProxyFactory) throws Exception {
return serviceProxyFactory.build();
}
})
.onErrorReturn(new Function<Throwable, RMIServiceProxy>() {
@Override
public RMIServiceProxy apply(Throwable throwable) throws Exception {
return RMIServiceProxy.NULL_PROXY;
}
})
.filter(new Predicate<RMIServiceProxy>() {
@Override
public boolean test(RMIServiceProxy proxy) throws Exception {
return !RMIServiceProxy.NULL_PROXY.equals(proxy);
}
})
.blockingGet(RMIServiceProxy.NULL_PROXY);
}

public void copyFrom(RMIServiceInfo info) {
setProxyFactoryHint(info.getProxyFactoryHint());
setParams(info.getParams());
Expand Down
13 changes: 3 additions & 10 deletions src/main/java/com/doodream/rmovjs/net/BaseServiceAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
import io.reactivex.functions.BooleanSupplier;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.observables.GroupedObservable;
import io.reactivex.schedulers.Schedulers;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;

public abstract class BaseServiceAdapter implements ServiceAdapter {

Expand Down Expand Up @@ -123,19 +121,14 @@ public void accept(Request request) throws Exception {
});
}
})
.doOnNext(new Consumer<Request>() {
.observeOn(Schedulers.io())
.subscribe(new Consumer<Request>() {
@Override
public void accept(Request request) throws Exception {
request.setClient(adapter);
if(Log.isTraceEnabled()) {
Log.trace("Request <= {}", request);
}
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Request>() {
@Override
public void accept(Request request) throws Exception {
request.setClient(adapter);
final Response response = handleRequest.apply(request);
if(Log.isTraceEnabled()) {
Log.trace("Response => {}", response);
Expand Down

0 comments on commit d960160

Please sign in to comment.