From 1d3b6c0fbbe3b63541dd5528c518abbf2251869e Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Tue, 19 Nov 2024 15:03:24 +0100 Subject: [PATCH 1/2] update invitations in real-time --- apps/events/src/routes/playground.tsx | 6 ++++++ apps/server/src/index.ts | 25 +++++++++++++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/apps/events/src/routes/playground.tsx b/apps/events/src/routes/playground.tsx index 23594040..ca012d42 100644 --- a/apps/events/src/routes/playground.tsx +++ b/apps/events/src/routes/playground.tsx @@ -270,6 +270,12 @@ const App = ({ } const message: EventMessage = { type: 'event', event: spaceEvent.value, spaceId: invitation.spaceId }; websocketConnection?.send(JSON.stringify(message)); + + // temporary until we have define a strategy for accepting invitations response + setTimeout(() => { + const message2: RequestListInvitations = { type: 'list-invitations' }; + websocketConnection?.send(JSON.stringify(message2)); + }, 1000); }} />

Spaces

diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 5f4b9355..80e2914b 100755 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -1,13 +1,12 @@ import cors from 'cors'; import 'dotenv/config'; -import { parse } from 'node:url'; import { Effect, Exit, Schema } from 'effect'; import express from 'express'; import type { ResponseListInvitations, ResponseListSpaces, ResponseSpace } from 'graph-framework-messages'; import { RequestMessage } from 'graph-framework-messages'; import { applyEvent } from 'graph-framework-space-events'; -import type WebSocket from 'ws'; -import { WebSocketServer } from 'ws'; +import { parse } from 'node:url'; +import WebSocket, { WebSocketServer } from 'ws'; import { applySpaceEvent } from './handlers/applySpaceEvent.js'; import { createSpace } from './handlers/createSpace.js'; import { getSpace } from './handlers/getSpace.js'; @@ -15,6 +14,10 @@ import { listInvitations } from './handlers/listInvitations.js'; import { listSpaces } from './handlers/listSpaces.js'; import { tmpInitAccount } from './handlers/tmpInitAccount.js'; import { assertExhaustive } from './utils/assertExhaustive.js'; +interface CustomWebSocket extends WebSocket { + accountId: string; + subscribedSpaces: Set; +} const decodeRequestMessage = Schema.decodeUnknownEither(RequestMessage); @@ -38,13 +41,15 @@ const server = app.listen(PORT, () => { console.log(`Listening on port ${PORT}`); }); -webSocketServer.on('connection', async (webSocket: WebSocket, request: Request) => { +webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Request) => { const params = parse(request.url, true); if (!params.query.accountId || typeof params.query.accountId !== 'string') { webSocket.close(); return; } const accountId = params.query.accountId; + webSocket.accountId = accountId; + webSocket.subscribedSpaces = new Set(); console.log('Connection established', accountId); webSocket.on('message', async (message) => { @@ -101,6 +106,18 @@ webSocketServer.on('connection', async (webSocket: WebSocket, request: Request) type: 'space', }; webSocket.send(JSON.stringify(outgoingMessage)); + for (const client of webSocketServer.clients as Set) { + if ( + client.readyState === WebSocket.OPEN && + client.accountId === data.event.transaction.signaturePublicKey + ) { + const invitations = await listInvitations({ accountId: client.accountId }); + const outgoingMessage: ResponseListInvitations = { type: 'list-invitations', invitations: invitations }; + // for now sending the entire list of invitations to the client - we could send only a single one + client.send(JSON.stringify(outgoingMessage)); + } + } + break; } case 'event': { From a53a866505d6ffb7044823815c844302fc9e947b Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Tue, 19 Nov 2024 21:33:58 +0100 Subject: [PATCH 2/2] broadcast events via websocket --- apps/events/src/routes/playground.tsx | 88 +++++++++++++------ apps/server/src/index.ts | 57 ++++++++---- .../graph-framework-messages/src/types.ts | 26 ++++-- .../src/apply-event.ts | 1 - 4 files changed, 120 insertions(+), 52 deletions(-) diff --git a/apps/events/src/routes/playground.tsx b/apps/events/src/routes/playground.tsx index ca012d42..af729ffa 100644 --- a/apps/events/src/routes/playground.tsx +++ b/apps/events/src/routes/playground.tsx @@ -8,8 +8,8 @@ import { createFileRoute } from '@tanstack/react-router'; import { Effect, Exit } from 'effect'; import * as Schema from 'effect/Schema'; import type { - EventMessage, Invitation, + RequestAcceptInvitationEvent, RequestCreateInvitationEvent, RequestCreateSpaceEvent, RequestListInvitations, @@ -80,13 +80,40 @@ const App = ({ const [spaces, setSpaces] = useState([]); const [invitations, setInvitations] = useState([]); + // Create a stable WebSocket connection that only depends on accountId useEffect(() => { - // temporary until we have a way to create accounts and authenticate them const websocketConnection = new WebSocket(`ws://localhost:3030/?accountId=${accountId}`); setWebsocketConnection(websocketConnection); + const onOpen = () => { + console.log('websocket connected'); + }; + + const onError = (event: Event) => { + console.log('websocket error', event); + }; + + const onClose = (event: CloseEvent) => { + console.log('websocket close', event); + }; + + websocketConnection.addEventListener('open', onOpen); + websocketConnection.addEventListener('error', onError); + websocketConnection.addEventListener('close', onClose); + + return () => { + websocketConnection.removeEventListener('open', onOpen); + websocketConnection.removeEventListener('error', onError); + websocketConnection.removeEventListener('close', onClose); + websocketConnection.close(); + }; + }, [accountId]); // Only recreate when accountId changes + + // Handle WebSocket messages in a separate effect + useEffect(() => { + if (!websocketConnection) return; + const onMessage = async (event: MessageEvent) => { - console.log('message received', event.data); const data = JSON.parse(event.data); const message = decodeResponseMessage(data); if (message._tag === 'Right') { @@ -157,8 +184,31 @@ const App = ({ ); break; } - case 'event': { - console.log('event', response); + case 'space-event': { + const space = spaces.find((s) => s.id === response.spaceId); + if (!space) { + console.error('Space not found', response.spaceId); + return; + } + if (!space.state) { + console.error('Space has no state', response.spaceId); + return; + } + + const applyEventResult = await Effect.runPromiseExit( + applyEvent({ event: response.event, state: space.state }), + ); + if (Exit.isSuccess(applyEventResult)) { + setSpaces((spaces) => + spaces.map((space) => { + if (space.id === response.spaceId) { + return { ...space, state: applyEventResult.value, events: [...space.events, response.event] }; + } + return space; + }), + ); + } + break; } case 'list-invitations': { @@ -170,31 +220,13 @@ const App = ({ } } }; - websocketConnection.addEventListener('message', onMessage); - const onOpen = () => { - console.log('websocket connected'); - }; - websocketConnection.addEventListener('open', onOpen); - - const onError = (event: Event) => { - console.log('websocket error', event); - }; - websocketConnection.addEventListener('error', onError); - - const onClose = (event: CloseEvent) => { - console.log('websocket close', event); - }; - websocketConnection.addEventListener('close', onClose); + websocketConnection.addEventListener('message', onMessage); return () => { websocketConnection.removeEventListener('message', onMessage); - websocketConnection.removeEventListener('open', onOpen); - websocketConnection.removeEventListener('error', onError); - websocketConnection.removeEventListener('close', onClose); - websocketConnection.close(); }; - }, [accountId, encryptionPrivateKey]); + }, [websocketConnection, encryptionPrivateKey, spaces]); return ( <> @@ -268,7 +300,11 @@ const App = ({ console.error('Failed to accept invitation', spaceEvent); return; } - const message: EventMessage = { type: 'event', event: spaceEvent.value, spaceId: invitation.spaceId }; + const message: RequestAcceptInvitationEvent = { + type: 'accept-invitation-event', + event: spaceEvent.value, + spaceId: invitation.spaceId, + }; websocketConnection?.send(JSON.stringify(message)); // temporary until we have define a strategy for accepting invitations response diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 80e2914b..cd7cb432 100755 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -1,11 +1,17 @@ import cors from 'cors'; import 'dotenv/config'; +import { parse } from 'node:url'; import { Effect, Exit, Schema } from 'effect'; import express from 'express'; -import type { ResponseListInvitations, ResponseListSpaces, ResponseSpace } from 'graph-framework-messages'; +import type { + ResponseListInvitations, + ResponseListSpaces, + ResponseSpace, + ResponseSpaceEvent, +} from 'graph-framework-messages'; import { RequestMessage } from 'graph-framework-messages'; +import type { SpaceEvent } from 'graph-framework-space-events'; import { applyEvent } from 'graph-framework-space-events'; -import { parse } from 'node:url'; import WebSocket, { WebSocketServer } from 'ws'; import { applySpaceEvent } from './handlers/applySpaceEvent.js'; import { createSpace } from './handlers/createSpace.js'; @@ -41,6 +47,25 @@ const server = app.listen(PORT, () => { console.log(`Listening on port ${PORT}`); }); +function broadcastSpaceEvents({ + spaceId, + event, + currentClient, +}: { spaceId: string; event: SpaceEvent; currentClient: CustomWebSocket }) { + for (const client of webSocketServer.clients as Set) { + if (currentClient === client) continue; + + const outgoingMessage: ResponseSpaceEvent = { + type: 'space-event', + spaceId, + event, + }; + if (client.readyState === WebSocket.OPEN && client.subscribedSpaces.has(spaceId)) { + client.send(JSON.stringify(outgoingMessage)); + } + } +} + webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Request) => { const params = parse(request.url, true); if (!params.query.accountId || typeof params.query.accountId !== 'string') { @@ -64,6 +89,7 @@ webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Req ...space, type: 'space', }; + webSocket.subscribedSpaces.add(data.id); webSocket.send(JSON.stringify(outgoingMessage)); break; } @@ -101,6 +127,7 @@ webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Req keyBoxes: data.keyBoxes.map((keyBox) => keyBox), }); const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId }); + // TODO send back confirmation instead of the entire space const outgoingMessage: ResponseSpace = { ...spaceWithEvents, type: 'space', @@ -118,24 +145,18 @@ webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Req } } + broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket }); break; } - case 'event': { - switch (data.event.transaction.type) { - case 'delete-space': { - break; - } - case 'accept-invitation': { - await applySpaceEvent({ accountId, spaceId: data.spaceId, event: data.event, keyBoxes: [] }); - const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId }); - const outgoingMessage: ResponseSpace = { - ...spaceWithEvents, - type: 'space', - }; - webSocket.send(JSON.stringify(outgoingMessage)); - break; - } - } + case 'accept-invitation-event': { + await applySpaceEvent({ accountId, spaceId: data.spaceId, event: data.event, keyBoxes: [] }); + const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId }); + const outgoingMessage: ResponseSpace = { + ...spaceWithEvents, + type: 'space', + }; + webSocket.send(JSON.stringify(outgoingMessage)); + broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket }); break; } default: diff --git a/packages/graph-framework-messages/src/types.ts b/packages/graph-framework-messages/src/types.ts index 0136e966..a262fd38 100644 --- a/packages/graph-framework-messages/src/types.ts +++ b/packages/graph-framework-messages/src/types.ts @@ -3,7 +3,6 @@ import { AcceptInvitationEvent, CreateInvitationEvent, CreateSpaceEvent, - DeleteSpaceEvent, SpaceEvent, } from 'graph-framework-space-events'; @@ -42,13 +41,13 @@ export const RequestCreateInvitationEvent = Schema.Struct({ export type RequestCreateInvitationEvent = Schema.Schema.Type; -export const EventMessage = Schema.Struct({ - type: Schema.Literal('event'), +export const RequestAcceptInvitationEvent = Schema.Struct({ + type: Schema.Literal('accept-invitation-event'), spaceId: Schema.String, - event: Schema.Union(DeleteSpaceEvent, AcceptInvitationEvent), + event: AcceptInvitationEvent, }); -export type EventMessage = Schema.Schema.Type; +export type RequestAcceptInvitationEvent = Schema.Schema.Type; export const RequestSubscribeToSpace = Schema.Struct({ type: Schema.Literal('subscribe-space'), @@ -72,7 +71,7 @@ export type RequestListInvitations = Schema.Schema.Type; +export const ResponseSpaceEvent = Schema.Struct({ + type: Schema.Literal('space-event'), + spaceId: Schema.String, + event: SpaceEvent, +}); + +export type ResponseSpaceEvent = Schema.Schema.Type; + export const ResponseSpace = Schema.Struct({ type: Schema.Literal('space'), id: Schema.String, @@ -115,6 +122,11 @@ export const ResponseSpace = Schema.Struct({ export type ResponseSpace = Schema.Schema.Type; -export const ResponseMessage = Schema.Union(EventMessage, ResponseListSpaces, ResponseListInvitations, ResponseSpace); +export const ResponseMessage = Schema.Union( + ResponseListSpaces, + ResponseListInvitations, + ResponseSpace, + ResponseSpaceEvent, +); export type ResponseMessage = Schema.Schema.Type; diff --git a/packages/graph-framework-space-events/src/apply-event.ts b/packages/graph-framework-space-events/src/apply-event.ts index 618f9627..856fed7b 100644 --- a/packages/graph-framework-space-events/src/apply-event.ts +++ b/packages/graph-framework-space-events/src/apply-event.ts @@ -34,7 +34,6 @@ export const applyEvent = ({ return Effect.fail(new InvalidEventError()); } if (event.transaction.previousEventHash !== state.lastEventHash) { - console.log('WEEEEE', event.transaction.previousEventHash, state.lastEventHash); return Effect.fail(new InvalidEventError()); } }