From b13534077adab56c802d6ec33cba191296d1c2e4 Mon Sep 17 00:00:00 2001 From: Rebecca Williams Date: Fri, 1 Nov 2024 09:44:35 +0000 Subject: [PATCH 1/7] Switch to use PVWS as the backend PV server ... instead of Coniql. Rename class and update references. Update tests using a mocked websocket connection requiring a new package to be installed. --- .eslintrc.json | 1 + package-lock.json | 27 +- package.json | 3 +- src/connection/coniql.test.ts | 144 ---------- src/connection/coniql.ts | 478 ---------------------------------- src/connection/pvws.test.ts | 81 ++++++ src/connection/pvws.ts | 284 ++++++++++++++++++++ src/redux/store.ts | 29 +-- 8 files changed, 408 insertions(+), 639 deletions(-) delete mode 100644 src/connection/coniql.test.ts delete mode 100644 src/connection/coniql.ts create mode 100644 src/connection/pvws.test.ts create mode 100644 src/connection/pvws.ts diff --git a/.eslintrc.json b/.eslintrc.json index dd76ebad..3b728335 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -36,6 +36,7 @@ } ], "import/no-cycle": "error", + "import/no-named-as-default": 0, "prettier/prettier": [ "error", { diff --git a/package-lock.json b/package-lock.json index 4c0b64e2..46518ef9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -58,6 +58,7 @@ "vite-plugin-node-polyfills": "^0.22.0", "vitest": "^2.0.5", "vitest-canvas-mock": "^0.3.3", + "vitest-websocket-mock": "^0.4.0", "xml-js": "^1.6.11" }, "peerDependencies": { @@ -9662,6 +9663,16 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/mock-socket": { + "version": "9.3.1", + "resolved": "https://registry.npmjs.org/mock-socket/-/mock-socket-9.3.1.tgz", + "integrity": "sha512-qxBgB7Qa2sEQgHFjj0dSigq7fX4k6Saisd5Nelwp2q8mlbAFh5dHV9JTTlF8viYJLSSWgMCZFUom8PJcMNBoJw==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">= 8" + } + }, "node_modules/moo-color": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/moo-color/-/moo-color-1.0.3.tgz", @@ -13884,6 +13895,20 @@ "vitest": "*" } }, + "node_modules/vitest-websocket-mock": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/vitest-websocket-mock/-/vitest-websocket-mock-0.4.0.tgz", + "integrity": "sha512-tGnOwE2nC8jfioQXDrX+lZ8EVrF+IO2NVqe1vV9h945W/hlR0S6ZYbMqCJGG3Nyd//c5XSe1IGLD2ZgE2D1I7Q==", + "dev": true, + "license": "MIT", + "dependencies": { + "@vitest/utils": "^2.0.3", + "mock-socket": "^9.2.1" + }, + "peerDependencies": { + "vitest": ">=2" + } + }, "node_modules/vitest/node_modules/magic-string": { "version": "0.30.11", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.11.tgz", @@ -14220,4 +14245,4 @@ } } } -} \ No newline at end of file +} diff --git a/package.json b/package.json index 92102474..a27eb11f 100644 --- a/package.json +++ b/package.json @@ -67,6 +67,7 @@ "vite-plugin-node-polyfills": "^0.22.0", "vitest": "^2.0.5", "vitest-canvas-mock": "^0.3.3", + "vitest-websocket-mock": "^0.4.0", "xml-js": "^1.6.11" }, "peerDependencies": { @@ -85,4 +86,4 @@ "trailingComma": "none", "arrowParens": "avoid" } -} \ No newline at end of file +} diff --git a/src/connection/coniql.test.ts b/src/connection/coniql.test.ts deleted file mode 100644 index 67e3978c..00000000 --- a/src/connection/coniql.test.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { ApolloClient } from "@apollo/client"; -import { - ConiqlPlugin, - ConiqlStatus, - ConiqlTime, - ConiqlBase64Array, - DEVICE_QUERY -} from "./coniql"; -import { DType } from "../types/dtypes"; -import { Mock, vi } from "vitest"; - -/* This mocks the observable returned by apolloclient.subscribe(). - Its subscribe method calls the next() method on its parameter - with the data expected to be returned by Coniql. */ -class MockObservable { - private float?: number; - private string?: string; - private array?: ConiqlBase64Array; - private time?: ConiqlTime; - private status?: ConiqlStatus; - - public constructor(content: { - float?: number; - string?: string; - array?: ConiqlBase64Array; - time?: ConiqlTime; - status?: ConiqlStatus; - }) { - const { float, string, array, time, status } = content; - this.float = float; - this.string = string; - this.array = array; - this.time = time; - this.status = status; - } - - public subscribe(x: any): void { - x.next({ - data: { - subscribeChannel: { - value: { - float: this.float, - string: this.string, - base64Array: this.array - }, - time: this.time - } - } - }); - } -} - -describe("ConiqlPlugin", (): void => { - let cp: ConiqlPlugin; - let mockConnUpdate: Mock; - let mockValUpdate: Mock; - let mockDevUpdate: Mock; - beforeEach((): void => { - cp = new ConiqlPlugin("a.b.c:100", false); - mockConnUpdate = vi.fn(); - mockValUpdate = vi.fn(); - mockDevUpdate = vi.fn(); - cp.connect(mockConnUpdate, mockValUpdate, mockDevUpdate); - }); - - it("handles update to value", (): void => { - ApolloClient.prototype.subscribe = vi.fn( - (_): MockObservable => new MockObservable({ float: 42 }) - ) as Mock; - cp.subscribe("hello", { string: true }); - expect(ApolloClient.prototype.subscribe).toHaveBeenCalled(); - expect(mockValUpdate).toHaveBeenCalledWith( - "hello", - new DType({ doubleValue: 42 }, undefined, undefined, undefined, true) - ); - }); - - it("handles update to array value", (): void => { - ApolloClient.prototype.subscribe = vi.fn( - (_): MockObservable => - new MockObservable( - // Corresponds to Int32Array with values [0, 1, 2] - { - array: { - numberType: "INT32", - base64: "AAAAAAEAAAACAAAA" - } - } - ) - ) as Mock; - cp.subscribe("hello", { string: true }); - expect(ApolloClient.prototype.subscribe).toHaveBeenCalled(); - expect(mockValUpdate).toHaveBeenCalledWith( - "hello", - new DType( - { arrayValue: Int32Array.from([0, 1, 2]) }, - undefined, - undefined, - undefined, - true - ) - ); - }); - - it("handles update to time", (): void => { - ApolloClient.prototype.subscribe = vi.fn( - (_): MockObservable => - new MockObservable({ - time: { - datetime: new Date(2017, 1, 1) - } - }) - ) as Mock; - cp.subscribe("hello", { string: true }); - expect(ApolloClient.prototype.subscribe).toHaveBeenCalled(); - const calls = mockValUpdate.mock.calls; - expect(calls.length).toBe(1); - const [pv, value] = mockValUpdate.mock.calls[0]; - expect(pv).toBe("hello"); - expect(value.time?.datetime?.getFullYear()).toBe(2017); - }); - - it("queries with device query", (): void => { - const catchFunc = vi.fn(); - const thenFunc = vi.fn(() => { - return { catch: catchFunc }; - }); - const query: any = vi.fn(() => { - return { then: thenFunc }; - }); - - // 'as any' as client is a private property - (cp as any).client.query = query; - cp.getDevice("dev://stuff"); - expect((cp as any).client.query).toHaveBeenCalledWith({ - query: DEVICE_QUERY, - variables: { device: "stuff" } - }); - }); - - it("unsubscribes_with_no_errors", (): void => { - expect(() => cp.unsubscribe("hello")).not.toThrow(TypeError); - }); -}); diff --git a/src/connection/coniql.ts b/src/connection/coniql.ts deleted file mode 100644 index d34e9287..00000000 --- a/src/connection/coniql.ts +++ /dev/null @@ -1,478 +0,0 @@ -/* Module that handles a GraphQL connection to the Coniql server. - See https://github.com/DiamondLightSource/coniql - */ -import log from "loglevel"; -import base64js from "base64-js"; -import { ApolloClient, ApolloLink, from } from "@apollo/client"; -import { RetryLink } from "apollo-link-retry"; -import { HttpLink } from "@apollo/client/link/http"; -import { onError } from "@apollo/client/link/error"; -import { InMemoryCache, NormalizedCacheObject } from "@apollo/client/cache"; -import { - ObservableSubscription, - getMainDefinition -} from "@apollo/client/utilities"; -import { gql } from "graphql-tag"; -import { - Connection, - ConnectionChangedCallback, - ValueChangedCallback, - nullConnCallback, - nullValueCallback, - SubscriptionType, - DeviceQueriedCallback, - nullDeviceCallback -} from "./plugin"; -import { Client, createClient } from "graphql-ws"; -import { GraphQLWsLink } from "@apollo/client/link/subscriptions"; -import { - DType, - DTime, - DAlarm, - AlarmQuality, - DDisplay, - DRange, - ChannelRole, - DisplayForm -} from "../types/dtypes"; - -export interface ConiqlStatus { - quality: "ALARM" | "WARNING" | "VALID" | "INVALID" | "UNDEFINED" | "CHANGING"; - message: string; - mutable: boolean; -} - -const QUALITY_TYPES = { - VALID: AlarmQuality.VALID, - ALARM: AlarmQuality.ALARM, - WARNING: AlarmQuality.WARNING, - INVALID: AlarmQuality.INVALID, - UNDEFINED: AlarmQuality.UNDEFINED, - CHANGING: AlarmQuality.CHANGING -}; - -interface ConiqlRange { - min: number; - max: number; -} - -interface ConiqlDisplay { - description: string; - role: "RW" | "WO" | "RO"; - controlRange: ConiqlRange; - displayRange: ConiqlRange; - warningRange: ConiqlRange; - alarmRange: ConiqlRange; - units: string; - precision: number; - form: FORM; - choices: string[]; -} - -const ROLES = { - RW: ChannelRole.RW, - RO: ChannelRole.RO, - WO: ChannelRole.WO -}; - -type FORM = - | "DEFAULT" - | "STRING" - | "BINARY" - | "DECIMAL" - | "HEX" - | "EXPONENTIAL" - | "ENGINEERING"; - -const FORMS = { - DEFAULT: DisplayForm.DEFAULT, - STRING: DisplayForm.STRING, - BINARY: DisplayForm.BINARY, - DECIMAL: DisplayForm.DECIMAL, - HEX: DisplayForm.HEX, - EXPONENTIAL: DisplayForm.EXPONENTIAL, - ENGINEERING: DisplayForm.ENGINEERING -}; - -type CONIQL_TYPE = - | "INT8" - | "UINT8" - | "INT16" - | "UINT16" - | "INT32" - | "UINT32" - | "INT32" - | "INT64" - | "FLOAT32" - | "FLOAT64"; - -const ARRAY_TYPES = { - INT8: Int8Array, - UINT8: Uint8Array, - INT16: Int16Array, - UINT16: Uint16Array, - INT32: Int32Array, - UINT32: Uint32Array, - INT64: BigInt64Array, - UINT64: BigUint64Array, - FLOAT32: Float32Array, - FLOAT64: Float64Array -}; - -export interface ConiqlBase64Array { - numberType: CONIQL_TYPE; - base64: string; -} - -interface ConiqlValue { - string: string; - float: number; - base64Array: ConiqlBase64Array; - stringArray: string[]; -} - -export interface ConiqlTime { - datetime: Date; -} - -function coniqlToDType( - value: ConiqlValue, - timeVal: ConiqlTime, - status: ConiqlStatus, - display: ConiqlDisplay -): DType { - let alarm = undefined; - let ddisplay = undefined; - if (status) { - alarm = new DAlarm(QUALITY_TYPES[status.quality], status.message); - } - if (display) { - ddisplay = new DDisplay({ - description: display.description, - role: display.role ? ROLES[display.role] : undefined, - controlRange: display.controlRange - ? new DRange(display.controlRange.min, display.controlRange.max) - : undefined, - alarmRange: display.alarmRange - ? new DRange(display.alarmRange.min, display.alarmRange.max) - : undefined, - warningRange: display.warningRange - ? new DRange(display.warningRange.min, display.warningRange.max) - : undefined, - units: display.units, - precision: display.precision, - form: display.form ? FORMS[display.form] : undefined, - choices: display.choices - }); - } - let array = undefined; - if (value?.base64Array) { - const bd = base64js.toByteArray(value.base64Array.base64); - array = new ARRAY_TYPES[value.base64Array.numberType as CONIQL_TYPE]( - bd.buffer - ); - } - let dtime = undefined; - if (timeVal?.datetime) { - dtime = new DTime(timeVal.datetime); - } - return new DType( - { - stringValue: value?.string, - doubleValue: value?.float, - arrayValue: array - }, - alarm, - dtime, - ddisplay, - // Coniql only returns changed values so these DTypes are - // always partial. - true - ); -} - -const PV_SUBSCRIPTION = gql` - subscription sub1($pvName: ID!) { - subscribeChannel(id: $pvName) { - id - time { - datetime - } - value { - string - float - base64Array { - numberType - base64 - } - } - status { - quality - message - mutable - } - display { - units - form - controlRange { - max - min - } - choices - precision - } - } - } -`; - -const PV_MUTATION = gql` - mutation put1($pvName: ID!, $value: String!) { - putChannels(ids: [$pvName], values: [$value]) { - id - } - } -`; - -export const DEVICE_QUERY = gql` - query deviceQuery($device: ID!) { - getDevice(id: $device) { - id - children(flatten: true) { - name - label - child { - __typename - ... on Channel { - id - display { - description - widget - } - } - ... on Device { - id - } - ... on Group { - layout - children { - name - } - } - } - } - } - } -`; - -export class ConiqlPlugin implements Connection { - private wsProtocol = "ws"; - private httpProtocol = "http"; - private client: ApolloClient; - private onConnectionUpdate: ConnectionChangedCallback; - private onValueUpdate: ValueChangedCallback; - private deviceQueried: DeviceQueriedCallback; - private connected: boolean; - private wsClient: Client; - private disconnected: string[] = []; - private subscriptions: { [pvName: string]: ObservableSubscription }; - - public constructor(socket: string, ssl: boolean) { - if (ssl) { - this.wsProtocol = "wss"; - this.httpProtocol = "https"; - } - const cache = new InMemoryCache({ - possibleTypes: { - name: [ - "FunctionMeta", - "ObjectMeta", - "EnumMeta", - "NumberMeta", - "TableMeta" - ] - } - }); - this.wsClient = createClient({ - url: `${this.wsProtocol}://${socket}/ws`, - retryAttempts: Infinity, - shouldRetry: () => true, - on: { - closed: () => { - if (this.connected) { - for (const pvName of Object.keys(this.subscriptions)) { - // Websocket closed so set connection status to disconnected and - // readonly - this.onConnectionUpdate(pvName, { - isConnected: false, - isReadonly: true - }); - } - } - this.connected = false; - }, - connected: () => { - this.connected = true; - } - } - }); - const link = this.createLink(socket); - this.client = new ApolloClient({ link, cache }); - this.onConnectionUpdate = nullConnCallback; - this.onValueUpdate = nullValueCallback; - this.deviceQueried = nullDeviceCallback; - this.connected = false; - this.subscriptions = {}; - } - - private createLink(socket: string): ApolloLink { - const wsLink = new GraphQLWsLink(this.wsClient); - const errorLink = onError(({ graphQLErrors, networkError }) => { - if (graphQLErrors) { - log.error("GraphQL errors:"); - graphQLErrors.forEach((error): void => { - log.error(error); - }); - } - if (networkError) { - log.error("Network error:"); - log.error(networkError); - } - }); - const httpLink = new HttpLink({ - uri: `${this.httpProtocol}://${socket}/graphql` - }); - const retryLink = new RetryLink({ - delay: { - initial: 300, - max: 60000, - jitter: true - }, - attempts: (count, operation, e) => { - if (e && e.response && e.response.status === 401) { - return false; - } - return count < 30; - } - }); - const link: ApolloLink = ApolloLink.split( - ({ query }): boolean => { - // https://github.com/apollographql/apollo-client/issues/3090 - const definition = getMainDefinition(query); - return ( - definition.kind === "OperationDefinition" && - definition.operation === "subscription" - ); - }, - from([retryLink as any, errorLink, wsLink]), - from([retryLink as any, errorLink, httpLink]) - ); - - return link; - } - - public connect( - connectionCallback: ConnectionChangedCallback, - valueCallback: ValueChangedCallback, - deviceQueried: DeviceQueriedCallback - ): void { - this.onConnectionUpdate = connectionCallback; - this.onValueUpdate = valueCallback; - this.deviceQueried = deviceQueried; - this.connected = true; - } - - public isConnected(): boolean { - return this.connected; - } - - private _process(data: any, pvName: string, operation: string): void { - // Process an update to a channel either from getChannel or subscribeChannel. - const { value, time, status, display } = data.data[operation]; - if (status) { - this.onConnectionUpdate(pvName, { - isConnected: true, - isReadonly: !status.mutable - }); - } - const dtype = coniqlToDType(value, time, status, display); - this.onValueUpdate(pvName, dtype); - } - - private _processDevice(data: any, device: string): void { - this.deviceQueried( - device, - new DType({ stringValue: JSON.stringify(data.data) }) - ); - } - - private _subscribe(pvName: string): ObservableSubscription { - return this.client - .subscribe({ - query: PV_SUBSCRIPTION, - variables: { pvName: pvName } - }) - .subscribe({ - next: (data): void => { - this._process(data, pvName, "subscribeChannel"); - }, - error: (err): void => { - log.error("err", err); - }, - complete: (): void => { - // complete is called when the websocket is disconnected. - this.onConnectionUpdate(pvName, { - isConnected: false, - isReadonly: true - }); - this.disconnected.push(pvName); - } - }); - } - - public subscribe(pvName: string, type?: SubscriptionType): string { - // TODO: How to handle multiple subscriptions of different types to the same channel? - if (this.subscriptions[pvName] === undefined) { - this.subscriptions[pvName] = this._subscribe(pvName); - } - return pvName; - } - - public getDevice(device: string): void { - this.client - .query({ - query: DEVICE_QUERY, - // Note: This currently splits the prefix, as currently devices are not - // accessible on ca or pva - variables: { device: device.split("://")[1] } - }) - .then(response => this._processDevice(response, device)) - .catch(error => { - log.error(`Failed to query device ${device}`); - log.error(error); - }); - } - - public putPv(pvName: string, value: DType): void { - log.debug(`Putting ${value} to ${pvName}.`); - const variables = { - pvName: pvName, - value: DType.coerceString(value) - }; - this.client - .mutate({ mutation: PV_MUTATION, variables: variables }) - .catch(error => { - log.error(`Failed to write ${value} to ${pvName}`); - log.error(error); - }); - } - - public unsubscribe(pvName: string): void { - // Note that connectionMiddleware handles multiple subscriptions - // for the same PV at present, so if this method is called then - // there is no further need for this PV. - if (this.subscriptions[pvName]) { - this.subscriptions[pvName].unsubscribe(); - delete this.subscriptions[pvName]; - } - } -} diff --git a/src/connection/pvws.test.ts b/src/connection/pvws.test.ts new file mode 100644 index 00000000..6d34e21a --- /dev/null +++ b/src/connection/pvws.test.ts @@ -0,0 +1,81 @@ +import { PvwsPlugin } from "./pvws"; +import WS from "vitest-websocket-mock"; +import { DType } from "../types/dtypes"; + +describe("PvwsPlugin", (): void => { + let cp: PvwsPlugin; + let mockConnUpdate: jest.Mock; + let mockValUpdate: jest.Mock; + let ws: WS; + beforeEach(async () => { + ws = new WS("ws://a.b.c:100/pvws/pv"); + cp = new PvwsPlugin("a.b.c:100", false); + mockConnUpdate = jest.fn(); + mockValUpdate = jest.fn(); + cp.connect(mockConnUpdate, mockValUpdate); + await ws.connected; + }); + afterEach(() => { + WS.clean(); + }); + + it("handles update to value", (): void => { + cp.subscribe("hello", { string: true }); + ws.send(JSON.stringify({ type: "update", pv: "hello", value: 42 })); + expect(mockValUpdate).toHaveBeenCalledWith( + "hello", + new DType( + { + stringValue: "42", + doubleValue: 42, + arrayValue: undefined + }, + undefined, + undefined, + undefined, + true + ) + ); + }); + + it("handles update to array value", (): void => { + cp.subscribe("hello", { string: true }); + ws.send( + JSON.stringify({ + type: "update", + pv: "hello", + b64int: "AAAAAAEAAAACAAAA" + }) + ); + expect(mockValUpdate).toHaveBeenCalledWith( + "hello", + new DType( + { + arrayValue: Int32Array.from([0, 1, 2]), + stringValue: undefined, + doubleValue: undefined + }, + undefined, + undefined, + undefined, + true + ) + ); + }); + + it("handles update to time", (): void => { + cp.subscribe("hello", { string: true }); + ws.send( + JSON.stringify({ type: "update", pv: "hello", seconds: "1483272000" }) + ); + const calls = mockValUpdate.mock.calls; + expect(calls.length).toBe(1); + const [pv, value] = mockValUpdate.mock.calls[0]; + expect(pv).toBe("hello"); + expect(value.time?.datetime?.getFullYear()).toBe(2017); + }); + + it("unsubscribes_with_no_errors", (): void => { + expect(() => cp.unsubscribe("hello")).not.toThrow(TypeError); + }); +}); diff --git a/src/connection/pvws.ts b/src/connection/pvws.ts new file mode 100644 index 00000000..13f80fbf --- /dev/null +++ b/src/connection/pvws.ts @@ -0,0 +1,284 @@ +/* Module that handles a GraphQL connection to the PVWS server. + See https://github.com/ornl-epics/pvws + */ +import base64js from "base64-js"; +import { + Connection, + ConnectionChangedCallback, + ValueChangedCallback, + nullConnCallback, + nullValueCallback, + SubscriptionType +} from "./plugin"; + +import { + DType, + DTime, + DAlarm, + AlarmQuality, + DDisplay, + DRange +} from "../types/dtypes"; +import log from "loglevel"; + +export interface PvwsStatus { + quality: "ALARM" | "WARNING" | "VALID" | "INVALID" | "UNDEFINED" | "CHANGING"; + message: string; + mutable: boolean; +} + +type PVWS_TYPE = + | "INT8" + | "UINT8" + | "INT16" + | "UINT16" + | "INT32" + | "UINT32" + | "INT32" + | "INT64" + | "FLOAT32" + | "FLOAT64"; + +const ARRAY_TYPES = { + INT8: Int8Array, + UINT8: Uint8Array, + INT16: Int16Array, + UINT16: Uint16Array, + INT32: Int32Array, + UINT32: Uint32Array, + INT64: BigInt64Array, + UINT64: BigUint64Array, + FLOAT32: Float32Array, + FLOAT64: Float64Array +}; +export interface PvwsBase64Array { + numberType: PVWS_TYPE; + base64: string; +} + +export interface PvwsTime { + datetime: Date; +} + +function pvwsToDType(data: any): DType { + let alarm = undefined; + let ddisplay = undefined; + if (data.severity !== undefined) { + if (data.severity === "MAJOR") { + alarm = new DAlarm(AlarmQuality.ALARM, ""); + } else if (data.severity === "MINOR") { + alarm = new DAlarm(AlarmQuality.WARNING, ""); + } else { + alarm = new DAlarm(AlarmQuality.VALID, ""); + } + } + ddisplay = new DDisplay({ + description: undefined, + role: undefined, + controlRange: undefined, + alarmRange: data.alarm_low + ? new DRange(data.alarm_low, data.alarm_high) + : undefined, + warningRange: data.warn_low + ? new DRange(data.warn_low, data.warn_high) + : undefined, + units: data.units, + precision: data.precision, + form: undefined, + choices: data.labels ? data.labels : undefined + }); + + let array = undefined; + if (data.b64int !== undefined) { + const bd = base64js.toByteArray(data.b64int); + array = new ARRAY_TYPES["INT32"](bd.buffer); + } else if (data.b64dbl !== undefined) { + const bd = base64js.toByteArray(data.b64dbl); + array = new ARRAY_TYPES["FLOAT64"](bd.buffer); + } + + let dtime = undefined; + if (data.seconds) { + const datetime = new Date(0); + datetime.setSeconds(data.seconds); + dtime = new DTime(datetime); + } + + let stringVal = undefined; + if (data.text !== undefined) { + stringVal = data.text; + } else if (data.value !== undefined) { + stringVal = data.value.toString(); + } + return new DType( + { + stringValue: stringVal, + doubleValue: data.value, + arrayValue: array + }, + alarm, + dtime, + ddisplay, + // PVWS only returns changed values so these DTypes are + // always partial. + true + ); +} + +export class PvwsPlugin implements Connection { + private wsProtocol = "ws"; + private onConnectionUpdate: ConnectionChangedCallback; + private onValueUpdate: ValueChangedCallback; + private connected: boolean; + private disconnected: string[] = []; + private subscriptions: { [pvName: string]: boolean }; + private url = ""; + private socket!: WebSocket; + private reconnect_ms = 5000; + + public constructor(socket: string, ssl: boolean) { + if (ssl) { + this.wsProtocol = "wss"; + } + this.url = `${this.wsProtocol}://${socket}/pvws/pv`; + this.open(false); + this.onConnectionUpdate = nullConnCallback; + this.onValueUpdate = nullValueCallback; + this.connected = false; + this.subscriptions = {}; + } + + /** Open the web socket, i.e. start PV communication */ + private open(reconnection: boolean) { + this.socket = new WebSocket(this.url); + this.socket.onopen = event => this.handleConnection(); + this.socket.onmessage = event => this.handleMessage(event.data); + this.socket.onclose = event => this.handleClose(event); + this.socket.onerror = event => this.handleError(event); + + if (reconnection) { + this.connected = true; + } + } + + private handleConnection() { + log.debug("Connected to " + this.url); + while (this.disconnected.length) { + const pvName = this.disconnected.pop(); + if (pvName !== undefined) { + this.subscribe(pvName); + this.subscriptions[pvName] = true; + } + } + } + + private handleMessage(message: string) { + const jm = JSON.parse(message); + if (jm.type === "update") { + if (jm.readonly !== undefined) { + this.onConnectionUpdate(jm.pv, { + isConnected: true, + isReadonly: jm.readonly + }); + } + const dtype = pvwsToDType(jm); + this.onValueUpdate(jm.pv, dtype); + } + } + + private handleError(event: Event) { + log.error("Error from " + this.url); + this.close(); + } + + private handleClose(event: CloseEvent) { + let message = "Web socket closed (" + event.code; + if (event.reason) { + message += ", " + event.reason; + } + message += ")"; + log.debug(message); + log.debug( + "Scheduling re-connect to " + this.url + " in " + this.reconnect_ms + "ms" + ); + + if (this.connected) { + for (const pvName of Object.keys(this.subscriptions)) { + // Websocket closed so set connection status to disconnected and + // readonly + this.onConnectionUpdate(pvName, { + isConnected: false, + isReadonly: true + }); + this.unsubscribe(pvName); + this.disconnected.push(pvName); + } + } + this.connected = false; + setTimeout(() => this.open(true), this.reconnect_ms); + } + + private close() { + this.socket.close(); + } + + public connect( + connectionCallback: ConnectionChangedCallback, + valueCallback: ValueChangedCallback + ): void { + this.onConnectionUpdate = connectionCallback; + this.onValueUpdate = valueCallback; + this.connected = true; + } + + public isConnected(): boolean { + return this.connected; + } + + private _subscribe(pvName: string) { + this.socket.send(JSON.stringify({ type: "subscribe", pvs: [pvName] })); + } + + public subscribe(pvName: string, type?: SubscriptionType): string { + // TODO: How to handle multiple subscriptions of different types to the same channel? + if (this.subscriptions[pvName] === undefined) { + this._subscribe(pvName); + this.subscriptions[pvName] = true; + } + return pvName; + } + + public getDevice(device: string): void { + //console.log("Not implemented"); + } + + public putPv(pvName: string, value: DType): void { + if (value.value.stringValue === undefined) { + this.socket.send( + JSON.stringify({ + type: "write", + pv: pvName, + value: value.value.doubleValue + }) + ); + } else { + this.socket.send( + JSON.stringify({ + type: "write", + pv: pvName, + value: value.value.stringValue + }) + ); + } + } + + public unsubscribe(pvName: string): void { + // Note that connectionMiddleware handles multiple subscriptions + // for the same PV at present, so if this method is called then + // there is no further need for this PV. + if (this.subscriptions[pvName]) { + this.socket.send(JSON.stringify({ type: "clear", pvs: [pvName] })); + delete this.subscriptions[pvName]; + } + } +} diff --git a/src/redux/store.ts b/src/redux/store.ts index 3b8cf2dc..108854c0 100644 --- a/src/redux/store.ts +++ b/src/redux/store.ts @@ -5,13 +5,13 @@ import { connectionMiddleware } from "./connectionMiddleware"; import { throttleMiddleware, UpdateThrottle } from "./throttleMiddleware"; import { Connection } from "../connection/plugin"; import { SimulatorPlugin } from "../connection/sim"; -import { ConiqlPlugin } from "../connection/coniql"; +import { PvwsPlugin } from "../connection/pvws"; import { ConnectionForwarder } from "../connection/forwarder"; -const CONIQL_SOCKET = - process.env.VITE_CONIQL_SOCKET ?? import.meta.env.VITE_CONIQL_SOCKET; -const CONIQL_SSL = - (process.env.VITE_CONIQL_SSL ?? import.meta.env.VITE_CONIQL_SSL) === "true"; +const PVWS_SOCKET = + process.env.VITE_PVWS_SOCKET ?? import.meta.env.VITE_PVWS_SOCKET; +const PVWS_SSL = + (process.env.VITE_PVWS_SSL ?? import.meta.env.VITE_PVWS_SSL) === "true"; const THROTTLE_PERIOD = parseFloat( process.env.VITE_THROTTLE_PERIOD ?? import.meta.env.VITE_THROTTLE_PERIOD ?? @@ -19,16 +19,15 @@ const THROTTLE_PERIOD = parseFloat( ); const simulator = new SimulatorPlugin(); -const plugins: [string, Connection][] = [ - ["sim://", simulator], - ["loc://", simulator] -]; -if (CONIQL_SOCKET !== undefined) { - const coniql = new ConiqlPlugin(CONIQL_SOCKET, CONIQL_SSL); - plugins.unshift(["pva://", coniql]); - plugins.unshift(["ca://", coniql]); - plugins.unshift(["ssim://", coniql]); - plugins.unshift(["dev://", coniql]); +const plugins: [string, Connection][] = [["sim://", simulator]]; +if (PVWS_SOCKET !== undefined) { + const pvws = new PvwsPlugin(PVWS_SOCKET, PVWS_SSL); + plugins.unshift(["pva://", pvws]); + plugins.unshift(["ca://", pvws]); + plugins.unshift(["loc://", pvws]); + plugins.unshift(["sim://", pvws]); + plugins.unshift(["ssim://", pvws]); + plugins.unshift(["dev://", pvws]); } const connection = new ConnectionForwarder(plugins); From 08e41fbb0510b08c1aba8cc0a5437bb355510cd6 Mon Sep 17 00:00:00 2001 From: Rebecca Williams Date: Fri, 8 Mar 2024 12:56:42 +0000 Subject: [PATCH 2/7] Fix how sting values are handled --- src/connection/pvws.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/connection/pvws.ts b/src/connection/pvws.ts index 13f80fbf..0d218e07 100644 --- a/src/connection/pvws.ts +++ b/src/connection/pvws.ts @@ -105,9 +105,7 @@ function pvwsToDType(data: any): DType { } let stringVal = undefined; - if (data.text !== undefined) { - stringVal = data.text; - } else if (data.value !== undefined) { + if (data.value !== undefined) { stringVal = data.value.toString(); } return new DType( @@ -181,6 +179,7 @@ export class PvwsPlugin implements Connection { isReadonly: jm.readonly }); } + const dtype = pvwsToDType(jm); this.onValueUpdate(jm.pv, dtype); } From 64f67cf6569c10fdea5630df12ec86feacd1afe1 Mon Sep 17 00:00:00 2001 From: Rebecca Williams Date: Fri, 8 Mar 2024 13:18:46 +0000 Subject: [PATCH 3/7] Fix linting --- src/connection/pvws.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connection/pvws.ts b/src/connection/pvws.ts index 0d218e07..5e4758fb 100644 --- a/src/connection/pvws.ts +++ b/src/connection/pvws.ts @@ -179,7 +179,7 @@ export class PvwsPlugin implements Connection { isReadonly: jm.readonly }); } - + const dtype = pvwsToDType(jm); this.onValueUpdate(jm.pv, dtype); } From aafaa375e19dcb346dd7dd54e5e9314336aa869e Mon Sep 17 00:00:00 2001 From: Rebecca Williams Date: Wed, 15 May 2024 10:48:13 +0100 Subject: [PATCH 4/7] Handle different PVWS array types --- src/connection/pvws.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/connection/pvws.ts b/src/connection/pvws.ts index 5e4758fb..64ecbd5e 100644 --- a/src/connection/pvws.ts +++ b/src/connection/pvws.ts @@ -95,6 +95,15 @@ function pvwsToDType(data: any): DType { } else if (data.b64dbl !== undefined) { const bd = base64js.toByteArray(data.b64dbl); array = new ARRAY_TYPES["FLOAT64"](bd.buffer); + } else if (data.b64flt !== undefined) { + const bd = base64js.toByteArray(data.b64flt); + array = new ARRAY_TYPES["FLOAT32"](bd.buffer); + } else if (data.b64srt !== undefined) { + const bd = base64js.toByteArray(data.b64srt); + array = new ARRAY_TYPES["INT16"](bd.buffer); + } else if (data.b64byt !== undefined) { + const bd = base64js.toByteArray(data.b64byt); + array = new ARRAY_TYPES["INT8"](bd.buffer); } let dtime = undefined; From 966229dff9c6b7a1180a582a02d4ddc67a934629 Mon Sep 17 00:00:00 2001 From: Rebecca Williams Date: Fri, 24 May 2024 14:38:55 +0100 Subject: [PATCH 5/7] Fix handling of string PVs --- src/connection/pvws.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/connection/pvws.ts b/src/connection/pvws.ts index 64ecbd5e..a481fc28 100644 --- a/src/connection/pvws.ts +++ b/src/connection/pvws.ts @@ -114,7 +114,9 @@ function pvwsToDType(data: any): DType { } let stringVal = undefined; - if (data.value !== undefined) { + if (data.text !== undefined) { + stringVal = data.text; + } else if (data.value !== undefined) { stringVal = data.value.toString(); } return new DType( From 1c196508cbc2a9f16540665557c5100868fe9d55 Mon Sep 17 00:00:00 2001 From: Rebecca Williams Date: Wed, 29 May 2024 09:37:17 +0100 Subject: [PATCH 6/7] Fix connection status issue so widgets display as connected Add test to check readonly property is correctly set. --- src/connection/pvws.test.ts | 16 ++++++++++++++++ src/connection/pvws.ts | 19 +++++++++++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/connection/pvws.test.ts b/src/connection/pvws.test.ts index 6d34e21a..a9b8a7f7 100644 --- a/src/connection/pvws.test.ts +++ b/src/connection/pvws.test.ts @@ -78,4 +78,20 @@ describe("PvwsPlugin", (): void => { it("unsubscribes_with_no_errors", (): void => { expect(() => cp.unsubscribe("hello")).not.toThrow(TypeError); }); + + it("handles update readonly value", (): void => { + cp.subscribe("hello", { string: true }); + ws.send(JSON.stringify({ type: "update", pv: "hello", value: 42 })); + expect(mockConnUpdate).toHaveBeenLastCalledWith("hello", { + isConnected: true, + isReadonly: true + }); + + ws.send(JSON.stringify({ type: "update", pv: "hello", readonly: false })); + expect(mockConnUpdate).toBeCalledTimes(2); + expect(mockConnUpdate).toHaveBeenLastCalledWith("hello", { + isConnected: true, + isReadonly: false + }); + }); }); diff --git a/src/connection/pvws.ts b/src/connection/pvws.ts index a481fc28..4af8cbab 100644 --- a/src/connection/pvws.ts +++ b/src/connection/pvws.ts @@ -141,6 +141,7 @@ export class PvwsPlugin implements Connection { private connected: boolean; private disconnected: string[] = []; private subscriptions: { [pvName: string]: boolean }; + private initMsgRcvd: { [pvName: string]: boolean }; private url = ""; private socket!: WebSocket; private reconnect_ms = 5000; @@ -155,6 +156,7 @@ export class PvwsPlugin implements Connection { this.onValueUpdate = nullValueCallback; this.connected = false; this.subscriptions = {}; + this.initMsgRcvd = {}; } /** Open the web socket, i.e. start PV communication */ @@ -184,10 +186,22 @@ export class PvwsPlugin implements Connection { private handleMessage(message: string) { const jm = JSON.parse(message); if (jm.type === "update") { - if (jm.readonly !== undefined) { + let updateConnection = false; + // PVWS only sends the readonly attribute if false + // so set true by default and update later. + let readonly = true; + if (!this.initMsgRcvd[jm.pv]) { + updateConnection = true; + this.initMsgRcvd[jm.pv] = true; + } else if (jm.readonly !== undefined) { + updateConnection = true; + // Update readonly from PVWS message + readonly = jm.readonly; + } + if (updateConnection) { this.onConnectionUpdate(jm.pv, { isConnected: true, - isReadonly: jm.readonly + isReadonly: readonly }); } @@ -254,6 +268,7 @@ export class PvwsPlugin implements Connection { if (this.subscriptions[pvName] === undefined) { this._subscribe(pvName); this.subscriptions[pvName] = true; + this.initMsgRcvd[pvName] = false; } return pvName; } From b9cffcb3b8ea41961267f0c4b3cb68f292d1cb65 Mon Sep 17 00:00:00 2001 From: Rebecca Williams Date: Fri, 8 Nov 2024 10:19:00 +0000 Subject: [PATCH 7/7] Move from using jest to vitest in pvws test --- src/connection/pvws.test.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/connection/pvws.test.ts b/src/connection/pvws.test.ts index a9b8a7f7..3cdf8a1e 100644 --- a/src/connection/pvws.test.ts +++ b/src/connection/pvws.test.ts @@ -1,17 +1,18 @@ import { PvwsPlugin } from "./pvws"; import WS from "vitest-websocket-mock"; import { DType } from "../types/dtypes"; +import { vi, describe, beforeEach, afterEach, it, Mock } from "vitest"; describe("PvwsPlugin", (): void => { let cp: PvwsPlugin; - let mockConnUpdate: jest.Mock; - let mockValUpdate: jest.Mock; + let mockConnUpdate: Mock; + let mockValUpdate: Mock; let ws: WS; beforeEach(async () => { ws = new WS("ws://a.b.c:100/pvws/pv"); cp = new PvwsPlugin("a.b.c:100", false); - mockConnUpdate = jest.fn(); - mockValUpdate = jest.fn(); + mockConnUpdate = vi.fn(); + mockValUpdate = vi.fn(); cp.connect(mockConnUpdate, mockValUpdate); await ws.connected; });