From e959f814dfb35143e0914f860421e78d5532fe1d Mon Sep 17 00:00:00 2001 From: Andrew Ewing Date: Fri, 19 Aug 2022 02:54:40 -0700 Subject: [PATCH] feat(iridium): offline sync (#4335) Co-authored-by: Drew Ewing --- .gitignore | 4 +- libraries/Iridium/chat/ChatManager.ts | 78 +++++++++++++++++++++++---- 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index bdc6b5ef5f..0899053caa 100644 --- a/.gitignore +++ b/.gitignore @@ -102,4 +102,6 @@ scratchpad.js .vscode/chrome/ solana-localnet -linked-iridium \ No newline at end of file + +./iridium +linked-iridium diff --git a/libraries/Iridium/chat/ChatManager.ts b/libraries/Iridium/chat/ChatManager.ts index 2dd03e3e89..7b34fc4c0c 100644 --- a/libraries/Iridium/chat/ChatManager.ts +++ b/libraries/Iridium/chat/ChatManager.ts @@ -5,10 +5,17 @@ import { didUtils, IridiumPubsubMessage, IridiumSetOptions, + encoding, } from '@satellite-im/iridium' +import type { IridiumDecodedPayload } from '@satellite-im/iridium/src/core/encoding' import type { AddOptions, AddResult } from 'ipfs-core-types/root' import type { IPFS } from 'ipfs-core-types' -import type { SyncSubscriptionResponse } from '@satellite-im/iridium/src/sync/agent' +import { CID } from 'multiformats' +import * as json from 'multiformats/codecs/json' +import type { + SyncFetchResponse, + SyncSubscriptionResponse, +} from '@satellite-im/iridium/src/sync/agent' import type { EmitterCallback } from '@satellite-im/iridium' import { Conversation, @@ -31,11 +38,13 @@ import { NotificationType, } from '~/libraries/Iridium/notifications/types' -export type ConversationPubsubEvent = IridiumMessage<{ - message?: ConversationMessage - cid?: string - type: 'chat/message' -}> +export type ConversationPubsubEvent = IridiumMessage< + IridiumDecodedPayload<{ + message?: ConversationMessage + cid?: string + type: 'chat/message' + }> +> export type State = { conversations: { [key: Conversation['id']]: Conversation } @@ -69,15 +78,30 @@ export default class ChatManager extends Emitter { const conversations = Object.values(this.state.conversations) // listen for sync node subscription responses this.iridium.connector?.p2p.on< - IridiumPubsubMessage + IridiumPubsubMessage> >('node/message/sync/subscribe', this.onSyncSubscriptionResponse.bind(this)) + this.iridium.connector?.p2p.on< + IridiumPubsubMessage> + >('node/message/sync/fetch', this.onSyncFetchResponse.bind(this)) + this.iridium.connector?.p2p.on('ready', async () => { - await new Promise((resolve) => setTimeout(resolve, 5000)) if (!this.iridium.connector?.p2p.primaryNodeID) { throw new Error('not connected to primary node') } - logger.info('iridium/chatmanager/init', 'p2p ready, initializing chat...') + + logger.info( + 'iridium/chatmanager/init', + 'p2p ready, initializing chat...', + { node: this.iridium.connector?.p2p.primaryNodeID }, + ) + // sync fetch + await this.iridium.connector?.p2p.send( + this.iridium.connector?.p2p.primaryNodeID, + { + type: 'sync/fetch', + }, + ) for (const conversation of conversations) { if (this.subscriptions[conversation.id] !== undefined) continue @@ -94,6 +118,7 @@ export default class ChatManager extends Emitter { { type: 'sync/subscribe', topic, + offlineSync: true, }, ) } @@ -107,7 +132,9 @@ export default class ChatManager extends Emitter { * @description - handle a sync subscription response from the sync node */ async onSyncSubscriptionResponse( - message: IridiumPubsubMessage, + message: IridiumPubsubMessage< + IridiumDecodedPayload + >, ) { logger.info( 'iridium/chatmanager/onSyncSubscriptionResponse', @@ -153,6 +180,36 @@ export default class ChatManager extends Emitter { ) } + async onSyncFetchResponse( + message: IridiumPubsubMessage>, + ) { + if (!message.payload.body.rows) { + return + } + await Promise.all( + message.payload.body.rows.map(async (row) => { + const stored = await this.iridium.connector?.dag.get(CID.parse(row.cid)) + if (!stored.body) { + return + } + const buffer = await this.iridium.connector?.did.decryptJWE(stored.body) + const message: any = buffer && json.decode(buffer) + if (stored.topic) { + logger.info( + 'iridium/chatmanager', + 'sync/fetch/row - emitting synced message', + message, + ) + await this.iridium.connector?.pubsub.emit(stored.topic, { + from: stored.from, + topic: stored.topic, + payload: { type: 'jwe', body: message }, + }) + } + }), + ) + } + get(path: string = '', options: any = {}) { return this.iridium.connector?.get(`/chat${path}`, options) } @@ -275,6 +332,7 @@ export default class ChatManager extends Emitter { { type: 'sync/subscribe', topic: `/chat/conversations/${id}`, + offlineSync: true, }, ) }