Skip to content

Commit

Permalink
fix(solo): query WebSocket for mailbox instead of ag-cosmos-helper
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Sep 9, 2021
1 parent 9d58314 commit 9a23c34
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 84 deletions.
2 changes: 1 addition & 1 deletion golang/cosmos/x/swingset/keeper/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func queryStorage(ctx sdk.Context, path string, req abci.RequestQuery, keeper Ke
value := storage.Value

if value == "" {
return []byte{}, sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, "could not get storage")
return []byte{}, sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "could not get storage %+v", path)
}

bz, err2 := codec.MarshalJSONIndent(legacyQuerierCdc, types.Storage{Value: value})
Expand Down
147 changes: 64 additions & 83 deletions packages/solo/src/chain-cosmos-sdk.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* global setTimeout */
/* global setTimeout Buffer */
import path from 'path';
import fs from 'fs';
import { execFile } from 'child_process';
Expand Down Expand Up @@ -44,9 +44,6 @@ const SEND_RETRY_DELAY_MS = 1_000;

const MAX_BUFFER_SIZE = 10 * 1024 * 1024;

// Wait for at least this long between errors when initially polling mailbox.
const MIN_MAILBOX_POLL_BACKOFF_MS = 1_000;

// How much of each delay to leave to randomness.
const RANDOM_SCALE = 0.1;

Expand Down Expand Up @@ -193,7 +190,6 @@ export async function connectToChain(
}

let goodRpcHref = rpcHrefs[0];
let pollMailboxBackoffMs = 0;
const runHelper = (args, stdin = undefined) => {
const fullArgs = [
...args,
Expand Down Expand Up @@ -230,31 +226,6 @@ export async function connectToChain(
await fs.promises.writeFile(dstFile, stdout);
};

const getMailbox = async () => {
const { stdout, stderr } = await runHelper([
'query',
'swingset',
'mailbox',
clientAddr,
'-ojson',
]);

const errMsg = stderr.trimRight();
if (errMsg) {
console.error(errMsg);
}
if (stdout) {
console.debug(`helper said: ${stdout}`);
try {
// Try to parse the stdout.
return JSON.parse(JSON.parse(stdout).value);
} catch (e) {
assert.fail(X`failed to parse output: ${e}`);
}
}
return undefined;
};

// Validate that our chain egress exists.
await retryRpcHref(async rpcHref => {
const args = ['query', 'swingset', 'egress', clientAddr];
Expand Down Expand Up @@ -334,7 +305,8 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(

// This magic identifier just distinguishes our subscription
// from other noise on the Websocket, if there is any.
const MAGIC_ID = 13254;
const MAILBOX_SUBSCRIPTION_ID = 13254;
const MAILBOX_QUERY_ID = 198772;
const mailboxPath = `mailbox.${clientAddr}`;
let firstUpdate = true;
ws.addEventListener('open', _ => {
Expand All @@ -343,7 +315,7 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
const obj = {
// JSON-RPC version 2.0.
jsonrpc: '2.0',
id: MAGIC_ID,
id: MAILBOX_SUBSCRIPTION_ID,
// We want to subscribe.
method: 'subscribe',
params: {
Expand All @@ -354,65 +326,57 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
// Send that message, and wait for the subscription.
ws.send(JSON.stringify(obj));

// Poll to establish our initial mailbox.
const pollMailboxWhileFirstUpdate = async () => {
if (!firstUpdate) {
// We're too late.
return;
}
const mb = await getMailbox().then(
res => res,
e => {
console.error(`Cannot get first mailbox:`, e);
},
);
if (!firstUpdate) {
// We're too late.
return;
}
// console.error('got first mb', mb);
if (mb !== undefined) {
// Found the initial mailbox.
// console.error('Updating in pollMailboxWhileFirstUpdate');
updater.updateState(mb);
firstUpdate = false;
return;
}

// Back off and try again.
pollMailboxBackoffMs = pollMailboxBackoffMs
? pollMailboxBackoffMs * 2
: MIN_MAILBOX_POLL_BACKOFF_MS;
setTimeout(
pollMailboxWhileFirstUpdate,
randomizeDelay(pollMailboxBackoffMs),
);
// Query for our initial mailbox.
const obj2 = {
jsonrpc: '2.0',
id: MAILBOX_QUERY_ID,
method: 'abci_query',
params: {
path: `/custom/swingset/storage/${mailboxPath}`,
},
};

// This is a new connection, so we have to poll again.
pollMailboxBackoffMs = 0;
pollMailboxWhileFirstUpdate().catch(e =>
console.error(
`Unexpected rejection while polling ${goodRpcHref} until first update:`,
e,
),
);
ws.send(JSON.stringify(obj2));
});
ws.addEventListener('message', ev => {
// We received a message.
// console.info('got message', ev.data);
const obj = JSON.parse(ev.data);
if (obj.id !== MAGIC_ID) {

const handleMailboxQuery = obj => {
// We received our initial mailbox query.
// console.info('got mailbox query', obj);
if (!firstUpdate) {
return;
}
if (obj.result && obj.result.response && obj.result.response.value) {
// Decode all the layers.
const { value: b64JsonStorage } = obj.result.response;
const jsonStorage = Buffer.from(b64JsonStorage, 'base64').toString(
'utf8',
);
const { value: mailboxValue } = JSON.parse(jsonStorage);

const mb = JSON.parse(mailboxValue);
// console.info('got mailbox value', mb);
updater.updateState(mb);
firstUpdate = false;
} else if (
obj.result &&
obj.result.response &&
obj.result.response.code === 6
) {
// No need to try again, just a missing mailbox that our subscription
// will pick up.
} else {
console.error('Error from mailbox query', obj);
ws.close();
}
};

const handleEventSubscription = obj => {
if (obj.error) {
console.error(`Error subscribing to events`, obj.error);
ws.close();
return;
}

// It matches our subscription, so maybe notify.
// It matches our subscription, so maybe notify the mailbox.
const events = obj.result.events;
if (!events) {
return;
Expand All @@ -437,11 +401,28 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
// Update our notifier.
// console.error('Updating in ws.message');
updater.updateState(mb);
firstUpdate = false;
};

ws.addEventListener('message', ev => {
// We received a message.
// console.info('got message', ev.data);
const obj = JSON.parse(ev.data);
switch (obj.id) {
case MAILBOX_SUBSCRIPTION_ID: {
handleEventSubscription(obj);
break;
}
case MAILBOX_QUERY_ID: {
handleMailboxQuery(obj);
break;
}
default: {
console.error('Unknown JSON-RPC message ID', obj);
}
}
});

ws.addEventListener('close', _ => {
// Stop trying to poll.
firstUpdate = false;
// The value `undefined` as the resolution of this retry
// tells the caller to retry again with a different RPC server.
retryPK.resolve(undefined);
Expand Down

0 comments on commit 9a23c34

Please sign in to comment.