Skip to content

Commit

Permalink
Prepare DisconnectionRouter for concurrent access. (#442)
Browse files Browse the repository at this point in the history
DisconnectionRouter is storing emitters waiting for information about disconnection in a Queue. This Queue may be accessed concurrently and it was not prepared for this scenario. Additionally there could be a race if an exception would happen when a new emitter would get subscribed.
Fixes #441
  • Loading branch information
dariuszseweryn committed Jun 13, 2018
1 parent e043356 commit b4c771a
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.polidea.rxandroidble2.internal.RxBleLog;
import com.polidea.rxandroidble2.internal.util.RxBleAdapterWrapper;

import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Queue;

import bleshadow.javax.inject.Inject;
Expand All @@ -30,7 +30,7 @@
class DisconnectionRouter implements DisconnectionRouterInput, DisconnectionRouterOutput {

private static final String TAG = "DisconnectionRouter";
private final Queue<ObservableEmitter<BleException>> exceptionEmitters = new LinkedList<>();
private final Queue<ObservableEmitter<BleException>> exceptionEmitters = new ConcurrentLinkedQueue<>();
private BleException exceptionOccurred;
private Disposable adapterMonitoringDisposable;

Expand Down Expand Up @@ -60,14 +60,14 @@ public BleException apply(Boolean isAdapterUsable) {
.firstElement()
.subscribe(new Consumer<BleException>() {
@Override
public void accept(BleException exception) throws Exception {
public void accept(BleException exception) {
RxBleLog.d(TAG, "An exception received, indicating that the adapter has became unusable.");
exceptionOccurred = exception;
notifySubscribersAboutException();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
public void accept(Throwable throwable) {
RxBleLog.w(TAG, "Failed to monitor adapter state.", throwable);
}
});
Expand Down Expand Up @@ -107,7 +107,7 @@ public void onGattConnectionStateException(BleGattException disconnectedGattExce
onExceptionOccurred(disconnectedGattException);
}

private void onExceptionOccurred(BleException exception) {
private synchronized void onExceptionOccurred(BleException exception) {
if (exceptionOccurred == null) {
exceptionOccurred = exception;
notifySubscribersAboutException();
Expand All @@ -133,12 +133,14 @@ private void notifySubscribersAboutException() {
public Observable<BleException> asValueOnlyObservable() {
return Observable.create(new ObservableOnSubscribe<BleException>() {
@Override
public void subscribe(final ObservableEmitter<BleException> emitter) throws Exception {
if (exceptionOccurred != null) {
emitter.onNext(exceptionOccurred);
emitter.onComplete();
} else {
storeEmitterToBeNotifiedInTheFuture(emitter);
public void subscribe(final ObservableEmitter<BleException> emitter) {
synchronized (DisconnectionRouter.this) {
if (exceptionOccurred != null) {
emitter.onNext(exceptionOccurred);
emitter.onComplete();
} else {
storeEmitterToBeNotifiedInTheFuture(emitter);
}
}
}
});
Expand All @@ -148,7 +150,7 @@ private void storeEmitterToBeNotifiedInTheFuture(final ObservableEmitter<BleExce
exceptionEmitters.add(emitter);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
public void cancel() {
exceptionEmitters.remove(emitter);
}
});
Expand Down

0 comments on commit b4c771a

Please sign in to comment.