Skip to content

Commit

Permalink
Merge pull request #1217 from relu91/fix_1216
Browse files Browse the repository at this point in the history
Align `value` function to Scripting API
  • Loading branch information
relu91 committed Jan 17, 2024
2 parents 6901b5a + ac4e5ae commit cee07c2
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 54 deletions.
2 changes: 1 addition & 1 deletion packages/binding-http/test/http-server-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class HttpServerTest {
let test: DataSchemaValue;
testThing.setPropertyReadHandler("test", (_) => Promise.resolve(test));
testThing.setPropertyWriteHandler("test", async (value) => {
test = await value.value();
test = Buffer.from(await value.arrayBuffer()).toString("utf-8");
});

testThing.setActionHandler("try", async (input: WoT.InteractionOutput) => {
Expand Down
23 changes: 20 additions & 3 deletions packages/binding-opcua/test/full-opcua-thing-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const thingDescription: WoT.ThingDescription = {
observable: true,
readOnly: true,
unit: "°C",
type: "number",
"opcua:nodeId": { root: "i=84", path: "/Objects/1:MySensor/2:ParameterSet/1:Temperature" },
// Don't specifu type here as it could be multi form: type: [ "object", "number" ],
forms: [
Expand Down Expand Up @@ -111,6 +112,7 @@ const thingDescription: WoT.ThingDescription = {
description: "the temperature set point",
observable: true,
unit: "°C",
type: "number",
// dont't
forms: [
{
Expand Down Expand Up @@ -358,10 +360,25 @@ describe("Full OPCUA Thing Test", () => {

return { thing, servient };
}
async function doTest(thing: WoT.ConsumedThing, propertyName: string, localOptions: InteractionOptions) {
async function doTest(
thing: WoT.ConsumedThing,
propertyName: string,
localOptions: InteractionOptions,
forceParsing = false
) {
debug("------------------------------------------------------");
try {
const content = await thing.readProperty(propertyName, localOptions);
if (forceParsing) {
// In opcua binding it is possible to return a special response that contains
// richer details than the bare value. However this makes the returned value
// not complaint with its data schema. Therefore we have to fallback to
// custom parsing.
const raw = await content.arrayBuffer();
const json = JSON.parse(Buffer.from(raw).toString("utf-8"));
debug(json?.toString());
return json;
}
const json = await content.value();
debug(json?.toString());
return json;
Expand Down Expand Up @@ -395,13 +412,13 @@ describe("Full OPCUA Thing Test", () => {
const json1 = await doTest(thing, propertyName, { formIndex: 1 });
expect(json1).to.eql(25);

const json2 = await doTest(thing, propertyName, { formIndex: 2 });
const json2 = await doTest(thing, propertyName, { formIndex: 2 }, true);
expect(json2).to.eql({ Type: 11, Body: 25 });

expect(thingDescription.properties?.temperature.forms[3].contentType).eql(
"application/opcua+json;type=DataValue"
);
const json3 = await doTest(thing, propertyName, { formIndex: 3 });
const json3 = await doTest(thing, propertyName, { formIndex: 3 }, true);
debug(json3?.toString());
expect((json3 as Record<string, unknown>).Value).to.eql({ Type: 11, Body: 25 });
} finally {
Expand Down
79 changes: 41 additions & 38 deletions packages/core/src/interaction-output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import * as util from "util";
import * as WoT from "wot-typescript-definitions";
import { ContentSerdes } from "./content-serdes";
import { ProtocolHelpers } from "./core";
import { DataSchemaError, NotSupportedError } from "./errors";
import { DataSchemaError, NotReadableError, NotSupportedError } from "./errors";
import { Content } from "./content";
import Ajv from "ajv";
import { createLoggers } from "./logger";
Expand All @@ -32,17 +32,17 @@ const { debug } = createLoggers("core", "interaction-output");
const ajv = new Ajv({ strict: false });

export class InteractionOutput implements WoT.InteractionOutput {
private content: Content;
private parsedValue: unknown;
private buffer?: ArrayBuffer;
private _stream?: ReadableStream;
#content: Content;
#value: unknown;
#buffer?: ArrayBuffer;
#stream?: ReadableStream;
dataUsed: boolean;
form?: WoT.Form;
schema?: WoT.DataSchema;

public get data(): ReadableStream {
if (this._stream) {
return this._stream;
if (this.#stream) {
return this.#stream;
}

if (this.dataUsed) {
Expand All @@ -51,71 +51,74 @@ export class InteractionOutput implements WoT.InteractionOutput {
// Once the stream is created data might be pulled unpredictably
// therefore we assume that it is going to be used to be safe.
this.dataUsed = true;
return (this._stream = ProtocolHelpers.toWoTStream(this.content.body) as ReadableStream);
return (this.#stream = ProtocolHelpers.toWoTStream(this.#content.body) as ReadableStream);
}

constructor(content: Content, form?: WoT.Form, schema?: WoT.DataSchema) {
this.content = content;
this.#content = content;
this.form = form;
this.schema = schema;
this.dataUsed = false;
}

async arrayBuffer(): Promise<ArrayBuffer> {
if (this.buffer) {
return this.buffer;
if (this.#buffer) {
return this.#buffer;
}

if (this.dataUsed) {
throw new Error("Can't read the stream once it has been already used");
}

const data = await this.content.toBuffer();
const data = await this.#content.toBuffer();
this.dataUsed = true;
this.buffer = data;
this.#buffer = data;

return data;
}

async value<T extends WoT.DataSchemaValue>(): Promise<T> {
// the value has been already read?
if (this.parsedValue !== undefined) {
return this.parsedValue as T;
if (this.#value !== undefined) {
return this.#value as T;
}

if (this.dataUsed) {
throw new Error("Can't read the stream once it has been already used");
throw new NotReadableError("Can't read the stream once it has been already used");
}

if (this.form == null) {
throw new NotReadableError("No form defined");
}

if (this.schema == null || this.schema.type == null) {
throw new NotReadableError("No schema defined");
}

// is content type valid?
if (!this.form || !ContentSerdes.get().isSupported(this.content.type)) {
const message = !this.form ? "Missing form" : `Content type ${this.content.type} not supported`;
if (!ContentSerdes.get().isSupported(this.#content.type)) {
const message = `Content type ${this.#content.type} not supported`;
throw new NotSupportedError(message);
}

// read fully the stream
const data = await this.content.toBuffer();
const bytes = await this.#content.toBuffer();
this.dataUsed = true;
this.buffer = data;

// call the contentToValue
// TODO: should be fixed contentToValue MUST define schema as nullable
const value = ContentSerdes.get().contentToValue({ type: this.content.type, body: data }, this.schema ?? {});

// any data (schema)?
if (this.schema) {
// validate the schema
const validate = ajv.compile<T>(this.schema);

if (!validate(value)) {
debug(`schema = ${util.inspect(this.schema, { depth: 10, colors: true })}`);
debug(`value: ${value}`);
debug(`Errror: ${validate.errors}`);
throw new DataSchemaError("Invalid value according to DataSchema", value as WoT.DataSchemaValue);
}
this.#buffer = bytes;

const json = ContentSerdes.get().contentToValue({ type: this.#content.type, body: bytes }, this.schema);

// validate the schema
const validate = ajv.compile<T>(this.schema);

if (!validate(json)) {
debug(`schema = ${util.inspect(this.schema, { depth: 10, colors: true })}`);
debug(`value: ${json}`);
debug(`Errror: ${validate.errors}`);
throw new DataSchemaError("Invalid value according to DataSchema", json as WoT.DataSchemaValue);
}

this.parsedValue = value;
return this.parsedValue as T;
this.#value = json;
return json;
}
}
22 changes: 10 additions & 12 deletions packages/core/src/wot-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,34 @@ import Helpers from "./helpers";
import { createLoggers } from "./logger";
import ContentManager from "./content-serdes";
import { getLastValidationErrors, isThingDescription } from "./validation";
import { inspect } from "util";

const { debug } = createLoggers("core", "wot-impl");

class ThingDiscoveryProcess implements WoT.ThingDiscoveryProcess {
constructor(rawThingDescriptions: WoT.DataSchemaValue, filter?: WoT.ThingFilter) {
constructor(private directory: ConsumedThing, public filter?: WoT.ThingFilter) {
this.filter = filter;
this.done = false;
this.rawThingDescriptions = rawThingDescriptions;
}

rawThingDescriptions: WoT.DataSchemaValue;

filter?: WoT.ThingFilter | undefined;
done: boolean;
error?: Error | undefined;
async stop(): Promise<void> {
this.done = true;
}

async *[Symbol.asyncIterator](): AsyncIterator<WoT.ThingDescription> {
if (!(this.rawThingDescriptions instanceof Array)) {
this.error = new Error("Encountered an invalid output value.");
let rawThingDescriptions: WoT.ThingDescription[];
try {
const thingsPropertyOutput = await this.directory.readProperty("things");
rawThingDescriptions = (await thingsPropertyOutput.value()) as WoT.ThingDescription[];
} catch (error) {
this.error = error instanceof Error ? error : new Error(inspect(error));
this.done = true;
return;
}

for (const outputValue of this.rawThingDescriptions) {
for (const outputValue of rawThingDescriptions) {
if (this.done) {
return;
}
Expand Down Expand Up @@ -81,10 +82,7 @@ export default class WoTImpl {
const directoyThingDescription = await this.requestThingDescription(url);
const consumedDirectoy = await this.consume(directoyThingDescription);

const thingsPropertyOutput = await consumedDirectoy.readProperty("things");
const rawThingDescriptions = await thingsPropertyOutput.value();

return new ThingDiscoveryProcess(rawThingDescriptions, filter);
return new ThingDiscoveryProcess(consumedDirectoy, filter);
}

/** @inheritDoc */
Expand Down
1 change: 1 addition & 0 deletions packages/core/test/DiscoveryTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function createDirectoryTestTd(title: string, thingsPropertyHref: string) {
},
properties: {
things: {
type: "array",
forms: [
{
href: thingsPropertyHref,
Expand Down

0 comments on commit cee07c2

Please sign in to comment.