Skip to content

Commit

Permalink
feat(cosmic-swingset): leave inbound in actionQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
mhofman committed Mar 23, 2023
1 parent e795236 commit a32299d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 105 deletions.
21 changes: 5 additions & 16 deletions golang/cosmos/x/swingset/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package swingset
import (
// "fmt"
// "os"
"encoding/json"
"time"

"github.com/cosmos/cosmos-sdk/telemetry"
Expand All @@ -23,10 +22,6 @@ type beginBlockAction struct {
Params types.Params `json:"params"`
}

type beginBlockResult struct {
QueueAllowed []types.QueueSize `json:"queue_allowed"`
}

type endBlockAction struct {
Type string `json:"type"`
BlockHeight int64 `json:"blockHeight"`
Expand All @@ -50,20 +45,14 @@ func BeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, keeper Keeper) erro
ChainID: ctx.ChainID(),
Params: keeper.GetParams(ctx),
}
out, err := keeper.BlockingSend(ctx, action)
_, err := keeper.BlockingSend(ctx, action)
// fmt.Fprintf(os.Stderr, "BEGIN_BLOCK Returned from SwingSet: %s, %v\n", out, err)

if out != "" {
var result beginBlockResult
err := json.Unmarshal([]byte(out), &result)
if err != nil {
panic(err)
}
state := keeper.GetState(ctx)
state.QueueAllowed = result.QueueAllowed
keeper.SetState(ctx, state)
if err != nil {
panic(err)
}

err = keeper.UpdateQueueAllowed(ctx)

return err
}

Expand Down
33 changes: 33 additions & 0 deletions golang/cosmos/x/swingset/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,39 @@ func (k Keeper) ActionQueueLength(ctx sdk.Context) (int32, error) {
return int32(int64Size), nil
}

func (k Keeper) UpdateQueueAllowed(ctx sdk.Context) error {
params := k.GetParams(ctx)
inboundQueueMax, found := types.QueueSizeEntry(params.QueueMax, types.QueueInbound)
if !found {
return errors.New("could not find max inboundQueue size in params")
}
inboundMempoolQueueMax := inboundQueueMax / 2

inboundQueueSize, err := k.ActionQueueLength(ctx)
if err != nil {
return err
}

var inboundQueueAllowed int32
if inboundQueueMax > inboundQueueSize {
inboundQueueAllowed = inboundQueueMax - inboundQueueSize
}

var inboundMempoolQueueAllowed int32
if inboundMempoolQueueMax > inboundQueueSize {
inboundMempoolQueueAllowed = inboundMempoolQueueMax - inboundQueueSize
}

state := k.GetState(ctx)
state.QueueAllowed = []types.QueueSize{
{Key: types.QueueInbound, Size_: inboundQueueAllowed},
{Key: types.QueueInboundMempool, Size_: inboundMempoolQueueAllowed},
}
k.SetState(ctx, state)

return nil
}

