Skip to content

Commit

Permalink
Added a way to execute custom RxBleRadioOperation
Browse files Browse the repository at this point in the history
The RxBleConnection interface has now a new method named queue that enable
library consumer to queue their own RxBleRadioOperation. The queue method
receives in parameter a RxBleRadioOperationCustom implementation. This
said implementation receives the underlying Android BLE API objects and
is responsible of returning an Observable<T> that executes the operation
asynchrounously.

The Observable<T> returned is then executed as part of normal queue
processing logic. The queue is released when the Observable<T> terminates
either via an error or normally via complete.

This feature shall be used only by people that studied the RxAndroidBLE
source code and understand the implication of running their own operation.
  • Loading branch information
maoueh authored and dariuszseweryn committed Feb 27, 2017
1 parent eb2eab4 commit fb2eccc
Show file tree
Hide file tree
Showing 17 changed files with 491 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.polidea.rxandroidble.NotificationSetupMode;
import com.polidea.rxandroidble.RxBleConnection;
import com.polidea.rxandroidble.RxBleDeviceServices;
import com.polidea.rxandroidble.RxBleRadioOperationCustom;
import com.polidea.rxandroidble.exceptions.BleConflictingNotificationAlreadySetException;
import com.polidea.rxandroidble.internal.connection.ImmediateSerializedBatchAckStrategy;
import com.polidea.rxandroidble.internal.util.ObservableUtil;
Expand Down Expand Up @@ -164,7 +165,7 @@ public Observable<Observable<byte[]>> setupNotification(@NonNull BluetoothGattCh

@Override
public Observable<Observable<byte[]>> setupIndication(@NonNull UUID characteristicUuid) {
return setupIndication(characteristicUuid, NotificationSetupMode.DEFAULT);
return setupIndication(characteristicUuid, NotificationSetupMode.DEFAULT);
}

@Override
Expand Down Expand Up @@ -470,4 +471,9 @@ public Boolean call(byte[] ignored) {
}

}

