Skip to content

Commit

Permalink
feat(state channels): add reconnect method (#662)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpowaga authored and nduchak committed Sep 11, 2019
1 parent 15147af commit 9d8d1e8
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 2 deletions.
14 changes: 14 additions & 0 deletions es/channel/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import AsyncInit from '../utils/async-init'
import { snakeToPascal } from '../utils/string'
import { buildTx } from '../tx/builder'
import { TX_TYPE } from '../tx/builder/schema'
import * as handlers from './handlers'
import {
eventEmitters,
Expand Down Expand Up @@ -659,6 +661,15 @@ function sendMessage (message, recipient) {
send(this, { jsonrpc: '2.0', method: 'channels.message', params: { info, to: recipient } })
}

async function reconnect (options, txParams) {
const { sign } = options

return Channel({
...options,
reconnectTx: await sign('reconnect', buildTx(txParams, TX_TYPE.channelReconnect).tx)
})
}

/**
* Channel
*
Expand Down Expand Up @@ -730,6 +741,9 @@ const Channel = AsyncInit.compose({
getContractState,
disconnect,
cleanContractCalls
},
statics: {
reconnect
}
})

Expand Down
7 changes: 5 additions & 2 deletions es/channel/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { EventEmitter } from 'events'
import * as R from 'ramda'
import JSONBig from '../utils/json-big'
import { pascalToSnake } from '../utils/string'
import { awaitingConnection } from './handlers'
import { awaitingConnection, channelClosed, channelOpen } from './handlers'

// Send ping message every 10 seconds
const PING_TIMEOUT_MS = 10000
Expand Down Expand Up @@ -222,13 +222,16 @@ async function initialize (channel, channelOptions) {
const wsUrl = channelURL(url, { ...params, protocol: 'json-rpc' })

options.set(channel, channelOptions)
fsm.set(channel, { handler: awaitingConnection })
fsm.set(channel, { handler: params.reconnectTx ? channelClosed : awaitingConnection })
eventEmitters.set(channel, new EventEmitter())
sequence.set(channel, 0)
rpcCallbacks.set(channel, new Map())
const ws = await WebSocket(wsUrl, {
onopen: () => {
changeStatus(channel, 'connected')
if (params.reconnectTx) {
enterState(channel, { handler: channelOpen })
}
ping(channel)
},
onclose: () => {
Expand Down
17 changes: 17 additions & 0 deletions es/tx/builder/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const OBJECT_TAG_CHANNEL_OFFCHAIN_UPDATE_DEPOSIT_TX = 571
const OBJECT_TAG_CHANNEL_OFFCHAIN_UPDATE_WITHDRAWAL_TX = 572
const OBJECT_TAG_CHANNEL_OFFCHAIN_CREATE_CONTRACT_TX = 573
const OBJECT_TAG_CHANNEL_OFFCHAIN_CALL_CONTRACT_TX = 574
const OBJECT_TAG_CHANNEL_RECONNECT_TX = 575
const OBJECT_TAG_PROOF_OF_INCLUSION = 60
const OBJECT_TAG_STATE_TREES = 62
const OBJECT_TAG_MERKLE_PATRICIA_TREE = 63
Expand Down Expand Up @@ -132,6 +133,7 @@ export const TX_TYPE = {
channelOffChainUpdateWithdrawal: 'channelOffChainUpdateWithdrawal',
channelOffChainCreateContract: 'channelOffChainCreateContract',
channelOffChainCallContract: 'channelOffChainCallContract',
channelReconnect: 'channelReconnect',
proofOfInclusion: 'proofOfInclusion',
stateTrees: 'stateTrees',
merklePatriciaTree: 'merklePatriciaTree',
Expand Down Expand Up @@ -235,6 +237,7 @@ export const OBJECT_ID_TX_TYPE = {
[OBJECT_TAG_CHANNEL_OFFCHAIN_UPDATE_WITHDRAWAL_TX]: TX_TYPE.channelOffChainUpdateWithdrawal,
[OBJECT_TAG_CHANNEL_OFFCHAIN_CREATE_CONTRACT_TX]: TX_TYPE.channelOffChainCreateContract,
[OBJECT_TAG_CHANNEL_OFFCHAIN_CALL_CONTRACT_TX]: TX_TYPE.channelOffChainCallContract,
[OBJECT_TAG_CHANNEL_RECONNECT_TX]: TX_TYPE.channelReconnect,
[OBJECT_TAG_PROOF_OF_INCLUSION]: TX_TYPE.proofOfInclusion,
[OBJECT_TAG_STATE_TREES]: TX_TYPE.stateTrees,
[OBJECT_TAG_MERKLE_PATRICIA_TREE]: TX_TYPE.merklePatriciaTree,
Expand Down Expand Up @@ -735,6 +738,14 @@ const CHANNEL_OFFCHAIN_CALL_CONTRACT_TX = [
TX_FIELD('gasLimit', FIELD_TYPES.int)
]

const CHANNEL_RECONNECT_TX = [
...BASE_TX,
TX_FIELD('channelId', FIELD_TYPES.id, 'ch'),
TX_FIELD('round', FIELD_TYPES.int),
TX_FIELD('role', FIELD_TYPES.string),
TX_FIELD('pubkey', FIELD_TYPES.id, 'ak')
]

const CHANNEL_OFFCHAIN_UPDATE_TRANSFER_TX = [
...BASE_TX,
TX_FIELD('from', FIELD_TYPES.id, 'ak'),
Expand Down Expand Up @@ -912,6 +923,9 @@ export const TX_SERIALIZATION_SCHEMA = {
[TX_TYPE.channelOffChainCallContract]: {
1: TX_SCHEMA_FIELD(CHANNEL_OFFCHAIN_CALL_CONTRACT_TX, OBJECT_TAG_CHANNEL_OFFCHAIN_CALL_CONTRACT_TX)
},
[TX_TYPE.channelReconnect]: {
1: TX_SCHEMA_FIELD(CHANNEL_RECONNECT_TX, OBJECT_TAG_CHANNEL_RECONNECT_TX)
},
[TX_TYPE.proofOfInclusion]: {
1: TX_SCHEMA_FIELD(PROOF_OF_INCLUSION_TX, OBJECT_TAG_PROOF_OF_INCLUSION)
},
Expand Down Expand Up @@ -1047,6 +1061,9 @@ export const TX_DESERIALIZATION_SCHEMA = {
[OBJECT_TAG_CHANNEL_OFFCHAIN_CALL_CONTRACT_TX]: {
1: TX_SCHEMA_FIELD(CHANNEL_OFFCHAIN_CALL_CONTRACT_TX, OBJECT_TAG_CHANNEL_OFFCHAIN_CALL_CONTRACT_TX)
},
[OBJECT_TAG_CHANNEL_RECONNECT_TX]: {
1: TX_SCHEMA_FIELD(CHANNEL_RECONNECT_TX, OBJECT_TAG_CHANNEL_RECONNECT_TX)
},
[OBJECT_TAG_PROOF_OF_INCLUSION]: {
1: TX_SCHEMA_FIELD(PROOF_OF_INCLUSION_TX, OBJECT_TAG_PROOF_OF_INCLUSION)
},
Expand Down
34 changes: 34 additions & 0 deletions test/integration/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,40 @@ describe('Channel', function () {
await initiator.sendTransaction(await initiator.signTransaction(snapshotSoloTx), { waitMined: true })
})

it('can reconnect', async () => {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
port: 3006,
sign: initiatorSign
})
responderCh = await Channel({
...sharedParams,
role: 'responder',
port: 3006,
sign: responderSign
})
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)])
const channelId = await initiatorCh.id()
const round = Number(unpackTx((await initiatorCh.state()).signedTx).tx.encodedTx.tx.nonce)
initiatorCh.disconnect()
const ch = await Channel.reconnect({
...sharedParams,
role: 'initiator',
port: 3006,
sign: initiatorSign
}, {
channelId,
round,
role: 'initiator',
pubkey: await initiator.address()
})
await waitForChannel(ch)
ch.state().should.eventually.be.fulfilled
})

describe('throws errors', function () {
before(async function () {
initiatorCh.disconnect()
Expand Down

0 comments on commit 9d8d1e8

Please sign in to comment.