Skip to content

Commit

Permalink
feature: Add Unsubscribe API to SDK
Browse files Browse the repository at this point in the history
Closes: #18, #20
Signed-Off-By: Andre Weber <andre.weber3@etas.com>
  • Loading branch information
wba2hi committed Oct 23, 2023
1 parent 3ea86a6 commit 69c6294
Show file tree
Hide file tree
Showing 11 changed files with 537 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.eclipse.kuksa

import io.grpc.Context
import io.grpc.ManagedChannel
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.eclipse.kuksa.proto.v1.KuksaValV1
import org.eclipse.kuksa.proto.v1.Types
import org.eclipse.kuksa.proto.v1.Types.Field
import org.eclipse.kuksa.proto.v1.VALGrpc
import org.eclipse.kuksa.subscription.Subscription

/**
* Encapsulates the interactions with the DataBroker using gRPC.
*/
internal class DataBrokerApiInteraction(
private val managedChannel: ManagedChannel,
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default,
) {
/**
* Sends a request to the DataBroker to respond with the specified [vssPath] and [fields] values.
*
* Throws a [DataBrokerException] in case the connection to the DataBroker is no longer active
*/
suspend fun fetchProperty(
vssPath: String,
fields: List<Field>,
): KuksaValV1.GetResponse {
return withContext(defaultDispatcher) {
val blockingStub = VALGrpc.newBlockingStub(managedChannel)
val entryRequest = KuksaValV1.EntryRequest.newBuilder()
.setPath(vssPath)
.addAllFields(fields)
.build()
val request = KuksaValV1.GetRequest.newBuilder()
.addEntries(entryRequest)
.build()

return@withContext try {
blockingStub.get(request)
} catch (e: StatusRuntimeException) {
throw DataBrokerException(e.message, e)
}
}
}

/**
* Sends a request to the DataBroker to update the specified [fields] of the [vssPath] and replace it's value with
* the specified [updatedDatapoint].
*
* Throws a [DataBrokerException] in case the connection to the DataBroker is no longer active
*/
suspend fun updateProperty(
vssPath: String,
fields: List<Field>,
updatedDatapoint: Types.Datapoint,
): KuksaValV1.SetResponse {
return withContext(defaultDispatcher) {
val blockingStub = VALGrpc.newBlockingStub(managedChannel)
val dataEntry = Types.DataEntry.newBuilder()
.setPath(vssPath)
.setValue(updatedDatapoint)
.build()
val entryUpdate = KuksaValV1.EntryUpdate.newBuilder()
.setEntry(dataEntry)
.addAllFields(fields)
.build()
val request = KuksaValV1.SetRequest.newBuilder()
.addUpdates(entryUpdate)
.build()

return@withContext try {
blockingStub.set(request)
} catch (e: StatusRuntimeException) {
throw DataBrokerException(e.message, e)
}
}
}

/**
* Sends a request to the DataBroker to subscribe to updates of the specified [vssPath] and [field].
* Returns a [Subscription] which can be used to register or unregister additional listeners or cancel / closing
* the subscription.
*
* Throws a [DataBrokerException] in case the connection to the DataBroker is no longer active
*/
fun subscribe(
vssPath: String,
field: Field,
): Subscription {
val asyncStub = VALGrpc.newStub(managedChannel)

val subscribeEntry = KuksaValV1.SubscribeEntry.newBuilder()
.setPath(vssPath)
.addFields(field)
.build()

val request = KuksaValV1.SubscribeRequest.newBuilder()
.addEntries(subscribeEntry)
.build()

val currentContext = Context.current()
val cancellableContext = currentContext.withCancellation()

val subscription = Subscription(vssPath, field, cancellableContext)
cancellableContext.run {
try {
val streamObserver = subscription.SubscriptionStreamObserver()
asyncStub.subscribe(request, streamObserver)
} catch (e: StatusRuntimeException) {
throw DataBrokerException(e.message, e)
}
}

return subscription
}
}
159 changes: 37 additions & 122 deletions kuksa-sdk/src/main/kotlin/org/eclipse/kuksa/DataBrokerConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,18 @@ package org.eclipse.kuksa
import android.util.Log
import io.grpc.ConnectivityState
import io.grpc.ManagedChannel
import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.eclipse.kuksa.extension.TAG
import org.eclipse.kuksa.extension.copy
import org.eclipse.kuksa.model.Property
import org.eclipse.kuksa.pattern.listener.MultiListener
import org.eclipse.kuksa.proto.v1.KuksaValV1
import org.eclipse.kuksa.proto.v1.KuksaValV1.GetResponse
import org.eclipse.kuksa.proto.v1.KuksaValV1.SetResponse
import org.eclipse.kuksa.proto.v1.KuksaValV1.SubscribeResponse
import org.eclipse.kuksa.proto.v1.Types
import org.eclipse.kuksa.proto.v1.Types.Datapoint
import org.eclipse.kuksa.proto.v1.VALGrpc
import org.eclipse.kuksa.subscription.SubscriptionManager
import org.eclipse.kuksa.vsscore.model.VssProperty
import org.eclipse.kuksa.vsscore.model.VssSpecification
import org.eclipse.kuksa.vsscore.model.heritage
Expand All @@ -52,6 +48,9 @@ class DataBrokerConnection internal constructor(
) {
val disconnectListeners = MultiListener<DisconnectListener>()

private val dataBrokerApiInteraction = DataBrokerApiInteraction(managedChannel, dispatcher)
private val subscriptionManager = SubscriptionManager(dataBrokerApiInteraction)

@Suppress("unused")
val subscriptions: Set<Property>
get() = subscribedProperties.copy()
Expand All @@ -74,53 +73,30 @@ class DataBrokerConnection internal constructor(
}

/**
* Subscribes to the specified vssPath with the provided propertyObserver. Once subscribed the application will be
* notified about any changes to the specified vssPath.
* Subscribes to the specified [property] and notifies the provided [propertyObserver] about updates.
*
* @throws DataBrokerException in case the connection to the DataBroker is no longer active
* Throws a [DataBrokerException] in case the connection to the DataBroker is no longer active
*/
fun subscribe(
properties: List<Property>,
property: Property,
propertyObserver: PropertyObserver,
) {
val asyncStub = VALGrpc.newStub(managedChannel)

val subscribeEntries = properties.map { property ->
KuksaValV1.SubscribeEntry.newBuilder()
.addAllFields(property.fields)
.setPath(property.vssPath)
.build()
}
val request = KuksaValV1.SubscribeRequest.newBuilder()
.addAllEntries(subscribeEntries)
.build()

val callback = object : StreamObserver<SubscribeResponse> {
override fun onNext(value: SubscribeResponse) {
Log.v(TAG, "onNext() called with: value = $value")

for (entryUpdate in value.updatesList) {
val entry = entryUpdate.entry
propertyObserver.onPropertyChanged(entry.path, entry)
}
}

override fun onError(t: Throwable?) {
Log.e(TAG, "onError() called with: t = $t, cause: ${t?.cause}")
t?.let { propertyObserver.onError(t) }
}

override fun onCompleted() {
Log.d(TAG, "onCompleted() called")
}
val vssPath = property.vssPath
property.fields.forEach { field ->
subscriptionManager.subscribe(vssPath, field, propertyObserver)
}
}

try {
asyncStub.subscribe(request, callback)

subscribedProperties.addAll(properties)
} catch (e: StatusRuntimeException) {
throw DataBrokerException(e.message, e)
/**
* Unsubscribes the [propertyObserver] from updates of the specified [property].
*/
fun unsubscribe(
property: Property,
propertyObserver: PropertyObserver,
) {
val vssPath = property.vssPath
property.fields.forEach { field ->
subscriptionManager.unsubscribe(vssPath, field, propertyObserver)
}
}

Expand All @@ -134,54 +110,27 @@ class DataBrokerConnection internal constructor(
*
* @throws DataBrokerException in case the connection to the DataBroker is no longer active
*/
@Suppress("exceptions:TooGenericExceptionCaught") // Handling is bundled together
@JvmOverloads
fun <T : VssSpecification> subscribe(
specification: T,
fields: List<Types.Field> = listOf(Types.Field.FIELD_VALUE),
observer: VssSpecificationObserver<T>,
) {
val vssPathToVssProperty = specification.heritage
.ifEmpty { setOf(specification) }
.filterIsInstance<VssProperty<*>>() // Only final leafs with a value can be observed
.groupBy { it.vssPath }
.mapValues { it.value.first() } // Always one result because the vssPath is unique
val leafProperties = vssPathToVssProperty.values
.map { Property(it.vssPath, fields) }
.toList()

try {
Log.d(TAG, "Subscribing to the following properties: $leafProperties")

// TODO: Remove as soon as the server supports subscribing to vssPaths which are not VssProperties
// Reduces the load on the observer for big VssSpecifications. We wait for the initial update
// of all VssProperties before notifying the observer about the first batch
val initialSubscriptionUpdates = leafProperties.associate { it.vssPath to false }.toMutableMap()

// This is currently needed because we get multiple subscribe responses for every heir. Otherwise we
// would override the last heir value with every new response.
var updatedVssSpecification = specification
val propertyObserver = object : PropertyObserver {
override fun onPropertyChanged(vssPath: String, updatedValue: Types.DataEntry) {
Log.v(TAG, "Update from subscribed property: $vssPath - $updatedValue")

updatedVssSpecification = updatedVssSpecification.copy(vssPath, updatedValue.value)

initialSubscriptionUpdates[vssPath] = true
val isInitialSubscriptionComplete = initialSubscriptionUpdates.values.all { it }
if (isInitialSubscriptionComplete) {
Log.d(TAG, "Initial update for subscribed property complete: $vssPath - $updatedValue")
observer.onSpecificationChanged(updatedVssSpecification)
}
}
fields.forEach { field ->
subscriptionManager.subscribe(specification, field, observer)
}
}

override fun onError(throwable: Throwable) {
observer.onError(throwable)
}
}
subscribe(leafProperties, propertyObserver)
} catch (e: Exception) {
throw DataBrokerException(e.message, e)
/**
* Unsubscribes the [observer] from updates of the specified [fields] and [specification].
*/
fun <T : VssSpecification> unsubscribe(
specification: T,
fields: List<Types.Field> = listOf(Types.Field.FIELD_VALUE),
observer: VssSpecificationObserver<T>,
) {
fields.forEach { field ->
subscriptionManager.unsubscribe(specification, field, observer)
}
}

Expand All @@ -192,22 +141,7 @@ class DataBrokerConnection internal constructor(
*/
suspend fun fetch(property: Property): GetResponse {
Log.d(TAG, "fetchProperty() called with: property: $property")
return withContext(dispatcher) {
val blockingStub = VALGrpc.newBlockingStub(managedChannel)
val entryRequest = KuksaValV1.EntryRequest.newBuilder()
.setPath(property.vssPath)
.addAllFields(property.fields)
.build()
val request = KuksaValV1.GetRequest.newBuilder()
.addEntries(entryRequest)
.build()

return@withContext try {
blockingStub.get(request)
} catch (e: StatusRuntimeException) {
throw DataBrokerException(e.message, e)
}
}
return dataBrokerApiInteraction.fetchProperty(property.vssPath, property.fields)
}

/**
Expand Down Expand Up @@ -261,26 +195,7 @@ class DataBrokerConnection internal constructor(
updatedDatapoint: Datapoint,
): SetResponse {
Log.d(TAG, "updateProperty() called with: updatedProperty = $property")
return withContext(dispatcher) {
val blockingStub = VALGrpc.newBlockingStub(managedChannel)
val dataEntry = Types.DataEntry.newBuilder()
.setPath(property.vssPath)
.setValue(updatedDatapoint)
.build()
val entryUpdate = KuksaValV1.EntryUpdate.newBuilder()
.setEntry(dataEntry)
.addAllFields(property.fields)
.build()
val request = KuksaValV1.SetRequest.newBuilder()
.addUpdates(entryUpdate)
.build()

return@withContext try {
blockingStub.set(request)
} catch (e: StatusRuntimeException) {
throw DataBrokerException(e.message, e)
}
}
return dataBrokerApiInteraction.updateProperty(property.vssPath, property.fields, updatedDatapoint)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@

package org.eclipse.kuksa

import org.eclipse.kuksa.pattern.listener.Listener
import org.eclipse.kuksa.proto.v1.Types.DataEntry
import org.eclipse.kuksa.proto.v1.Types.Field
import org.eclipse.kuksa.vsscore.model.VssSpecification

/**
* The Observer is used to notify about changes to subscribed properties.
*/
interface PropertyObserver {
interface PropertyObserver : Listener {
/**
* Will be triggered with the [updatedValue] when the underlying [vssPath] changed it's value.
* Will be triggered with the [updatedValue] when the underlying [field] of the [vssPath] changed it's value.
*/
fun onPropertyChanged(vssPath: String, updatedValue: DataEntry)
fun onPropertyChanged(vssPath: String, field: Field, updatedValue: DataEntry)

/**
* Will be triggered when an error happens during subscription and forwards the [throwable].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ interface ListenerCollection<T : Listener> : Iterable<T> {
* Removes a [listener] and returns true if the [listener] was successfully removed, returns false otherwise.
*/
fun unregister(listener: T): Boolean

/**
* isEmpty checks the number of registered listeners and returns true, if no listener is registered or false if at
* least one listener is registered.
*/
fun isEmpty(): Boolean
}

0 comments on commit 69c6294

Please sign in to comment.