Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4ffecd4
api: Add fixed # tasks per round. Closes space-meridian/roadmap#82
juliangruber Aug 2, 2024
7486597
docs
juliangruber Aug 2, 2024
f1635e5
fix tests
juliangruber Aug 2, 2024
ab8b302
disable dynamic task count in tests
juliangruber Aug 2, 2024
3013d3f
Merge branch 'main' into api/add/fixed-task-count-per-round
juliangruber Aug 2, 2024
329e318
fix more tests
juliangruber Aug 2, 2024
4fb6739
fix more tests
juliangruber Aug 2, 2024
46ea7d3
fix more tests
juliangruber Aug 2, 2024
86a4f7d
Merge branch 'main' into api/add/fixed-task-count-per-round
juliangruber Sep 9, 2024
3a04f92
use new `measurement_count` tracked internally
juliangruber Sep 9, 2024
4095686
docs
juliangruber Sep 9, 2024
b9508a2
fix publish query
juliangruber Sep 9, 2024
1b35585
fix test
juliangruber Sep 9, 2024
b6f44c9
try fix query
juliangruber Sep 9, 2024
e101f7c
fix 0 measurements case
juliangruber Sep 9, 2024
d8d8d5e
publish: fix include contract address in query
juliangruber Sep 9, 2024
d050342
fix test
juliangruber Sep 9, 2024
e240e69
wip
juliangruber Sep 10, 2024
6fa4a30
implement ratio between node and round tasks
juliangruber Sep 12, 2024
361f9ee
docs
juliangruber Sep 12, 2024
047eb3f
select spark round id, not meridian round number
juliangruber Sep 12, 2024
2a7f284
fix
juliangruber Sep 12, 2024
26810b6
fix
juliangruber Sep 12, 2024
37a7d1a
fix
juliangruber Sep 12, 2024
767c117
convert to int
juliangruber Sep 12, 2024
bb30def
add telemetry
juliangruber Sep 12, 2024
65ad1a0
fix lint
juliangruber Sep 12, 2024
f278c5a
fix no previous round case
juliangruber Sep 12, 2024
c6c5576
handle 0 (not just null)
juliangruber Sep 12, 2024
1e61e91
fix query
juliangruber Sep 12, 2024
b84d342
cast to int
juliangruber Sep 12, 2024
b662bbc
telemetry: handle no prev round case
juliangruber Sep 12, 2024
faf8bfe
mock one telemetry call
juliangruber Sep 12, 2024
22aa040
fix lint
juliangruber Sep 12, 2024
4540f90
fix missing export
juliangruber Sep 12, 2024
6cb915a
fix test
juliangruber Sep 12, 2024
d584596
fix test
juliangruber Sep 12, 2024
555d532
fix lint
juliangruber Sep 12, 2024
f38e3d3
tests
juliangruber Sep 12, 2024
ea580c7
tests
juliangruber Sep 12, 2024
8d863e0
tests
juliangruber Sep 12, 2024
a7280d9
tests
juliangruber Sep 12, 2024
f6861b0
tests
juliangruber Sep 12, 2024
16daa72
tests
juliangruber Sep 12, 2024
88a8621
tests
juliangruber Sep 12, 2024
e6ad4a8
fix signature
juliangruber Sep 12, 2024
7280eb9
tests
juliangruber Sep 12, 2024
9e5f1fe
add passing test
juliangruber Sep 13, 2024
e465dae
add passing test
juliangruber Sep 13, 2024
2eaa2f4
add passing test
juliangruber Sep 13, 2024
fdb7513
refactor
juliangruber Sep 17, 2024
c5ab20f
NODE_TASKS_TO_ROUND_TASKS_RATIO -> ROUND_TASKS_TO_NODE_TASKS_RATIO
juliangruber Sep 17, 2024
cca8c15
fixup! refactor
juliangruber Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/bin/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pg from 'pg'
import { startRoundTracker } from '../lib/round-tracker.js'
import { migrate } from '../../migrations/index.js'
import { clearNetworkInfoStationIdsSeen } from '../lib/network-info-logger.js'
import { recordNetworkInfoTelemetry } from '../../common/telemetry.js'

