diff --git a/packages/cli/src/__tests__/ceramic-daemon.test.ts b/packages/cli/src/__tests__/ceramic-daemon.test.ts index ff68c729de..7652fe0d0a 100644 --- a/packages/cli/src/__tests__/ceramic-daemon.test.ts +++ b/packages/cli/src/__tests__/ceramic-daemon.test.ts @@ -28,6 +28,7 @@ const waitChange = (doc: EventEmitter, count = 1): Promise => { } const port = 7777 const apiUrl = 'http://localhost:' + port +const topic = '/ceramic' /** * Create an IPFS instance @@ -64,6 +65,10 @@ describe('Ceramic interop: core <> http-client', () => { }, Bootstrap: [] } }) + if (!ipfs.pubsub) { + ipfs.pubsub = {} + } + ipfs.pubsub.subscribe = jest.fn() }) afterAll(async () => { @@ -76,7 +81,7 @@ describe('Ceramic interop: core <> http-client', () => { // performed yet by the time the test checks. To eliminate this race condition we should set // anchorOnRequest to false in the config for the InMemoryAnchorService and anchor manually // throughout the tests. - core = await Ceramic.create(ipfs) + core = await Ceramic.create(ipfs, {pubsubTopic: topic}) const doctypeHandler = new TileDoctypeHandler() doctypeHandler.verifyJWS = (): Promise => { return } diff --git a/packages/core/src/__tests__/dispatcher.test.ts b/packages/core/src/__tests__/dispatcher.test.ts index 85af38de5c..7bc7f197a5 100644 --- a/packages/core/src/__tests__/dispatcher.test.ts +++ b/packages/core/src/__tests__/dispatcher.test.ts @@ -4,6 +4,7 @@ import Document from "../document" import { TileDoctype } from "@ceramicnetwork/doctype-tile" import DocID from "@ceramicnetwork/docid"; +const TOPIC = '/ceramic' const FAKE_CID = new CID('bafybeig6xv5nwphfmvcnektpnojts33jqcuam7bmye2pb54adnrtccjlsu') const FAKE_CID2 = new CID('bafybeig6xv5nwphfmvcnektpnojts44jqcuam7bmye2pb54adnrtccjlsu') const FAKE_DOC_ID = "kjzl6cwe1jw147dvq16zluojmraqvwdmbh61dx9e0c59i344lcrsgqfohexp60s" @@ -23,7 +24,6 @@ const ipfs = { }, id: (): any => ({ id: 'ipfsid' }) } -const TOPIC = '/ceramic' class TileDoctypeMock extends TileDoctype { get doctype() { @@ -50,9 +50,13 @@ describe('Dispatcher', () => { await dispatcher.init() }) + afterEach(async () => { + await dispatcher.close() + }) + it('is constructed correctly', async () => { expect(dispatcher._documents).toEqual({}) - expect(ipfs.pubsub.subscribe).toHaveBeenCalledWith(TOPIC, expect.anything()) + expect(ipfs.pubsub.subscribe).toHaveBeenCalledWith(TOPIC, expect.anything(), expect.anything()) }) it('closes correctly', async () => { @@ -62,8 +66,16 @@ describe('Dispatcher', () => { }) it('makes registration correctly', async () => { - const doc = new Document(DocID.fromString(FAKE_DOC_ID), dispatcher, null) - doc._doctype = new TileDoctypeMock() + const doc = new Document( + DocID.fromString(FAKE_DOC_ID), + dispatcher, + null, + false, + {}, + null, + null + ) + doc['_doctype'] = new TileDoctypeMock(null, {}) await dispatcher.register(doc) const publishArgs = ipfs.pubsub.publish.mock.calls[0] @@ -94,8 +106,16 @@ describe('Dispatcher', () => { }) it('handle message correctly', async () => { - const doc = new Document(DocID.fromString(FAKE_DOC_ID), dispatcher, null) - doc._doctype = new TileDoctypeMock() + const doc = new Document( + DocID.fromString(FAKE_DOC_ID), + dispatcher, + null, + false, + {}, + null, + null + ) + doc['_doctype'] = new TileDoctypeMock(null, {}) await dispatcher.register(doc) // Store the query ID sent when the doc is registered so we can use it as the response ID later diff --git a/packages/core/src/dispatcher.ts b/packages/core/src/dispatcher.ts index 677aaa4ecc..340936b58b 100644 --- a/packages/core/src/dispatcher.ts +++ b/packages/core/src/dispatcher.ts @@ -12,6 +12,8 @@ import DocID from "@ceramicnetwork/docid"; const IPFS_GET_TIMEOUT = 30000 // 30 seconds const IPFS_MAX_RECORD_SIZE = 256000 // 256 KB +const IPFS_RESUBSCRIBE_INTERVAL_DELAY = 1000 * 60 // 1 minute +const TESTING = process.env.NODE_ENV == 'test' /** * Ceramic Pub/Sub message type. @@ -45,6 +47,7 @@ export default class Dispatcher extends EventEmitter { private logger: Logger private _isRunning = true + private _resubscribeInterval: any constructor (public _ipfs: IpfsApi, public topic: string) { super() @@ -58,8 +61,42 @@ export default class Dispatcher extends EventEmitter { */ async init(): Promise { this._peerId = this._peerId || (await this._ipfs.id()).id - await this._ipfs.pubsub.subscribe(this.topic, this.handleMessage.bind(this)) - this._log({ peer: this._peerId, event: 'subscribed', topic: this.topic }) + await this._subscribe() + if (!TESTING) { + this._resubscribe() + } + } + + /** + * Subscribes IPFS pubsub to `this.topic` and logs a `subscribe` event. + * + * Logs error if subscribe fails. + */ + async _subscribe(): Promise { + try { + await this._ipfs.pubsub.subscribe( + this.topic, + this.handleMessage, + {timeout: TESTING ? null : IPFS_GET_TIMEOUT} + ) + this._log({peer: this._peerId, event: 'subscribed', topic: this.topic }) + } catch (error) { + // TODO: use logger + if (error.message.includes('Already subscribed')) { + this.logger.debug(error.message) + } else { + console.error(error) + } + } + } + + /** + * Periodically subscribes to IPFS pubsub topic. + */ + _resubscribe(): void { + this._resubscribeInterval = setInterval(async () => { + await this._subscribe() + }, IPFS_RESUBSCRIBE_INTERVAL_DELAY) } /** @@ -193,7 +230,7 @@ export default class Dispatcher extends EventEmitter { * * @param message - Message data */ - async handleMessage (message: any): Promise { + handleMessage = async (message: any): Promise => { if (!this._isRunning) { this.logger.error('Dispatcher has been closed') return @@ -319,6 +356,8 @@ export default class Dispatcher extends EventEmitter { async close(): Promise { this._isRunning = false + clearInterval(this._resubscribeInterval) + await Promise.all(Object.values(this._documents).map(async (doc) => await doc.close())) await this._ipfs.pubsub.unsubscribe(this.topic) diff --git a/packages/core/src/document.ts b/packages/core/src/document.ts index 6e01ad05d2..64d26b9526 100644 --- a/packages/core/src/document.ts +++ b/packages/core/src/document.ts @@ -746,7 +746,7 @@ class Document extends EventEmitter { await this._applyQueue.onEmpty() - this._context.anchorService.removeAllListeners(this.id.toString()) + this._context.anchorService && this._context.anchorService.removeAllListeners(this.id.toString()) await Utils.awaitCondition(() => this._isProcessing, () => false, 500) }