Skip to content

Commit

Permalink
feat(core): implement subscription interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed Jan 18, 2022
1 parent b6c79a7 commit 60354a5
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 52 deletions.
166 changes: 147 additions & 19 deletions lib/src/core/consumed_thing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

import '../../scripting_api.dart' as scripting_api;
import '../../scripting_api.dart'
hide ConsumedThing, InteractionOutput, Subscription;
import '../../scripting_api.dart' hide ConsumedThing, InteractionOutput;
import '../definitions/data_schema.dart';
import '../definitions/form.dart';
import '../definitions/interaction_affordances/interaction_affordance.dart';
import '../definitions/security_scheme.dart';
Expand All @@ -19,7 +19,6 @@ import 'interaction_output.dart';
import 'operation_type.dart';
import 'protocol_interfaces/protocol_client.dart';
import 'servient.dart';
import 'subscription.dart';

enum _AffordanceType {
action,
Expand Down Expand Up @@ -52,6 +51,10 @@ class ConsumedThing implements scripting_api.ConsumedThing {
final List<String> _security = [];
final Map<String, SecurityScheme> _securityDefinitions = {};

final Map<String, scripting_api.Subscription> _subscribedEvents = {};

final Map<String, scripting_api.Subscription> _observedProperties = {};

/// Constructor
ConsumedThing(this.servient, this.thingDescription)
: title = thingDescription.title {
Expand All @@ -73,14 +76,14 @@ class ConsumedThing implements scripting_api.ConsumedThing {
}

ProtocolClient client;
Form form;
Form foundForm;

final int? formIndex = options?.formIndex;

if (formIndex != null) {
if (formIndex >= 0 && formIndex < forms.length) {
form = forms[formIndex];
final scheme = Uri.parse(form.href).scheme;
foundForm = forms[formIndex];
final scheme = Uri.parse(foundForm.href).scheme;
client = servient.clientFor(scheme);
} else {
throw ArgumentError('ConsumedThing "$title" missing formIndex for '
Expand All @@ -90,15 +93,16 @@ class ConsumedThing implements scripting_api.ConsumedThing {
// ignore: unused_local_variable
final schemes = forms.map((form) => Uri.parse(form.href).scheme);

form =
forms.firstWhere((form) => hasClientFor(Uri.parse(form.href).scheme));
final scheme = Uri.parse(form.href).scheme;
foundForm = forms.firstWhere((form) =>
hasClientFor(Uri.parse(form.href).scheme) &&
_supportsOperationType(form, affordanceType, operationType));
final scheme = Uri.parse(foundForm.href).scheme;
client = servient.clientFor(scheme);
}

_ensureClientSecurity(client, form);
_ensureClientSecurity(client, foundForm);

return _ClientAndForm(client, form);
return _ClientAndForm(client, foundForm);
}

@override
Expand Down Expand Up @@ -219,9 +223,88 @@ class ConsumedThing implements scripting_api.ConsumedThing {
@override
Future<Subscription> observeProperty(
String propertyName, scripting_api.InteractionListener listener,
[scripting_api.ErrorListener? onError, InteractionOptions? options]) {
// TODO(JKRhb): implement observeProperty
throw UnimplementedError();
[scripting_api.ErrorListener? onError,
InteractionOptions? options]) async {
final property = thingDescription.properties[propertyName];

if (property == null) {
throw StateError(
'ConsumedThing $title does not have property $propertyName');
}

if (_observedProperties.containsKey(propertyName)) {
throw ArgumentError("ConsumedThing '$title' already has a function "
"subscribed to $propertyName. You can only observe once");
}

return _createSubscription(property, options, listener, onError,
propertyName, property, SubscriptionType.property);
}

Future<Subscription> _createSubscription(
InteractionAffordance affordance,
scripting_api.InteractionOptions? options,
scripting_api.InteractionListener listener,
scripting_api.ErrorListener? onError,
String affordanceName,
DataSchema? dataSchema,
SubscriptionType subscriptionType,
) async {
OperationType operationType;
_AffordanceType affordanceType;
if (subscriptionType == SubscriptionType.property) {
operationType = OperationType.observeproperty;
affordanceType = _AffordanceType.property;
} else {
operationType = OperationType.subscribeevent;
affordanceType = _AffordanceType.event;
}

final clientAndForm = _getClientFor(
affordance.augmentedForms, operationType, affordanceType, options);

final form = clientAndForm.form; // TODO(JKRhb): Handle URI variables
final client = clientAndForm.client;

final subscription = await client.subscribeResource(
form, () => removeSubscription(affordanceName, subscriptionType),
(content) {
try {
listener(InteractionOutput(
content, servient.contentSerdes, form, dataSchema));
} on Exception {
// Exception is handled by onError function. Not sure if this is the
// best design, though.
// TODO(JKRhb): Check if this try-catch-block can be removed.
}
}, (error) {
if (onError != null) {
onError(error);
}
}, () {
// TODO(JKRhb): current scripting api cannot handle this (apparently)
});

if (subscriptionType == SubscriptionType.property) {
_observedProperties[affordanceName] = subscription;
} else {
_subscribedEvents[affordanceName] = subscription;
}

return subscription;
}

Future<PropertyReadMap> _readProperties(
List<String> propertyNames, InteractionOptions? options) async {
final Map<String, Future<InteractionOutput>> outputs = {};

for (final propertyName in propertyNames) {
outputs[propertyName] = readProperty(propertyName, options);
}

final outputList = await Future.wait(outputs.values);

return Map.fromIterables(outputs.keys, outputList);
}

@override
Expand All @@ -241,15 +324,60 @@ class ConsumedThing implements scripting_api.ConsumedThing {
Future<Subscription> subscribeEvent(
String eventName, scripting_api.InteractionListener listener,
[scripting_api.ErrorListener? onError, InteractionOptions? options]) {
// TODO(JKRhb): implement subscribeEvent
throw UnimplementedError();
// TODO(JKRhb): Handle subscription and cancellation data.
final event = thingDescription.events[eventName];

if (event == null) {
throw StateError('ConsumedThing $title does not have event $eventName');
}

if (_subscribedEvents.containsKey(eventName)) {
throw ArgumentError("ConsumedThing '$title' already has a function "
"subscribed to $eventName. You can only subscribe once.");
}

return _createSubscription(event, options, listener, onError, eventName,
event.data, SubscriptionType.event);
}

@override
Future<void> writeMultipleProperties(PropertyWriteMap valueMap,
[InteractionOptions? options]) {
// TODO(JKRhb): implement writeMultipleProperties
throw UnimplementedError();
[InteractionOptions? options]) async {
await Future.wait(
valueMap.keys.map((key) => writeProperty(key, valueMap[key])));
}

/// Removes a subscription with a specified [key] and [type].
void removeSubscription(String key, SubscriptionType type) {
switch (type) {
case SubscriptionType.property:
_observedProperties.remove(key);
break;
case SubscriptionType.event:
_subscribedEvents.remove(key);
break;
}
}

static bool _supportsOperationType(
Form form, _AffordanceType affordanceType, OperationType operationType) {
List<String>? operationTypes = form.op;

// TODO(JKRhb): Replace with constants or stringified OperationType enum
// values.
switch (affordanceType) {
case _AffordanceType.property:
operationTypes ??= ["readproperty", "writeproperty"];
break;
case _AffordanceType.action:
operationTypes ??= ["invokeaction"];
break;
case _AffordanceType.event:
operationTypes ??= ["subscribeevent", "unsubscribeevent"];
break;
}

return operationTypes.contains(operationType.toShortString());
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/src/core/protocol_interfaces/protocol_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

import '../../definitions/form.dart';
import '../../definitions/security_scheme.dart';
import '../../scripting_api/subscription.dart';
import '../content.dart';
import '../credentials.dart';
import '../subscription.dart';

/// Base class for a Protocol Client.
abstract class ProtocolClient {
Expand All @@ -37,6 +37,7 @@ abstract class ProtocolClient {
/// [form].
Future<Subscription> subscribeResource(
Form form,
void Function() deregisterObservation,
void Function(Content content) next,
void Function(Exception error)? error,
void Function()? complete);
Expand Down
32 changes: 0 additions & 32 deletions lib/src/core/subscription.dart

This file was deleted.

0 comments on commit 60354a5

Please sign in to comment.