@Override
public <T> Observable<T> queue(RxBleRadioOperationCustom<T> operation) {
throw new UnsupportedOperationException("Mock does not support queuing custom operation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public String getMacAddress() {

@Override
public BluetoothDevice getBluetoothDevice() {
throw new UnsupportedOperationException("Mock does not support returning a "
+ "BluetoothDevice.");
throw new UnsupportedOperationException("Mock does not support returning a BluetoothDevice.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.Executors;
import rx.Observable;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -64,7 +65,7 @@ class RxBleClientImpl extends RxBleClient {
public static RxBleClientImpl getInstance(@NonNull Context context) {
final Context applicationContext = context.getApplicationContext();
final RxBleAdapterWrapper rxBleAdapterWrapper = new RxBleAdapterWrapper(BluetoothAdapter.getDefaultAdapter());
final RxBleRadioImpl rxBleRadio = new RxBleRadioImpl();
final RxBleRadioImpl rxBleRadio = new RxBleRadioImpl(getRxBleRadioScheduler());
final RxBleAdapterStateObservable adapterStateObservable = new RxBleAdapterStateObservable(applicationContext);
final BleConnectionCompat bleConnectionCompat = new BleConnectionCompat(context);
final ExecutorService executor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -194,4 +195,13 @@ public RxBleScanResult call(RxBleInternalScanResult scanResult) {
})
.share();
}

/**
* In some implementations (i.e. Samsung Android 4.3) calling BluetoothDevice.connectGatt()
* from thread other than main thread ends in connecting with status 133. It's safer to make bluetooth calls
* on the main thread.
*/
private static Scheduler getRxBleRadioScheduler() {
return AndroidSchedulers.mainThread();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.polidea.rxandroidble;

import android.bluetooth.BluetoothGatt;
import android.bluetooth.BluetoothGattCharacteristic;
import android.bluetooth.BluetoothGattDescriptor;
import android.content.Context;
Expand All @@ -13,11 +14,13 @@
import com.polidea.rxandroidble.exceptions.BleGattCannotStartException;
import com.polidea.rxandroidble.exceptions.BleGattException;
import com.polidea.rxandroidble.exceptions.BleGattOperationType;
import com.polidea.rxandroidble.internal.connection.RxBleGattCallback;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Scheduler;

/**
* The BLE connection handle, supporting GATT operations. Operations are enqueued and the library makes sure that they are not
Expand Down Expand Up @@ -417,4 +420,33 @@ Observable<Observable<byte[]>> setupIndication(@NonNull BluetoothGattCharacteris
*/
@RequiresApi(api = Build.VERSION_CODES.LOLLIPOP)
Observable<Integer> requestMtu(int mtu);

/**
* <b>This method requires deep knowledge of RxAndroidBLE internals. Use it only as a last resort if you know
* what your are doing.</b>
* <p>
* Queue an operation for future execution. The method accepts a {@link RxBleRadioOperationCustom} concrete implementation
* and will queue it inside connection operation queue. When ready to execute, the {@link Observable<T>} returned
* by the {@link RxBleRadioOperationCustom#asObservable(BluetoothGatt, RxBleGattCallback, Scheduler)} will be
* subscribed to.
* <p>
* Every event emitted by the {@link Observable<T>} returned by
* {@link RxBleRadioOperationCustom#asObservable(BluetoothGatt, RxBleGattCallback, Scheduler)} will be forwarded
* to the {@link Observable<T>} returned by this method.
* <p>
* You <b>must</b> ensure the custom operation's {@link Observable<T>} do terminate either via {@code onCompleted}
* or {@code onError(Throwable)}. Otherwise, the internal queue orchestrator will wait forever for
* your {@link Observable<T>} to complete. Normal queue processing will be resumed after the {@link Observable<T>}
* returned by {@link RxBleRadioOperationCustom#asObservable(BluetoothGatt, RxBleGattCallback, Scheduler)}
* completes.
* <p>
* The operation will be added to the queue using a {@link com.polidea.rxandroidble.internal.RxBleRadioOperation.Priority#NORMAL}
* priority.
*
* @param operation The custom radio operation to queue.
* @param <T> The type returned by the {@link RxBleRadioOperationCustom} instance.
* @return Observable emitting the value after execution or an error in case of failure.
*/
<T> Observable<T> queue(RxBleRadioOperationCustom<T> operation);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.polidea.rxandroidble;

import android.bluetooth.BluetoothGatt;
import android.support.annotation.NonNull;

import com.polidea.rxandroidble.internal.RxBleRadio;
import com.polidea.rxandroidble.internal.connection.RxBleGattCallback;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;

/**
* Represents a custom operation that will be enqueued for future execution within the client instance.
*/
public interface RxBleRadioOperationCustom<T> {

/**
* Return an observable that implement a custom radio operation using low-level Android BLE API.
* <p>
* The {@link Observable<T>} returned by this method will be subscribed to by the {@link RxBleRadio}
* when it determines that the custom operation should be the next to be run.
* <p>
* The method receives everything needed to access the low-level Android BLE API objects mainly the
* {@link BluetoothGatt} to interact with Android BLE GATT operations and {@link RxBleGattCallback}
* to be notified when GATT operations completes.
* <p>
* Every event emitted by the returned {@link Observable<T>} will be forwarded to the observable
* returned by {@link RxBleConnection#queue(RxBleRadioOperationCustom)}
* <p>
* As the implementer, your contract is to return an {@link Observable<T>} that completes at some
* point in time. When the returned observable terminates, either via the {@link Observer#onCompleted()} or
* {@link Observer#onError(Throwable)} callback, the {@link RxBleRadio} queue's lock is released so that
* queue operations can continue.
* <p>
* You <b>must</b> ensure the returned {@link Observable<T>} do terminate either via {@code onCompleted}
* or {@code onError(Throwable)}. Otherwise, the internal queue orchestrator will wait forever for
* your {@link Observable<T>} to complete and the it will not continue to process queued operations.
*
* @param bluetoothGatt The Android API GATT instance
* @param rxBleGattCallback The internal Rx ready bluetooth gatt callback to be notified of GATT operations
* @param scheduler The RxBleRadio scheduler used to asObservable operation (currently {@link AndroidSchedulers#mainThread()}
* @throws Throwable Any exception that your custom operation might throw
*/
@NonNull
Observable<T> asObservable(BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
Scheduler scheduler) throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.polidea.rxandroidble.internal;

import rx.Observable;
import rx.Scheduler;

public interface RxBleRadio {

Scheduler scheduler();

<T> Observable<T> queue(RxBleRadioOperation<T> rxBleRadioOperation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final void run() {
}

/**
* This method will be overriden in concrete operation implementations and
* This method will be overridden in concrete operation implementations and
* will contain specific operation logic.
*/
protected abstract void protectedRun() throws Throwable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import android.bluetooth.BluetoothGatt;
import android.bluetooth.BluetoothGattCharacteristic;
import android.support.annotation.NonNull;

import com.polidea.rxandroidble.RxBleConnection;
import com.polidea.rxandroidble.internal.RxBleRadio;
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationCharacteristicLongWrite;

import java.util.UUID;
import java.util.concurrent.Callable;

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

Expand Down Expand Up @@ -109,7 +111,7 @@ public Observable<byte[]> call(BluetoothGattCharacteristic bluetoothGattCharacte
maxBatchSizeCallable,
writeOperationAckStrategy,
bytes,
AndroidSchedulers.mainThread(),
rxBleRadio.scheduler(),
Schedulers.computation()
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
import android.bluetooth.BluetoothGattDescriptor;
import android.bluetooth.BluetoothGattService;
import android.os.Build;
import android.os.DeadObjectException;
import android.support.annotation.NonNull;
import android.support.annotation.RequiresApi;

import com.polidea.rxandroidble.NotificationSetupMode;
import com.polidea.rxandroidble.RxBleConnection;
import com.polidea.rxandroidble.RxBleDeviceServices;
import com.polidea.rxandroidble.RxBleRadioOperationCustom;
import com.polidea.rxandroidble.exceptions.BleCannotSetCharacteristicNotificationException;
import com.polidea.rxandroidble.exceptions.BleConflictingNotificationAlreadySetException;
import com.polidea.rxandroidble.exceptions.BleDisconnectedException;
import com.polidea.rxandroidble.exceptions.BleException;
import com.polidea.rxandroidble.internal.RxBleRadio;
import com.polidea.rxandroidble.internal.RxBleRadioOperation;
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationCharacteristicRead;
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationCharacteristicWrite;
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationDescriptorRead;
Expand All @@ -22,8 +27,8 @@
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationReadRssi;
import com.polidea.rxandroidble.internal.operations.RxBleRadioOperationServicesDiscover;
import com.polidea.rxandroidble.internal.util.ByteAssociation;
import com.polidea.rxandroidble.internal.util.CharacteristicNotificationId;
import com.polidea.rxandroidble.internal.util.CharacteristicChangedEvent;
import com.polidea.rxandroidble.internal.util.CharacteristicNotificationId;
import com.polidea.rxandroidble.internal.util.ObservableUtil;

import java.util.HashMap;
Expand Down Expand Up @@ -528,4 +533,30 @@ public Observable<byte[]> writeDescriptor(BluetoothGattDescriptor bluetoothGattD
public Observable<Integer> readRssi() {
return rxBleRadio.queue(new RxBleRadioOperationReadRssi(gattCallback, bluetoothGatt, timeoutScheduler));
}

@Override
public <T> Observable<T> queue(final RxBleRadioOperationCustom<T> operation) {
return rxBleRadio.queue(new RxBleRadioOperation<T>() {
@Override
@SuppressWarnings("ConstantConditions")
protected void protectedRun() throws Throwable {
Observable<T> operationObservable = operation.asObservable(bluetoothGatt, gattCallback, rxBleRadio.scheduler());
if (operationObservable == null) {
throw new IllegalArgumentException("The custom operation asObservable method must return a non-null observable");
}

operationObservable.doOnCompleted(new Action0() {
@Override
public void call() {
releaseRadio();
}
}).subscribe(getSubscriber());
}

@Override
protected BleException provideException(DeadObjectException deadObjectException) {
return new BleDisconnectedException(deadObjectException, bluetoothGatt.getDevice().getAddress());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class RxBleRadioOperationCharacteristicLongWrite extends RxBleRadioOperat

private final byte[] bytesToWrite;

private final Scheduler mainThreadScheduler;
private final Scheduler scheduler;

private final Scheduler timeoutScheduler;

Expand All @@ -60,7 +60,7 @@ public RxBleRadioOperationCharacteristicLongWrite(
Callable<Integer> batchSizeProvider,
RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy,
byte[] bytesToWrite,
Scheduler mainThreadScheduler,
Scheduler scheduler,
Scheduler timeoutScheduler
) {
this.bluetoothGatt = bluetoothGatt;
Expand All @@ -69,7 +69,7 @@ public RxBleRadioOperationCharacteristicLongWrite(
this.batchSizeProvider = batchSizeProvider;
this.writeOperationAckStrategy = writeOperationAckStrategy;
this.bytesToWrite = bytesToWrite;
this.mainThreadScheduler = mainThreadScheduler;
this.scheduler = scheduler;
this.timeoutScheduler = timeoutScheduler;
}

Expand All @@ -86,7 +86,7 @@ protected void protectedRun() throws Throwable {
final ByteBuffer byteBuffer = ByteBuffer.wrap(bytesToWrite);

writeBatchAndObserve(batchSize, byteBuffer)
.subscribeOn(mainThreadScheduler)
.subscribeOn(scheduler)
.takeFirst(writeResponseForMatchingCharacteristic())
.timeout(
SINGLE_BATCH_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
import java.util.concurrent.Semaphore;

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Action1;

public class RxBleRadioImpl implements RxBleRadio {

private OperationPriorityFifoBlockingQueue queue = new OperationPriorityFifoBlockingQueue();
private final Scheduler scheduler;

public RxBleRadioImpl(final Scheduler scheduler) {
this.scheduler = scheduler;

public RxBleRadioImpl() {
new Thread(new Runnable() {
@Override
public void run() {
Expand All @@ -34,20 +37,14 @@ public void run() {

rxBleRadioOperation.setRadioBlockingSemaphore(semaphore);

/**
* In some implementations (i.e. Samsung Android 4.3) calling BluetoothDevice.connectGatt()
* from thread other than main thread ends in connecting with status 133. It's safer to make bluetooth calls
* on the main thread.
*/
Observable.just(rxBleRadioOperation)
.observeOn(AndroidSchedulers.mainThread())
.observeOn(RxBleRadioImpl.this.scheduler)
.subscribe(new Action1<RxBleRadioOperation>() {
@Override
public void call(RxBleRadioOperation rxBleRadioOperation1) {
rxBleRadioOperation1.run();
}
});

semaphore.acquire();
log("FINISHED", rxBleRadioOperation);
} catch (InterruptedException e) {
Expand All @@ -58,6 +55,11 @@ public void call(RxBleRadioOperation rxBleRadioOperation1) {
}).start();
}

@Override
public Scheduler scheduler() {
return scheduler;
}

@Override
public <T> Observable<T> queue(final RxBleRadioOperation<T> rxBleRadioOperation) {
return rxBleRadioOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ package com.polidea.rxandroidble
import com.polidea.rxandroidble.internal.RxBleRadio
import com.polidea.rxandroidble.internal.RxBleRadioOperation
import rx.Observable
import rx.Scheduler
import rx.schedulers.Schedulers

class FlatRxBleRadio implements RxBleRadio {
public final MockSemaphore semaphore = new MockSemaphore()

def Scheduler scheduler() {
return Schedulers.immediate()
}

@Override
def <T> Observable<T> queue(RxBleRadioOperation<T> rxBleRadioOperation) {
return rxBleRadioOperation
Expand Down

0 comments on commit fb2eccc

Please sign in to comment.