// BlockingSend sends a message to the controller and blocks the Golang process
// until the response. It is orthogonal to PushAction, and should only be used
// by SwingSet to perform block lifecycle events (BEGIN_BLOCK, END_BLOCK,
Expand Down
11 changes: 8 additions & 3 deletions packages/cosmic-swingset/src/kernel-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,14 @@ export function makeInboundQueueMetrics(initialLength) {
let remove = 0;

return harden({
incStat: (delta = 1) => {
length += delta;
add += delta;
updateLength: newLength => {
const delta = newLength - length;
length = newLength;
if (delta > 0) {
add += delta;
} else {
remove -= delta;
}
},

decStat: (delta = 1) => {
Expand Down
95 changes: 15 additions & 80 deletions packages/cosmic-swingset/src/launch-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ import {
BeansPerBlockComputeLimit,
BeansPerVatCreation,
BeansPerXsnapComputron,
QueueInbound,
} from './sim-params.js';
import { parseParams, encodeQueueSizes } from './params.js';
import { parseParams } from './params.js';
import { makeQueue } from './make-queue.js';

const console = anylogger('launch-chain');
Expand Down Expand Up @@ -237,31 +236,9 @@ export async function launch({
});
const { kvStore, commit } = hostStorage;

// makeQueue() thinks it should commit/abort, but the kvStore doesn't provide
// those ('commit' is reserved for flushing the block buffer). Furthermore
// the kvStore only deals with string values.
// We create a storage wrapper that adds a prefix to keys, serializes values,
// and disables commit/abort.

const inboundQueuePrefix = getHostKey('inboundQueue.');
/** @type {import("./make-queue.js").QueueStorage} */
const inboundQueueStorage = harden({
get: key => {
return kvStore.get(inboundQueuePrefix + key);
},
set: (key, value) => {
typeof value === 'string' || Fail`value in inboundQueue must be a string`;
kvStore.set(inboundQueuePrefix + key, value);
},
delete: key => kvStore.delete(inboundQueuePrefix + key),
commit: () => {}, // disable
abort: () => {}, // disable
});
/** @typedef {ReturnType<typeof makeQueue<{context: any, action: any}>>} ActionQueue */
/** @type {ActionQueue} */
const actionQueue = makeQueue(actionQueueStorage);
/** @type {ActionQueue} */
const inboundQueue = makeQueue(inboundQueueStorage);

// Not to be confused with the gas model, this meter is for OpenTelemetry.
const metricMeter = metricsProvider.getMeter('ag-chain-cosmos');
Expand Down Expand Up @@ -292,7 +269,7 @@ export async function launch({
? parseInt(env.END_BLOCK_SPIN_MS, 10)
: 0;

const inboundQueueMetrics = makeInboundQueueMetrics(inboundQueue.size());
const inboundQueueMetrics = makeInboundQueueMetrics(actionQueue.size());
const { crankScheduler } = exportKernelStats({
controller,
metricMeter,
Expand All @@ -314,33 +291,6 @@ export async function launch({
}
}

let savedQueueAllowed = JSON.parse(
kvStore.get(getHostKey('queueAllowed')) || '{}',
);

function updateQueueAllowed(_blockHeight, _blockTime, params) {
assert(params.queueMax);
assert(QueueInbound in params.queueMax);

const inboundQueueMax = params.queueMax[QueueInbound];
const inboundMempoolQueueMax = Math.floor(inboundQueueMax / 2);

const inboundQueueSize = inboundQueue.size();

const inboundQueueAllowed = Math.max(0, inboundQueueMax - inboundQueueSize);
const inboundMempoolQueueAllowed = Math.max(
0,
inboundMempoolQueueMax - inboundQueueSize,
);

savedQueueAllowed = {
// Keep up-to-date with queue size keys defined in
// golang/cosmos/x/swingset/types/default-params.go
inbound: inboundQueueAllowed,
inbound_mempool: inboundMempoolQueueAllowed,
};
}

async function saveChainState() {
// Save the mailbox state.
await mailboxStorage.commit();
Expand All @@ -351,7 +301,6 @@ export async function launch({
kvStore.set(getHostKey('height'), `${blockHeight}`);
kvStore.set(getHostKey('blockTime'), `${blockTime}`);
kvStore.set(getHostKey('chainSends'), JSON.stringify(chainSends));
kvStore.set(getHostKey('queueAllowed'), JSON.stringify(savedQueueAllowed));

await commit();
}
Expand Down Expand Up @@ -516,19 +465,19 @@ export async function launch({

let keepGoing = await runSwingset();

// Then process as much as we can from the inboundQueue, which contains
// Then process as much as we can from the actionQueue, which contains
// first the old actions followed by the newActions, running the
// kernel to completion after each.
if (keepGoing) {
for (const { action, context } of inboundQueue.consumeAll()) {
for (const { action, context } of actionQueue.consumeAll()) {
const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`;
inboundQueueMetrics.decStat();
// eslint-disable-next-line no-await-in-loop
await performAction(action, inboundNum);
// eslint-disable-next-line no-await-in-loop
keepGoing = await runSwingset();
if (!keepGoing) {
// any leftover actions will remain on the inboundQueue for possible
// any leftover actions will remain on the actionQueue for possible
// processing in the next block
break;
}
Expand All @@ -540,19 +489,14 @@ export async function launch({
}
}

async function endBlock(blockHeight, blockTime, params, newActions) {
async function endBlock(blockHeight, blockTime, params) {
// This is called once per block, during the END_BLOCK event, and
// only when we know that cosmos is in sync (else we'd skip kernel
// execution). 'newActions' are the bridge/mailbox/etc events that
// cosmos stored up for delivery to swingset in this block.

// First, push all newActions onto the end of the inboundQueue,
// remembering that inboundQueue might still have work from the
// previous block
for (const actionRecord of newActions) {
inboundQueue.push(actionRecord);
inboundQueueMetrics.incStat();
}
// execution).

// First, record new actions (bridge/mailbox/etc events that cosmos
// added up for delivery to swingset) into our inboundQueue metrics
inboundQueueMetrics.updateLength(actionQueue.size());

// We update the timer device at the start of each block, which might push
// work onto the end of the kernel run-queue (if any timers were ready to
Expand Down Expand Up @@ -746,19 +690,14 @@ export async function launch({
blockManagerConsole.info('block', blockHeight, 'begin');
runTime = 0;

if (blockNeedsExecution(blockHeight)) {
// We are not reevaluating, so compute a new queueAllowed
updateQueueAllowed(blockHeight, blockTime, blockParams);
}

controller.writeSlogObject({
type: 'cosmic-swingset-begin-block',
blockHeight,
blockTime,
queueAllowed: savedQueueAllowed,
actionQueueStats: inboundQueueMetrics.getStats(),
});

return { queue_allowed: encodeQueueSizes(savedQueueAllowed) };
return undefined;
}

case ActionType.END_BLOCK: {
Expand Down Expand Up @@ -794,12 +733,7 @@ export async function launch({
provideInstallationPublisher();

await processAction(action.type, async () =>
endBlock(
blockHeight,
blockTime,
blockParams,
actionQueue.consumeAll(),
),
endBlock(blockHeight, blockTime, blockParams),
);

// We write out our on-chain state as a number of chainSends.
Expand All @@ -815,6 +749,7 @@ export async function launch({
type: 'cosmic-swingset-end-block-finish',
blockHeight,
blockTime,
actionQueueStats: inboundQueueMetrics.getStats(),
});

return undefined;
Expand Down
14 changes: 8 additions & 6 deletions packages/cosmic-swingset/src/sim-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import anylogger from 'anylogger';
import { makeSlogSender } from '@agoric/telemetry';

import { resolve as importMetaResolve } from 'import-meta-resolve';
import { assert, Fail } from '@agoric/assert';
import { Fail } from '@agoric/assert';
import { makeWithQueue } from '@agoric/internal/src/queue.js';
import { makeBatchedDeliver } from '@agoric/internal/src/batched-deliver.js';
import stringify from './json-stable-stringify.js';
Expand Down Expand Up @@ -135,13 +135,15 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) {
blockTime,
params,
};
const beginBlockResult = await blockingSend(beginAction);
assert(beginBlockResult);
const queueAllowed = parseQueueSizes(beginBlockResult.queue_allowed);
assert(QueueInbound in queueAllowed);
await blockingSend(beginAction);
const inboundQueueMax = parseQueueSizes(params.queue_max)[QueueInbound];
const inboundQueueAllowed = Math.max(
0,
inboundQueueMax - actionQueue.size(),
);

// Gather up the new messages into the latest block.
const thisBlock = intoChain.splice(0, queueAllowed[QueueInbound]);
const thisBlock = intoChain.splice(0, inboundQueueAllowed);

for (const [i, [newMessages, acknum]] of thisBlock.entries()) {
actionQueue.push({
Expand Down

0 comments on commit a32299d

Please sign in to comment.