Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Warnings:

- Added the required column `accountId` to the `Update` table without a default value. This is not possible if the table is not empty.
- Added the required column `updateId` to the `Update` table without a default value. This is not possible if the table is not empty.
- Added the required column `signatureHex` to the `Update` table without a default value. This is not possible if the table is not empty.
- Added the required column `signatureRecovery` to the `Update` table without a default value. This is not possible if the table is not empty.

*/
-- RedefineTables
PRAGMA defer_foreign_keys=ON;
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_Update" (
"spaceId" TEXT NOT NULL,
"clock" INTEGER NOT NULL,
"content" BLOB NOT NULL,
"accountId" TEXT NOT NULL,
"signatureHex" TEXT NOT NULL,
"signatureRecovery" INTEGER NOT NULL,
"updateId" TEXT NOT NULL,

PRIMARY KEY ("spaceId", "clock"),
CONSTRAINT "Update_spaceId_fkey" FOREIGN KEY ("spaceId") REFERENCES "Space" ("id") ON DELETE RESTRICT ON UPDATE CASCADE,
CONSTRAINT "Update_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "Account" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
INSERT INTO "new_Update" ("clock", "content", "spaceId") SELECT "clock", "content", "spaceId" FROM "Update";
DROP TABLE "Update";
ALTER TABLE "new_Update" RENAME TO "Update";
PRAGMA foreign_keys=ON;
PRAGMA defer_foreign_keys=OFF;
2 changes: 1 addition & 1 deletion apps/server/prisma/migrations/migration_lock.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (i.e. Git)
# It should be added in your version-control system (e.g., Git)
provider = "sqlite"
6 changes: 6 additions & 0 deletions apps/server/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ model Account {
sessionNonce String?
sessionToken String?
sessionTokenExpires DateTime?
updates Update[]

@@index([sessionToken])
}
Expand All @@ -83,6 +84,11 @@ model Update {
spaceId String
clock Int
content Bytes
account Account @relation(fields: [accountId], references: [id])
accountId String
signatureHex String
signatureRecovery Int
updateId String

@@id([spaceId, clock])
}
Expand Down
19 changes: 16 additions & 3 deletions apps/server/src/handlers/createUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ type Params = {
accountId: string;
update: Uint8Array;
spaceId: string;
signatureHex: string;
signatureRecovery: number;
updateId: string;
};

