Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix pubsub subscribe #735

Merged
merged 20 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a136c95
fix: remove anchor service listeners if it is in context
v-stickykeys Dec 31, 2020
7343b9a
feat: resubscribe to pubsub topic on interval
v-stickykeys Dec 31, 2020
4dc52fb
fix: lint
v-stickykeys Dec 31, 2020
072ff85
feat: wait for pubsub subscription
v-stickykeys Dec 31, 2020
5f056bf
fix: update dispatcher test with additional subscribe arg
v-stickykeys Dec 31, 2020
3ddd272
fix: use try catch for pubsub ls
v-stickykeys Jan 1, 2021
dd9b096
fix: mock ipfs pubsub in ceramic api tests
v-stickykeys Jan 1, 2021
f6414db
fix: remove reject from ipfs ls mock
v-stickykeys Jan 4, 2021
94aabf9
feat: update constants for dispatcher timeout and delay
v-stickykeys Jan 4, 2021
3f7f7d8
fix: disable pubsub resub and timeout when testing
v-stickykeys Jan 4, 2021
dc5071d
Revert "fix: mock ipfs pubsub in ceramic api tests"
v-stickykeys Jan 4, 2021
5e043ee
feat: mock pubsub in ceramic daemon test
v-stickykeys Jan 4, 2021
56d0ff9
fmt: use testing variable
v-stickykeys Jan 4, 2021
5fa3927
fmt: Update packages/core/src/dispatcher.ts
v-stickykeys Jan 5, 2021
3ed6275
feat: resubscribe to pubsub unconditionally
v-stickykeys Jan 5, 2021
16ad93c
fmt: update conditional statements
v-stickykeys Jan 5, 2021
7aef116
feat: use same handler for pubsub subscribe
v-stickykeys Jan 5, 2021
dc7f9d0
feat: use arrow function for pubsub handle message
v-stickykeys Jan 5, 2021
d1397ef
feat: replace already subscribed log
v-stickykeys Jan 5, 2021
f48ca28
fix: replace already subscribed log
v-stickykeys Jan 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/cli/src/__tests__/ceramic-daemon.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const waitChange = (doc: EventEmitter, count = 1): Promise<void> => {
}
const port = 7777
const apiUrl = 'http://localhost:' + port
const topic = '/ceramic'

