Skip to content

Commit

Permalink
Merge pull request #371 from NordicSemiconductor/feature/progress-ind…
Browse files Browse the repository at this point in the history
…ication-as-flow

Progress indication as flow
  • Loading branch information
philips77 committed Mar 22, 2022
2 parents 793ef75 + fba63e6 commit 0110295
Show file tree
Hide file tree
Showing 25 changed files with 540 additions and 80 deletions.
Expand Up @@ -45,6 +45,7 @@ val BleManager.bondingState: BondState
* Multiple calls for this method return the same object.
* If a different connection observer was set using [BleManager.setConnectionObserver], this
* method will throw [IllegalStateException].
* @since 2.3.0
*/
fun BleManager.stateAsFlow(): StateFlow<ConnectionState> = with(connectionObserver) {
when (this) {
Expand All @@ -59,6 +60,7 @@ fun BleManager.stateAsFlow(): StateFlow<ConnectionState> = with(connectionObserv
* Multiple calls for this method return the same object.
* If a different bond state observer was set using [BleManager.setBondingObserver], this
* method will throw [IllegalStateException].
* @since 2.3.0
*/
fun BleManager.bondingStateAsFlow(): StateFlow<BondState> = with(bondingObserver) {
when (this) {
Expand Down
@@ -1,3 +1,5 @@
@file:Suppress("unused")

package no.nordicsemi.android.ble.ktx

import android.bluetooth.BluetoothGattCharacteristic
Expand All @@ -14,6 +16,7 @@ import java.util.UUID
* manually.
*
* @return GATT characteristic object or null if no characteristic was found.
* @since 2.3.0
*/
fun BluetoothGattService.getCharacteristic(
uuid: UUID,
Expand Down
@@ -0,0 +1,207 @@
@file:Suppress("unused")

package no.nordicsemi.android.ble.ktx

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import no.nordicsemi.android.ble.*
import no.nordicsemi.android.ble.data.DataMerger
import no.nordicsemi.android.ble.data.DataSplitter
import no.nordicsemi.android.ble.data.DefaultMtuSplitter

/**
* The upload or download progress indication.
*
* @since 2.4.0
* @property index The 0-based index of the packet. Only the packets that passed the filter
* will be reported. As the number of expected packets is not know, it is up to the
* application to calculate the real progress based on the index and data length.
* @property data The latest received packet as it was sent by the remote device.
*/
data class ProgressIndication(val index: Int, val data: ByteArray?) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as ProgressIndication

if (index != other.index) return false
if (data != null) {
if (other.data == null) return false
if (!data.contentEquals(other.data)) return false
} else if (other.data != null) return false

return true
}

override fun hashCode(): Int {
var result = index
result = 31 * result + (data?.contentHashCode() ?: 0)
return result
}
}

/**
* Adds a merger that will be used to merge multiple packets into a single Data.
* The merger may modify each packet if necessary.
*
* The returned flow will be notified each time a new packet is received.
*
* @return The flow with progress indications.
* @since 2.4.0
*/
fun ReadRequest.mergeWithProgressFlow(merger: DataMerger): Flow<ProgressIndication> {
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
// Create a temporary callback that will be used to emit progress.
var callback: ((ProgressIndication) -> Unit)? = null
// Set the merger, which will invoke the temporary callback on progress.
// The merger must be called here, not in the callbackFlow.
merge(merger) { _, data, index ->
// The temporary callback will be set in the callbackFlow below.
callback?.invoke(ProgressIndication(index, data))
}
// Return the callback flow. It will be closed when the request is complete or has failed.
return callbackFlow {
callback = { trySend(it) }
then { close() }
awaitClose { callback = null }
}
}

/**
* Adds a merger that will be used to merge multiple packets into a single Data.
* The merger may modify each packet if necessary.
*
* The returned flow will be notified each time a new packet is received.
*
* @return The flow with progress indications.
* @since 2.4.0
*/
fun WaitForValueChangedRequest.mergeWithProgressFlow(merger: DataMerger): Flow<ProgressIndication> {
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
// Create a temporary callback that will be used to emit progress.
var callback: ((ProgressIndication) -> Unit)? = null
// Set the merger, which will invoke the temporary callback on progress.
// The merger must be called here, not in the callbackFlow.
merge(merger) { _, data, index ->
// The temporary callback will be set in the callbackFlow below.
callback?.invoke(ProgressIndication(index, data))
}
// Return the callback flow. It will be closed when the request is complete or has failed.
return callbackFlow {
callback = { trySend(it) }
then { close() }
awaitClose { callback = null }
}
}

/**
* Adds a merger that will be used to merge multiple packets into a single Data.
* The merger may modify each packet if necessary.
*
* The returned flow will be notified each time a new packet is received.
*
* @return The flow with progress indications.
* @since 2.4.0
*/
fun ValueChangedCallback.mergeWithProgressFlow(merger: DataMerger): Flow<ProgressIndication> {
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
// Create a temporary callback that will be used to emit progress.
var callback: ((ProgressIndication) -> Unit)? = null
// Set the merger, which will invoke the temporary callback on progress.
// The merger must be called here, not in the callbackFlow.
merge(merger) { _, data, index ->
// The temporary callback will be set in the callbackFlow below.
callback?.invoke(ProgressIndication(index, data))
}
// Return the callback flow. It will be closed when the request is complete or has failed.
return callbackFlow {
callback = { trySend(it) }
then { close() }
awaitClose { callback = null }
}
}

/**
* Adds a default MTU splitter that will be used to cut given data into at-most MTU-3
* bytes long packets.
*
* The returned flow will be notified each time a new packet is sent.
*
* @return The flow with progress indications.
* @since 2.4.0
*/
fun WriteRequest.splitWithProgressFlow(): Flow<ProgressIndication> = splitWithProgressFlow(DefaultMtuSplitter())

/**
* Adds a splitter that will be used to cut given data into multiple packets.
* The splitter may modify each packet if necessary, i.e. add a flag indicating first packet,
* continuation or the last packet.
*
* The returned flow will be notified each time a new packet is sent.
*
* @return The flow with progress indications.
* @since 2.4.0
*/
fun WriteRequest.splitWithProgressFlow(splitter: DataSplitter): Flow<ProgressIndication> {
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
// Create a temporary callback that will be used to emit progress.
var callback: ((ProgressIndication) -> Unit)? = null
// Set the splitter, which will invoke the temporary callback on progress.
// The splitter must be called here, not in the callbackFlow.
split(splitter) { _, data, index ->
// The temporary callback will be set in the callbackFlow below.
callback?.invoke(ProgressIndication(index, data))
}
// Return the callback flow. It will be closed when the request is complete or has failed.
return callbackFlow {
callback = { trySend(it) }
then { close() }
awaitClose { callback = null }
}
}

/**
* Adds a default MTU splitter that will be used to cut given data into at-most MTU-3
* bytes long packets.
*
* The returned flow will be notified each time a new packet is sent.
*
* @return The flow with progress indications.
* @since 2.4.0
*/
fun WaitForReadRequest.splitWithProgressFlow(): Flow<ProgressIndication> = splitWithProgressFlow(DefaultMtuSplitter())

/**
* Adds a splitter that will be used to cut given data into multiple packets.
* The splitter may modify each packet if necessary, i.e. add a flag indicating first packet,
* continuation or the last packet.
*
* The returned flow will be notified each time a new packet is sent.
*
* @return The flow with progress indications.
* @since 2.4.0
*/
fun WaitForReadRequest.splitWithProgressFlow(splitter: DataSplitter): Flow<ProgressIndication> {
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
// Set the splitter, which will invoke the temporary callback on progress.
// The splitter must be called here, not in the callbackFlow.
var callback: ((ProgressIndication) -> Unit)? = null
// Set the splitter, which invokes the temporary callback.
split(splitter) { _, data, index ->
// The temporary callback will be set in the callbackFlow below.
callback?.invoke(ProgressIndication(index, data))
}
// Return the callback flow. It will be closed when the request is complete or has failed.
return callbackFlow {
callback = { trySend(it) }
then { close() }
awaitClose { callback = null }
}
}
@@ -1,3 +1,5 @@
@file:Suppress("unused")

package no.nordicsemi.android.ble.ktx

import android.bluetooth.BluetoothDevice
Expand All @@ -14,6 +16,7 @@ import kotlin.coroutines.resumeWithException

/**
* Suspends the coroutine until the request is completed.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -26,6 +29,7 @@ suspend fun Request.suspend() = suspendCancellable()
/**
* Suspends the coroutine until the data have been written.
* @return The data written.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand Down Expand Up @@ -56,6 +60,7 @@ suspend fun WriteRequest.suspend(): Data {
* ).suspendForResponse()
*
* @return The data written parsed to required type.
* @since 2.4.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -76,6 +81,7 @@ suspend inline fun <reified T: WriteResponse> WriteRequest.suspendForResponse():
/**
* Suspends the coroutine until the data have been read.
* @return The data read.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -101,6 +107,7 @@ suspend fun ReadRequest.suspend(): Data {
* .suspendForResponse()
*
* @return The data read parsed to required type.
* @since 2.4.0
*/
@Throws(
BluetoothDisabledException::class,
Expand Down Expand Up @@ -129,6 +136,7 @@ suspend inline fun <reified T: ReadResponse> ReadRequest.suspendForResponse(): T
* .suspendForValidResponse()
*
* @return The data read parsed to required type.
* @since 2.4.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -145,6 +153,7 @@ suspend inline fun <reified T: ProfileReadResponse> ReadRequest.suspendForValidR
/**
* Suspends the coroutine until the RSSI value is received.
* @return The current RSSI value.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -163,6 +172,7 @@ suspend fun ReadRssiRequest.suspend(): Int {
/**
* Suspends the coroutine until the MTU value is received.
* @return The current MTU value.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -181,6 +191,7 @@ suspend fun MtuRequest.suspend(): Int {
/**
* Suspends the coroutine until the TX and RX PHY values are received.
* @return A pair of TX and RX PHYs.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -199,6 +210,7 @@ suspend fun PhyRequest.suspend(): Pair<Int, Int> {
/**
* Suspends the coroutine until the value of the attribute has changed.
* @return The new value of the attribute.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand Down Expand Up @@ -234,6 +246,7 @@ suspend fun WaitForValueChangedRequest.suspend(): Data = suspendCancellableCoro
* .suspendForResponse()
*
* @return The new value of the attribute.
* @since 2.4.0
*/
@Throws(
BluetoothDisabledException::class,
Expand Down Expand Up @@ -261,6 +274,7 @@ suspend inline fun <reified T: ReadResponse> WaitForValueChangedRequest.suspendF
* .suspendForValidResponse()
*
* @return The new value of the attribute.
* @since 2.4.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -277,6 +291,7 @@ suspend inline fun <reified T: ProfileReadResponse> WaitForValueChangedRequest.s
/**
* Suspends the coroutine until the value of the attribute has changed.
* @return The new value of the attribute.
* @since 2.3.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -287,6 +302,8 @@ suspend inline fun <reified T: ProfileReadResponse> WaitForValueChangedRequest.s
suspend fun WaitForReadRequest.suspend(): Data = suspendCancellableCoroutine { continuation ->
var data: Data? = null
this
// Make sure the callbacks are called without unnecessary delay.
.setHandler(null)
// DON'T USE .before callback here, it's used to get BluetoothDevice instance above.
.with { _, d -> data = d }
.invalid { continuation.resumeWithException(InvalidRequestException(this)) }
Expand Down Expand Up @@ -314,6 +331,7 @@ suspend fun WaitForReadRequest.suspend(): Data = suspendCancellableCoroutine {
* .suspendForResponse()
*
* @return The new value of the attribute.
* @since 2.4.0
*/
@Throws(
BluetoothDisabledException::class,
Expand All @@ -333,6 +351,8 @@ suspend inline fun <reified T: WriteResponse> WaitForReadRequest.suspendForRespo

private suspend fun Request.suspendCancellable(): Unit = suspendCancellableCoroutine { continuation ->
this
// Make sure the callbacks are called without unnecessary delay.
.setHandler(null)
// DON'T USE .before callback here, it's used to get BluetoothDevice instance above.
.invalid { continuation.resumeWithException(InvalidRequestException(this)) }
.fail { _, status ->
Expand Down

0 comments on commit 0110295

Please sign in to comment.