Skip to content

Commit

Permalink
feature: Correctly trigger Actuator Target Events
Browse files Browse the repository at this point in the history
Closes: #29
Signed-Off-By: Andre Weber <andre.weber3@etas.com>
  • Loading branch information
wba2hi committed Nov 3, 2023
1 parent 0535546 commit 3c2428b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.eclipse.kuksa.proto.v1.KuksaValV1.GetResponse
import org.eclipse.kuksa.proto.v1.KuksaValV1.SetResponse
import org.eclipse.kuksa.proto.v1.Types
import org.eclipse.kuksa.proto.v1.Types.Datapoint
import org.eclipse.kuksa.proto.v1.Types.Field
import org.eclipse.kuksa.subscription.DataBrokerSubscriber
import org.eclipse.kuksa.vsscore.model.VssProperty
import org.eclipse.kuksa.vsscore.model.VssSpecification
Expand Down Expand Up @@ -117,7 +118,7 @@ class DataBrokerConnection internal constructor(
@JvmOverloads
fun <T : VssSpecification> subscribe(
specification: T,
fields: List<Types.Field> = listOf(Types.Field.FIELD_VALUE),
fields: List<Field> = listOf(Field.FIELD_VALUE),
listener: VssSpecificationListener<T>,
) {
fields.forEach { field ->
Expand All @@ -130,7 +131,7 @@ class DataBrokerConnection internal constructor(
*/
fun <T : VssSpecification> unsubscribe(
specification: T,
fields: List<Types.Field> = listOf(Types.Field.FIELD_VALUE),
fields: List<Field> = listOf(Field.FIELD_VALUE),
listener: VssSpecificationListener<T>,
) {
fields.forEach { field ->
Expand Down Expand Up @@ -160,7 +161,7 @@ class DataBrokerConnection internal constructor(
@JvmOverloads
suspend fun <T : VssSpecification> fetch(
specification: T,
fields: List<Types.Field> = listOf(Types.Field.FIELD_VALUE),
fields: List<Field> = listOf(Field.FIELD_VALUE),
): T {
return withContext(dispatcher) {
try {
Expand Down Expand Up @@ -204,18 +205,22 @@ class DataBrokerConnection internal constructor(

/**
* Only a [VssProperty] can be updated because they have an actual value. When provided with any parent
* [VssSpecification] then this [update] method will find all [VssProperty] children and updates them instead.
* [VssSpecification] then this [update] method will find all [VssProperty] children and updates their corresponding
* [fields] instead.
* Compared to [update] with only one [Property] and [Datapoint], here multiple [SetResponse] will be returned
* because a [VssSpecification] may consists of multiple values which may need to be updated.
*
* @throws DataBrokerException in case the connection to the DataBroker is no longer active
* @throws IllegalArgumentException if the [VssProperty] could not be converted to a [Datapoint].
*/
suspend fun update(vssSpecification: VssSpecification): List<SetResponse> {
suspend fun update(
vssSpecification: VssSpecification,
fields: List<Field> = listOf(Field.FIELD_VALUE),
): List<SetResponse> {
val responses = mutableListOf<SetResponse>()

vssSpecification.vssProperties.forEach { vssProperty ->
val property = Property(vssProperty.vssPath)
val property = Property(vssProperty.vssPath, fields)
val response = update(property, vssProperty.datapoint)
responses.add(response)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.eclipse.kuksa.extension.TAG
import org.eclipse.kuksa.proto.v1.KuksaValV1
import org.eclipse.kuksa.proto.v1.KuksaValV1.SubscribeResponse
import org.eclipse.kuksa.proto.v1.Types
Expand Down Expand Up @@ -94,16 +95,11 @@ internal class DataBrokerTransporter(
): KuksaValV1.SetResponse {
return withContext(defaultDispatcher) {
val blockingStub = VALGrpc.newBlockingStub(managedChannel)
val dataEntry = Types.DataEntry.newBuilder()
.setPath(vssPath)
.setValue(updatedDatapoint)
.build()
val entryUpdate = KuksaValV1.EntryUpdate.newBuilder()
.setEntry(dataEntry)
.addAllFields(fields)
.build()

val entryUpdates = fields.map { createEntryUpdate(vssPath, it, updatedDatapoint) }

val request = KuksaValV1.SetRequest.newBuilder()
.addUpdates(entryUpdate)
.addAllUpdates(entryUpdates)
.build()

return@withContext try {
Expand Down Expand Up @@ -162,7 +158,7 @@ internal class DataBrokerTransporter(
}

override fun onCompleted() {
Log.d("TAG", "onCompleted() called")
Log.d(TAG, "onCompleted() called")
}
}

Expand All @@ -176,4 +172,30 @@ internal class DataBrokerTransporter(

return subscription
}

private fun createEntryUpdate(
vssPath: String,
field: Field,
updatedDatapoint: Types.Datapoint,
): KuksaValV1.EntryUpdate {
val builder = Types.DataEntry.newBuilder()
.setPath(vssPath)

when (field) {
Field.FIELD_ACTUATOR_TARGET -> {
builder.actuatorTarget = updatedDatapoint
}

else -> {
builder.value = updatedDatapoint
}
}

builder.build()

return KuksaValV1.EntryUpdate.newBuilder()
.setEntry(builder.build())
.addFields(field)
.build()
}
}

0 comments on commit 3c2428b

Please sign in to comment.