Skip to content

Commit

Permalink
feat(iridium): offline sync (#4335)
Browse files Browse the repository at this point in the history
Co-authored-by: Drew Ewing <drew.ewing@satellite.im>
  • Loading branch information
aewing and Drew Ewing committed Aug 19, 2022
1 parent 2cc605d commit e959f81
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 11 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,6 @@ scratchpad.js
.vscode/chrome/

solana-localnet
linked-iridium

./iridium
linked-iridium
78 changes: 68 additions & 10 deletions libraries/Iridium/chat/ChatManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ import {
didUtils,
IridiumPubsubMessage,
IridiumSetOptions,
encoding,
} from '@satellite-im/iridium'
import type { IridiumDecodedPayload } from '@satellite-im/iridium/src/core/encoding'
import type { AddOptions, AddResult } from 'ipfs-core-types/root'
import type { IPFS } from 'ipfs-core-types'
import type { SyncSubscriptionResponse } from '@satellite-im/iridium/src/sync/agent'
import { CID } from 'multiformats'
import * as json from 'multiformats/codecs/json'
import type {
SyncFetchResponse,
SyncSubscriptionResponse,
} from '@satellite-im/iridium/src/sync/agent'
import type { EmitterCallback } from '@satellite-im/iridium'
import {
Conversation,
Expand All @@ -31,11 +38,13 @@ import {
NotificationType,
} from '~/libraries/Iridium/notifications/types'

export type ConversationPubsubEvent = IridiumMessage<{
message?: ConversationMessage
cid?: string
type: 'chat/message'
}>
export type ConversationPubsubEvent = IridiumMessage<
IridiumDecodedPayload<{
message?: ConversationMessage
cid?: string
type: 'chat/message'
}>
>

export type State = {
conversations: { [key: Conversation['id']]: Conversation }
Expand Down Expand Up @@ -69,15 +78,30 @@ export default class ChatManager extends Emitter<ConversationMessage> {
const conversations = Object.values(this.state.conversations)
// listen for sync node subscription responses
this.iridium.connector?.p2p.on<
IridiumPubsubMessage<SyncSubscriptionResponse>
IridiumPubsubMessage<IridiumDecodedPayload<SyncSubscriptionResponse>>
>('node/message/sync/subscribe', this.onSyncSubscriptionResponse.bind(this))

this.iridium.connector?.p2p.on<
IridiumPubsubMessage<IridiumDecodedPayload<SyncFetchResponse>>
>('node/message/sync/fetch', this.onSyncFetchResponse.bind(this))

this.iridium.connector?.p2p.on('ready', async () => {
await new Promise((resolve) => setTimeout(resolve, 5000))
if (!this.iridium.connector?.p2p.primaryNodeID) {
throw new Error('not connected to primary node')
}
logger.info('iridium/chatmanager/init', 'p2p ready, initializing chat...')

logger.info(
'iridium/chatmanager/init',
'p2p ready, initializing chat...',
{ node: this.iridium.connector?.p2p.primaryNodeID },
)
// sync fetch
await this.iridium.connector?.p2p.send(
this.iridium.connector?.p2p.primaryNodeID,
{
type: 'sync/fetch',
},
)

for (const conversation of conversations) {
if (this.subscriptions[conversation.id] !== undefined) continue
Expand All @@ -94,6 +118,7 @@ export default class ChatManager extends Emitter<ConversationMessage> {
{
type: 'sync/subscribe',
topic,
offlineSync: true,
},
)
}
Expand All @@ -107,7 +132,9 @@ export default class ChatManager extends Emitter<ConversationMessage> {
* @description - handle a sync subscription response from the sync node
*/
async onSyncSubscriptionResponse(
message: IridiumPubsubMessage<SyncSubscriptionResponse>,
message: IridiumPubsubMessage<
IridiumDecodedPayload<SyncSubscriptionResponse>
>,
) {
logger.info(
'iridium/chatmanager/onSyncSubscriptionResponse',
Expand Down Expand Up @@ -153,6 +180,36 @@ export default class ChatManager extends Emitter<ConversationMessage> {
)
}

async onSyncFetchResponse(
message: IridiumPubsubMessage<IridiumDecodedPayload<SyncFetchResponse>>,
) {
if (!message.payload.body.rows) {
return
}
await Promise.all(
message.payload.body.rows.map(async (row) => {
const stored = await this.iridium.connector?.dag.get(CID.parse(row.cid))
if (!stored.body) {
return
}
const buffer = await this.iridium.connector?.did.decryptJWE(stored.body)
const message: any = buffer && json.decode(buffer)
if (stored.topic) {
logger.info(
'iridium/chatmanager',
'sync/fetch/row - emitting synced message',
message,
)
await this.iridium.connector?.pubsub.emit(stored.topic, {
from: stored.from,
topic: stored.topic,
payload: { type: 'jwe', body: message },
})
}
}),
)
}

get(path: string = '', options: any = {}) {
return this.iridium.connector?.get(`/chat${path}`, options)
}
Expand Down Expand Up @@ -275,6 +332,7 @@ export default class ChatManager extends Emitter<ConversationMessage> {
{
type: 'sync/subscribe',
topic: `/chat/conversations/${id}`,
offlineSync: true,
},
)
}
Expand Down

0 comments on commit e959f81

Please sign in to comment.