Skip to content

Tutorial: Connection Observable sharing

Dariusz Seweryn edited this page Mar 29, 2019 · 1 revision

Introduction

* This page contains opinions — the reader may not agree with them

The library is designed to not allow for subscribing to RxBleDevice.establishConnection() twice or more if the previously established connection has not been ended. An BleAlreadyConnectedException is emitted to the second or next subscribers.

Motivation

Quite often it happens that communication protocol between a BLE central and a peripheral is using a request-response pattern. Sometimes the response itself contains no information about the request that triggered it so it is only understandable with the request. A code that could model this looks like this:

Single.zip(
    rxBleConnection.writeCharacteristic(uuid, data), // write the request
    characteristicNotificationObservable.take(1).singleOrError(), // and await the first response
    { requestData, responseData -> /* interpretation */ }
)

The problem is that if one would try to call this code two times around the same moment the responses could get mixed. These situations tend to happen seemingly at random so they are hard to debug. To prevent them the BleAlreadyConnectedException is emitted.

You may think:

Ok, I am already aware of this possibility but I still need to share the connection — how can I continue?

Sharing The Connection

There are three major ways of of interacting with RxBleConnection from different entry points. Starting from the best (in the opinion of the library's author) to the worst in terms of maintainability.

1. Use a single .subscribe()

An approach derived from an excellent Jake Wharton's talk about Managing State with RxJava.

Pros

  • Whole flow specified in a single place
  • Easiest to follow and comprehend once familiar with Rx programming
  • No state managing
  • Most control
  • Least error prone

Cons

  • May be difficult for beginners

Example

Click to expand

Every app tends to have a unique data flow. Let's say this time there is a need to do one thing when the connection is established, a thing that is not related to the connection and a second thing on the connection. All the things should happen during a single connection.

rxBleDevice.establishConnection(false)
    .flatMapCompletable { connection ->
        Completable.concat(listOf(
            connection.discoverServices().ignoreElement(), // one thing with connection
            otherCompletableNotRelatedToConnection,
            connection.writeCharacteristic(uuid, data).ignoreElement() // second thing with connection
        ))
    }
    .subscribe()

2. Multiple subscribes with RxReplayingShare

Already mentioned Jake Wharton has published a library that allows for easy sharing of an Observable. The difference between standard RxJava .share() operator and this utility is that if an emission from the shared Observable has already happened before a second subscription the previous emission would get replayed.

Pros

  • Easy to follow given all .subscribe() points
  • No state managing

Cons

  • Lack of control over how many connections interactions will happen

Example

Click to expand
val sharedConnectionObs = rxBleDevice.establishConnection(false).compose(ReplayingShare.instance())

sharedConnectionObs.subscribe({ /* ignored */ }, { /* ignored */ } // used only to keep the connection alive during otherCompletableNotRelatedToConnection below

Completable.concat(listOf(
    sharedConnectionObs.flatMap { /* do something with the connection */ }.take(1).ignoreElements(),
    otherCompletableNotRelatedToConnection,
    sharedConnectionObs.flatMap { /* do something else with the connection */ }.take(1).ignoreElements()
)).subscribe()

Over how many connections the above Completable will happen?

One cannot be sure. If the sharedConnectionObs would disconnect while otherCompletableNotRelatedToConnection is running the second sharedConnectionObs would try to connect to the peripheral again. Is this a problem? It may be — depending on the use-case.

3. Stateful connection manager

Pros

  • Easy to understand for people coming from object oriented programming background
  • Good control

Cons

  • Stateful (and therefore error prone)
  • Poorly models asynchronous nature of connections

Example

Taken from this issue by davordev

Click to expand

The whole idea behind this is to have a single source of truth for active bluetooth connection. So, in order to do anything with the ble device, you should call bleSessionManager.connection which will either return an active connection or throw an exception (preferably set by developer) notifying the user that the connection isn't established. In order to establish an connection, the following code should be called first:

bleSessionManager.apply {
    setMacAddress("MAC_ADDRESS_OF_DEVICE")
    setupConnection()
}

Preferably, you can call RxBleDevice.observeConnectionStateChanges() to track the connection state.

If you want to terminate the connection, you can either:

  • call bleSessionManager.disconnect() to close down the connection instantly, or
  • call bleSessionManager.disconnectButFirst { [..code to execute..] } to close down the connection after the code has been executed

BleSessionManager.kt

/**
 * Manages Bluetooth connection.
 * @property rxBleClient RxBleClient
 * @property mSchedulerProvider SchedulerProvider
 * @property eventStream CompositeDisposable
 * @property macAddress String?
 * @property disconnectTriggerSubject PublishSubject<Boolean>
 * @property _connection Observable<RxBleConnection>?
 * @property connection Observable<RxBleConnection>
 * @constructor
 */
class BleSessionManager(
    val rxBleClient: RxBleClient,
    val mSchedulerProvider: SchedulerProvider
) {

    private val eventStream: CompositeDisposable = CompositeDisposable()
    private var macAddress: String? = null
    private val disconnectTriggerSubject: PublishSubject<Boolean> = PublishSubject.create<Boolean>()
    private var _connection: Observable<RxBleConnection>? = null

    var connection: Observable<RxBleConnection>
        get() {
            return _connection ?: connectionFallback()
        }
        private set(value) {
            this._connection = value
        }

    /**
     * Fallback in case connection isn't valid.
     * @return Observable<RxBleConnection>
     */
    private fun connectionFallback(): Observable<RxBleConnection> {
        return Observable.error(NotConnectedException())
    }

    /**
     * Used to set mac address.
     * @param macAddress String
     */
    fun setMacAddress(macAddress: String) {
        this.macAddress = macAddress
    }

    /**
     * Configures connection. Called after `setMacAddress()`.
     * @throws NotFoundException May throw NotFoundException if mac address is not set.
     */
    fun setupConnection() {
        val address = this.macAddress ?: throw NotFoundException()

        val observable = rxBleClient.getBleDevice(address)
            .establishConnection(true, Timeout(BLE_CONNECT_OPERATION_TIMEOUT, TimeUnit.SECONDS))
            .takeUntil(disconnectTriggerSubject)
            .compose(ReplayingShare.instance())

        // Subscribe actions should be implemented even though they have no implementation.
        val disposable = observable
            .subscribeOn(mSchedulerProvider.io())
            .retry(10)
            .subscribe(
                {},
                {},
                {}
            )
        eventStream.add(disposable)
        connection = observable
    }

    /**
     * Shuts down the connection and clears event stream.
     */
    private fun closeConnection() {
        disconnectTriggerSubject.onNext(true)
        eventStream.clear()
    }

    /**
     * Nullifies connection. Triggered by End User.
     */
    fun disconnect() {
        closeConnection()
        _connection = null
    }

    /**
     * Nullifies connection, but executes the given code before. Triggered by End User.
     */
    fun disconnectButFirst(extensionCode: () -> Unit) {
        extensionCode()
        disconnect()
    }
}

BluetoothService.kt

class BluetoothService(
    val bluetoothClient: RxBleClient,
    val bluetoothScanSettings: ScanSettings,
    val mSchedulerProvider: SchedulerProvider,
    val bleSessionManager: BleSessionManager
) : IBluetoothService {

    private fun observeConnection(device: RxBleDevice): Observable<ConnectionState> {
        return device
            .observeConnectionStateChanges()
            .throttleLatest(500, TimeUnit.MILLISECONDS)
            .subscribeOn(mSchedulerProvider.io())
            .map { it.mapToConnectionState() }
    }

    override fun connectToDevice(macAddress: String): Observable<ConnectionState> {
        val device = bluetoothClient.getBleDevice(macAddress)
        bleSessionManager.setMacAddress(macAddress)
        bleSessionManager.setupConnection()
        return observeConnection(device)
    }

    override fun getDeviceShadow(): Single<HelixDeviceShadow> {

        return bleSessionManager.connection
            .subscribeOn(mSchedulerProvider.io())
            .firstOrError()
            .flatMap { it.readCharacteristic(MY_UUID) }
            .map {
                it.mapToMyResponse()
            }
            .doOnError { it.mapBluetoothGattOperationExceptions() }
    }

    override fun disconnect() {

        var disposable: Disposable? = null

        disposable = bleSessionManager.connection
            .subscribeOn(mSchedulerProvider.io())
            .firstOrError()
            .flatMap {
                it.writeCharacteristic(SLEEP_UUID, putHelixToSleep())
            }
            .ignoreElement()
            .onErrorComplete()
            .doFinally {

                disposable?.dispose()
                otaUpdateDisposables.clear()
                bleSessionManager.disconnect()
            }
            .subscribe()
    }