diff --git a/api/bin/spark.js b/api/bin/spark.js index d9de8df2..c3388ed8 100644 --- a/api/bin/spark.js +++ b/api/bin/spark.js @@ -6,6 +6,7 @@ import pg from 'pg' import { startRoundTracker } from '../lib/round-tracker.js' import { migrate } from '../../migrations/index.js' import { clearNetworkInfoStationIdsSeen } from '../lib/network-info-logger.js' +import { recordNetworkInfoTelemetry } from '../../common/telemetry.js' const { PORT = 8080, @@ -41,7 +42,10 @@ console.log('Initializing round tracker...') const start = Date.now() try { - const currentRound = await startRoundTracker({ pgPool: client }) + const currentRound = await startRoundTracker({ + pgPool: client, + recordTelemetry: recordNetworkInfoTelemetry + }) console.log( 'Initialized round tracker in %sms. SPARK round number at service startup: %s', Date.now() - start, diff --git a/api/lib/round-tracker.js b/api/lib/round-tracker.js index 0e1db99d..877f5ff1 100644 --- a/api/lib/round-tracker.js +++ b/api/lib/round-tracker.js @@ -2,23 +2,24 @@ import assert from 'node:assert' import * as Sentry from '@sentry/node' import { createMeridianContract } from './ie-contract.js' -// The number of tasks per round is proportionate to the SPARK round length - longer rounds require -// more tasks per round. -// -// See https://www.notion.so/pl-strflt/SPARK-tasking-v2-604e26d57f6b4892946525bcb3a77104?pvs=4#ded1cd98c2664a2289453d38e2715643 -// for more details, this constant represents TC (tasks per committee). -// -// We will need to tweak this value based on measurements; that's why I put it here as a constant. -export const TASKS_PER_ROUND = 1000 - -// How many tasks is each SPARK checker node expected to complete every round (at most). -export const MAX_TASKS_PER_NODE = 15 +// Tweak this to control the network's overall task count. +export const TASKS_EXECUTED_PER_ROUND = 500_000 + +// Baseline values for how many tasks should be completed every round, and how +// many tasks each SPARK checker node is expected to complete (every round, at +// most). The actual value will be set dynamically based on +// TASKS_EXECUTED_PER_ROUND and the number of tasks executed in the last round. +export const BASELINE_TASKS_PER_ROUND = 1000 +export const BASELINE_TASKS_PER_NODE = 15 + +export const ROUND_TASKS_TO_NODE_TASKS_RATIO = BASELINE_TASKS_PER_ROUND / BASELINE_TASKS_PER_NODE /** @typedef {Awaited>} MeridianContract */ /** * @param {object} args * @param {import('pg').Pool} args.pgPool + * @param {import('../../common/typings.js').RecordTelemetryFn} recordTelemetry * @param {AbortSignal} [args.signal] * @returns { * sparkRoundNumber: bigint; @@ -27,7 +28,7 @@ export const MAX_TASKS_PER_NODE = 15 * roundStartEpoch: bigint; * } */ -export async function startRoundTracker ({ pgPool, signal }) { +export async function startRoundTracker ({ pgPool, signal, recordTelemetry }) { const contract = await createMeridianContract() const onRoundStart = (newRoundIndex, ...args) => { @@ -41,7 +42,7 @@ export async function startRoundTracker ({ pgPool, signal }) { ) } - updateSparkRound(pgPool, contract, newRoundIndex, blockNumber).catch(err => { + updateSparkRound(pgPool, contract, newRoundIndex, recordTelemetry, blockNumber).catch(err => { console.error('Cannot handle RoundStart:', err) Sentry.captureException(err) }) @@ -53,7 +54,7 @@ export async function startRoundTracker ({ pgPool, signal }) { }) } - const currentRound = await updateSparkRound(pgPool, contract, await contract.currentRoundIndex()) + const currentRound = await updateSparkRound(pgPool, contract, await contract.currentRoundIndex(), recordTelemetry) return currentRound } @@ -61,9 +62,10 @@ export async function startRoundTracker ({ pgPool, signal }) { * @param {import('pg').Pool} pgPool * @param {MeridianContract} contract * @param {bigint} newRoundIndex + * @param {import('../../common/typings.js').RecordTelemetryFn} recordTelemetry * @param {number} [roundStartEpoch] */ -async function updateSparkRound (pgPool, contract, newRoundIndex, roundStartEpoch) { +async function updateSparkRound (pgPool, contract, newRoundIndex, recordTelemetry, roundStartEpoch) { const meridianRoundIndex = BigInt(newRoundIndex) const meridianContractAddress = await contract.getAddress() @@ -78,7 +80,8 @@ async function updateSparkRound (pgPool, contract, newRoundIndex, roundStartEpoc meridianContractAddress, meridianRoundIndex, roundStartEpoch, - pgClient + pgClient, + recordTelemetry }) await pgClient.query('COMMIT') console.log('SPARK round started: %s (epoch: %s)', sparkRoundNumber, roundStartEpoch) @@ -169,7 +172,8 @@ export async function mapCurrentMeridianRoundToSparkRound ({ meridianContractAddress, meridianRoundIndex, roundStartEpoch, - pgClient + pgClient, + recordTelemetry }) { let sparkRoundNumber @@ -223,7 +227,8 @@ export async function mapCurrentMeridianRoundToSparkRound ({ sparkRoundNumber, meridianContractAddress, meridianRoundIndex, - roundStartEpoch + roundStartEpoch, + recordTelemetry }) return sparkRoundNumber @@ -233,29 +238,68 @@ export async function maybeCreateSparkRound (pgClient, { sparkRoundNumber, meridianContractAddress, meridianRoundIndex, - roundStartEpoch + roundStartEpoch, + recordTelemetry }) { - const { rowCount } = await pgClient.query(` + // maxTasksPerNode(round(n)) = + // BASELINE_TASKS_PER_NODE + // if n=0 + // BASELINE_TASKS_PER_NODE + // if measurementCount(round(n-1)) = 0 + // maxTasksPerNode(round(n-1)) * (TASKS_EXECUTED_PER_ROUND / measurementCount(round(n-1))) + // otherwise + const { rows: [previousRound] } = await pgClient.query(` + SELECT measurement_count, max_tasks_per_node + FROM spark_rounds + WHERE id = $1 - 1::bigint + `, [ + sparkRoundNumber + ]) + const { rows, rowCount } = await pgClient.query(` INSERT INTO spark_rounds (id, created_at, meridian_address, meridian_round, start_epoch, max_tasks_per_node) - VALUES ($1, now(), $2, $3, $4, $5) + VALUES ( + $1, + now(), + $2, + $3, + $4, + ( + $5::int /* previousRound.max_tasks_per_node || BASELINE_TASKS_PER_NODE */ + * $6::int /* TASKS_EXECUTED_PER_ROUND */ + / $7::int /* previousRound.measurement_count || TASKS_EXECUTED_PER_ROUND */ + ) + ) ON CONFLICT DO NOTHING + RETURNING max_tasks_per_node `, [ sparkRoundNumber, meridianContractAddress, meridianRoundIndex, roundStartEpoch, - MAX_TASKS_PER_NODE + previousRound?.max_tasks_per_node || BASELINE_TASKS_PER_NODE, + TASKS_EXECUTED_PER_ROUND, + previousRound?.measurement_count || TASKS_EXECUTED_PER_ROUND ]) if (rowCount) { // We created a new SPARK round. Let's define retrieval tasks for this new round. // This is a short- to medium-term solution until we move to fully decentralized tasking - await defineTasksForRound(pgClient, sparkRoundNumber) + const taskCount = Math.floor( + rows[0].max_tasks_per_node * ROUND_TASKS_TO_NODE_TASKS_RATIO + ) + await defineTasksForRound(pgClient, sparkRoundNumber, taskCount) + recordTelemetry('round', point => { + point.intField('current_round_measurement_count_target', TASKS_EXECUTED_PER_ROUND) + point.intField('current_round_task_count', taskCount) + point.intField('current_round_node_max_task_count', rows[0].max_tasks_per_node) + point.intField('previous_round_measurement_count', previousRound?.measurement_count ?? 0) + point.intField('previous_round_node_max_task_count', previousRound?.max_tasks_per_node ?? 0) + }) } } -async function defineTasksForRound (pgClient, sparkRoundNumber) { +async function defineTasksForRound (pgClient, sparkRoundNumber, taskCount) { await pgClient.query(` INSERT INTO retrieval_tasks (round_id, cid, miner_id, clients) WITH selected AS ( @@ -273,7 +317,7 @@ async function defineTasksForRound (pgClient, sparkRoundNumber) { GROUP BY selected.cid, selected.miner_id; `, [ sparkRoundNumber, - TASKS_PER_ROUND + taskCount ]) } diff --git a/api/test/round-tracker.test.js b/api/test/round-tracker.test.js index 9dd2c2db..0a85b0bb 100644 --- a/api/test/round-tracker.test.js +++ b/api/test/round-tracker.test.js @@ -1,7 +1,10 @@ import assert from 'node:assert' import pg from 'pg' import { - TASKS_PER_ROUND, + BASELINE_TASKS_PER_ROUND, + BASELINE_TASKS_PER_NODE, + TASKS_EXECUTED_PER_ROUND, + ROUND_TASKS_TO_NODE_TASKS_RATIO, getRoundStartEpoch, mapCurrentMeridianRoundToSparkRound, startRoundTracker @@ -10,6 +13,7 @@ import { migrate } from '../../migrations/index.js' import { assertApproximately } from '../../test-helpers/assert.js' import { createMeridianContract } from '../lib/ie-contract.js' import { afterEach, beforeEach } from 'mocha' +import { createTelemetryRecorderStub } from '../../test-helpers/platform-test-helpers.js' const { DATABASE_URL } = process.env @@ -49,11 +53,13 @@ describe('Round Tracker', () => { describe('mapCurrentMeridianRoundToSparkRound', () => { it('handles meridian rounds from the same contract', async () => { + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() let sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress: '0x1a', meridianRoundIndex: 120n, roundStartEpoch: 321n, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(sparkRoundNumber, 1n) let sparkRounds = (await pgClient.query('SELECT * FROM spark_rounds ORDER BY id')).rows @@ -64,12 +70,28 @@ describe('Round Tracker', () => { // first round number was correctly initialised assert.strictEqual(await getFirstRoundForContractAddress(pgClient, '0x1a'), '1') + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress: '0x1a', meridianRoundIndex: 121n, roundStartEpoch: 321n, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(sparkRoundNumber, 2n) sparkRounds = (await pgClient.query('SELECT * FROM spark_rounds ORDER BY id')).rows @@ -80,24 +102,55 @@ describe('Round Tracker', () => { // first round number was not changed assert.strictEqual(await getFirstRoundForContractAddress(pgClient, '0x1a'), '1') + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields }))[1], + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '15i' + } + ) }) it('handles deployment of a new smart contract', async () => { + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() // First contract version `0x1a` let sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress: '0x1a', meridianRoundIndex: 120n, roundStartEpoch: 321n, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(sparkRoundNumber, 1n) + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) // New contract version `0x1b` sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress: '0x1b', meridianRoundIndex: 10n, roundStartEpoch: 321n, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(sparkRoundNumber, 2n) @@ -114,7 +167,8 @@ describe('Round Tracker', () => { meridianContractAddress: '0x1b', meridianRoundIndex: 11n, roundStartEpoch: 321n, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(sparkRoundNumber, 3n) @@ -124,6 +178,19 @@ describe('Round Tracker', () => { // first round number was not changed assert.strictEqual(await getFirstRoundForContractAddress(pgClient, '0x1b'), '2') + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields }))[1], + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '15i' + } + ) }) it('handles duplicate RoundStarted event', async () => { @@ -131,12 +198,14 @@ describe('Round Tracker', () => { const meridianRoundIndex = 1n const meridianContractAddress = '0x1a' const roundStartEpoch = 321n + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() let sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress, meridianRoundIndex, roundStartEpoch, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(sparkRoundNumber, 1n) let sparkRounds = (await pgClient.query('SELECT * FROM spark_rounds ORDER BY id')).rows @@ -144,6 +213,21 @@ describe('Round Tracker', () => { assertApproximately(sparkRounds[0].created_at, now, 30_000) assert.strictEqual(sparkRounds[0].meridian_address, '0x1a') assert.strictEqual(sparkRounds[0].meridian_round, '1') + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress, @@ -160,15 +244,17 @@ describe('Round Tracker', () => { }) it('creates tasks when a new round starts', async () => { + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() const sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress: '0x1a', meridianRoundIndex: 1n, roundStartEpoch: 321n, - pgClient + pgClient, + recordTelemetry }) const { rows: tasks } = await pgClient.query('SELECT * FROM retrieval_tasks ORDER BY id') - assert.strictEqual(tasks.length, TASKS_PER_ROUND) + assert.strictEqual(tasks.length, BASELINE_TASKS_PER_ROUND) for (const [ix, t] of tasks.entries()) { assert.strictEqual(BigInt(t.round_id), sparkRoundNumber) assert.strictEqual(typeof t.cid, 'string', `task#${ix} cid`) @@ -177,6 +263,21 @@ describe('Round Tracker', () => { assert.strictEqual(t.protocol, null, `task#${ix} protocol`) assert.match(t.miner_id, /^f0/, `task#${ix} miner_id should match /^f0/, found ${t.miner_id}`) } + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) }) it('creates tasks only once per round', async () => { @@ -184,42 +285,210 @@ describe('Round Tracker', () => { const meridianContractAddress = '0x1a' const roundStartEpoch = 321n + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() const firstRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress, meridianRoundIndex, roundStartEpoch, - pgClient + pgClient, + recordTelemetry }) const secondRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress, meridianRoundIndex, roundStartEpoch, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(firstRoundNumber, secondRoundNumber) const { rows: tasks } = await pgClient.query('SELECT * FROM retrieval_tasks ORDER BY id') - assert.strictEqual(tasks.length, TASKS_PER_ROUND) + assert.strictEqual(tasks.length, BASELINE_TASKS_PER_ROUND) for (const t of tasks) { assert.strictEqual(BigInt(t.round_id), firstRoundNumber) } + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) }) it('sets tasks_per_round', async () => { const meridianRoundIndex = 1n const meridianContractAddress = '0x1a' const roundStartEpoch = 321n + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() const sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ meridianContractAddress, meridianRoundIndex, roundStartEpoch, - pgClient + pgClient, + recordTelemetry }) assert.strictEqual(sparkRoundNumber, 1n) const sparkRounds = (await pgClient.query('SELECT * FROM spark_rounds ORDER BY id')).rows assert.deepStrictEqual(sparkRounds.map(r => r.id), ['1']) assert.strictEqual(sparkRounds[0].max_tasks_per_node, 15) + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) + }) + + describe('task scaling', async () => { + it('uses baseline values for the first round', async () => { + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() + const sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ + meridianContractAddress: '0x1a', + meridianRoundIndex: 120n, + roundStartEpoch: 321n, + pgClient, + recordTelemetry + }) + const { rows: [sparkRound] } = await pgClient.query( + 'SELECT * FROM spark_rounds WHERE id = $1', + [sparkRoundNumber] + ) + assert.strictEqual(sparkRound.max_tasks_per_node, BASELINE_TASKS_PER_NODE) + const { rows: retrievalTasks } = await pgClient.query( + 'SELECT * FROM retrieval_tasks' + ) + assert.strictEqual(retrievalTasks.length, BASELINE_TASKS_PER_ROUND) + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) + }) + it('uses baseline values when the previous round was empty', async () => { + await mapCurrentMeridianRoundToSparkRound({ + meridianContractAddress: '0x1a', + meridianRoundIndex: 120n, + roundStartEpoch: 321n, + pgClient, + recordTelemetry: createTelemetryRecorderStub().recordTelemetry + }) + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() + const sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ + meridianContractAddress: '0x1a', + meridianRoundIndex: 121n, + roundStartEpoch: 321n, + pgClient, + recordTelemetry + }) + const { rows: [sparkRound] } = await pgClient.query( + 'SELECT * FROM spark_rounds WHERE id = $1', + [sparkRoundNumber] + ) + assert.strictEqual(sparkRound.max_tasks_per_node, BASELINE_TASKS_PER_NODE) + const { rows: retrievalTasks } = await pgClient.query( + 'SELECT * FROM retrieval_tasks WHERE round_id = $1', + [sparkRoundNumber] + ) + assert.strictEqual(retrievalTasks.length, BASELINE_TASKS_PER_ROUND) + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '15i' + } + ] + ) + }) + it('scales the task count to reach desired tasks executed', async () => { + const prevSparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ + meridianContractAddress: '0x1a', + meridianRoundIndex: 120n, + roundStartEpoch: 321n, + pgClient, + recordTelemetry: createTelemetryRecorderStub().recordTelemetry + }) + await pgClient.query( + 'UPDATE spark_rounds SET measurement_count = $1 WHERE id = $2', + [TASKS_EXECUTED_PER_ROUND / 2, prevSparkRoundNumber] + ) + // It should double the node task count as the previous round only + // produces half of the desired measurements + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() + const sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({ + meridianContractAddress: '0x1a', + meridianRoundIndex: 121n, + roundStartEpoch: 321n, + pgClient, + recordTelemetry + }) + const { rows: [sparkRound] } = await pgClient.query( + 'SELECT * FROM spark_rounds WHERE id = $1', + [sparkRoundNumber] + ) + assert.strictEqual(sparkRound.max_tasks_per_node, BASELINE_TASKS_PER_NODE * 2) + const { rows: retrievalTasks } = await pgClient.query( + 'SELECT * FROM retrieval_tasks WHERE round_id = $1', + [sparkRoundNumber] + ) + assert.strictEqual(retrievalTasks.length, BASELINE_TASKS_PER_ROUND * 2) + const { rows: [prevSparkRound] } = await pgClient.query( + 'SELECT * FROM spark_rounds WHERE id = $1', + [prevSparkRoundNumber] + ) + const { rows: prevRoundRetrievalTasks } = await pgClient.query( + 'SELECT * FROM retrieval_tasks WHERE round_id = $1', + [prevSparkRoundNumber] + ) + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${prevRoundRetrievalTasks.length * 2}i`, + current_round_node_max_task_count: `${prevSparkRound.max_tasks_per_node * 2}i`, + previous_round_measurement_count: `${TASKS_EXECUTED_PER_ROUND / 2}i`, + previous_round_node_max_task_count: '15i' + } + ] + ) + }) }) }) @@ -236,8 +505,28 @@ describe('Round Tracker', () => { describe('startRoundTracker', () => { it('detects the current round', async function () { this.timeout(TIMEOUT_WHEN_QUERYING_CHAIN) - const { sparkRoundNumber } = await startRoundTracker({ pgPool, signal: testFinished.signal }) + const { recordTelemetry, telemetry } = createTelemetryRecorderStub() + const { sparkRoundNumber } = await startRoundTracker({ + pgPool, + signal: testFinished.signal, + recordTelemetry + }) assert.strictEqual(typeof sparkRoundNumber, 'bigint') + assert.deepStrictEqual( + telemetry.map(p => ({ _point: p.name, ...p.fields })), + [ + { + _point: 'round', + current_round_measurement_count_target: `${TASKS_EXECUTED_PER_ROUND}i`, + current_round_task_count: `${Math.floor( + BASELINE_TASKS_PER_NODE * ROUND_TASKS_TO_NODE_TASKS_RATIO + )}i`, + current_round_node_max_task_count: `${BASELINE_TASKS_PER_NODE}i`, + previous_round_measurement_count: '0i', + previous_round_node_max_task_count: '0i' + } + ] + ) }) }) }) diff --git a/api/test/test.js b/api/test/test.js index 12d5ec2d..bb544abb 100644 --- a/api/test/test.js +++ b/api/test/test.js @@ -4,12 +4,13 @@ import { once } from 'node:events' import assert, { AssertionError } from 'node:assert' import pg from 'pg' import { - TASKS_PER_ROUND, + BASELINE_TASKS_PER_ROUND, maybeCreateSparkRound, mapCurrentMeridianRoundToSparkRound, - MAX_TASKS_PER_NODE + BASELINE_TASKS_PER_NODE } from '../lib/round-tracker.js' import { delegatedFromEthAddress } from '@glif/filecoin-address' +import { createTelemetryRecorderStub } from '../../test-helpers/platform-test-helpers.js' const { DATABASE_URL } = process.env const participantAddress = '0x000000000000000000000000000000000000dEaD' @@ -59,7 +60,8 @@ describe('Routes', () => { sparkRoundNumber: currentSparkRoundNumber, meridianContractAddress: '0x1a', meridianRoundIndex: 123n, - roundStartEpoch: 321n + roundStartEpoch: 321n, + recordTelemetry: createTelemetryRecorderStub().recordTelemetry }) const handler = await createHandler({ client, @@ -450,13 +452,15 @@ describe('Routes', () => { before(async () => { await client.query('DELETE FROM meridian_contract_versions') await client.query('DELETE FROM spark_rounds') + const { recordTelemetry } = createTelemetryRecorderStub() // round 1 managed by old contract version let num = await mapCurrentMeridianRoundToSparkRound({ pgClient: client, meridianContractAddress: '0xOLD', meridianRoundIndex: 10n, - roundStartEpoch: 321n + roundStartEpoch: 321n, + recordTelemetry }) assert.strictEqual(num, 1n) @@ -465,7 +469,8 @@ describe('Routes', () => { pgClient: client, meridianContractAddress: '0xNEW', meridianRoundIndex: 120n, - roundStartEpoch: 621n + roundStartEpoch: 621n, + recordTelemetry }) assert.strictEqual(num, 2n) @@ -474,7 +479,8 @@ describe('Routes', () => { pgClient: client, meridianContractAddress: '0xNEW', meridianRoundIndex: 121n, - roundStartEpoch: 921n + roundStartEpoch: 921n, + recordTelemetry }) assert.strictEqual(num, 3n) }) @@ -486,10 +492,10 @@ describe('Routes', () => { assert.deepStrictEqual(details, { roundId: '2', - maxTasksPerNode: MAX_TASKS_PER_NODE, + maxTasksPerNode: BASELINE_TASKS_PER_NODE, startEpoch: '621' }) - assert.strictEqual(retrievalTasks.length, TASKS_PER_ROUND) + assert.strictEqual(retrievalTasks.length, BASELINE_TASKS_PER_ROUND) for (const task of retrievalTasks) { assert.equal(typeof task.cid, 'string', 'all tasks have "cid"') @@ -506,10 +512,10 @@ describe('Routes', () => { assert.deepStrictEqual(details, { roundId: '1', - maxTasksPerNode: MAX_TASKS_PER_NODE, + maxTasksPerNode: BASELINE_TASKS_PER_NODE, startEpoch: '321' }) - assert.strictEqual(retrievalTasks.length, TASKS_PER_ROUND) + assert.strictEqual(retrievalTasks.length, BASELINE_TASKS_PER_ROUND) }) it('returns 404 for unknown round index', async () => { @@ -531,7 +537,8 @@ describe('Routes', () => { sparkRoundNumber: currentSparkRoundNumber, meridianContractAddress: '0x1a', meridianRoundIndex: 123n, - roundStartEpoch: 321n + roundStartEpoch: 321n, + recordTelemetry: createTelemetryRecorderStub().recordTelemetry }) }) @@ -575,7 +582,8 @@ describe('Routes', () => { sparkRoundNumber: currentSparkRoundNumber, meridianContractAddress: '0x1a', meridianRoundIndex: 123n, - roundStartEpoch: 321n + roundStartEpoch: 321n, + recordTelemetry: createTelemetryRecorderStub().recordTelemetry }) }) diff --git a/migrations/058.do.round-measurement-count.sql b/migrations/058.do.round-measurement-count.sql new file mode 100644 index 00000000..830204e9 --- /dev/null +++ b/migrations/058.do.round-measurement-count.sql @@ -0,0 +1 @@ +ALTER TABLE spark_rounds ADD COLUMN measurement_count BIGINT; diff --git a/publish/index.js b/publish/index.js index 3d522d47..299d0999 100644 --- a/publish/index.js +++ b/publish/index.js @@ -1,6 +1,7 @@ /* global File */ import pRetry from 'p-retry' +import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' export const publish = async ({ client: pgPool, @@ -88,6 +89,16 @@ export const publish = async ({ cid.toString(), new Date() ]) + await pgClient.query(` + UPDATE spark_rounds + SET measurement_count = COALESCE(measurement_count, 0) + $1 + WHERE meridian_address = $2 AND meridian_round = $3 + `, [ + measurements.length, + SparkImpactEvaluator.ADDRESS, + roundIndex + ]) + await pgClient.query('COMMIT') } catch (err) { await pgClient.query('ROLLBACK') diff --git a/publish/test/test.js b/publish/test/test.js index 097a53a9..0891641f 100644 --- a/publish/test/test.js +++ b/publish/test/test.js @@ -2,6 +2,7 @@ import { publish } from '../index.js' import assert from 'node:assert' import { CID } from 'multiformats/cid' import pg from 'pg' +import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' import { assertApproximately, @@ -86,6 +87,7 @@ describe('unit', () => { [1], undefined, [[]], + [0, SparkImpactEvaluator.ADDRESS, 1], undefined ]) assert.strictEqual(clientStatements.pop(), 'VACUUM measurements')