Skip to content

Commit

Permalink
Added Bluetooth Adapter state monitoring to connection (#275)
Browse files Browse the repository at this point in the history
`DeadObjectException` is usually thrown when interacting with `BluetoothAdapter` or `BluetoothGatt` instance that was obtained before bluetooth being turned off. Prior to library version `1.3.0` all errors raised when interacting with `BluetoothGatt` or recieved by `BluetoothGattCallback` were closing the connection (were emitted by `RxBleDevice.establishConnection()`). After `1.3.0` only errors raised in `RxBleRadioOperationConnect` and recieved by `BluetoothGattCallback.onConnectionStateChange()` were closing the connection so it was possible that `DeadObjectException`s raised repetedly by subscribing to i.e. `RxBleConnection.readRssi()` would not close the connection. Added monitoring of BluetoothAdapter’s state to prevent `DeadObjectException`s and inform the user about the connection loss as soon as possible.
  • Loading branch information
dariuszseweryn committed Sep 5, 2017
1 parent 53d85dc commit d44afab
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 179 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.polidea.rxandroidble.internal.connection;


import com.jakewharton.rxrelay.BehaviorRelay;
import com.polidea.rxandroidble.RxBleAdapterStateObservable;
import com.polidea.rxandroidble.exceptions.BleDisconnectedException;
import com.polidea.rxandroidble.exceptions.BleException;
import com.polidea.rxandroidble.internal.DeviceModule;
import javax.inject.Inject;
import javax.inject.Named;
import rx.Observable;
import rx.functions.Func1;

/**
* A class that is responsible for routing all potential sources of disconnection to an Observable that emits only errors.
*/
@ConnectionScope
class DisconnectionRouter {

private final BehaviorRelay<BleException> disconnectionErrorRelay = BehaviorRelay.create();

private final Observable disconnectionErrorObservable;

@Inject
DisconnectionRouter(
@Named(DeviceModule.MAC_ADDRESS) final String macAddress,
Observable<RxBleAdapterStateObservable.BleAdapterState> adapterStateObservable
) {
disconnectionErrorObservable = Observable.merge(
disconnectionErrorRelay
.flatMap(new Func1<BleException, Observable<?>>() {
@Override
public Observable<?> call(BleException e) {
return Observable.error(e);
}
}),
adapterStateObservable
.filter(new Func1<RxBleAdapterStateObservable.BleAdapterState, Boolean>() {
@Override
public Boolean call(RxBleAdapterStateObservable.BleAdapterState bleAdapterState) {
return !bleAdapterState.isUsable();
}
})
.flatMap(new Func1<RxBleAdapterStateObservable.BleAdapterState, Observable<?>>() {
@Override
public Observable<?> call(RxBleAdapterStateObservable.BleAdapterState bleAdapterState) {
return Observable.error(new BleDisconnectedException(macAddress)); // TODO: Introduce BleDisabledException?
}
})
)
.replay()
.autoConnect(0);
}

/**
* Method to be called whenever a connection braking exception happens. It will be routed to {@link #asObservable()}.
*
* @param bleException the exception that happened
*/
void route(BleException bleException) {
disconnectionErrorRelay.call(bleException);
}

/**
* Function returning an Observable that will only throw error in case of a disconnection
*
* @param <T> the type of returned observable
* @return the Observable
*/
<T> Observable<T> asObservable() {
//noinspection unchecked
return disconnectionErrorObservable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import android.bluetooth.BluetoothGattCallback;
import android.bluetooth.BluetoothGattCharacteristic;
import android.bluetooth.BluetoothGattDescriptor;
import android.util.Pair;

import com.jakewharton.rxrelay.PublishRelay;
import com.jakewharton.rxrelay.SerializedRelay;
Expand Down Expand Up @@ -35,7 +34,8 @@ public class RxBleGattCallback {

private final Scheduler callbackScheduler;
private final BluetoothGattProvider bluetoothGattProvider;
private final Output<Pair<BluetoothGatt, RxBleConnectionState>> gattAndConnectionStateOutput = new Output<>();
private final DisconnectionRouter disconnectionRouter;
private final PublishRelay<RxBleConnectionState> connectionStatePublishRelay = PublishRelay.create();
private final Output<RxBleDeviceServices> servicesDiscoveredOutput = new Output<>();
private final Output<ByteAssociation<UUID>> readCharacteristicOutput = new Output<>();
private final Output<ByteAssociation<UUID>> writeCharacteristicOutput = new Output<>();
Expand All @@ -45,39 +45,20 @@ public class RxBleGattCallback {
private final Output<ByteAssociation<BluetoothGattDescriptor>> writeDescriptorOutput = new Output<>();
private final Output<Integer> readRssiOutput = new Output<>();
private final Output<Integer> changedMtuOutput = new Output<>();
private final Func1<BleGattException, Object> errorMapper = new Func1<BleGattException, Object>() {
private final Func1<BleGattException, Observable<?>> errorMapper = new Func1<BleGattException, Observable<?>>() {
@Override
public Object call(BleGattException bleGattException) {
throw bleGattException;
public Observable<?> call(BleGattException bleGattException) {
return Observable.error(bleGattException);
}
};
private final Observable disconnectedErrorObservable = gattAndConnectionStateOutput.valueRelay
.filter(new Func1<Pair<BluetoothGatt, RxBleConnectionState>, Boolean>() {
@Override
public Boolean call(Pair<BluetoothGatt, RxBleConnectionState> pair) {
return isDisconnectedOrDisconnecting(pair);
}
})
.map(new Func1<Pair<BluetoothGatt, RxBleConnectionState>, Object>() {
@Override
public Object call(Pair<BluetoothGatt, RxBleConnectionState> bluetoothGattRxBleConnectionStatePair) {
throw new BleDisconnectedException(bluetoothGattRxBleConnectionStatePair.first.getDevice().getAddress());
}
})
.mergeWith(gattAndConnectionStateOutput.errorRelay.map(errorMapper))
.replay()
.autoConnect(0);

@Inject
public RxBleGattCallback(@Named(ClientComponent.NamedSchedulers.GATT_CALLBACK) Scheduler callbackScheduler,
BluetoothGattProvider bluetoothGattProvider) {
BluetoothGattProvider bluetoothGattProvider,
DisconnectionRouter disconnectionRouter) {
this.callbackScheduler = callbackScheduler;
this.bluetoothGattProvider = bluetoothGattProvider;
}

private boolean isDisconnectedOrDisconnecting(Pair<BluetoothGatt, RxBleConnectionState> pair) {
RxBleConnectionState rxBleConnectionState = pair.second;
return rxBleConnectionState == RxBleConnectionState.DISCONNECTED || rxBleConnectionState == RxBleConnectionState.DISCONNECTING;
this.disconnectionRouter = disconnectionRouter;
}

private BluetoothGattCallback bluetoothGattCallback = new BluetoothGattCallback() {
Expand All @@ -88,8 +69,17 @@ public void onConnectionStateChange(BluetoothGatt gatt, int status, int newState
super.onConnectionStateChange(gatt, status, newState);
bluetoothGattProvider.updateBluetoothGatt(gatt);

propagateErrorIfOccurred(gattAndConnectionStateOutput, gatt, status, BleGattOperationType.CONNECTION_STATE);
gattAndConnectionStateOutput.valueRelay.call(new Pair<>(gatt, mapConnectionStateToRxBleConnectionStatus(newState)));
if (isDisconnectedOrDisconnecting(newState)) {
disconnectionRouter.route(new BleDisconnectedException(gatt.getDevice().getAddress()));
} else if (status != BluetoothGatt.GATT_SUCCESS) {
disconnectionRouter.route(new BleGattException(gatt, status, BleGattOperationType.CONNECTION_STATE));
}

connectionStatePublishRelay.call(mapConnectionStateToRxBleConnectionStatus(newState));
}

private boolean isDisconnectedOrDisconnecting(int newState) {
return newState == BluetoothGatt.STATE_DISCONNECTED || newState == BluetoothGatt.STATE_DISCONNECTING;
}

@Override
Expand Down Expand Up @@ -253,10 +243,9 @@ private boolean propagateStatusError(Output output, BleGattException exception)
private <T> Observable<T> withDisconnectionHandling(Output<T> output) {
//noinspection unchecked
return Observable.merge(
disconnectedErrorObservable,
gattAndConnectionStateOutput.errorRelay.map(errorMapper),
disconnectionRouter.<T>asObservable(),
output.valueRelay,
output.errorRelay.map(errorMapper)
(Observable<T>) output.errorRelay.flatMap(errorMapper)
);
}

Expand All @@ -271,23 +260,15 @@ public BluetoothGattCallback getBluetoothGattCallback() {
*/
public <T> Observable<T> observeDisconnect() {
//noinspection unchecked
return disconnectedErrorObservable;
return disconnectionRouter.asObservable();
}

/**
* @return Observable that emits RxBleConnectionState that matches BluetoothGatt's state.
* Does NOT emit errors even if status != GATT_SUCCESS.
*/
public Observable<RxBleConnectionState> getOnConnectionStateChange() {
return gattAndConnectionStateOutput.valueRelay.map(
new Func1<Pair<BluetoothGatt, RxBleConnectionState>, RxBleConnectionState>() {
@Override
public RxBleConnectionState call(
Pair<BluetoothGatt, RxBleConnectionState> bluetoothGattRxBleConnectionStatePair) {
return bluetoothGattRxBleConnectionStatePair.second;
}
}
).observeOn(callbackScheduler);
return connectionStatePublishRelay.observeOn(callbackScheduler);
}

public Observable<RxBleDeviceServices> getOnServicesDiscovered() {
Expand All @@ -309,7 +290,7 @@ public Observable<ByteAssociation<UUID>> getOnCharacteristicWrite() {
public Observable<CharacteristicChangedEvent> getOnCharacteristicChanged() {
//noinspection unchecked
return Observable.merge(
disconnectedErrorObservable,
disconnectionRouter.<CharacteristicChangedEvent>asObservable(),
changedCharacteristicSerializedPublishRelay
)
.observeOn(callbackScheduler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.polidea.rxandroidble.internal.connection

import static com.polidea.rxandroidble.RxBleAdapterStateObservable.BleAdapterState.STATE_OFF
import static com.polidea.rxandroidble.RxBleAdapterStateObservable.BleAdapterState.STATE_TURNING_OFF
import static com.polidea.rxandroidble.RxBleAdapterStateObservable.BleAdapterState.STATE_TURNING_ON

import com.polidea.rxandroidble.RxBleAdapterStateObservable
import com.polidea.rxandroidble.exceptions.BleDisconnectedException
import com.polidea.rxandroidble.exceptions.BleException
import org.robospock.RoboSpecification
import rx.observers.TestSubscriber
import rx.subjects.PublishSubject
import spock.lang.Unroll

class DisconnectionRouterTest extends RoboSpecification {

String mockMacAddress = "1234"
PublishSubject<RxBleAdapterStateObservable.BleAdapterState> mockAdapterStateSubject = PublishSubject.create()
DisconnectionRouter objectUnderTest = new DisconnectionRouter(mockMacAddress, mockAdapterStateSubject)
TestSubscriber testSubscriber = new TestSubscriber()

def "should emit exception from .asObservable() when got one from .route()"() {

given:
BleException testException = new BleException()
objectUnderTest.asObservable().subscribe(testSubscriber)

when:
objectUnderTest.route(testException)

then:
testSubscriber.assertError(testException)
}

def "should emit exception from .asObservable() when got one from .route() even before subscription"() {

given:
BleException testException = new BleException()
objectUnderTest.route(testException)

when:
objectUnderTest.asObservable().subscribe(testSubscriber)

then:
testSubscriber.assertError(testException)
}

@Unroll
def "should emit exception from .asObservable() when adapterStateObservable emits STATE_TURNING_ON/STATE_TURNING_OFF/STATE_OFF"() {

given:
objectUnderTest.asObservable().subscribe(testSubscriber)

when:
mockAdapterStateSubject.onNext(bleAdapterState)

then:
testSubscriber.assertError({ BleDisconnectedException e -> e.bluetoothDeviceAddress == mockMacAddress })

where:
bleAdapterState << [ STATE_TURNING_ON, STATE_TURNING_OFF, STATE_OFF ]
}

@Unroll
def "should emit exception from .asObservable() when adapterStateObservable emits STATE_TURNING_ON/STATE_TURNING_OFF/STATE_OFF even before subscription"() {

given:
mockAdapterStateSubject.onNext(bleAdapterState)

when:
objectUnderTest.asObservable().subscribe(testSubscriber)

then:
testSubscriber.assertError({ BleDisconnectedException e -> e.bluetoothDeviceAddress == mockMacAddress })

where:
bleAdapterState << [ STATE_TURNING_ON, STATE_TURNING_OFF, STATE_OFF ]
}

def "should not emit exception from .asObservable() when adapterStateObservable emits STATE_ON"() {

given:
objectUnderTest.asObservable().subscribe(testSubscriber)

when:
mockAdapterStateSubject.onNext(RxBleAdapterStateObservable.BleAdapterState.STATE_ON)

then:
testSubscriber.assertNoErrors()
}
}
Loading

0 comments on commit d44afab

Please sign in to comment.