Skip to content

Commit

Permalink
fix(channel): ignore messages that can't be handled, print to console
Browse files Browse the repository at this point in the history
  • Loading branch information
davidyuk committed Jan 24, 2022
1 parent 22d6dbb commit aaad8e3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/channel/internal.js
Expand Up @@ -110,16 +110,21 @@ async function handleMessage (channel, message) {
}

async function dequeueMessage (channel) {
const queue = messageQueue.get(channel)
if (messageQueueLocked.get(channel) || !queue.length) {
return
}
const [message, ...remaining] = queue
messageQueue.set(channel, remaining || [])
if (messageQueueLocked.get(channel)) return
const messages = messageQueue.get(channel)
if (!messages.length) return
messageQueueLocked.set(channel, true)
await handleMessage(channel, message)
while (messages.length) {
const message = messages.shift()
try {
await handleMessage(channel, message)
} catch (error) {
console.error('Error handling incoming message:')
console.error(message)
console.error(error)
}
}
messageQueueLocked.set(channel, false)
dequeueMessage(channel)
}

function ping (channel) {
Expand Down Expand Up @@ -151,20 +156,24 @@ function onMessage (channel, data) {
} finally {
rpcCallbacks.get(channel).delete(message.id)
}
} else if (message.method === 'channels.message') {
return
}
if (message.method === 'channels.message') {
emit(channel, 'message', message.params.data.message)
} else if (message.method === 'channels.system.pong') {
return
}
if (message.method === 'channels.system.pong') {
if (
(message.params.channel_id === channelId.get(channel)) ||
// Skip channelId check if channelId is not known yet
(channelId.get(channel) == null)
) {
ping(channel)
}
} else {
messageQueue.set(channel, [...(messageQueue.get(channel) || []), message])
dequeueMessage(channel)
return
}
messageQueue.get(channel).push(message)
dequeueMessage(channel)
}

function wrapCallErrorMessage (message) {
Expand Down Expand Up @@ -200,6 +209,7 @@ export async function initialize (channel, { url, ...channelOptions }) {
eventEmitters.set(channel, new EventEmitter())
sequence.set(channel, 0)
rpcCallbacks.set(channel, new Map())
messageQueue.set(channel, [])

const wsUrl = new URL(url)
Object.entries(channelOptions)
Expand Down
17 changes: 17 additions & 0 deletions test/integration/channel.js
Expand Up @@ -25,6 +25,7 @@ import { generateKeyPair, encodeBase64Check } from '../../src/utils/crypto'
import { unpackTx, buildTx, buildTxHash } from '../../src/tx/builder'
import { decode } from '../../src/tx/builder/helpers'
import Channel from '../../src/channel'
import { send } from '../../src/channel/internal'
import MemoryAccount from '../../src/account/memory'
import {
IllegalArgumentError,
Expand Down Expand Up @@ -140,6 +141,22 @@ describe('Channel', function () {
responderTx.should.eql({ ...responderTx, ...expectedTxParams })
})

it('prints error on handling incoming messages', async () => {
const received = new Promise(resolve => sinon.stub(console, 'error').callsFake(resolve))
send(initiatorCh, {
jsonrpc: '2.0',
method: 'not-existing-method',
params: {}
})
await received
expect(console.error.callCount).to.be.equal(3)
expect(console.error.getCall(0).firstArg).to.be.equal('Error handling incoming message:')
expect(console.error.getCall(1).firstArg.error.message).to.be.equal('Method not found')
expect(console.error.getCall(2).firstArg.toString())
.to.be.equal('UnknownChannelStateError: State Channels FSM entered unknown state')
console.error.restore()
})

it('can post update and accept', async () => {
responderShouldRejectUpdate = false
const roundBefore = initiatorCh.round()
Expand Down

0 comments on commit aaad8e3

Please sign in to comment.