From 597b4b65419e5b298ae37c5a34971fc627b0e80b Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 1 Sep 2025 14:23:22 +0200 Subject: [PATCH 01/22] bump version --- .changeset/lazy-birds-wait.md | 6 ------ packages/create-hypergraph/CHANGELOG.md | 5 +++++ packages/create-hypergraph/package.json | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) delete mode 100644 .changeset/lazy-birds-wait.md diff --git a/.changeset/lazy-birds-wait.md b/.changeset/lazy-birds-wait.md deleted file mode 100644 index 48fd9890..00000000 --- a/.changeset/lazy-birds-wait.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -"create-hypergraph": patch ---- - -improve geo connect box based on authentication state in all templates - \ No newline at end of file diff --git a/packages/create-hypergraph/CHANGELOG.md b/packages/create-hypergraph/CHANGELOG.md index 88555c4b..f3258838 100644 --- a/packages/create-hypergraph/CHANGELOG.md +++ b/packages/create-hypergraph/CHANGELOG.md @@ -1,5 +1,10 @@ # create-hypergraph +## 0.5.4 +### Patch Changes + +- de0d153: improve geo connect box based on authentication state in all templates + ## 0.5.3 ### Patch Changes diff --git a/packages/create-hypergraph/package.json b/packages/create-hypergraph/package.json index 2ba4bf42..8bc16dad 100644 --- a/packages/create-hypergraph/package.json +++ b/packages/create-hypergraph/package.json @@ -1,6 +1,6 @@ { "name": "create-hypergraph", - "version": "0.5.3", + "version": "0.5.4", "description": "CLI toolchain to scaffold a Hypergraph-enabled application with a given template.", "type": "module", "bin": { From 9f729409ac0ce0fa397c41289b9d640c9fc6edcb Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Tue, 2 Sep 2025 14:46:52 +0200 Subject: [PATCH 02/22] introduce mailbox --- apps/server-new/src/server.ts | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index a6cbf865..2efce467 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -1,14 +1,14 @@ -import { createServer } from 'node:http'; +import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import * as HttpApiScalar from '@effect/platform/HttpApiScalar'; import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; -import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import * as Effect from 'effect/Effect'; import * as Layer from 'effect/Layer'; -import * as Schedule from 'effect/Schedule'; +import * as Mailbox from 'effect/Mailbox'; import * as Stream from 'effect/Stream'; +import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; @@ -24,18 +24,30 @@ const ApiLayer = HttpLayerRouter.addHttpApi(hypergraphApi, { openapiPath: '/docs/openapi.json', }).pipe(Layer.provide(HandlersLive)); +type OutgoingMessage = { + type: 'message'; + message: string; +}; + // Create websocket layer at /ws. const WebSocketLayer = HttpLayerRouter.add( 'GET', '/ws', - // TODO: Implement actual websocket logic here. - Stream.fromSchedule(Schedule.spaced(1000)).pipe( - Stream.map(JSON.stringify), - Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), - Stream.decodeText(), - Stream.runForEach((_) => Effect.log(_)), - Effect.as(HttpServerResponse.empty()), - ), + Effect.gen(function* () { + const outgoingMailbox = yield* Mailbox.make(); + const incomingMailbox = yield* Mailbox.make(); + + yield* outgoingMailbox.offer({ type: 'message', message: 'Hello, world!' }); + + // TODO: Implement actual websocket logic here. + return yield* Mailbox.toStream(outgoingMailbox).pipe( + Stream.map(JSON.stringify), + Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), + Stream.decodeText(), + Stream.runForEach((_) => Effect.log(_)), + Effect.as(HttpServerResponse.empty()), + ); + }), ); // Merge router layers together and add the cors middleware layer. From 3e8f06ccc7fd06f1661294a0bc4098096524b65c Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Tue, 2 Sep 2025 17:01:00 +0200 Subject: [PATCH 03/22] return response and move websocket to root --- apps/server-new/src/http/api.ts | 2 +- apps/server-new/src/server.ts | 33 ++++++++++++++++++++++----------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/apps/server-new/src/http/api.ts b/apps/server-new/src/http/api.ts index 8194564e..63320ce9 100644 --- a/apps/server-new/src/http/api.ts +++ b/apps/server-new/src/http/api.ts @@ -65,7 +65,7 @@ export class IdentityQuery extends Schema.Class('IdentityQuery')( /** * Health endpoints */ -export const statusEndpoint = HttpApiEndpoint.get('status')`/`.addSuccess(Schema.String); +export const statusEndpoint = HttpApiEndpoint.get('status')`/status`.addSuccess(Schema.String); export const healthGroup = HttpApiGroup.make('Health').add(statusEndpoint); diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index 2efce467..f7184390 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -7,6 +7,7 @@ import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; import * as Effect from 'effect/Effect'; import * as Layer from 'effect/Layer'; import * as Mailbox from 'effect/Mailbox'; +import * as Schema from 'effect/Schema'; import * as Stream from 'effect/Stream'; import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; @@ -24,27 +25,37 @@ const ApiLayer = HttpLayerRouter.addHttpApi(hypergraphApi, { openapiPath: '/docs/openapi.json', }).pipe(Layer.provide(HandlersLive)); -type OutgoingMessage = { - type: 'message'; - message: string; +const Domain = { + Request: Schema.Struct({ + type: Schema.String, + message: Schema.String, + }), + Response: Schema.Struct({ + type: Schema.String, + message: Schema.String, + }), }; -// Create websocket layer at /ws. +type Request = Schema.Schema.Type; + const WebSocketLayer = HttpLayerRouter.add( 'GET', - '/ws', + '/', Effect.gen(function* () { - const outgoingMailbox = yield* Mailbox.make(); - const incomingMailbox = yield* Mailbox.make(); + const requests = yield* Mailbox.make(); - yield* outgoingMailbox.offer({ type: 'message', message: 'Hello, world!' }); + yield* requests.offer({ type: 'message', message: 'Hello, world!' }); - // TODO: Implement actual websocket logic here. - return yield* Mailbox.toStream(outgoingMailbox).pipe( + return yield* Mailbox.toStream(requests).pipe( Stream.map(JSON.stringify), Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), Stream.decodeText(), - Stream.runForEach((_) => Effect.log(_)), + Stream.runForEach((message) => + Effect.gen(function* () { + yield* Effect.log('RECEIVED: ' + message); + yield* requests.offer({ type: 'message', message: 'RECEIVED' }); + }), + ), Effect.as(HttpServerResponse.empty()), ); }), From 9aaed199166d831a858d55ee47eff64fec0c874f Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Tue, 2 Sep 2025 19:12:55 +0200 Subject: [PATCH 04/22] add session validation --- apps/server-new/src/server.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index f7184390..8db265c3 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -4,6 +4,7 @@ import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; +import { isArray } from 'effect/Array'; import * as Effect from 'effect/Effect'; import * as Layer from 'effect/Layer'; import * as Mailbox from 'effect/Mailbox'; @@ -13,6 +14,7 @@ import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; +import * as AppIdentityService from './services/app-identity.ts'; // Create scalar openapi browser layer at /docs. const DocsLayer = HttpApiScalar.layerHttpLayerRouter({ @@ -42,6 +44,20 @@ const WebSocketLayer = HttpLayerRouter.add( 'GET', '/', Effect.gen(function* () { + const request = yield* HttpServerRequest.HttpServerRequest; + + const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); + const token = isArray(searchParams.token) ? searchParams.token[0] : searchParams.token; + + if (!token) { + return yield* HttpServerResponse.empty({ status: 400 }); + } + + const appIdentityService = yield* AppIdentityService.AppIdentityService; + const { accountAddress } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); + + yield* Effect.log(accountAddress); + const requests = yield* Mailbox.make(); yield* requests.offer({ type: 'message', message: 'Hello, world!' }); @@ -58,7 +74,7 @@ const WebSocketLayer = HttpLayerRouter.add( ), Effect.as(HttpServerResponse.empty()), ); - }), + }).pipe(Effect.provide(AppIdentityService.layer)), ); // Merge router layers together and add the cors middleware layer. From 4cc2a5c35590d86e2155c1b8f3bc2591a947b24d Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Tue, 2 Sep 2025 21:08:32 +0200 Subject: [PATCH 05/22] implement websocket list-spaces --- apps/server-new/src/server.ts | 47 ++++++++++++++------------ apps/server-new/src/services/spaces.ts | 40 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 22 deletions(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index 8db265c3..c1f32c2b 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -4,6 +4,7 @@ import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; +import { Messages } from '@graphprotocol/hypergraph'; import { isArray } from 'effect/Array'; import * as Effect from 'effect/Effect'; import * as Layer from 'effect/Layer'; @@ -15,6 +16,7 @@ import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; import * as AppIdentityService from './services/app-identity.ts'; +import * as SpacesService from './services/spaces.ts'; // Create scalar openapi browser layer at /docs. const DocsLayer = HttpApiScalar.layerHttpLayerRouter({ @@ -27,24 +29,16 @@ const ApiLayer = HttpLayerRouter.addHttpApi(hypergraphApi, { openapiPath: '/docs/openapi.json', }).pipe(Layer.provide(HandlersLive)); -const Domain = { - Request: Schema.Struct({ - type: Schema.String, - message: Schema.String, - }), - Response: Schema.Struct({ - type: Schema.String, - message: Schema.String, - }), -}; - -type Request = Schema.Schema.Type; +const decodeJson = Schema.decodeUnknownSync(Schema.parseJson()); +const decodeRequestMessage = Schema.decodeUnknownEither(Messages.RequestMessage); const WebSocketLayer = HttpLayerRouter.add( 'GET', '/', Effect.gen(function* () { const request = yield* HttpServerRequest.HttpServerRequest; + const spacesService = yield* SpacesService.SpacesService; + const responseMailbox = yield* Mailbox.make(); const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); const token = isArray(searchParams.token) ? searchParams.token[0] : searchParams.token; @@ -54,27 +48,36 @@ const WebSocketLayer = HttpLayerRouter.add( } const appIdentityService = yield* AppIdentityService.AppIdentityService; - const { accountAddress } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); - - yield* Effect.log(accountAddress); - - const requests = yield* Mailbox.make(); + const { accountAddress, address } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); - yield* requests.offer({ type: 'message', message: 'Hello, world!' }); + // yield* responseMailbox.offer(JSON.stringify({ type: 'message', message: 'Hello, world!' })); - return yield* Mailbox.toStream(requests).pipe( + return yield* Mailbox.toStream(responseMailbox).pipe( Stream.map(JSON.stringify), Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), Stream.decodeText(), Stream.runForEach((message) => Effect.gen(function* () { - yield* Effect.log('RECEIVED: ' + message); - yield* requests.offer({ type: 'message', message: 'RECEIVED' }); + const json = decodeJson(message); + const request = yield* decodeRequestMessage(json); + yield* Effect.log('RECEIVED: ' + request); + switch (request.type) { + case 'list-spaces': { + const spaces = yield* spacesService.listByAppIdentity(address); + const outgoingMessage: Messages.ResponseListSpaces = { type: 'list-spaces', spaces: spaces }; + // TODO: fix Messages.serialize + yield* responseMailbox.offer(outgoingMessage); + break; + } + } + // yield* responseMailbox.offer(Messages.serialize({ type: 'message', message: 'RECEIVED' })); }), ), Effect.as(HttpServerResponse.empty()), ); - }).pipe(Effect.provide(AppIdentityService.layer)), + }) + .pipe(Effect.provide(AppIdentityService.layer)) + .pipe(Effect.provide(SpacesService.layer)), ); // Merge router layers together and add the cors middleware layer. diff --git a/apps/server-new/src/services/spaces.ts b/apps/server-new/src/services/spaces.ts index b0a8c121..b73c7a76 100644 --- a/apps/server-new/src/services/spaces.ts +++ b/apps/server-new/src/services/spaces.ts @@ -22,6 +22,11 @@ export interface SpaceInfo { }>; } +export interface SpaceListEntry { + id: string; + name: string; +} + export interface CreateSpaceParams { accountAddress: string; event: SpaceEvents.CreateSpaceEvent; @@ -48,6 +53,9 @@ export class SpacesService extends Context.Tag('SpacesService')< readonly addAppIdentityToSpaces: ( params: AddAppIdentityToSpacesParams, ) => Effect.Effect; + readonly listByAppIdentity: ( + appIdentityAddress: string, + ) => Effect.Effect; } >() {} @@ -107,6 +115,37 @@ export const layer = Effect.gen(function* () { })); }); + const listByAppIdentity = Effect.fn('listByAppIdentity')(function* (appIdentityAddress: string) { + return yield* use((client) => + client.space.findMany({ + where: { + appIdentities: { + some: { + address: appIdentityAddress, + }, + }, + }, + include: { + appIdentities: { + select: { + address: true, + appId: true, + }, + }, + keys: { + include: { + keyBoxes: { + where: { + appIdentityAddress, + }, + }, + }, + }, + }, + }), + ); + }); + const createSpace = Effect.fn('createSpace')(function* (params: CreateSpaceParams) { const { accountAddress, event, keyBox, infoContent, infoSignatureHex, infoSignatureRecovery, name } = params; @@ -229,6 +268,7 @@ export const layer = Effect.gen(function* () { return { listByAccount, + listByAppIdentity, createSpace, addAppIdentityToSpaces, } as const; From a5a4d90a7d1f0002c2264e375a7f56d8f9eee2bb Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Tue, 2 Sep 2025 21:25:34 +0200 Subject: [PATCH 06/22] list invitations --- apps/server-new/src/server.ts | 15 ++++- apps/server-new/src/services/invitations.ts | 62 +++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 apps/server-new/src/services/invitations.ts diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index c1f32c2b..738e16c0 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -16,6 +16,7 @@ import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; import * as AppIdentityService from './services/app-identity.ts'; +import * as InvitationsService from './services/invitations.ts'; import * as SpacesService from './services/spaces.ts'; // Create scalar openapi browser layer at /docs. @@ -38,6 +39,7 @@ const WebSocketLayer = HttpLayerRouter.add( Effect.gen(function* () { const request = yield* HttpServerRequest.HttpServerRequest; const spacesService = yield* SpacesService.SpacesService; + const invitationsService = yield* InvitationsService.InvitationsService; const responseMailbox = yield* Mailbox.make(); const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); @@ -69,6 +71,16 @@ const WebSocketLayer = HttpLayerRouter.add( yield* responseMailbox.offer(outgoingMessage); break; } + case 'list-invitations': { + const invitations = yield* invitationsService.listByAppIdentity(accountAddress); + const outgoingMessage: Messages.ResponseListInvitations = { + type: 'list-invitations', + invitations, + }; + // TODO: fix Messages.serialize + yield* responseMailbox.offer(outgoingMessage); + break; + } } // yield* responseMailbox.offer(Messages.serialize({ type: 'message', message: 'RECEIVED' })); }), @@ -77,7 +89,8 @@ const WebSocketLayer = HttpLayerRouter.add( ); }) .pipe(Effect.provide(AppIdentityService.layer)) - .pipe(Effect.provide(SpacesService.layer)), + .pipe(Effect.provide(SpacesService.layer)) + .pipe(Effect.provide(InvitationsService.layer)), ); // Merge router layers together and add the cors middleware layer. diff --git a/apps/server-new/src/services/invitations.ts b/apps/server-new/src/services/invitations.ts new file mode 100644 index 00000000..b2b3af10 --- /dev/null +++ b/apps/server-new/src/services/invitations.ts @@ -0,0 +1,62 @@ +import { type Messages, SpaceEvents } from '@graphprotocol/hypergraph'; +import { Context, Effect, Layer } from 'effect'; +import * as Schema from 'effect/Schema'; +import * as DatabaseService from './database.js'; +import * as IdentityService from './identity.js'; + +export class InvitationsService extends Context.Tag('InvitationsService')< + InvitationsService, + { + readonly listByAppIdentity: ( + appIdentityAddress: string, + ) => Effect.Effect; + } +>() {} + +const decodeSpaceState = Schema.decodeUnknownEither(SpaceEvents.SpaceState); + +export const layer = Effect.gen(function* () { + const { use } = yield* DatabaseService.DatabaseService; + + const listByAppIdentity = Effect.fn('listByAccount')(function* (accountAddress: string) { + const invitations = yield* use((client) => + client.invitation.findMany({ + where: { + inviteeAccountAddress: accountAddress, + }, + include: { + space: { + include: { + events: { + orderBy: { + counter: 'desc', + }, + take: 1, + }, + }, + }, + }, + }), + ); + + return invitations + .map((invitation) => { + const result = decodeSpaceState(JSON.parse(invitation.space.events[0].state)); + if (result._tag === 'Right') { + const state = result.right; + return { + id: invitation.id, + previousEventHash: state.lastEventHash, + spaceId: invitation.spaceId, + }; + } + console.error('Invalid space state from the DB', result.left); + return null; + }) + .filter((invitation) => invitation !== null); + }); + + return { + listByAppIdentity, + } as const; +}).pipe(Layer.effect(InvitationsService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); From d947a2928e823aa66bcd125fa40eb4039c2be031 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Sun, 7 Sep 2025 07:14:00 +0200 Subject: [PATCH 07/22] fix name --- apps/server-new/src/services/invitations.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/server-new/src/services/invitations.ts b/apps/server-new/src/services/invitations.ts index b2b3af10..67c011d8 100644 --- a/apps/server-new/src/services/invitations.ts +++ b/apps/server-new/src/services/invitations.ts @@ -18,7 +18,7 @@ const decodeSpaceState = Schema.decodeUnknownEither(SpaceEvents.SpaceState); export const layer = Effect.gen(function* () { const { use } = yield* DatabaseService.DatabaseService; - const listByAppIdentity = Effect.fn('listByAccount')(function* (accountAddress: string) { + const listByAppIdentity = Effect.fn('listByAppIdentity')(function* (accountAddress: string) { const invitations = yield* use((client) => client.invitation.findMany({ where: { From 8db28de0e82d4c6e8dcecff74c6acea396baa742 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 16:01:49 +0200 Subject: [PATCH 08/22] manage updates --- apps/server-new/src/server.ts | 63 ++++++-- apps/server-new/src/services/spaces.ts | 144 +++++++++++++++++- apps/server-new/src/services/updates.ts | 73 +++++++++ packages/hypergraph/src/messages/serialize.ts | 14 ++ 4 files changed, 283 insertions(+), 11 deletions(-) create mode 100644 apps/server-new/src/services/updates.ts diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index 738e16c0..1a427725 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -16,8 +16,10 @@ import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; import * as AppIdentityService from './services/app-identity.ts'; +import * as IdentityService from './services/identity.ts'; import * as InvitationsService from './services/invitations.ts'; import * as SpacesService from './services/spaces.ts'; +import * as UpdatesService from './services/updates.ts'; // Create scalar openapi browser layer at /docs. const DocsLayer = HttpApiScalar.layerHttpLayerRouter({ @@ -30,7 +32,6 @@ const ApiLayer = HttpLayerRouter.addHttpApi(hypergraphApi, { openapiPath: '/docs/openapi.json', }).pipe(Layer.provide(HandlersLive)); -const decodeJson = Schema.decodeUnknownSync(Schema.parseJson()); const decodeRequestMessage = Schema.decodeUnknownEither(Messages.RequestMessage); const WebSocketLayer = HttpLayerRouter.add( @@ -40,6 +41,7 @@ const WebSocketLayer = HttpLayerRouter.add( const request = yield* HttpServerRequest.HttpServerRequest; const spacesService = yield* SpacesService.SpacesService; const invitationsService = yield* InvitationsService.InvitationsService; + const updatesService = yield* UpdatesService.UpdatesService; const responseMailbox = yield* Mailbox.make(); const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); @@ -50,25 +52,23 @@ const WebSocketLayer = HttpLayerRouter.add( } const appIdentityService = yield* AppIdentityService.AppIdentityService; + const identityService = yield* IdentityService.IdentityService; const { accountAddress, address } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); - // yield* responseMailbox.offer(JSON.stringify({ type: 'message', message: 'Hello, world!' })); - return yield* Mailbox.toStream(responseMailbox).pipe( Stream.map(JSON.stringify), Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), Stream.decodeText(), Stream.runForEach((message) => Effect.gen(function* () { - const json = decodeJson(message); + const json = Messages.deserialize(message); const request = yield* decodeRequestMessage(json); - yield* Effect.log('RECEIVED: ' + request); switch (request.type) { case 'list-spaces': { const spaces = yield* spacesService.listByAppIdentity(address); const outgoingMessage: Messages.ResponseListSpaces = { type: 'list-spaces', spaces: spaces }; // TODO: fix Messages.serialize - yield* responseMailbox.offer(outgoingMessage); + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); break; } case 'list-invitations': { @@ -77,12 +77,53 @@ const WebSocketLayer = HttpLayerRouter.add( type: 'list-invitations', invitations, }; - // TODO: fix Messages.serialize - yield* responseMailbox.offer(outgoingMessage); + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'subscribe-space': { + const space = yield* spacesService.getSpace({ + spaceId: request.id, + accountAddress, + appIdentityAddress: address, + }); + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'create-update': { + const signer = Messages.recoverUpdateMessageSigner(request); + const identity = yield* identityService.getAppOrConnectIdentity({ + accountAddress: request.accountAddress, + signaturePublicKey: signer, + }); + if (identity.accountAddress !== accountAddress) { + // TODO: improve error handling + return yield* Effect.die(new Error('Invalid signature')); + } + + const update = yield* updatesService.createUpdate({ + accountAddress: request.accountAddress, + update: request.update, + spaceId: request.spaceId, + signatureHex: request.signature.hex, + signatureRecovery: request.signature.recovery, + updateId: request.updateId, + }); + const outgoingMessage: Messages.ResponseUpdateConfirmed = { + type: 'update-confirmed', + updateId: request.updateId, + clock: update.clock, + spaceId: request.spaceId, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // TODO: broadcast updates break; } } - // yield* responseMailbox.offer(Messages.serialize({ type: 'message', message: 'RECEIVED' })); }), ), Effect.as(HttpServerResponse.empty()), @@ -90,7 +131,9 @@ const WebSocketLayer = HttpLayerRouter.add( }) .pipe(Effect.provide(AppIdentityService.layer)) .pipe(Effect.provide(SpacesService.layer)) - .pipe(Effect.provide(InvitationsService.layer)), + .pipe(Effect.provide(InvitationsService.layer)) + .pipe(Effect.provide(IdentityService.layer)) + .pipe(Effect.provide(UpdatesService.layer)), ); // Merge router layers together and add the cors middleware layer. diff --git a/apps/server-new/src/services/spaces.ts b/apps/server-new/src/services/spaces.ts index b73c7a76..821625bc 100644 --- a/apps/server-new/src/services/spaces.ts +++ b/apps/server-new/src/services/spaces.ts @@ -1,5 +1,7 @@ -import { Identity, type Messages, SpaceEvents, Utils } from '@graphprotocol/hypergraph'; +import { Identity, type Inboxes, type Messages, SpaceEvents, Utils } from '@graphprotocol/hypergraph'; import { Context, Effect, Layer } from 'effect'; +import * as Predicate from 'effect/Predicate'; +import { ResourceNotFoundError } from '../http/errors.js'; import * as DatabaseService from './database.js'; import * as IdentityService from './identity.js'; @@ -22,6 +24,41 @@ export interface SpaceInfo { }>; } +export interface GetSpaceResult { + id: string; + name: string; + events: SpaceEvents.SpaceEvent[]; + keyBoxes: Array<{ + id: string; + ciphertext: string; + nonce: string; + authorPublicKey: string; + accountAddress: string; + }>; + inboxes: Array<{ + inboxId: string; + isPublic: boolean; + authPolicy: Inboxes.InboxSenderAuthPolicy; + encryptionPublicKey: string; + secretKey: string; + }>; + updates: + | { + updates: Array<{ + accountAddress: string; + update: Uint8Array; + signature: { + hex: string; + recovery: number; + }; + updateId: string; + }>; + firstUpdateClock: number; + lastUpdateClock: number; + } + | undefined; +} + export interface SpaceListEntry { id: string; name: string; @@ -37,6 +74,12 @@ export interface CreateSpaceParams { name: string; } +export interface GetSpaceParams { + spaceId: string; + accountAddress: string; + appIdentityAddress: string; +} + export interface AddAppIdentityToSpacesParams { appIdentityAddress: string; accountAddress: string; @@ -56,6 +99,9 @@ export class SpacesService extends Context.Tag('SpacesService')< readonly listByAppIdentity: ( appIdentityAddress: string, ) => Effect.Effect; + readonly getSpace: ( + params: GetSpaceParams, + ) => Effect.Effect; } >() {} @@ -146,6 +192,101 @@ export const layer = Effect.gen(function* () { ); }); + const getSpace = Effect.fn('getSpace')(function* (params: GetSpaceParams) { + const { spaceId, accountAddress, appIdentityAddress } = params; + + const space = yield* use((client) => + client.space.findUnique({ + where: { + id: spaceId, + members: { + some: { + address: accountAddress, + }, + }, + }, + include: { + events: { + orderBy: { + counter: 'asc', + }, + }, + keys: { + include: { + keyBoxes: { + where: { + accountAddress, + appIdentityAddress, + }, + select: { + nonce: true, + ciphertext: true, + authorPublicKey: true, + }, + }, + }, + }, + updates: { + orderBy: { + clock: 'asc', + }, + }, + inboxes: { + select: { + id: true, + isPublic: true, + authPolicy: true, + encryptionPublicKey: true, + encryptedSecretKey: true, + }, + }, + }, + }), + ).pipe( + Effect.filterOrFail(Predicate.isNotNull, () => new ResourceNotFoundError({ resource: 'Space', id: spaceId })), + ); + + const keyBoxes = space.keys.flatMap((key) => { + return { + id: key.id, + nonce: key.keyBoxes[0].nonce, + ciphertext: key.keyBoxes[0].ciphertext, + accountAddress, + authorPublicKey: key.keyBoxes[0].authorPublicKey, + }; + }); + + return { + id: space.id, + name: space.name, + events: space.events.map((wrapper) => JSON.parse(wrapper.event)), + keyBoxes, + inboxes: space.inboxes.map((inbox) => ({ + inboxId: inbox.id, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy as Inboxes.InboxSenderAuthPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + secretKey: inbox.encryptedSecretKey, + })), + updates: + space.updates.length > 0 + ? { + updates: space.updates.map((update) => ({ + accountAddress: update.accountAddress, + update: new Uint8Array(update.content), + signature: { + hex: update.signatureHex, + recovery: update.signatureRecovery, + }, + updateId: update.updateId, + })), + firstUpdateClock: space.updates[0].clock, + lastUpdateClock: space.updates[space.updates.length - 1].clock, + } + : undefined, + }; + }); + const createSpace = Effect.fn('createSpace')(function* (params: CreateSpaceParams) { const { accountAddress, event, keyBox, infoContent, infoSignatureHex, infoSignatureRecovery, name } = params; @@ -269,6 +410,7 @@ export const layer = Effect.gen(function* () { return { listByAccount, listByAppIdentity, + getSpace, createSpace, addAppIdentityToSpaces, } as const; diff --git a/apps/server-new/src/services/updates.ts b/apps/server-new/src/services/updates.ts new file mode 100644 index 00000000..426bd8e9 --- /dev/null +++ b/apps/server-new/src/services/updates.ts @@ -0,0 +1,73 @@ +import { Context, Effect, Layer } from 'effect'; +import * as DatabaseService from './database.js'; + +type CreateUpdateParams = { + accountAddress: string; + update: Uint8Array; + spaceId: string; + signatureHex: string; + signatureRecovery: number; + updateId: string; +}; + +type CreateUpdateResult = { + clock: number; + content: Uint8Array; + signatureHex: string; + signatureRecovery: number; + updateId: string; + accountAddress: string; + spaceId: string; +}; + +export class UpdatesService extends Context.Tag('UpdatesService')< + UpdatesService, + { + readonly createUpdate: ( + params: CreateUpdateParams, + ) => Effect.Effect; + } +>() {} + +export const layer = Effect.gen(function* () { + const { use } = yield* DatabaseService.DatabaseService; + + const createUpdate = Effect.fn('createUpdate')(function* ({ + accountAddress, + update, + spaceId, + signatureHex, + signatureRecovery, + updateId, + }: CreateUpdateParams) { + // TODO: implement retries + const result = yield* use((client) => { + return client.$transaction(async (prisma) => { + const lastUpdate = await prisma.update.findFirst({ + where: { spaceId }, + orderBy: { clock: 'desc' }, + }); + + const clock = lastUpdate ? lastUpdate.clock + 1 : 0; + + return await prisma.update.create({ + data: { + space: { connect: { id: spaceId } }, + clock, + content: Buffer.from(update), + signatureHex, + signatureRecovery, + updateId, + account: { connect: { address: accountAddress } }, + }, + }); + }); + }); + + return result; + }); + + return { + createUpdate, + } as const; +}).pipe(Layer.effect(UpdatesService), Layer.provide(DatabaseService.layer)); diff --git a/packages/hypergraph/src/messages/serialize.ts b/packages/hypergraph/src/messages/serialize.ts index 1950fead..8ad3a863 100644 --- a/packages/hypergraph/src/messages/serialize.ts +++ b/packages/hypergraph/src/messages/serialize.ts @@ -11,6 +11,20 @@ export function serialize(obj: any): string { }); } +export function serializeV2(obj: any): any { + return JSON.parse( + JSON.stringify(obj, (_key, value) => { + if (value instanceof Uint8Array) { + return { __type: 'Uint8Array', data: Array.from(value) }; + } + if (value instanceof Date) { + return { __type: 'Date', data: value.toISOString() }; + } + return value; + }), + ); +} + export function deserialize(json: string): unknown { return JSON.parse(json, (_key, value) => { if (value && value.__type === 'Uint8Array') { From 1cf4d3a88fba5f363ddf57a8cc84c26fd55b836c Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 16:20:27 +0200 Subject: [PATCH 09/22] improve logging --- apps/server-new/src/server.ts | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index 1a427725..ddf09106 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -4,6 +4,7 @@ import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; +import * as Socket from '@effect/platform/Socket'; import { Messages } from '@graphprotocol/hypergraph'; import { isArray } from 'effect/Array'; import * as Effect from 'effect/Effect'; @@ -126,6 +127,26 @@ const WebSocketLayer = HttpLayerRouter.add( } }), ), + Effect.catchAll((error) => + Effect.gen(function* () { + // Only log error if it's not a SocketCloseError + if (!Socket.SocketCloseError.is(error)) { + yield* Effect.logInfo('WebSocket disconnected due to error', { + error: error.message || String(error), + accountAddress, + appIdentityAddress: address, + }); + } + }), + ), + Effect.ensuring( + Effect.gen(function* () { + yield* Effect.logInfo('WebSocket connection closed', { + accountAddress, + appIdentityAddress: address, + }); + }), + ), Effect.as(HttpServerResponse.empty()), ); }) From 098e00efd953c472d48181db6030455cf721f5df Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 16:59:20 +0200 Subject: [PATCH 10/22] add connections manager --- apps/server-new/src/index.ts | 4 +- apps/server-new/src/server.ts | 51 +++++- apps/server-new/src/services/connections.ts | 169 ++++++++++++++++++++ 3 files changed, 219 insertions(+), 5 deletions(-) create mode 100644 apps/server-new/src/services/connections.ts diff --git a/apps/server-new/src/index.ts b/apps/server-new/src/index.ts index 1a7bdb7b..c021742f 100644 --- a/apps/server-new/src/index.ts +++ b/apps/server-new/src/index.ts @@ -25,7 +25,9 @@ const Observability = Layer.unwrapEffect( ); const layer = server.pipe( - Layer.provide(Logger.structured), + // TODO: add structured logger back in + // Layer.provide(Logger.structured), + Layer.provide(Logger.pretty), Layer.provide(Observability), Layer.provide(PlatformConfigProvider.layerDotEnvAdd('.env')), Layer.provide(NodeContext.layer), diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index ddf09106..fd243011 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -1,10 +1,11 @@ -import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; +import { createServer } from 'node:http'; import * as HttpApiScalar from '@effect/platform/HttpApiScalar'; import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; import * as Socket from '@effect/platform/Socket'; +import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import { Messages } from '@graphprotocol/hypergraph'; import { isArray } from 'effect/Array'; import * as Effect from 'effect/Effect'; @@ -12,11 +13,11 @@ import * as Layer from 'effect/Layer'; import * as Mailbox from 'effect/Mailbox'; import * as Schema from 'effect/Schema'; import * as Stream from 'effect/Stream'; -import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; import * as AppIdentityService from './services/app-identity.ts'; +import * as ConnectionsService from './services/connections.ts'; import * as IdentityService from './services/identity.ts'; import * as InvitationsService from './services/invitations.ts'; import * as SpacesService from './services/spaces.ts'; @@ -43,6 +44,7 @@ const WebSocketLayer = HttpLayerRouter.add( const spacesService = yield* SpacesService.SpacesService; const invitationsService = yield* InvitationsService.InvitationsService; const updatesService = yield* UpdatesService.UpdatesService; + const connectionsService = yield* ConnectionsService.ConnectionsService; const responseMailbox = yield* Mailbox.make(); const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); @@ -56,6 +58,13 @@ const WebSocketLayer = HttpLayerRouter.add( const identityService = yield* IdentityService.IdentityService; const { accountAddress, address } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); + // Register this connection + const connectionId = yield* connectionsService.registerConnection({ + accountAddress, + appIdentityAddress: address, + mailbox: responseMailbox, + }); + return yield* Mailbox.toStream(responseMailbox).pipe( Stream.map(JSON.stringify), Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), @@ -87,6 +96,10 @@ const WebSocketLayer = HttpLayerRouter.add( accountAddress, appIdentityAddress: address, }); + + // Track this subscription + yield* connectionsService.subscribeToSpace(connectionId, request.id); + const outgoingMessage: Messages.ResponseSpace = { type: 'space', ...space, @@ -121,7 +134,32 @@ const WebSocketLayer = HttpLayerRouter.add( }; yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - // TODO: broadcast updates + // Broadcast the update to all subscribed clients + const updates: Messages.Updates = { + updates: [ + { + update: update.content, + accountAddress: update.accountAddress, + signature: { hex: update.signatureHex, recovery: update.signatureRecovery }, + updateId: update.updateId, + }, + ], + firstUpdateClock: update.clock, + lastUpdateClock: update.clock, + }; + + const broadcastMessage: Messages.ResponseUpdatesNotification = { + type: 'updates-notification', + updates, + spaceId: request.spaceId, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: broadcastMessage, + excludeConnectionId: connectionId, + }); + break; } } @@ -141,6 +179,8 @@ const WebSocketLayer = HttpLayerRouter.add( ), Effect.ensuring( Effect.gen(function* () { + // Clean up the connection when it closes + yield* connectionsService.removeConnection(connectionId); yield* Effect.logInfo('WebSocket connection closed', { accountAddress, appIdentityAddress: address, @@ -166,4 +206,7 @@ const HttpServerLayer = serverPortConfig.pipe( Layer.unwrapEffect, ); -export const server = HttpLayerRouter.serve(AppLayer).pipe(Layer.provide(HttpServerLayer)); +export const server = HttpLayerRouter.serve(AppLayer).pipe( + Layer.provide(HttpServerLayer), + Layer.provide(ConnectionsService.layer), +); diff --git a/apps/server-new/src/services/connections.ts b/apps/server-new/src/services/connections.ts new file mode 100644 index 00000000..ed5b2b86 --- /dev/null +++ b/apps/server-new/src/services/connections.ts @@ -0,0 +1,169 @@ +import { Messages } from '@graphprotocol/hypergraph'; +import { Context, Effect, Layer, Ref } from 'effect'; +import type * as Mailbox from 'effect/Mailbox'; + +type Connection = { + accountAddress: string; + appIdentityAddress: string; + mailbox: Mailbox.Mailbox; + subscribedSpaces: Set; +}; + +export class ConnectionsService extends Context.Tag('ConnectionsService')< + ConnectionsService, + { + readonly registerConnection: (params: { + accountAddress: string; + appIdentityAddress: string; + mailbox: Mailbox.Mailbox; + }) => Effect.Effect; + readonly removeConnection: (connectionId: string) => Effect.Effect; + readonly subscribeToSpace: (connectionId: string, spaceId: string) => Effect.Effect; + readonly unsubscribeFromSpace: (connectionId: string, spaceId: string) => Effect.Effect; + readonly broadcastToSpace: (params: { + spaceId: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) => Effect.Effect; + readonly getConnection: (connectionId: string) => Effect.Effect; + } +>() {} + +export const layer = Effect.gen(function* () { + // Store connections by a unique connection ID + const connections = yield* Ref.make(new Map()); + const connectionCounter = yield* Ref.make(0); + + const registerConnection = Effect.fn('registerConnection')(function* ({ + accountAddress, + appIdentityAddress, + mailbox, + }: { + accountAddress: string; + appIdentityAddress: string; + mailbox: Mailbox.Mailbox; + }) { + const nextId = yield* Ref.updateAndGet(connectionCounter, (n) => n + 1); + yield* Effect.logInfo('Next ID', { + nextId, + }); + const connectionId = `conn-${nextId}`; + const connection: Connection = { + accountAddress, + appIdentityAddress, + mailbox, + subscribedSpaces: new Set(), + }; + + yield* Ref.update(connections, (map) => new Map(map).set(connectionId, connection)); + + yield* Effect.logInfo('Registered new connection', { + connectionId, + accountAddress, + appIdentityAddress, + }); + + return connectionId; + }); + + const removeConnection = Effect.fn('removeConnection')(function* (connectionId: string) { + const currentConnections = yield* Ref.get(connections); + const connection = currentConnections.get(connectionId); + + if (connection) { + yield* Effect.logInfo('Removing connection', { + connectionId, + accountAddress: connection.accountAddress, + subscribedSpaces: Array.from(connection.subscribedSpaces), + }); + + yield* Ref.update(connections, (map) => { + const newMap = new Map(map); + newMap.delete(connectionId); + return newMap; + }); + } + }); + + const subscribeToSpace = Effect.fn('subscribeToSpace')(function* (connectionId: string, spaceId: string) { + const currentConnections = yield* Ref.get(connections); + const connection = currentConnections.get(connectionId); + + if (connection) { + yield* Ref.update(connections, (map) => { + const newMap = new Map(map); + const conn = newMap.get(connectionId); + if (conn) { + conn.subscribedSpaces.add(spaceId); + } + return newMap; + }); + + yield* Effect.logInfo('Subscribed connection to space', { + connectionId, + spaceId, + accountAddress: connection.accountAddress, + }); + } + }); + + const unsubscribeFromSpace = Effect.fn('unsubscribeFromSpace')(function* (connectionId: string, spaceId: string) { + const currentConnections = yield* Ref.get(connections); + const connection = currentConnections.get(connectionId); + + if (connection) { + yield* Ref.update(connections, (map) => { + const newMap = new Map(map); + const conn = newMap.get(connectionId); + if (conn) { + conn.subscribedSpaces.delete(spaceId); + } + return newMap; + }); + + yield* Effect.logInfo('Unsubscribed connection from space', { + connectionId, + spaceId, + accountAddress: connection.accountAddress, + }); + } + }); + + const broadcastToSpace = Effect.fn('broadcastToSpace')(function* ({ + spaceId, + message, + excludeConnectionId, + }: { + spaceId: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) { + const currentConnections = yield* Ref.get(connections); + + for (const [connectionId, connection] of currentConnections) { + // Skip if this is the excluded connection (sender) + if (excludeConnectionId && connectionId === excludeConnectionId) { + continue; + } + + // Only send to connections subscribed to this space + if (connection.subscribedSpaces.has(spaceId)) { + yield* connection.mailbox.offer(Messages.serializeV2(message)); + } + } + }); + + const getConnection = Effect.fn('getConnection')(function* (connectionId: string) { + const currentConnections = yield* Ref.get(connections); + return currentConnections.get(connectionId); + }); + + return { + registerConnection, + removeConnection, + subscribeToSpace, + unsubscribeFromSpace, + broadcastToSpace, + getConnection, + } as const; +}).pipe(Layer.effect(ConnectionsService)); From 540267a921aeebeebbdb319c0d3f95db329d60e3 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 17:31:43 +0200 Subject: [PATCH 11/22] add create-space --- apps/server-new/src/server.ts | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index fd243011..720bff18 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -1,11 +1,10 @@ -import { createServer } from 'node:http'; +import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import * as HttpApiScalar from '@effect/platform/HttpApiScalar'; import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; import * as Socket from '@effect/platform/Socket'; -import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import { Messages } from '@graphprotocol/hypergraph'; import { isArray } from 'effect/Array'; import * as Effect from 'effect/Effect'; @@ -13,6 +12,7 @@ import * as Layer from 'effect/Layer'; import * as Mailbox from 'effect/Mailbox'; import * as Schema from 'effect/Schema'; import * as Stream from 'effect/Stream'; +import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; @@ -160,6 +160,33 @@ const WebSocketLayer = HttpLayerRouter.add( excludeConnectionId: connectionId, }); + break; + } + case 'create-space-event': { + // Create the new space + const spaceResult = yield* spacesService.createSpace({ + accountAddress, + event: request.event, + keyBox: request.keyBox, + infoContent: new Uint8Array(), // TODO: Get from request when available + infoSignatureHex: '', + infoSignatureRecovery: 0, + name: request.name, + }); + + // Get the full space data to send back + const space = yield* spacesService.getSpace({ + spaceId: spaceResult.id, + accountAddress, + appIdentityAddress: address, + }); + + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; } } From 6ef1e9dbf5d973550db1b781134164550c744b9a Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 17:59:35 +0200 Subject: [PATCH 12/22] add missing events --- apps/server-new/src/server.ts | 203 +++++++++++++++++- apps/server-new/src/services/account-inbox.ts | 132 ++++++++++++ apps/server-new/src/services/space-inbox.ts | 43 ++++ apps/server-new/src/services/spaces.ts | 128 ++++++++++- 4 files changed, 502 insertions(+), 4 deletions(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index 720bff18..91dfbada 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -1,10 +1,11 @@ -import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; +import { createServer } from 'node:http'; import * as HttpApiScalar from '@effect/platform/HttpApiScalar'; import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; import * as Socket from '@effect/platform/Socket'; +import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import { Messages } from '@graphprotocol/hypergraph'; import { isArray } from 'effect/Array'; import * as Effect from 'effect/Effect'; @@ -12,14 +13,15 @@ import * as Layer from 'effect/Layer'; import * as Mailbox from 'effect/Mailbox'; import * as Schema from 'effect/Schema'; import * as Stream from 'effect/Stream'; -import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; +import * as AccountInboxService from './services/account-inbox.ts'; import * as AppIdentityService from './services/app-identity.ts'; import * as ConnectionsService from './services/connections.ts'; import * as IdentityService from './services/identity.ts'; import * as InvitationsService from './services/invitations.ts'; +import * as SpaceInboxService from './services/space-inbox.ts'; import * as SpacesService from './services/spaces.ts'; import * as UpdatesService from './services/updates.ts'; @@ -45,6 +47,8 @@ const WebSocketLayer = HttpLayerRouter.add( const invitationsService = yield* InvitationsService.InvitationsService; const updatesService = yield* UpdatesService.UpdatesService; const connectionsService = yield* ConnectionsService.ConnectionsService; + const accountInboxService = yield* AccountInboxService.AccountInboxService; + const spaceInboxService = yield* SpaceInboxService.SpaceInboxService; const responseMailbox = yield* Mailbox.make(); const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); @@ -187,6 +191,197 @@ const WebSocketLayer = HttpLayerRouter.add( }; yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'create-invitation-event': { + // Apply the invitation event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [...request.keyBoxes], // Convert readonly array to mutable + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + // Note: Invitee notification would require adding a method to ConnectionsService + // to find connections by account address and broadcast to them + + break; + } + case 'accept-invitation-event': { + // Apply the invitation acceptance event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [], // No keyBoxes needed for accepting invitations + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-space-inbox-event': { + // Apply the space inbox creation event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [], // No keyBoxes needed for creating space inboxes + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-account-inbox': { + // Validate that the account matches the authenticated user + if (request.accountAddress !== accountAddress) { + // TODO: Better error handling + return yield* Effect.fail(new Error('Invalid accountAddress')); + } + + // Create the account inbox + yield* accountInboxService.createAccountInbox(request); + + // TODO: Broadcast the inbox to other clients from the same account + // This would require adding a method to ConnectionsService to broadcast by account + + break; + } + case 'get-latest-space-inbox-messages': { + // Check that the user has access to this space + yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Get the latest messages from the space inbox + const messages = yield* spaceInboxService.getLatestSpaceInboxMessages({ + inboxId: request.inboxId, + since: request.since, + }); + + const outgoingMessage: Messages.ResponseSpaceInboxMessages = { + type: 'space-inbox-messages', + spaceId: request.spaceId, + inboxId: request.inboxId, + messages, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'get-latest-account-inbox-messages': { + // Check that the user has access to this inbox + yield* accountInboxService.getAccountInbox({ + accountAddress, + inboxId: request.inboxId, + }); + + // Get the latest messages from the account inbox + const messages = yield* accountInboxService.getLatestAccountInboxMessages({ + inboxId: request.inboxId, + since: request.since, + }); + + const outgoingMessage: Messages.ResponseAccountInboxMessages = { + type: 'account-inbox-messages', + accountAddress, + inboxId: request.inboxId, + messages, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'get-account-inboxes': { + // List all inboxes for the authenticated account + const inboxes = yield* accountInboxService.listAccountInboxes({ accountAddress }); + + const outgoingMessage: Messages.ResponseAccountInboxes = { + type: 'account-inboxes', + inboxes, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; } } @@ -221,7 +416,9 @@ const WebSocketLayer = HttpLayerRouter.add( .pipe(Effect.provide(SpacesService.layer)) .pipe(Effect.provide(InvitationsService.layer)) .pipe(Effect.provide(IdentityService.layer)) - .pipe(Effect.provide(UpdatesService.layer)), + .pipe(Effect.provide(UpdatesService.layer)) + .pipe(Effect.provide(AccountInboxService.layer)) + .pipe(Effect.provide(SpaceInboxService.layer)), ); // Merge router layers together and add the cors middleware layer. diff --git a/apps/server-new/src/services/account-inbox.ts b/apps/server-new/src/services/account-inbox.ts index 0ba6c40f..79d23a69 100644 --- a/apps/server-new/src/services/account-inbox.ts +++ b/apps/server-new/src/services/account-inbox.ts @@ -37,6 +37,16 @@ export class AccountInboxService extends Context.Tag('AccountInboxService')< Messages.InboxMessage, ResourceNotFoundError | ValidationError | AuthorizationError | DatabaseService.DatabaseError >; + readonly createAccountInbox: ( + data: Messages.RequestCreateAccountInbox, + ) => Effect.Effect; + readonly getLatestAccountInboxMessages: (params: { + inboxId: string; + since: Date; + }) => Effect.Effect; + readonly listAccountInboxes: (params: { + accountAddress: string; + }) => Effect.Effect; } >() {} @@ -258,9 +268,131 @@ export const layer = Effect.gen(function* () { return createdMessage; }); + const createAccountInbox = Effect.fn('createAccountInbox')(function* (data: Messages.RequestCreateAccountInbox) { + const { accountAddress, inboxId, isPublic, authPolicy, encryptionPublicKey, signature } = data; + + // Verify the signature is valid for the corresponding accountAddress + const signer = Inboxes.recoverAccountInboxCreatorKey(data); + const signerAccount = yield* getAppOrConnectIdentity({ + accountAddress: data.accountAddress, + signaturePublicKey: signer, + }).pipe(Effect.mapError(() => new AuthorizationError({ message: 'Invalid signature' }))); + + if (signerAccount.accountAddress !== accountAddress) { + return yield* Effect.fail(new AuthorizationError({ message: 'Invalid signature' })); + } + + // Create the inbox (will throw an error if it already exists) + const inbox = yield* use((client) => + client.accountInbox.create({ + data: { + id: inboxId, + isPublic, + authPolicy, + encryptionPublicKey, + signatureHex: signature.hex, + signatureRecovery: signature.recovery, + account: { connect: { address: accountAddress } }, + }, + }), + ); + + return { + inboxId: inbox.id, + accountAddress, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy as Inboxes.InboxSenderAuthPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + signature: { + hex: inbox.signatureHex, + recovery: inbox.signatureRecovery, + }, + }; + }); + + const getLatestAccountInboxMessages = Effect.fn('getLatestAccountInboxMessages')(function* ({ + inboxId, + since, + }: { + inboxId: string; + since: Date; + }) { + const messages = yield* use((client) => + client.accountInboxMessage.findMany({ + where: { + accountInboxId: inboxId, + createdAt: { + gte: since, + }, + }, + orderBy: { + createdAt: 'asc', + }, + }), + ); + + return messages.map( + (msg): Messages.InboxMessage => ({ + id: msg.id, + ciphertext: msg.ciphertext, + signature: + msg.signatureHex != null && msg.signatureRecovery != null + ? { + hex: msg.signatureHex, + recovery: msg.signatureRecovery, + } + : undefined, + authorAccountAddress: msg.authorAccountAddress ?? undefined, + createdAt: msg.createdAt, + }), + ); + }); + + const listAccountInboxes = Effect.fn('listAccountInboxes')(function* ({ + accountAddress, + }: { + accountAddress: string; + }) { + const inboxes = yield* use((client) => + client.accountInbox.findMany({ + where: { accountAddress }, + select: { + id: true, + isPublic: true, + authPolicy: true, + encryptionPublicKey: true, + account: { + select: { + address: true, + }, + }, + signatureHex: true, + signatureRecovery: true, + }, + }), + ); + + return inboxes.map( + (inbox): Messages.AccountInbox => ({ + inboxId: inbox.id, + accountAddress: inbox.account.address, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy as Inboxes.InboxSenderAuthPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + signature: { + hex: inbox.signatureHex, + recovery: inbox.signatureRecovery, + }, + }), + ); + }); + return { listPublicAccountInboxes, getAccountInbox, postAccountInboxMessage, + createAccountInbox, + getLatestAccountInboxMessages, + listAccountInboxes, } as const; }).pipe(Layer.effect(AccountInboxService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); diff --git a/apps/server-new/src/services/space-inbox.ts b/apps/server-new/src/services/space-inbox.ts index 1822b862..0a9ffe95 100644 --- a/apps/server-new/src/services/space-inbox.ts +++ b/apps/server-new/src/services/space-inbox.ts @@ -33,6 +33,10 @@ export class SpaceInboxService extends Context.Tag('SpaceInboxService')< Messages.InboxMessage, ResourceNotFoundError | ValidationError | AuthorizationError | DatabaseService.DatabaseError >; + readonly getLatestSpaceInboxMessages: (params: { + inboxId: string; + since: Date; + }) => Effect.Effect; } >() {} @@ -242,9 +246,48 @@ export const layer = Effect.gen(function* () { return createdMessage; }); + const getLatestSpaceInboxMessages = Effect.fn('getLatestSpaceInboxMessages')(function* ({ + inboxId, + since, + }: { + inboxId: string; + since: Date; + }) { + const messages = yield* use((client) => + client.spaceInboxMessage.findMany({ + where: { + spaceInboxId: inboxId, + createdAt: { + gte: since, + }, + }, + orderBy: { + createdAt: 'asc', + }, + }), + ); + + return messages.map( + (msg): Messages.InboxMessage => ({ + id: msg.id, + ciphertext: msg.ciphertext, + signature: + msg.signatureHex != null && msg.signatureRecovery != null + ? { + hex: msg.signatureHex, + recovery: msg.signatureRecovery, + } + : undefined, + authorAccountAddress: msg.authorAccountAddress ?? undefined, + createdAt: msg.createdAt, + }), + ); + }); + return { listPublicSpaceInboxes, getSpaceInbox, postSpaceInboxMessage, + getLatestSpaceInboxMessages, } as const; }).pipe(Layer.effect(SpaceInboxService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); diff --git a/apps/server-new/src/services/spaces.ts b/apps/server-new/src/services/spaces.ts index 821625bc..b5301a9b 100644 --- a/apps/server-new/src/services/spaces.ts +++ b/apps/server-new/src/services/spaces.ts @@ -1,5 +1,5 @@ import { Identity, type Inboxes, type Messages, SpaceEvents, Utils } from '@graphprotocol/hypergraph'; -import { Context, Effect, Layer } from 'effect'; +import { Context, Effect, Exit, Layer } from 'effect'; import * as Predicate from 'effect/Predicate'; import { ResourceNotFoundError } from '../http/errors.js'; import * as DatabaseService from './database.js'; @@ -86,6 +86,13 @@ export interface AddAppIdentityToSpacesParams { spacesInput: Messages.RequestConnectAddAppIdentityToSpaces['spacesInput']; } +export interface ApplySpaceEventParams { + accountAddress: string; + spaceId: string; + event: SpaceEvents.SpaceEvent; + keyBoxes: Messages.KeyBoxWithKeyId[]; +} + export class SpacesService extends Context.Tag('SpacesService')< SpacesService, { @@ -102,6 +109,9 @@ export class SpacesService extends Context.Tag('SpacesService')< readonly getSpace: ( params: GetSpaceParams, ) => Effect.Effect; + readonly applySpaceEvent: ( + params: ApplySpaceEventParams, + ) => Effect.Effect; } >() {} @@ -407,11 +417,127 @@ export const layer = Effect.gen(function* () { ); }); + const applySpaceEvent = Effect.fn('applySpaceEvent')(function* (params: ApplySpaceEventParams) { + const { accountAddress, spaceId, event, keyBoxes } = params; + + if (event.transaction.type === 'create-space') { + return yield* Effect.fail(new Error('applySpaceEvent does not support create-space events.')); + } + + yield* use((client) => + client.$transaction(async (transaction) => { + if (event.transaction.type === 'accept-invitation') { + // verify that the account is the invitee + await transaction.invitation.findFirstOrThrow({ + where: { inviteeAccountAddress: event.author.accountAddress }, + }); + } else { + // verify that the account is a member of the space + // TODO verify that the account is an admin of the space + await transaction.space.findUniqueOrThrow({ + where: { + id: spaceId, + members: { some: { address: accountAddress } }, + }, + }); + } + + const lastEvent = await transaction.spaceEvent.findFirstOrThrow({ + where: { spaceId }, + orderBy: { counter: 'desc' }, + }); + + // Create the getVerifiedIdentity function for event validation + const getVerifiedIdentity = (accountAddressToFetch: string, publicKey: string) => { + // applySpaceEvent is only allowed to be called by the account that is applying the event + if (accountAddressToFetch !== accountAddress) { + return Effect.fail(new Identity.InvalidIdentityError()); + } + + return getAppOrConnectIdentity({ + accountAddress: accountAddressToFetch, + signaturePublicKey: publicKey, + }).pipe(Effect.mapError(() => new Identity.InvalidIdentityError())); + }; + + const result = await Effect.runPromiseExit( + SpaceEvents.applyEvent({ + event, + state: JSON.parse(lastEvent.state), + getVerifiedIdentity, + }), + ); + + if (Exit.isFailure(result)) { + throw new Error('Invalid event'); + } + + if (event.transaction.type === 'create-invitation') { + const inviteeAccountAddress = event.transaction.inviteeAccountAddress; + await transaction.invitation.create({ + data: { + id: event.transaction.id, + spaceId, + accountAddress: event.author.accountAddress, + inviteeAccountAddress, + }, + }); + await transaction.spaceKeyBox.createMany({ + data: keyBoxes.map((keyBox) => ({ + id: `${keyBox.id}-${inviteeAccountAddress}`, + nonce: keyBox.nonce, + ciphertext: keyBox.ciphertext, + accountAddress: inviteeAccountAddress, + authorPublicKey: keyBox.authorPublicKey, + spaceKeyId: keyBox.id, + })), + }); + } + + if (event.transaction.type === 'accept-invitation') { + await transaction.invitation.delete({ + where: { spaceId_inviteeAccountAddress: { spaceId, inviteeAccountAddress: event.author.accountAddress } }, + }); + + await transaction.space.update({ + where: { id: spaceId }, + data: { members: { connect: { address: event.author.accountAddress } } }, + }); + } + + await transaction.spaceEvent.create({ + data: { + spaceId, + counter: lastEvent.counter + 1, + event: JSON.stringify(event), + id: event.transaction.id, + state: JSON.stringify(result.value), + }, + }); + + if (event.transaction.type === 'create-space-inbox') { + await transaction.spaceInbox.create({ + data: { + id: event.transaction.inboxId, + isPublic: event.transaction.isPublic, + authPolicy: event.transaction.authPolicy, + encryptionPublicKey: event.transaction.encryptionPublicKey, + encryptedSecretKey: event.transaction.secretKey, + space: { connect: { id: spaceId } }, + spaceEvent: { connect: { id: event.transaction.id } }, + }, + }); + } + }), + ); + }); + return { listByAccount, listByAppIdentity, getSpace, createSpace, addAppIdentityToSpaces, + applySpaceEvent, } as const; }).pipe(Layer.effect(SpacesService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); From bd4fc589b8ac2506183f327aff408d142660184f Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 18:10:52 +0200 Subject: [PATCH 13/22] broadcast a new account inbox to other clients from the same account --- apps/server-new/src/server.ts | 22 ++++++++++++--- apps/server-new/src/services/connections.ts | 30 +++++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index 91dfbada..d58d1e6f 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -318,10 +318,26 @@ const WebSocketLayer = HttpLayerRouter.add( } // Create the account inbox - yield* accountInboxService.createAccountInbox(request); + const inbox = yield* accountInboxService.createAccountInbox(request); + + // Broadcast the new inbox to other clients from the same account + const inboxMessage: Messages.ResponseAccountInbox = { + type: 'account-inbox', + inbox: { + accountAddress: inbox.accountAddress, + inboxId: inbox.inboxId, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + signature: inbox.signature, + }, + }; - // TODO: Broadcast the inbox to other clients from the same account - // This would require adding a method to ConnectionsService to broadcast by account + yield* connectionsService.broadcastToAccount({ + accountAddress, + message: inboxMessage, + excludeConnectionId: connectionId, + }); break; } diff --git a/apps/server-new/src/services/connections.ts b/apps/server-new/src/services/connections.ts index ed5b2b86..7f73855a 100644 --- a/apps/server-new/src/services/connections.ts +++ b/apps/server-new/src/services/connections.ts @@ -25,6 +25,11 @@ export class ConnectionsService extends Context.Tag('ConnectionsService')< message: Messages.ResponseMessage; excludeConnectionId?: string; }) => Effect.Effect; + readonly broadcastToAccount: (params: { + accountAddress: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) => Effect.Effect; readonly getConnection: (connectionId: string) => Effect.Effect; } >() {} @@ -153,6 +158,30 @@ export const layer = Effect.gen(function* () { } }); + const broadcastToAccount = Effect.fn('broadcastToAccount')(function* ({ + accountAddress, + message, + excludeConnectionId, + }: { + accountAddress: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) { + const currentConnections = yield* Ref.get(connections); + + for (const [connectionId, connection] of currentConnections) { + // Skip if this is the excluded connection (sender) + if (excludeConnectionId && connectionId === excludeConnectionId) { + continue; + } + + // Only send to connections from the same account + if (connection.accountAddress === accountAddress) { + yield* connection.mailbox.offer(Messages.serializeV2(message)); + } + } + }); + const getConnection = Effect.fn('getConnection')(function* (connectionId: string) { const currentConnections = yield* Ref.get(connections); return currentConnections.get(connectionId); @@ -164,6 +193,7 @@ export const layer = Effect.gen(function* () { subscribeToSpace, unsubscribeFromSpace, broadcastToSpace, + broadcastToAccount, getConnection, } as const; }).pipe(Layer.effect(ConnectionsService)); From 73dfc9f646ca09706b6a9893a1a2e60f9cb2f215 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 18:13:58 +0200 Subject: [PATCH 14/22] broadcast new invitations --- apps/server-new/src/server.ts | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index d58d1e6f..b418d522 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -229,8 +229,23 @@ const WebSocketLayer = HttpLayerRouter.add( excludeConnectionId: connectionId, }); - // Note: Invitee notification would require adding a method to ConnectionsService - // to find connections by account address and broadcast to them + // Notify the invitee if they're connected + if (request.event.transaction.type === 'create-invitation') { + const inviteeAccountAddress = request.event.transaction.inviteeAccountAddress; + + // Get the updated invitation list for the invitee + const invitations = yield* invitationsService.listByAppIdentity(inviteeAccountAddress); + const invitationMessage: Messages.ResponseListInvitations = { + type: 'list-invitations', + invitations, + }; + + // Broadcast to all connections for the invitee account + yield* connectionsService.broadcastToAccount({ + accountAddress: inviteeAccountAddress, + message: invitationMessage, + }); + } break; } From 7275c3a78eb8a1d9b1fbd6edd113cffee4f0b410 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 18:40:18 +0200 Subject: [PATCH 15/22] add create update retry --- apps/server-new/src/services/updates.ts | 58 +++++++++++++++++++++---- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/apps/server-new/src/services/updates.ts b/apps/server-new/src/services/updates.ts index 426bd8e9..f3b735d9 100644 --- a/apps/server-new/src/services/updates.ts +++ b/apps/server-new/src/services/updates.ts @@ -1,4 +1,5 @@ -import { Context, Effect, Layer } from 'effect'; +import { Context, Effect, Layer, Predicate, Schedule } from 'effect'; +import { ResourceNotFoundError } from '../http/errors.js'; import * as DatabaseService from './database.js'; type CreateUpdateParams = { @@ -25,10 +26,24 @@ export class UpdatesService extends Context.Tag('UpdatesService')< { readonly createUpdate: ( params: CreateUpdateParams, - ) => Effect.Effect; + ) => Effect.Effect; } >() {} +// Retry with Effect's built-in retry mechanism for database lock errors +const retrySchedule = Schedule.exponential('100 millis').pipe( + Schedule.intersect(Schedule.recurs(5)), + Schedule.whileInput((error: DatabaseService.DatabaseError) => { + // Check if it's a database lock error that should trigger retry + const cause = error.cause as { code?: string; message?: string }; + const shouldRetry = + cause?.code === 'P2034' || // Prisma transaction conflict + cause?.code === 'P1008' || // Prisma connection timeout + Boolean(cause?.message?.includes('database is locked')); + return shouldRetry; + }), +); + export const layer = Effect.gen(function* () { const { use } = yield* DatabaseService.DatabaseService; @@ -40,9 +55,17 @@ export const layer = Effect.gen(function* () { signatureRecovery, updateId, }: CreateUpdateParams) { - // TODO: implement retries - const result = yield* use((client) => { - return client.$transaction(async (prisma) => { + // First verify the account is a member of the space + yield* use((client) => + client.space.findUnique({ + where: { id: spaceId, members: { some: { address: accountAddress } } }, + }), + ).pipe( + Effect.filterOrFail(Predicate.isNotNull, () => new ResourceNotFoundError({ resource: 'Space', id: spaceId })), + ); + + const result = yield* use((client) => + client.$transaction(async (prisma) => { const lastUpdate = await prisma.update.findFirst({ where: { spaceId }, orderBy: { clock: 'desc' }, @@ -61,10 +84,29 @@ export const layer = Effect.gen(function* () { account: { connect: { address: accountAddress } }, }, }); - }); - }); + }), + ).pipe( + Effect.retry(retrySchedule), + Effect.tapError((error) => { + const cause = error.cause as { code?: string; message?: string }; + return Effect.logError('Failed to create update after retries', { + error: cause?.message || String(error), + code: cause?.code, + spaceId, + updateId, + }); + }), + ); - return result; + return { + clock: result.clock, + content: new Uint8Array(result.content), + signatureHex: result.signatureHex, + signatureRecovery: result.signatureRecovery, + updateId: result.updateId, + accountAddress: result.accountAddress, + spaceId: result.spaceId, + }; }); return { From 6add14dd8e7dcb4a95fc4a5556455a1c2d119262 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 18:40:44 +0200 Subject: [PATCH 16/22] add claude settings --- .claude/settings.local.json | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .claude/settings.local.json diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 00000000..cd4fb8b6 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,11 @@ +{ + "permissions": { + "allow": [ + "Bash(pnpm lint:fix:*)", + "Bash(pnpm typecheck:*)", + "Bash(pnpm check:*)" + ], + "deny": [], + "ask": [] + } +} \ No newline at end of file From 65797189adc27368dbccc1830f8e19613e118ae1 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 18:55:16 +0200 Subject: [PATCH 17/22] lint fixes --- .claude/settings.local.json | 8 ++------ packages/hypergraph/src/messages/serialize.ts | 1 + 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index cd4fb8b6..f43a83b3 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,11 +1,7 @@ { "permissions": { - "allow": [ - "Bash(pnpm lint:fix:*)", - "Bash(pnpm typecheck:*)", - "Bash(pnpm check:*)" - ], + "allow": ["Bash(pnpm lint:fix:*)", "Bash(pnpm typecheck:*)", "Bash(pnpm check:*)"], "deny": [], "ask": [] } -} \ No newline at end of file +} diff --git a/packages/hypergraph/src/messages/serialize.ts b/packages/hypergraph/src/messages/serialize.ts index 8ad3a863..54e7a855 100644 --- a/packages/hypergraph/src/messages/serialize.ts +++ b/packages/hypergraph/src/messages/serialize.ts @@ -11,6 +11,7 @@ export function serialize(obj: any): string { }); } +// biome-ignore lint/suspicious/noExplicitAny: same as stringify and parse export function serializeV2(obj: any): any { return JSON.parse( JSON.stringify(obj, (_key, value) => { From 09e16f15e57bd5e3f1af2911afe275bfbf6b9ecf Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 18:59:44 +0200 Subject: [PATCH 18/22] extract websocket to a separate file --- apps/server-new/src/server.ts | 436 +------------------------------ apps/server-new/src/websocket.ts | 434 ++++++++++++++++++++++++++++++ 2 files changed, 437 insertions(+), 433 deletions(-) create mode 100644 apps/server-new/src/websocket.ts diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index b418d522..a9b37d2f 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -1,29 +1,15 @@ -import { createServer } from 'node:http'; +import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import * as HttpApiScalar from '@effect/platform/HttpApiScalar'; import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; -import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; -import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; -import * as Socket from '@effect/platform/Socket'; -import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; -import { Messages } from '@graphprotocol/hypergraph'; -import { isArray } from 'effect/Array'; import * as Effect from 'effect/Effect'; import * as Layer from 'effect/Layer'; -import * as Mailbox from 'effect/Mailbox'; -import * as Schema from 'effect/Schema'; -import * as Stream from 'effect/Stream'; +import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; -import * as AccountInboxService from './services/account-inbox.ts'; -import * as AppIdentityService from './services/app-identity.ts'; import * as ConnectionsService from './services/connections.ts'; -import * as IdentityService from './services/identity.ts'; -import * as InvitationsService from './services/invitations.ts'; -import * as SpaceInboxService from './services/space-inbox.ts'; -import * as SpacesService from './services/spaces.ts'; -import * as UpdatesService from './services/updates.ts'; +import { WebSocketLayer } from './websocket.ts'; // Create scalar openapi browser layer at /docs. const DocsLayer = HttpApiScalar.layerHttpLayerRouter({ @@ -36,422 +22,6 @@ const ApiLayer = HttpLayerRouter.addHttpApi(hypergraphApi, { openapiPath: '/docs/openapi.json', }).pipe(Layer.provide(HandlersLive)); -const decodeRequestMessage = Schema.decodeUnknownEither(Messages.RequestMessage); - -const WebSocketLayer = HttpLayerRouter.add( - 'GET', - '/', - Effect.gen(function* () { - const request = yield* HttpServerRequest.HttpServerRequest; - const spacesService = yield* SpacesService.SpacesService; - const invitationsService = yield* InvitationsService.InvitationsService; - const updatesService = yield* UpdatesService.UpdatesService; - const connectionsService = yield* ConnectionsService.ConnectionsService; - const accountInboxService = yield* AccountInboxService.AccountInboxService; - const spaceInboxService = yield* SpaceInboxService.SpaceInboxService; - const responseMailbox = yield* Mailbox.make(); - - const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); - const token = isArray(searchParams.token) ? searchParams.token[0] : searchParams.token; - - if (!token) { - return yield* HttpServerResponse.empty({ status: 400 }); - } - - const appIdentityService = yield* AppIdentityService.AppIdentityService; - const identityService = yield* IdentityService.IdentityService; - const { accountAddress, address } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); - - // Register this connection - const connectionId = yield* connectionsService.registerConnection({ - accountAddress, - appIdentityAddress: address, - mailbox: responseMailbox, - }); - - return yield* Mailbox.toStream(responseMailbox).pipe( - Stream.map(JSON.stringify), - Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), - Stream.decodeText(), - Stream.runForEach((message) => - Effect.gen(function* () { - const json = Messages.deserialize(message); - const request = yield* decodeRequestMessage(json); - switch (request.type) { - case 'list-spaces': { - const spaces = yield* spacesService.listByAppIdentity(address); - const outgoingMessage: Messages.ResponseListSpaces = { type: 'list-spaces', spaces: spaces }; - // TODO: fix Messages.serialize - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - break; - } - case 'list-invitations': { - const invitations = yield* invitationsService.listByAppIdentity(accountAddress); - const outgoingMessage: Messages.ResponseListInvitations = { - type: 'list-invitations', - invitations, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - break; - } - case 'subscribe-space': { - const space = yield* spacesService.getSpace({ - spaceId: request.id, - accountAddress, - appIdentityAddress: address, - }); - - // Track this subscription - yield* connectionsService.subscribeToSpace(connectionId, request.id); - - const outgoingMessage: Messages.ResponseSpace = { - type: 'space', - ...space, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - break; - } - case 'create-update': { - const signer = Messages.recoverUpdateMessageSigner(request); - const identity = yield* identityService.getAppOrConnectIdentity({ - accountAddress: request.accountAddress, - signaturePublicKey: signer, - }); - if (identity.accountAddress !== accountAddress) { - // TODO: improve error handling - return yield* Effect.die(new Error('Invalid signature')); - } - - const update = yield* updatesService.createUpdate({ - accountAddress: request.accountAddress, - update: request.update, - spaceId: request.spaceId, - signatureHex: request.signature.hex, - signatureRecovery: request.signature.recovery, - updateId: request.updateId, - }); - const outgoingMessage: Messages.ResponseUpdateConfirmed = { - type: 'update-confirmed', - updateId: request.updateId, - clock: update.clock, - spaceId: request.spaceId, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - // Broadcast the update to all subscribed clients - const updates: Messages.Updates = { - updates: [ - { - update: update.content, - accountAddress: update.accountAddress, - signature: { hex: update.signatureHex, recovery: update.signatureRecovery }, - updateId: update.updateId, - }, - ], - firstUpdateClock: update.clock, - lastUpdateClock: update.clock, - }; - - const broadcastMessage: Messages.ResponseUpdatesNotification = { - type: 'updates-notification', - updates, - spaceId: request.spaceId, - }; - - yield* connectionsService.broadcastToSpace({ - spaceId: request.spaceId, - message: broadcastMessage, - excludeConnectionId: connectionId, - }); - - break; - } - case 'create-space-event': { - // Create the new space - const spaceResult = yield* spacesService.createSpace({ - accountAddress, - event: request.event, - keyBox: request.keyBox, - infoContent: new Uint8Array(), // TODO: Get from request when available - infoSignatureHex: '', - infoSignatureRecovery: 0, - name: request.name, - }); - - // Get the full space data to send back - const space = yield* spacesService.getSpace({ - spaceId: spaceResult.id, - accountAddress, - appIdentityAddress: address, - }); - - const outgoingMessage: Messages.ResponseSpace = { - type: 'space', - ...space, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - break; - } - case 'create-invitation-event': { - // Apply the invitation event to the space - yield* spacesService.applySpaceEvent({ - accountAddress, - spaceId: request.spaceId, - event: request.event, - keyBoxes: [...request.keyBoxes], // Convert readonly array to mutable - }); - - // Get the updated space data - const space = yield* spacesService.getSpace({ - spaceId: request.spaceId, - accountAddress, - appIdentityAddress: address, - }); - - // Send the updated space back to the client - const outgoingMessage: Messages.ResponseSpace = { - type: 'space', - ...space, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - // Broadcast the space event to other subscribers - const spaceEventMessage: Messages.ResponseSpaceEvent = { - type: 'space-event', - spaceId: request.spaceId, - event: request.event, - }; - - yield* connectionsService.broadcastToSpace({ - spaceId: request.spaceId, - message: spaceEventMessage, - excludeConnectionId: connectionId, - }); - - // Notify the invitee if they're connected - if (request.event.transaction.type === 'create-invitation') { - const inviteeAccountAddress = request.event.transaction.inviteeAccountAddress; - - // Get the updated invitation list for the invitee - const invitations = yield* invitationsService.listByAppIdentity(inviteeAccountAddress); - const invitationMessage: Messages.ResponseListInvitations = { - type: 'list-invitations', - invitations, - }; - - // Broadcast to all connections for the invitee account - yield* connectionsService.broadcastToAccount({ - accountAddress: inviteeAccountAddress, - message: invitationMessage, - }); - } - - break; - } - case 'accept-invitation-event': { - // Apply the invitation acceptance event to the space - yield* spacesService.applySpaceEvent({ - accountAddress, - spaceId: request.spaceId, - event: request.event, - keyBoxes: [], // No keyBoxes needed for accepting invitations - }); - - // Get the updated space data - const space = yield* spacesService.getSpace({ - spaceId: request.spaceId, - accountAddress, - appIdentityAddress: address, - }); - - // Send the updated space back to the client - const outgoingMessage: Messages.ResponseSpace = { - type: 'space', - ...space, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - // Broadcast the space event to other subscribers - const spaceEventMessage: Messages.ResponseSpaceEvent = { - type: 'space-event', - spaceId: request.spaceId, - event: request.event, - }; - - yield* connectionsService.broadcastToSpace({ - spaceId: request.spaceId, - message: spaceEventMessage, - excludeConnectionId: connectionId, - }); - - break; - } - case 'create-space-inbox-event': { - // Apply the space inbox creation event to the space - yield* spacesService.applySpaceEvent({ - accountAddress, - spaceId: request.spaceId, - event: request.event, - keyBoxes: [], // No keyBoxes needed for creating space inboxes - }); - - // Get the updated space data - const space = yield* spacesService.getSpace({ - spaceId: request.spaceId, - accountAddress, - appIdentityAddress: address, - }); - - // Send the updated space back to the client - const outgoingMessage: Messages.ResponseSpace = { - type: 'space', - ...space, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - // Broadcast the space event to other subscribers - const spaceEventMessage: Messages.ResponseSpaceEvent = { - type: 'space-event', - spaceId: request.spaceId, - event: request.event, - }; - - yield* connectionsService.broadcastToSpace({ - spaceId: request.spaceId, - message: spaceEventMessage, - excludeConnectionId: connectionId, - }); - - break; - } - case 'create-account-inbox': { - // Validate that the account matches the authenticated user - if (request.accountAddress !== accountAddress) { - // TODO: Better error handling - return yield* Effect.fail(new Error('Invalid accountAddress')); - } - - // Create the account inbox - const inbox = yield* accountInboxService.createAccountInbox(request); - - // Broadcast the new inbox to other clients from the same account - const inboxMessage: Messages.ResponseAccountInbox = { - type: 'account-inbox', - inbox: { - accountAddress: inbox.accountAddress, - inboxId: inbox.inboxId, - isPublic: inbox.isPublic, - authPolicy: inbox.authPolicy, - encryptionPublicKey: inbox.encryptionPublicKey, - signature: inbox.signature, - }, - }; - - yield* connectionsService.broadcastToAccount({ - accountAddress, - message: inboxMessage, - excludeConnectionId: connectionId, - }); - - break; - } - case 'get-latest-space-inbox-messages': { - // Check that the user has access to this space - yield* spacesService.getSpace({ - spaceId: request.spaceId, - accountAddress, - appIdentityAddress: address, - }); - - // Get the latest messages from the space inbox - const messages = yield* spaceInboxService.getLatestSpaceInboxMessages({ - inboxId: request.inboxId, - since: request.since, - }); - - const outgoingMessage: Messages.ResponseSpaceInboxMessages = { - type: 'space-inbox-messages', - spaceId: request.spaceId, - inboxId: request.inboxId, - messages, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - break; - } - case 'get-latest-account-inbox-messages': { - // Check that the user has access to this inbox - yield* accountInboxService.getAccountInbox({ - accountAddress, - inboxId: request.inboxId, - }); - - // Get the latest messages from the account inbox - const messages = yield* accountInboxService.getLatestAccountInboxMessages({ - inboxId: request.inboxId, - since: request.since, - }); - - const outgoingMessage: Messages.ResponseAccountInboxMessages = { - type: 'account-inbox-messages', - accountAddress, - inboxId: request.inboxId, - messages, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - break; - } - case 'get-account-inboxes': { - // List all inboxes for the authenticated account - const inboxes = yield* accountInboxService.listAccountInboxes({ accountAddress }); - - const outgoingMessage: Messages.ResponseAccountInboxes = { - type: 'account-inboxes', - inboxes, - }; - yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); - - break; - } - } - }), - ), - Effect.catchAll((error) => - Effect.gen(function* () { - // Only log error if it's not a SocketCloseError - if (!Socket.SocketCloseError.is(error)) { - yield* Effect.logInfo('WebSocket disconnected due to error', { - error: error.message || String(error), - accountAddress, - appIdentityAddress: address, - }); - } - }), - ), - Effect.ensuring( - Effect.gen(function* () { - // Clean up the connection when it closes - yield* connectionsService.removeConnection(connectionId); - yield* Effect.logInfo('WebSocket connection closed', { - accountAddress, - appIdentityAddress: address, - }); - }), - ), - Effect.as(HttpServerResponse.empty()), - ); - }) - .pipe(Effect.provide(AppIdentityService.layer)) - .pipe(Effect.provide(SpacesService.layer)) - .pipe(Effect.provide(InvitationsService.layer)) - .pipe(Effect.provide(IdentityService.layer)) - .pipe(Effect.provide(UpdatesService.layer)) - .pipe(Effect.provide(AccountInboxService.layer)) - .pipe(Effect.provide(SpaceInboxService.layer)), -); - // Merge router layers together and add the cors middleware layer. const CorsMiddleware = HttpLayerRouter.middleware(HttpMiddleware.cors()); const AppLayer = Layer.mergeAll(ApiLayer, DocsLayer, WebSocketLayer).pipe(Layer.provide(CorsMiddleware.layer)); diff --git a/apps/server-new/src/websocket.ts b/apps/server-new/src/websocket.ts new file mode 100644 index 00000000..90f04a03 --- /dev/null +++ b/apps/server-new/src/websocket.ts @@ -0,0 +1,434 @@ +import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; +import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; +import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; +import * as Socket from '@effect/platform/Socket'; +import { Messages } from '@graphprotocol/hypergraph'; +import { isArray } from 'effect/Array'; +import * as Effect from 'effect/Effect'; +import * as Mailbox from 'effect/Mailbox'; +import * as Schema from 'effect/Schema'; +import * as Stream from 'effect/Stream'; +import * as AccountInboxService from './services/account-inbox.ts'; +import * as AppIdentityService from './services/app-identity.ts'; +import * as ConnectionsService from './services/connections.ts'; +import * as IdentityService from './services/identity.ts'; +import * as InvitationsService from './services/invitations.ts'; +import * as SpaceInboxService from './services/space-inbox.ts'; +import * as SpacesService from './services/spaces.ts'; +import * as UpdatesService from './services/updates.ts'; + +const decodeRequestMessage = Schema.decodeUnknownEither(Messages.RequestMessage); + +export const WebSocketLayer = HttpLayerRouter.add( + 'GET', + '/', + Effect.gen(function* () { + const request = yield* HttpServerRequest.HttpServerRequest; + const spacesService = yield* SpacesService.SpacesService; + const invitationsService = yield* InvitationsService.InvitationsService; + const updatesService = yield* UpdatesService.UpdatesService; + const connectionsService = yield* ConnectionsService.ConnectionsService; + const accountInboxService = yield* AccountInboxService.AccountInboxService; + const spaceInboxService = yield* SpaceInboxService.SpaceInboxService; + const responseMailbox = yield* Mailbox.make(); + + const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); + const token = isArray(searchParams.token) ? searchParams.token[0] : searchParams.token; + + if (!token) { + return yield* HttpServerResponse.empty({ status: 400 }); + } + + const appIdentityService = yield* AppIdentityService.AppIdentityService; + const identityService = yield* IdentityService.IdentityService; + const { accountAddress, address } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); + + // Register this connection + const connectionId = yield* connectionsService.registerConnection({ + accountAddress, + appIdentityAddress: address, + mailbox: responseMailbox, + }); + + return yield* Mailbox.toStream(responseMailbox).pipe( + Stream.map(JSON.stringify), + Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), + Stream.decodeText(), + Stream.runForEach((message) => + Effect.gen(function* () { + const json = Messages.deserialize(message); + const request = yield* decodeRequestMessage(json); + switch (request.type) { + case 'list-spaces': { + const spaces = yield* spacesService.listByAppIdentity(address); + const outgoingMessage: Messages.ResponseListSpaces = { type: 'list-spaces', spaces: spaces }; + // TODO: fix Messages.serialize + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'list-invitations': { + const invitations = yield* invitationsService.listByAppIdentity(accountAddress); + const outgoingMessage: Messages.ResponseListInvitations = { + type: 'list-invitations', + invitations, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'subscribe-space': { + const space = yield* spacesService.getSpace({ + spaceId: request.id, + accountAddress, + appIdentityAddress: address, + }); + + // Track this subscription + yield* connectionsService.subscribeToSpace(connectionId, request.id); + + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'create-update': { + const signer = Messages.recoverUpdateMessageSigner(request); + const identity = yield* identityService.getAppOrConnectIdentity({ + accountAddress: request.accountAddress, + signaturePublicKey: signer, + }); + if (identity.accountAddress !== accountAddress) { + // TODO: improve error handling + return yield* Effect.die(new Error('Invalid signature')); + } + + const update = yield* updatesService.createUpdate({ + accountAddress: request.accountAddress, + update: request.update, + spaceId: request.spaceId, + signatureHex: request.signature.hex, + signatureRecovery: request.signature.recovery, + updateId: request.updateId, + }); + const outgoingMessage: Messages.ResponseUpdateConfirmed = { + type: 'update-confirmed', + updateId: request.updateId, + clock: update.clock, + spaceId: request.spaceId, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the update to all subscribed clients + const updates: Messages.Updates = { + updates: [ + { + update: update.content, + accountAddress: update.accountAddress, + signature: { hex: update.signatureHex, recovery: update.signatureRecovery }, + updateId: update.updateId, + }, + ], + firstUpdateClock: update.clock, + lastUpdateClock: update.clock, + }; + + const broadcastMessage: Messages.ResponseUpdatesNotification = { + type: 'updates-notification', + updates, + spaceId: request.spaceId, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: broadcastMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-space-event': { + // Create the new space + const spaceResult = yield* spacesService.createSpace({ + accountAddress, + event: request.event, + keyBox: request.keyBox, + infoContent: new Uint8Array(), // TODO: Get from request when available + infoSignatureHex: '', + infoSignatureRecovery: 0, + name: request.name, + }); + + // Get the full space data to send back + const space = yield* spacesService.getSpace({ + spaceId: spaceResult.id, + accountAddress, + appIdentityAddress: address, + }); + + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'create-invitation-event': { + // Apply the invitation event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [...request.keyBoxes], // Convert readonly array to mutable + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + // Notify the invitee if they're connected + if (request.event.transaction.type === 'create-invitation') { + const inviteeAccountAddress = request.event.transaction.inviteeAccountAddress; + + // Get the updated invitation list for the invitee + const invitations = yield* invitationsService.listByAppIdentity(inviteeAccountAddress); + const invitationMessage: Messages.ResponseListInvitations = { + type: 'list-invitations', + invitations, + }; + + // Broadcast to all connections for the invitee account + yield* connectionsService.broadcastToAccount({ + accountAddress: inviteeAccountAddress, + message: invitationMessage, + }); + } + + break; + } + case 'accept-invitation-event': { + // Apply the invitation acceptance event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [], // No keyBoxes needed for accepting invitations + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-space-inbox-event': { + // Apply the space inbox creation event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [], // No keyBoxes needed for creating space inboxes + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-account-inbox': { + // Validate that the account matches the authenticated user + if (request.accountAddress !== accountAddress) { + // TODO: Better error handling + return yield* Effect.fail(new Error('Invalid accountAddress')); + } + + // Create the account inbox + const inbox = yield* accountInboxService.createAccountInbox(request); + + // Broadcast the new inbox to other clients from the same account + const inboxMessage: Messages.ResponseAccountInbox = { + type: 'account-inbox', + inbox: { + accountAddress: inbox.accountAddress, + inboxId: inbox.inboxId, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + signature: inbox.signature, + }, + }; + + yield* connectionsService.broadcastToAccount({ + accountAddress, + message: inboxMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'get-latest-space-inbox-messages': { + // Check that the user has access to this space + yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Get the latest messages from the space inbox + const messages = yield* spaceInboxService.getLatestSpaceInboxMessages({ + inboxId: request.inboxId, + since: request.since, + }); + + const outgoingMessage: Messages.ResponseSpaceInboxMessages = { + type: 'space-inbox-messages', + spaceId: request.spaceId, + inboxId: request.inboxId, + messages, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'get-latest-account-inbox-messages': { + // Check that the user has access to this inbox + yield* accountInboxService.getAccountInbox({ + accountAddress, + inboxId: request.inboxId, + }); + + // Get the latest messages from the account inbox + const messages = yield* accountInboxService.getLatestAccountInboxMessages({ + inboxId: request.inboxId, + since: request.since, + }); + + const outgoingMessage: Messages.ResponseAccountInboxMessages = { + type: 'account-inbox-messages', + accountAddress, + inboxId: request.inboxId, + messages, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'get-account-inboxes': { + // List all inboxes for the authenticated account + const inboxes = yield* accountInboxService.listAccountInboxes({ accountAddress }); + + const outgoingMessage: Messages.ResponseAccountInboxes = { + type: 'account-inboxes', + inboxes, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + } + }), + ), + Effect.catchAll((error) => + Effect.gen(function* () { + // Only log error if it's not a SocketCloseError + if (!Socket.SocketCloseError.is(error)) { + yield* Effect.logInfo('WebSocket disconnected due to error', { + error: error.message || String(error), + accountAddress, + appIdentityAddress: address, + }); + } + }), + ), + Effect.ensuring( + Effect.gen(function* () { + // Clean up the connection when it closes + yield* connectionsService.removeConnection(connectionId); + yield* Effect.logInfo('WebSocket connection closed', { + accountAddress, + appIdentityAddress: address, + }); + }), + ), + Effect.as(HttpServerResponse.empty()), + ); + }) + .pipe(Effect.provide(AppIdentityService.layer)) + .pipe(Effect.provide(SpacesService.layer)) + .pipe(Effect.provide(InvitationsService.layer)) + .pipe(Effect.provide(IdentityService.layer)) + .pipe(Effect.provide(UpdatesService.layer)) + .pipe(Effect.provide(AccountInboxService.layer)) + .pipe(Effect.provide(SpaceInboxService.layer)), +); From f364393c27b6ea2d24ca7082fb35534eada6c6d5 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 19:03:29 +0200 Subject: [PATCH 19/22] add back structured logger --- apps/server-new/src/index.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/server-new/src/index.ts b/apps/server-new/src/index.ts index c021742f..e52f14d4 100644 --- a/apps/server-new/src/index.ts +++ b/apps/server-new/src/index.ts @@ -25,9 +25,8 @@ const Observability = Layer.unwrapEffect( ); const layer = server.pipe( - // TODO: add structured logger back in - // Layer.provide(Logger.structured), - Layer.provide(Logger.pretty), + Layer.provide(Logger.structured), + // Layer.provide(Logger.pretty), Layer.provide(Observability), Layer.provide(PlatformConfigProvider.layerDotEnvAdd('.env')), Layer.provide(NodeContext.layer), From 515f7bd3f049201416c629cbea4b46510eda1a11 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 19:06:33 +0200 Subject: [PATCH 20/22] add lint fixes --- apps/server-new/src/server.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index a9b37d2f..0a7c32f5 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -1,10 +1,10 @@ -import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; +import { createServer } from 'node:http'; import * as HttpApiScalar from '@effect/platform/HttpApiScalar'; import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; +import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import * as Effect from 'effect/Effect'; import * as Layer from 'effect/Layer'; -import { createServer } from 'node:http'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; From 6ddae2f7694cfaf63c35c02df9006078a04cee63 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 19:24:14 +0200 Subject: [PATCH 21/22] fix logging --- apps/server-new/src/services/invitations.ts | 30 ++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/apps/server-new/src/services/invitations.ts b/apps/server-new/src/services/invitations.ts index 67c011d8..3d15c6cc 100644 --- a/apps/server-new/src/services/invitations.ts +++ b/apps/server-new/src/services/invitations.ts @@ -39,21 +39,21 @@ export const layer = Effect.gen(function* () { }), ); - return invitations - .map((invitation) => { - const result = decodeSpaceState(JSON.parse(invitation.space.events[0].state)); - if (result._tag === 'Right') { - const state = result.right; - return { - id: invitation.id, - previousEventHash: state.lastEventHash, - spaceId: invitation.spaceId, - }; - } - console.error('Invalid space state from the DB', result.left); - return null; - }) - .filter((invitation) => invitation !== null); + const processedInvitations = []; + for (const invitation of invitations) { + const result = decodeSpaceState(JSON.parse(invitation.space.events[0].state)); + if (result._tag === 'Right') { + const state = result.right; + processedInvitations.push({ + id: invitation.id, + previousEventHash: state.lastEventHash, + spaceId: invitation.spaceId, + }); + } else { + yield* Effect.logError('Invalid space state from the DB', { error: result.left }); + } + } + return processedInvitations; }); return { From c6579087b813875b02996ba6de343ae3776f58c3 Mon Sep 17 00:00:00 2001 From: Nik Graf Date: Mon, 8 Sep 2025 19:27:19 +0200 Subject: [PATCH 22/22] return new account inbox after creation --- apps/server-new/src/websocket.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/server-new/src/websocket.ts b/apps/server-new/src/websocket.ts index 90f04a03..7b50552c 100644 --- a/apps/server-new/src/websocket.ts +++ b/apps/server-new/src/websocket.ts @@ -335,6 +335,8 @@ export const WebSocketLayer = HttpLayerRouter.add( excludeConnectionId: connectionId, }); + yield* responseMailbox.offer(Messages.serializeV2(inboxMessage)); + break; } case 'get-latest-space-inbox-messages': {