Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iridium): offline sync #4335

Merged
merged 4 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,6 @@ scratchpad.js
.vscode/chrome/

solana-localnet
linked-iridium

./iridium
molimauro marked this conversation as resolved.
Show resolved Hide resolved
linked-iridium
78 changes: 68 additions & 10 deletions libraries/Iridium/chat/ChatManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ import {
didUtils,
IridiumPubsubMessage,
IridiumSetOptions,
encoding,
} from '@satellite-im/iridium'
import type { IridiumDecodedPayload } from '@satellite-im/iridium/src/core/encoding'
import type { AddOptions, AddResult } from 'ipfs-core-types/root'
import type { IPFS } from 'ipfs-core-types'
import type { SyncSubscriptionResponse } from '@satellite-im/iridium/src/sync/agent'
import { CID } from 'multiformats'
import * as json from 'multiformats/codecs/json'
import type {
SyncFetchResponse,
SyncSubscriptionResponse,
} from '@satellite-im/iridium/src/sync/agent'
import type { EmitterCallback } from '@satellite-im/iridium'
import {
Conversation,
Expand All @@ -31,11 +38,13 @@ import {
NotificationType,
} from '~/libraries/Iridium/notifications/types'

export type ConversationPubsubEvent = IridiumMessage<{
message?: ConversationMessage
cid?: string
type: 'chat/message'
}>
export type ConversationPubsubEvent = IridiumMessage<
IridiumDecodedPayload<{
message?: ConversationMessage
cid?: string
type: 'chat/message'
}>
>

export type State = {
conversations: { [key: Conversation['id']]: Conversation }
Expand Down Expand Up @@ -69,15 +78,30 @@ export default class ChatManager extends Emitter<ConversationMessage> {
const conversations = Object.values(this.state.conversations)
// listen for sync node subscription responses
this.iridium.connector?.p2p.on<
IridiumPubsubMessage<SyncSubscriptionResponse>
IridiumPubsubMessage<IridiumDecodedPayload<SyncSubscriptionResponse>>
>('node/message/sync/subscribe', this.onSyncSubscriptionResponse.bind(this))

this.iridium.connector?.p2p.on<
IridiumPubsubMessage<IridiumDecodedPayload<SyncFetchResponse>>
>('node/message/sync/fetch', this.onSyncFetchResponse.bind(this))

this.iridium.connector?.p2p.on('ready', async () => {
await new Promise((resolve) => setTimeout(resolve, 5000))
if (!this.iridium.connector?.p2p.primaryNodeID) {
throw new Error('not connected to primary node')
}
logger.info('iridium/chatmanager/init', 'p2p ready, initializing chat...')

logger.info(
'iridium/chatmanager/init',
'p2p ready, initializing chat...',
{ node: this.iridium.connector?.p2p.primaryNodeID },
)
// sync fetch
await this.iridium.connector?.p2p.send(
this.iridium.connector?.p2p.primaryNodeID,
{
type: 'sync/fetch',
},
)

for (const conversation of conversations) {
if (this.subscriptions[conversation.id] !== undefined) continue
Expand All @@ -94,6 +118,7 @@ export default class ChatManager extends Emitter<ConversationMessage> {
{
type: 'sync/subscribe',
topic,
offlineSync: true,
},
)
}
Expand All @@ -107,7 +132,9 @@ export default class ChatManager extends Emitter<ConversationMessage> {
* @description - handle a sync subscription response from the sync node
*/
async onSyncSubscriptionResponse(
message: IridiumPubsubMessage<SyncSubscriptionResponse>,
message: IridiumPubsubMessage<
IridiumDecodedPayload<SyncSubscriptionResponse>
>,
) {
logger.info(
'iridium/chatmanager/onSyncSubscriptionResponse',
Expand Down Expand Up @@ -153,6 +180,36 @@ export default class ChatManager extends Emitter<ConversationMessage> {
)
}

async onSyncFetchResponse(
message: IridiumPubsubMessage<IridiumDecodedPayload<SyncFetchResponse>>,
) {
if (!message.payload.body.rows) {
return
}
await Promise.all(
message.payload.body.rows.map(async (row) => {
const stored = await this.iridium.connector?.dag.get(CID.parse(row.cid))
if (!stored.body) {
return
}
const buffer = await this.iridium.connector?.did.decryptJWE(stored.body)
const message: any = buffer && json.decode(buffer)
if (stored.topic) {
logger.info(
'iridium/chatmanager',
'sync/fetch/row - emitting synced message',
message,
)
await this.iridium.connector?.pubsub.emit(stored.topic, {
from: stored.from,
topic: stored.topic,
payload: { type: 'jwe', body: message },
})
}
}),
)
}

get(path: string = '', options: any = {}) {
return this.iridium.connector?.get(`/chat${path}`, options)
}
Expand Down Expand Up @@ -275,6 +332,7 @@ export default class ChatManager extends Emitter<ConversationMessage> {
{
type: 'sync/subscribe',
topic: `/chat/conversations/${id}`,
offlineSync: true,
},
)
}
Expand Down