From e6822779190a8275b0d20f9714b34b343147e42b Mon Sep 17 00:00:00 2001 From: Drew Ewing Date: Mon, 15 Aug 2022 15:03:01 -0700 Subject: [PATCH 1/3] feat(iridium): sync pins for messaging --- libraries/Iridium/chat/ChatManager.ts | 68 ++++++++++++++++----------- nuxt.config.js | 2 +- 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/libraries/Iridium/chat/ChatManager.ts b/libraries/Iridium/chat/ChatManager.ts index 3b40d17fa4..1e06096350 100644 --- a/libraries/Iridium/chat/ChatManager.ts +++ b/libraries/Iridium/chat/ChatManager.ts @@ -4,6 +4,8 @@ import { Emitter, didUtils, IridiumPubsubMessage, + encoding, + IridiumSetOptions, } from '@satellite-im/iridium' import type { AddOptions, AddResult } from 'ipfs-core-types/root' import type { IPFS } from 'ipfs-core-types' @@ -28,7 +30,8 @@ import { blobToStream } from '~/utilities/BlobManip' import isNSFW from '~/utilities/NSFW' export type ConversationPubsubEvent = IridiumMessage<{ - message: ConversationMessage + message?: ConversationMessage + cid?: string type: 'chat/message' }> @@ -144,7 +147,7 @@ export default class ChatManager extends Emitter { return this.iridium.connector?.get(`/chat${path}`, options) } - set(path: string = '', payload: any, options: any = {}) { + set(path: string = '', payload: any, options: IridiumSetOptions = {}) { return this.iridium.connector?.set(`/chat${path}`, payload, options) } @@ -162,8 +165,24 @@ export default class ChatManager extends Emitter { ) { throw new Error(ChatError.CONVERSATION_NOT_FOUND) } - const { type, message } = payload.body - if (type === 'chat/message' && message) { + const { type, cid } = payload.body + if (type === 'chat/message') { + let message: ConversationMessage + if (cid) { + message = await this.iridium.connector.load(cid) + message.id = cid + } else if (payload.body.message) { + message = payload.body.message + } else { + throw new Error('no message in payload') + } + + logger.info( + 'iridium/chatmanager/onConversationMessage', + 'message received', + { message, cid, from, conversationId }, + ) + Vue.set( this.state.conversations[conversationId].message, message.id, @@ -334,47 +353,40 @@ export default class ChatManager extends Emitter { const { conversationId } = payload const conversation = this.getConversation(conversationId) - const messageID = await this.iridium.connector.store( - { - ...payload, - from: this.iridium.connector.id, - reactions: {}, - attachments: payload.attachments, - }, - { - encrypt: { recipients: conversation.participants }, - }, - ) - if (!messageID) { - throw new Error(ChatError.MESSAGE_NOT_SENT) + const message: Partial = { + ...payload, + from: this.iridium.connector.id, + reactions: {}, + attachments: payload.attachments, } + message.id = ( + await this.iridium.connector.store(message, { + syncPin: true, + encrypt: { recipients: conversation.participants }, + }) + ).toString() if (!this._subscriptions[conversationId]) { // we're not subscribed yet throw new Error(`not yet subscribed to conversation ${conversationId}`) } - const messageCID = messageID.toString() - const message: ConversationMessage = { - ...payload, - from: this.iridium.connector.id, - reactions: {}, - attachments: payload.attachments, - id: messageCID, - } Vue.set( this.state.conversations[conversationId].message, - messageCID, + message.id, + message, + ) + await this.set( + `/conversations/${conversationId}/message/${message.id}`, message, ) - this.set(`/conversations/${conversationId}/message/${messageCID}`, message) // broadcast the message to connected peers await this.iridium.connector.publish( `/chat/conversations/${conversationId}`, { type: 'chat/message', - message, + cid: message.id, }, { encrypt: { recipients: conversation.participants }, diff --git a/nuxt.config.js b/nuxt.config.js index 5a20fb9cf2..08470bc6c3 100644 --- a/nuxt.config.js +++ b/nuxt.config.js @@ -5,7 +5,7 @@ export default defineNuxtConfig({ alias: { tslib: 'tslib/tslib.es6.js', 'merge-options': 'merge-options/index.js', - '@satellite-im/iridium': '@satellite-im/iridium/dist/index.browser.js', + '@satellite-im/iridium': '@satellite-im/iridium/dist/browser/index.js', }, bridge: { nitro: false, From 0713f9294c8b3b7cffaba01c87587d79d95df7fa Mon Sep 17 00:00:00 2001 From: Drew Ewing Date: Mon, 15 Aug 2022 15:03:27 -0700 Subject: [PATCH 2/3] feat(iridium): sync pins for messaging --- libraries/Iridium/chat/ChatManager.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/libraries/Iridium/chat/ChatManager.ts b/libraries/Iridium/chat/ChatManager.ts index 1e06096350..564176baef 100644 --- a/libraries/Iridium/chat/ChatManager.ts +++ b/libraries/Iridium/chat/ChatManager.ts @@ -4,14 +4,12 @@ import { Emitter, didUtils, IridiumPubsubMessage, - encoding, IridiumSetOptions, } from '@satellite-im/iridium' import type { AddOptions, AddResult } from 'ipfs-core-types/root' import type { IPFS } from 'ipfs-core-types' import type { SyncSubscriptionResponse } from '@satellite-im/iridium/src/sync/agent' import type { EmitterCallback } from '@satellite-im/iridium' -import { ItemErrors } from '../files/types' import { Conversation, ConversationMessage, From 94899cf68cd5ac13f9d287b2ddcac972998d6d2d Mon Sep 17 00:00:00 2001 From: Drew Ewing Date: Wed, 17 Aug 2022 17:27:10 -0700 Subject: [PATCH 3/3] feat(iridium): offline message sync --- .gitignore | 2 +- libraries/Iridium/chat/ChatManager.ts | 78 +++++++++++++++++++++++---- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 6003790c5e..8d7c24050e 100644 --- a/.gitignore +++ b/.gitignore @@ -103,4 +103,4 @@ scratchpad.js solana-localnet -iridium \ No newline at end of file +./iridium diff --git a/libraries/Iridium/chat/ChatManager.ts b/libraries/Iridium/chat/ChatManager.ts index 564176baef..305cf3215a 100644 --- a/libraries/Iridium/chat/ChatManager.ts +++ b/libraries/Iridium/chat/ChatManager.ts @@ -5,10 +5,17 @@ import { didUtils, IridiumPubsubMessage, IridiumSetOptions, + encoding, } from '@satellite-im/iridium' +import type { IridiumDecodedPayload } from '@satellite-im/iridium/src/core/encoding' import type { AddOptions, AddResult } from 'ipfs-core-types/root' import type { IPFS } from 'ipfs-core-types' -import type { SyncSubscriptionResponse } from '@satellite-im/iridium/src/sync/agent' +import { CID } from 'multiformats' +import * as json from 'multiformats/codecs/json' +import type { + SyncFetchResponse, + SyncSubscriptionResponse, +} from '@satellite-im/iridium/src/sync/agent' import type { EmitterCallback } from '@satellite-im/iridium' import { Conversation, @@ -27,11 +34,13 @@ import { FILE_TYPE } from '~/libraries/Files/types/file' import { blobToStream } from '~/utilities/BlobManip' import isNSFW from '~/utilities/NSFW' -export type ConversationPubsubEvent = IridiumMessage<{ - message?: ConversationMessage - cid?: string - type: 'chat/message' -}> +export type ConversationPubsubEvent = IridiumMessage< + IridiumDecodedPayload<{ + message?: ConversationMessage + cid?: string + type: 'chat/message' + }> +> export type State = { conversations: { [key: Conversation['id']]: Conversation } @@ -65,15 +74,30 @@ export default class ChatManager extends Emitter { const conversations = Object.values(this.state.conversations) // listen for sync node subscription responses this.iridium.connector?.p2p.on< - IridiumPubsubMessage + IridiumPubsubMessage> >('node/message/sync/subscribe', this.onSyncSubscriptionResponse.bind(this)) + this.iridium.connector?.p2p.on< + IridiumPubsubMessage> + >('node/message/sync/fetch', this.onSyncFetchResponse.bind(this)) + this.iridium.connector?.p2p.on('ready', async () => { - await new Promise((resolve) => setTimeout(resolve, 5000)) if (!this.iridium.connector?.p2p.primaryNodeID) { throw new Error('not connected to primary node') } - logger.info('iridium/chatmanager/init', 'p2p ready, initializing chat...') + + logger.info( + 'iridium/chatmanager/init', + 'p2p ready, initializing chat...', + { node: this.iridium.connector?.p2p.primaryNodeID }, + ) + // sync fetch + await this.iridium.connector?.p2p.send( + this.iridium.connector?.p2p.primaryNodeID, + { + type: 'sync/fetch', + }, + ) for (const conversation of conversations) { if (this._subscriptions[conversation.id] !== undefined) continue @@ -90,6 +114,7 @@ export default class ChatManager extends Emitter { { type: 'sync/subscribe', topic, + offlineSync: true, }, ) } @@ -103,7 +128,9 @@ export default class ChatManager extends Emitter { * @description - handle a sync subscription response from the sync node */ async onSyncSubscriptionResponse( - message: IridiumPubsubMessage, + message: IridiumPubsubMessage< + IridiumDecodedPayload + >, ) { logger.info( 'iridium/chatmanager/onSyncSubscriptionResponse', @@ -141,6 +168,36 @@ export default class ChatManager extends Emitter { ) } + async onSyncFetchResponse( + message: IridiumPubsubMessage>, + ) { + if (!message.payload.body.rows) { + return + } + await Promise.all( + message.payload.body.rows.map(async (row) => { + const stored = await this.iridium.connector?.dag.get(CID.parse(row.cid)) + if (!stored.body) { + return + } + const buffer = await this.iridium.connector?.did.decryptJWE(stored.body) + const message: any = buffer && json.decode(buffer) + if (stored.topic) { + logger.info( + 'iridium/chatmanager', + 'sync/fetch/row - emitting synced message', + message, + ) + await this.iridium.connector?.pubsub.emit(stored.topic, { + from: stored.from, + topic: stored.topic, + payload: { type: 'jwe', body: message }, + }) + } + }), + ) + } + get(path: string = '', options: any = {}) { return this.iridium.connector?.get(`/chat${path}`, options) } @@ -247,6 +304,7 @@ export default class ChatManager extends Emitter { { type: 'sync/subscribe', topic: `/chat/conversations/${id}`, + offlineSync: true, }, ) }