export const createUpdate = async ({ accountId, update, spaceId }: Params) => {
export const createUpdate = async ({
accountId,
update,
spaceId,
signatureHex,
signatureRecovery,
updateId,
}: Params) => {
// throw error if account is not a member of the space
await prisma.space.findUniqueOrThrow({
where: { id: spaceId, members: { some: { id: accountId } } },
Expand Down Expand Up @@ -36,14 +46,17 @@ export const createUpdate = async ({ accountId, update, spaceId }: Params) => {

return await prisma.update.create({
data: {
spaceId,
space: { connect: { id: spaceId } },
clock,
content: Buffer.from(update),
signatureHex,
signatureRecovery,
updateId,
account: { connect: { id: accountId } },
},
});
});
success = true;
return result;
} catch (error) {
const dbError = error as { code?: string; message?: string };
if (dbError.code === 'P2034' || dbError.code === 'P1008' || dbError.message?.includes('database is locked')) {
Expand Down
14 changes: 13 additions & 1 deletion apps/server/src/handlers/getSpace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,26 @@ export const getSpace = async ({ spaceId, accountId }: Params) => {
};
});

const formatUpdate = (update) => {
return {
accountId: update.accountId,
update: new Uint8Array(update.content),
signature: {
hex: update.signatureHex,
recovery: update.signatureRecovery,
},
updateId: update.updateId,
};
};

return {
id: space.id,
events: space.events.map((wrapper) => JSON.parse(wrapper.event)),
keyBoxes,
updates:
space.updates.length > 0
? {
updates: space.updates.map((update) => new Uint8Array(update.content)),
updates: space.updates.map(formatUpdate),
firstUpdateClock: space.updates[0].clock,
lastUpdateClock: space.updates[space.updates.length - 1].clock,
}
Expand Down
59 changes: 42 additions & 17 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,24 +393,49 @@ webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Req
break;
}
case 'create-update': {
const update = await createUpdate({ accountId, spaceId: data.spaceId, update: data.update });
const outgoingMessage: Messages.ResponseUpdateConfirmed = {
type: 'update-confirmed',
ephemeralId: data.ephemeralId,
clock: update.clock,
spaceId: data.spaceId,
};
webSocket.send(Messages.serialize(outgoingMessage));
try {
// Check that the update was signed by a valid identity
// belonging to this accountId
const signer = Messages.recoverUpdateMessageSigner(data);
const identity = await getIdentity({ signaturePublicKey: signer });
if (identity.accountId !== accountId) {
throw new Error('Invalid signature');
}
const update = await createUpdate({
accountId,
spaceId: data.spaceId,
update: data.update,
signatureHex: data.signature.hex,
signatureRecovery: data.signature.recovery,
updateId: data.updateId,
});
const outgoingMessage: Messages.ResponseUpdateConfirmed = {
type: 'update-confirmed',
updateId: data.updateId,
clock: update.clock,
spaceId: data.spaceId,
};
webSocket.send(Messages.serialize(outgoingMessage));

broadcastUpdates({
spaceId: data.spaceId,
updates: {
updates: [new Uint8Array(update.content)],
firstUpdateClock: update.clock,
lastUpdateClock: update.clock,
},
currentClient: webSocket,
});
broadcastUpdates({
spaceId: data.spaceId,
updates: {
updates: [
{
accountId,
update: data.update,
signature: data.signature,
updateId: data.updateId,
},
],
firstUpdateClock: update.clock,
lastUpdateClock: update.clock,
},
currentClient: webSocket,
});
} catch (err) {
console.error('Error creating update:', err);
}
break;
}
default:
Expand Down
121 changes: 71 additions & 50 deletions packages/hypergraph-react/src/HypergraphAppContext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import * as automerge from '@automerge/automerge';
import { uuid } from '@automerge/automerge';
import type { DocHandle } from '@automerge/automerge-repo';
import { RepoContext } from '@automerge/automerge-repo-react-hooks';
import { Identity, Key, Messages, SpaceEvents, type SpaceStorageEntry, Utils, store } from '@graphprotocol/hypergraph';
import { useSelector as useSelectorStore } from '@xstate/store/react';
Expand Down Expand Up @@ -464,11 +465,66 @@ export function HypergraphAppProvider({
// Handle WebSocket messages in a separate effect
useEffect(() => {
if (!websocketConnection) return;
if (!accountId) {
console.error('No accountId found');
return;
}
const encryptionPrivateKey = keys?.encryptionPrivateKey;
if (!encryptionPrivateKey) {
console.error('No encryption private key found');
return;
}
const signaturePrivateKey = keys?.signaturePrivateKey;
if (!signaturePrivateKey) {
console.error('No signature private key found.');
return;
}

const applyUpdates = async (
spaceId: string,
spaceSecretKey: string,
automergeDocHandle: DocHandle<unknown>,
updates: Messages.Updates,
) => {
const verifiedUpdates = await Promise.all(
updates.updates.map(async (update) => {
const signer = Messages.recoverUpdateMessageSigner({
update: update.update,
spaceId,
updateId: update.updateId,
signature: update.signature,
accountId: update.accountId,
});
const authorIdentity = await getUserIdentity(update.accountId);
if (authorIdentity.signaturePublicKey !== signer) {
console.error(
`Received invalid signature, recovered signer is ${signer},
expected ${authorIdentity.signaturePublicKey}`,
);
return { valid: false, update: new Uint8Array([]) };
}
return {
valid: true,
update: Messages.decryptMessage({
nonceAndCiphertext: update.update,
secretKey: Utils.hexToBytes(spaceSecretKey),
}),
};
}),
);
const validUpdates = verifiedUpdates.filter((update) => update.valid).map((update) => update.update);
automergeDocHandle.update((existingDoc) => {
const [newDoc] = automerge.applyChanges(existingDoc, validUpdates);
return newDoc;
});

store.send({
type: 'applyUpdate',
spaceId,
firstUpdateClock: updates.firstUpdateClock,
lastUpdateClock: updates.lastUpdateClock,
});
};

const onMessage = async (event: MessageEvent) => {
const data = Messages.deserialize(event.data);
Expand Down Expand Up @@ -526,26 +582,7 @@ export function HypergraphAppProvider({
}

if (response.updates) {
const updates = response.updates?.updates.map((update) => {
return Messages.decryptMessage({
nonceAndCiphertext: update,
secretKey: Utils.hexToBytes(keys[0].key),
});
});

for (const update of updates) {
automergeDocHandle.update((existingDoc) => {
const [newDoc] = automerge.applyChanges(existingDoc, [update]);
return newDoc;
});
}

store.send({
type: 'applyUpdate',
spaceId: response.id,
firstUpdateClock: response.updates?.firstUpdateClock,
lastUpdateClock: response.updates?.lastUpdateClock,
});
await applyUpdates(response.id, keys[0].key, automergeDocHandle, response.updates);
}

automergeDocHandle.on('change', (result) => {
Expand All @@ -558,19 +595,16 @@ export function HypergraphAppProvider({
const storeState = store.getSnapshot();
const space = storeState.context.spaces[0];

const ephemeralId = uuid();
const updateId = uuid();

const nonceAndCiphertext = Messages.encryptMessage({
const messageToSend = Messages.signedUpdateMessage({
accountId,
updateId,
spaceId: space.id,
message: lastLocalChange,
secretKey: Utils.hexToBytes(space.keys[0].key),
secretKey: space.keys[0].key,
signaturePrivateKey,
});

const messageToSend = {
type: 'create-update',
ephemeralId,
update: nonceAndCiphertext,
spaceId: space.id,
} as const satisfies Messages.RequestCreateUpdate;
websocketConnection.send(Messages.serialize(messageToSend));
} catch (error) {
console.error('Error sending message', error);
Expand Down Expand Up @@ -614,7 +648,7 @@ export function HypergraphAppProvider({
case 'update-confirmed': {
store.send({
type: 'removeUpdateInFlight',
ephemeralId: response.ephemeralId,
updateId: response.updateId,
});
store.send({
type: 'updateConfirmed',
Expand All @@ -631,25 +665,12 @@ export function HypergraphAppProvider({
console.error('Space not found', response.spaceId);
return;
}
if (!space.automergeDocHandle) {
console.error('No automergeDocHandle found', response.spaceId);
return;
}

const automergeUpdates = response.updates.updates.map((update) => {
return Messages.decryptMessage({
nonceAndCiphertext: update,
secretKey: Utils.hexToBytes(space.keys[0].key),
});
});

space?.automergeDocHandle?.update((existingDoc) => {
const [newDoc] = automerge.applyChanges(existingDoc, automergeUpdates);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we gather all the automergeUpdates and then apply them. I actually didn't do it properly in the other case, but this should be the way to go. Can you update the function to do it this way?

The reason is that afaik this is much more performant than applying ever update in a for loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will fix

return newDoc;
});

store.send({
type: 'applyUpdate',
spaceId: response.spaceId,
firstUpdateClock: response.updates.firstUpdateClock,
lastUpdateClock: response.updates.lastUpdateClock,
});
await applyUpdates(response.spaceId, space.keys[0].key, space.automergeDocHandle, response.updates);
break;
}
default: {
Expand All @@ -664,7 +685,7 @@ export function HypergraphAppProvider({
return () => {
websocketConnection.removeEventListener('message', onMessage);
};
}, [websocketConnection, spaces, keys?.encryptionPrivateKey]);
}, [websocketConnection, spaces, accountId, keys?.encryptionPrivateKey, keys?.signaturePrivateKey]);

const createSpaceForContext = async () => {
if (!accountId) {
Expand Down
1 change: 1 addition & 0 deletions packages/hypergraph/src/messages/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './decrypt-message.js';
export * from './encrypt-message.js';
export * from './serialize.js';
export * from './signed-update-message.js';
export * from './types.js';
Loading
Loading