/**
* Create an IPFS instance
Expand Down Expand Up @@ -64,6 +65,11 @@ describe('Ceramic interop: core <> http-client', () => {
}, Bootstrap: []
}
})
if (!ipfs.pubsub) {
ipfs.pubsub = {}
}
ipfs.pubsub.subscribe = jest.fn()
ipfs.pubsub.ls = jest.fn(() => Promise.resolve([topic]))
})

afterAll(async () => {
Expand All @@ -76,7 +82,7 @@ describe('Ceramic interop: core <> http-client', () => {
// performed yet by the time the test checks. To eliminate this race condition we should set
// anchorOnRequest to false in the config for the InMemoryAnchorService and anchor manually
// throughout the tests.
core = await Ceramic.create(ipfs)
core = await Ceramic.create(ipfs, {pubsubTopic: topic})

const doctypeHandler = new TileDoctypeHandler()
doctypeHandler.verifyJWS = (): Promise<void> => { return }
Expand Down
35 changes: 29 additions & 6 deletions packages/core/src/__tests__/dispatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Document from "../document"
import { TileDoctype } from "@ceramicnetwork/doctype-tile"
import DocID from "@ceramicnetwork/docid";

const TOPIC = '/ceramic'
const FAKE_CID = new CID('bafybeig6xv5nwphfmvcnektpnojts33jqcuam7bmye2pb54adnrtccjlsu')
const FAKE_CID2 = new CID('bafybeig6xv5nwphfmvcnektpnojts44jqcuam7bmye2pb54adnrtccjlsu')
const FAKE_DOC_ID = "kjzl6cwe1jw147dvq16zluojmraqvwdmbh61dx9e0c59i344lcrsgqfohexp60s"
Expand All @@ -12,6 +13,9 @@ const ipfs = {
pubsub: {
subscribe: jest.fn(),
unsubscribe: jest.fn(),
ls: jest.fn(() => new Promise((resolve) => {
resolve([TOPIC])
})),
publish: jest.fn()
},
dag: {
Expand All @@ -23,7 +27,6 @@ const ipfs = {
},
id: (): any => ({ id: 'ipfsid' })
}
const TOPIC = '/ceramic'

class TileDoctypeMock extends TileDoctype {
get doctype() {
Expand All @@ -50,9 +53,13 @@ describe('Dispatcher', () => {
await dispatcher.init()
})

afterEach(async () => {
await dispatcher.close()
})

it('is constructed correctly', async () => {
expect(dispatcher._documents).toEqual({})
expect(ipfs.pubsub.subscribe).toHaveBeenCalledWith(TOPIC, expect.anything())
expect(ipfs.pubsub.subscribe).toHaveBeenCalledWith(TOPIC, expect.anything(), expect.anything())
})

it('closes correctly', async () => {
Expand All @@ -62,8 +69,16 @@ describe('Dispatcher', () => {
})

it('makes registration correctly', async () => {
const doc = new Document(DocID.fromString(FAKE_DOC_ID), dispatcher, null)
doc._doctype = new TileDoctypeMock()
const doc = new Document(
v-stickykeys marked this conversation as resolved.
Show resolved Hide resolved
DocID.fromString(FAKE_DOC_ID),
dispatcher,
null,
false,
{},
null,
null
)
doc['_doctype'] = new TileDoctypeMock(null, {})
await dispatcher.register(doc)

const publishArgs = ipfs.pubsub.publish.mock.calls[0]
Expand Down Expand Up @@ -94,8 +109,16 @@ describe('Dispatcher', () => {
})

it('handle message correctly', async () => {
const doc = new Document(DocID.fromString(FAKE_DOC_ID), dispatcher, null)
doc._doctype = new TileDoctypeMock()
const doc = new Document(
DocID.fromString(FAKE_DOC_ID),
dispatcher,
null,
false,
{},
null,
null
)
doc['_doctype'] = new TileDoctypeMock(null, {})
await dispatcher.register(doc)

// Store the query ID sent when the doc is registered so we can use it as the response ID later
Expand Down
72 changes: 70 additions & 2 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import DocID from "@ceramicnetwork/docid";

const IPFS_GET_TIMEOUT = 30000 // 30 seconds
const IPFS_MAX_RECORD_SIZE = 256000 // 256 KB
const IPFS_RESUBSCRIBE_INTERVAL_DELAY = 1000 * 60 // 1 minute
const TESTING = process.env.NODE_ENV == 'test'
v-stickykeys marked this conversation as resolved.
Show resolved Hide resolved

/**
* Ceramic Pub/Sub message type.
Expand Down Expand Up @@ -45,6 +47,8 @@ export default class Dispatcher extends EventEmitter {

private logger: Logger
private _isRunning = true
private _isSubscribed = false
private _resubscribeInterval: any

constructor (public _ipfs: IpfsApi, public topic: string) {
super()
Expand All @@ -58,8 +62,70 @@ export default class Dispatcher extends EventEmitter {
*/
async init(): Promise<void> {
this._peerId = this._peerId || (await this._ipfs.id()).id
await this._ipfs.pubsub.subscribe(this.topic, this.handleMessage.bind(this))
this._log({ peer: this._peerId, event: 'subscribed', topic: this.topic })
await this._subscribe()
!TESTING && this._resubscribeOnDisconnect()
v-stickykeys marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Subscribes IPFS pubsub to `this.topic` and logs an `subscribe` event.
*/
async _subscribe(): Promise<void> {
if (this._isSubscribed) {
console.log(`Pubsub subscribe skipped. Already subscribed to topic ${this.topic}`)
v-stickykeys marked this conversation as resolved.
Show resolved Hide resolved
} else {
try {
await this._ipfs.pubsub.subscribe(
this.topic,
this.handleMessage.bind(this),
{timeout: !TESTING && IPFS_GET_TIMEOUT}
v-stickykeys marked this conversation as resolved.
Show resolved Hide resolved
)

const { isSubscribed, error } = await this._confirmIsSubscribed()
if (isSubscribed) {
this._log({peer: this._peerId, event: 'subscribed', topic: this.topic })
} else if (error) {
throw error
} else {
throw new Error(`Pubsub subscribe failed for topic ${this.topic}`)
}
} catch (error) {
console.error(error)
v-stickykeys marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/**
* Periodically checks that IPFS pubsub is subscribed to `this.topic` and
* attempts to subscribe if not.
*/
_resubscribeOnDisconnect(): void {
this._resubscribeInterval = setInterval(async () => {
const { isSubscribed } = await this._confirmIsSubscribed()
!isSubscribed && await this._subscribe()
}, IPFS_RESUBSCRIBE_INTERVAL_DELAY)
}

/**
* Sets `this._iSubscribed` if the `this.topic` is listed as an IPFS pubsub
* subscription.
* @returns A tuple of `this._isSubscribed` and an error if one was caught.
*/
async _confirmIsSubscribed(): Promise<{isSubscribed: boolean, error: Error | null}> {
let isSubscribed = true
let error = null

try {
const subscriptions = await this._ipfs.pubsub.ls({ timeout: IPFS_GET_TIMEOUT })
v-stickykeys marked this conversation as resolved.
Show resolved Hide resolved
if (!subscriptions.includes(this.topic)) {
isSubscribed = false
}
} catch (_error) {
isSubscribed = false
error = _error
}
this._isSubscribed = isSubscribed

return { isSubscribed: this._isSubscribed, error }
}

/**
Expand Down Expand Up @@ -319,6 +385,8 @@ export default class Dispatcher extends EventEmitter {
async close(): Promise<void> {
this._isRunning = false

clearInterval(this._resubscribeInterval)

await Promise.all(Object.values(this._documents).map(async (doc) => await doc.close()))

await this._ipfs.pubsub.unsubscribe(this.topic)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ class Document extends EventEmitter {

await this._applyQueue.onEmpty()

this._context.anchorService.removeAllListeners(this.id.toString())
this._context.anchorService && this._context.anchorService.removeAllListeners(this.id.toString())
await Utils.awaitCondition(() => this._isProcessing, () => false, 500)
}

Expand Down