const {
PORT = 8080,
Expand Down Expand Up @@ -41,7 +42,10 @@ console.log('Initializing round tracker...')
const start = Date.now()

try {
const currentRound = await startRoundTracker({ pgPool: client })
const currentRound = await startRoundTracker({
pgPool: client,
recordTelemetry: recordNetworkInfoTelemetry
})
console.log(
'Initialized round tracker in %sms. SPARK round number at service startup: %s',
Date.now() - start,
Expand Down
94 changes: 69 additions & 25 deletions api/lib/round-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@ import assert from 'node:assert'
import * as Sentry from '@sentry/node'
import { createMeridianContract } from './ie-contract.js'

// The number of tasks per round is proportionate to the SPARK round length - longer rounds require
// more tasks per round.
//
// See https://www.notion.so/pl-strflt/SPARK-tasking-v2-604e26d57f6b4892946525bcb3a77104?pvs=4#ded1cd98c2664a2289453d38e2715643
// for more details, this constant represents TC (tasks per committee).
//
// We will need to tweak this value based on measurements; that's why I put it here as a constant.
export const TASKS_PER_ROUND = 1000

// How many tasks is each SPARK checker node expected to complete every round (at most).
export const MAX_TASKS_PER_NODE = 15
// Tweak this to control the network's overall task count.
export const TASKS_EXECUTED_PER_ROUND = 500_000

// Baseline values for how many tasks should be completed every round, and how
// many tasks each SPARK checker node is expected to complete (every round, at
// most). The actual value will be set dynamically based on
// TASKS_EXECUTED_PER_ROUND and the number of tasks executed in the last round.
export const BASELINE_TASKS_PER_ROUND = 1000
export const BASELINE_TASKS_PER_NODE = 15

export const ROUND_TASKS_TO_NODE_TASKS_RATIO = BASELINE_TASKS_PER_ROUND / BASELINE_TASKS_PER_NODE

/** @typedef {Awaited<ReturnType<import('./ie-contract.js').createMeridianContract>>} MeridianContract */

/**
* @param {object} args
* @param {import('pg').Pool} args.pgPool
* @param {import('../../common/typings.js').RecordTelemetryFn} recordTelemetry
* @param {AbortSignal} [args.signal]
* @returns {
* sparkRoundNumber: bigint;
Expand All @@ -27,7 +28,7 @@ export const MAX_TASKS_PER_NODE = 15
* roundStartEpoch: bigint;
* }
*/
export async function startRoundTracker ({ pgPool, signal }) {
export async function startRoundTracker ({ pgPool, signal, recordTelemetry }) {
const contract = await createMeridianContract()

const onRoundStart = (newRoundIndex, ...args) => {
Expand All @@ -41,7 +42,7 @@ export async function startRoundTracker ({ pgPool, signal }) {
)
}

updateSparkRound(pgPool, contract, newRoundIndex, blockNumber).catch(err => {
updateSparkRound(pgPool, contract, newRoundIndex, recordTelemetry, blockNumber).catch(err => {
console.error('Cannot handle RoundStart:', err)
Sentry.captureException(err)
})
Expand All @@ -53,17 +54,18 @@ export async function startRoundTracker ({ pgPool, signal }) {
})
}

const currentRound = await updateSparkRound(pgPool, contract, await contract.currentRoundIndex())
const currentRound = await updateSparkRound(pgPool, contract, await contract.currentRoundIndex(), recordTelemetry)
return currentRound
}

/**
* @param {import('pg').Pool} pgPool
* @param {MeridianContract} contract
* @param {bigint} newRoundIndex
* @param {import('../../common/typings.js').RecordTelemetryFn} recordTelemetry
* @param {number} [roundStartEpoch]
*/
async function updateSparkRound (pgPool, contract, newRoundIndex, roundStartEpoch) {
async function updateSparkRound (pgPool, contract, newRoundIndex, recordTelemetry, roundStartEpoch) {
const meridianRoundIndex = BigInt(newRoundIndex)
const meridianContractAddress = await contract.getAddress()

Expand All @@ -78,7 +80,8 @@ async function updateSparkRound (pgPool, contract, newRoundIndex, roundStartEpoc
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch,
pgClient
pgClient,
recordTelemetry
})
await pgClient.query('COMMIT')
console.log('SPARK round started: %s (epoch: %s)', sparkRoundNumber, roundStartEpoch)
Expand Down Expand Up @@ -169,7 +172,8 @@ export async function mapCurrentMeridianRoundToSparkRound ({
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch,
pgClient
pgClient,
recordTelemetry
}) {
let sparkRoundNumber

Expand Down Expand Up @@ -223,7 +227,8 @@ export async function mapCurrentMeridianRoundToSparkRound ({
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch
roundStartEpoch,
recordTelemetry
})

return sparkRoundNumber
Expand All @@ -233,29 +238,68 @@ export async function maybeCreateSparkRound (pgClient, {
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch
roundStartEpoch,
recordTelemetry
}) {
const { rowCount } = await pgClient.query(`
// maxTasksPerNode(round(n)) =
// BASELINE_TASKS_PER_NODE
// if n=0
// BASELINE_TASKS_PER_NODE
// if measurementCount(round(n-1)) = 0
// maxTasksPerNode(round(n-1)) * (TASKS_EXECUTED_PER_ROUND / measurementCount(round(n-1)))
// otherwise
const { rows: [previousRound] } = await pgClient.query(`
SELECT measurement_count, max_tasks_per_node
FROM spark_rounds
WHERE id = $1 - 1::bigint
`, [
sparkRoundNumber
])
const { rows, rowCount } = await pgClient.query(`
INSERT INTO spark_rounds
(id, created_at, meridian_address, meridian_round, start_epoch, max_tasks_per_node)
VALUES ($1, now(), $2, $3, $4, $5)
VALUES (
$1,
now(),
$2,
$3,
$4,
(
$5::int /* previousRound.max_tasks_per_node || BASELINE_TASKS_PER_NODE */
* $6::int /* TASKS_EXECUTED_PER_ROUND */
/ $7::int /* previousRound.measurement_count || TASKS_EXECUTED_PER_ROUND */
)
)
ON CONFLICT DO NOTHING
RETURNING max_tasks_per_node
`, [
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch,
MAX_TASKS_PER_NODE
previousRound?.max_tasks_per_node || BASELINE_TASKS_PER_NODE,
TASKS_EXECUTED_PER_ROUND,
previousRound?.measurement_count || TASKS_EXECUTED_PER_ROUND
])

if (rowCount) {
// We created a new SPARK round. Let's define retrieval tasks for this new round.
// This is a short- to medium-term solution until we move to fully decentralized tasking
await defineTasksForRound(pgClient, sparkRoundNumber)
const taskCount = Math.floor(
rows[0].max_tasks_per_node * ROUND_TASKS_TO_NODE_TASKS_RATIO
)
await defineTasksForRound(pgClient, sparkRoundNumber, taskCount)
recordTelemetry('round', point => {
point.intField('current_round_measurement_count_target', TASKS_EXECUTED_PER_ROUND)
point.intField('current_round_task_count', taskCount)
point.intField('current_round_node_max_task_count', rows[0].max_tasks_per_node)
point.intField('previous_round_measurement_count', previousRound?.measurement_count ?? 0)
point.intField('previous_round_node_max_task_count', previousRound?.max_tasks_per_node ?? 0)
})
}
}

async function defineTasksForRound (pgClient, sparkRoundNumber) {
async function defineTasksForRound (pgClient, sparkRoundNumber, taskCount) {
await pgClient.query(`
INSERT INTO retrieval_tasks (round_id, cid, miner_id, clients)
WITH selected AS (
Expand All @@ -273,7 +317,7 @@ async function defineTasksForRound (pgClient, sparkRoundNumber) {
GROUP BY selected.cid, selected.miner_id;
`, [
sparkRoundNumber,
TASKS_PER_ROUND
taskCount
])
}

Expand Down
Loading