Skip to content

Commit

Permalink
resolve Java 8 compatibility problem in legacy android version and fi…
Browse files Browse the repository at this point in the history
…x other major bug regarding to ser-deser of BSON

because Android SDK version under 23 doesn't support (or isn't compatible to) Java 8, all the language features of Java 8 are removed (Lambda, Method reference, etc.)
there was two major bugs listed below
- type resolution of body in Response was misbehaving in handling types other than concrete class (List<>, Primitives...)
- there was bug in invoke call proxy in HaRMIClient which causes IllegalArgumentsException

- closes #48 : remove Java 8 features
- closes #49 : fix call proxy invocation
  • Loading branch information
fritzprix committed Aug 31, 2018
1 parent 197bc2d commit 63bf9dc
Show file tree
Hide file tree
Showing 24 changed files with 1,330 additions and 387 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -165,12 +165,12 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.0-alpha4</version>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.3.0-alpha4</version>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down
121 changes: 88 additions & 33 deletions src/main/java/com/doodream/rmovjs/client/HaRMIClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@
import com.doodream.rmovjs.sdp.ServiceDiscoveryListener;
import com.google.common.base.Preconditions;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.annotation.Annotation;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
Expand Down Expand Up @@ -48,7 +55,12 @@ public Object selectNext(List<Object> proxies, Object lastSelected) {
@Override
public Object selectNext(List<Object> proxies, Object lastSelected) {
return Observable.fromIterable(proxies)
.sorted(Comparator.comparing(RMIClient::access))
.sorted(new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
return RMIClient.access(o1).compareTo(RMIClient.access(o2));
}
})
.blockingFirst();
}
};
Expand Down Expand Up @@ -85,20 +97,37 @@ public static void destroy(Object callProxy, boolean force) {
client.close(force);
}

public static <T> T create(ServiceDiscovery discovery, long qos, Class svc, Class<T> ctrl, RequestRoutePolicy policy, AvailabilityChangeListener listener) {
public static <T> T create(ServiceDiscovery discovery, long qos, Class svc, final Class<T> ctrl, RequestRoutePolicy policy, AvailabilityChangeListener listener) {

Service service = (Service) svc.getAnnotation(Service.class);
Preconditions.checkNotNull(service);


Controller controller = Observable.fromArray(svc.getDeclaredFields())
.filter(field -> field.getType().equals(ctrl))
.map(field -> field.getAnnotation(Controller.class))
.filter(new Predicate<Field>() {
@Override
public boolean test(Field field) throws Exception {
return field.getType().equals(ctrl);
}
})
.map(new Function<Field, Controller>() {
@Override
public Controller apply(Field field) throws Exception {
return field.getAnnotation(Controller.class);
}
})
.blockingFirst(null);


List<Method> validMethods = Observable.fromArray(ctrl.getMethods())
.filter(RMIMethod::isValidMethod).toList().blockingGet();
.filter(new Predicate<Method>() {
@Override
public boolean test(Method method) throws Exception {
return RMIMethod.isValidMethod(method);
}
})
.toList()
.blockingGet();

final RMIServiceInfo serviceInfo = RMIServiceInfo.from(svc);
Preconditions.checkNotNull(serviceInfo, "Invalid Service Class %s", svc);
Expand All @@ -107,14 +136,24 @@ public static <T> T create(ServiceDiscovery discovery, long qos, Class svc, Clas
Preconditions.checkNotNull(controller, "no matched controller");
Preconditions.checkArgument(ctrl.isInterface());

HaRMIClient<T> haRMIClient = new HaRMIClient<>(svc, ctrl, qos, DEFAULT_QOS_UPDATE_PERIOD, TimeUnit.MILLISECONDS, policy);
final HaRMIClient<T> haRMIClient = new HaRMIClient<>(svc, ctrl, qos, DEFAULT_QOS_UPDATE_PERIOD, TimeUnit.MILLISECONDS, policy);
haRMIClient.availabilityChangeListener = listener;


CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(startDiscovery(discovery, svc)
.subscribeOn(Schedulers.newThread())
.subscribe(haRMIClient::registerProxy, haRMIClient::onError));
.subscribe(new Consumer<RMIServiceProxy>() {
@Override
public void accept(RMIServiceProxy rmiServiceProxy) throws Exception {
haRMIClient.registerProxy(rmiServiceProxy);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
haRMIClient.onError(throwable);
}
}));

haRMIClient.setDisposable(compositeDisposable);

Expand Down Expand Up @@ -178,39 +217,52 @@ private synchronized void registerProxy(RMIServiceProxy serviceProxy) {
Log.debug("client is added");
}

listenerInvoker.submit(() -> {
synchronized (clients) {
availabilityChangeListener.onAvailabilityChanged(clients.size());
clients.notifyAll();
listenerInvoker.submit(new Runnable() {
@Override
public void run() {
synchronized (clients) {
availabilityChangeListener.onAvailabilityChanged(clients.size());
clients.notifyAll();
}
}
});
}


private static Observable<RMIServiceProxy> startDiscovery(ServiceDiscovery discovery, Class svc) {
return Observable.create(emitter -> discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() {
@Override
public void onDiscovered(RMIServiceProxy proxy) {
emitter.onNext(proxy);
}

private static Observable<RMIServiceProxy> startDiscovery(final ServiceDiscovery discovery, final Class svc) {
return Observable.create(new ObservableOnSubscribe<RMIServiceProxy>() {
@Override
public void onDiscoveryStarted() {

public void subscribe(final ObservableEmitter<RMIServiceProxy> emitter) throws Exception {
discovery.startDiscovery(svc, false, new ServiceDiscoveryListener() {
@Override
public void onDiscovered(RMIServiceProxy proxy) {
emitter.onNext(proxy);
}

@Override
public void onDiscoveryStarted() {

}

@Override
public void onDiscoveryFinished() {
emitter.onComplete();
}
});
}

@Override
public void onDiscoveryFinished() {
emitter.onComplete();
}
}));
});
}


private void close(boolean force) {
private void close(final boolean force) {
compositeDisposable.dispose();
listenerInvoker.shutdown();
clients.forEach(proxy -> RMIClient.destroy(proxy, force));
clients.forEach(new java.util.function.Consumer<Object>() {
@Override
public void accept(Object proxy) {
RMIClient.destroy(proxy, force);
}
});
}


Expand Down Expand Up @@ -244,7 +296,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
Preconditions.checkNotNull(lastProxy);
Log.debug("invoking : {} , {}", method, args);
try {
return method.invoke(lastProxy, method, args);
return method.invoke(lastProxy, args);
} catch (RMIException e) {
if(RMIError.isServiceBad(e.code())) {
// bad service
Expand All @@ -263,10 +315,13 @@ private synchronized void purgeBadProxy(Object badProxy) {
Log.warn("client ({}) is not in the discovered set", client.who());
}
RMIClient.destroy(badProxy, true);
listenerInvoker.submit(() -> {
synchronized (client) {
availabilityChangeListener.onAvailabilityChanged(clients.size());
client.notifyAll();
listenerInvoker.submit(new Runnable() {
@Override
public void run() {
synchronized (client) {
availabilityChangeListener.onAvailabilityChanged(clients.size());
client.notifyAll();
}
}
});
}
Expand Down
Loading

0 comments on commit 63bf9dc

Please sign in to comment.