Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: decouple round tracker from API routes #292

Merged
merged 5 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions bin/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import Sentry from '@sentry/node'
import fs from 'node:fs/promises'
import { join, dirname } from 'node:path'
import { fileURLToPath } from 'node:url'
import { createRoundGetter } from '../lib/round-tracker.js'
import { startRoundTracker } from '../lib/round-tracker.js'
import { migrate } from '../lib/migrate.js'
import assert from 'node:assert'

const {
PORT = 8080,
Expand Down Expand Up @@ -59,11 +58,20 @@ client.on('error', err => {
})
await migrate(client)

const getCurrentRound = await createRoundGetter(client)
console.log('Initializing round tracker...')
const start = Date.now()

const round = getCurrentRound()
assert(!!round, 'cannot obtain the current Spark round number')
console.log('SPARK round number at service startup:', round.sparkRoundNumber)
try {
const currentRound = await startRoundTracker(client)
console.log(
'Initialized round tracker in %sms. SPARK round number at service startup: %s',
Date.now() - start,
currentRound.sparkRoundNumber
)
} catch (err) {
console.error('Cannot obtain the current Spark round number:', err)
process.exit(1)
}

const logger = {
error: console.error,
Expand All @@ -74,7 +82,6 @@ const logger = {
const handler = await createHandler({
client,
logger,
getCurrentRound,
domain: DOMAIN
})
const server = http.createServer(handler)
Expand Down
35 changes: 21 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { mapRequestToInetGroup } from './lib/inet-grouping.js'
import { satisfies } from 'compare-versions'
import { ethAddressFromDelegated } from '@glif/filecoin-address'

const handler = async (req, res, client, getCurrentRound, domain) => {
const handler = async (req, res, client, domain) => {
if (req.headers.host.split(':')[0] !== domain) {
return redirect(res, `https://${domain}${req.url}`)
}
Expand All @@ -19,22 +19,21 @@ const handler = async (req, res, client, getCurrentRound, domain) => {
} else if (segs[0] === 'retrievals' && req.method === 'GET') {
assert.fail(410, 'This API endpoint is no longer supported.')
} else if (segs[0] === 'measurements' && req.method === 'POST') {
await createMeasurement(req, res, client, getCurrentRound)
await createMeasurement(req, res, client)
} else if (segs[0] === 'measurements' && req.method === 'GET') {
await getMeasurement(req, res, client, Number(segs[1]))
} else if (segs[0] === 'rounds' && segs[1] === 'meridian' && req.method === 'GET') {
await getMeridianRoundDetails(req, res, client, segs[2], segs[3])
} else if (segs[0] === 'rounds' && req.method === 'GET') {
await getRoundDetails(req, res, client, getCurrentRound, segs[1])
await getRoundDetails(req, res, client, segs[1])
} else if (segs[0] === 'inspect-request' && req.method === 'GET') {
await inspectRequest(req, res)
} else {
notFound(res)
}
}

const createMeasurement = async (req, res, client, getCurrentRound) => {
const { sparkRoundNumber } = getCurrentRound()
const createMeasurement = async (req, res, client) => {
const body = await getRawBody(req, { limit: '100kb' })
const measurement = JSON.parse(body)
validate(measurement, 'sparkVersion', { type: 'string', required: false })
Expand Down Expand Up @@ -104,9 +103,12 @@ const createMeasurement = async (req, res, client, getCurrentRound) => {
provider_id,
completed_at_round
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21
)
SELECT
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
id as completed_at_round
FROM spark_rounds
ORDER BY id DESC
LIMIT 1
RETURNING id
`, [
measurement.sparkVersion,
Expand All @@ -128,8 +130,7 @@ const createMeasurement = async (req, res, client, getCurrentRound) => {
measurement.carChecksum,
measurement.indexerResult,
measurement.minerId,
measurement.providerId,
sparkRoundNumber
measurement.providerId
])
json(res, { id: rows[0].id })
}
Expand Down Expand Up @@ -168,9 +169,16 @@ const getMeasurement = async (req, res, client, measurementId) => {
})
}

const getRoundDetails = async (req, res, client, getCurrentRound, roundParam) => {
const getRoundDetails = async (req, res, client, roundParam) => {
if (roundParam === 'current') {
const { meridianContractAddress, meridianRoundIndex } = getCurrentRound()
const { rows: [round] } = await client.query(`
SELECT meridian_address, meridian_round FROM spark_rounds
ORDER BY id DESC
LIMIT 1
`)
assert(!!round, 'No rounds found in "spark_rounds" table.')
const meridianContractAddress = round.meridian_address
const meridianRoundIndex = BigInt(round.meridian_round)
const addr = encodeURIComponent(meridianContractAddress)
const idx = encodeURIComponent(meridianRoundIndex)
const location = `/rounds/meridian/${addr}/${idx}`
Expand Down Expand Up @@ -305,13 +313,12 @@ export const inspectRequest = async (req, res) => {
export const createHandler = async ({
client,
logger,
getCurrentRound,
domain
}) => {
return (req, res) => {
const start = new Date()
logger.request(`${req.method} ${req.url} ...`)
handler(req, res, client, getCurrentRound, domain)
handler(req, res, client, domain)
.catch(err => errorHandler(res, err, logger))
.then(() => {
logger.request(`${req.method} ${req.url} ${res.statusCode} (${new Date() - start}ms)`)
Expand Down
33 changes: 17 additions & 16 deletions lib/round-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,42 @@ export const MAX_TASKS_PER_NODE = 15

/**
* @param {import('pg').Pool} pgPool
* @returns {() => {
* @returns {
* sparkRoundNumber: bigint;
* meridianContractAddress: string;
* meridianRoundIndex: bigint;
* }}
* roundStartEpoch: bigint;
* }
*/
export async function createRoundGetter (pgPool) {
export async function startRoundTracker (pgPool) {
const contract = await createMeridianContract()

let sparkRoundNumber, meridianContractAddress, meridianRoundIndex

const updateSparkRound = async (newRoundIndex) => {
meridianRoundIndex = BigInt(newRoundIndex)
meridianContractAddress = contract.address
const meridianRoundIndex = BigInt(newRoundIndex)
const meridianContractAddress = contract.address

const roundStartEpoch = await getRoundStartEpoch(contract, meridianRoundIndex)

const pgClient = await pgPool.connect()
try {
await pgClient.query('BEGIN')
sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({
const sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch,
pgClient
})
await pgClient.query('COMMIT')
console.log('SPARK round started: %s', sparkRoundNumber)
console.log('SPARK round started: %s (epoch: %s)', sparkRoundNumber, roundStartEpoch)
return {
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch
}
} catch (err) {
await pgClient.query('ROLLBACK')
throw err
} finally {
pgClient.release()
}
Expand All @@ -58,13 +64,8 @@ export async function createRoundGetter (pgPool) {
})
})

await updateSparkRound(await contract.currentRoundIndex())

return () => ({
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex
})
const currentRound = await updateSparkRound(await contract.currentRoundIndex())
return currentRound
}

/*
Expand Down
14 changes: 0 additions & 14 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ describe('Routes', () => {
error: console.error,
request () {}
},
getCurrentRound () {
return {
sparkRoundNumber: currentSparkRoundNumber,
meridianContractAddress: '0x1a',
meridianRoundIndex: 123n
}
},
domain: '127.0.0.1'
})
server = http.createServer(handler)
Expand Down Expand Up @@ -630,13 +623,6 @@ describe('Routes', () => {
error: console.error,
request () {}
},
getCurrentRound () {
return {
sparkRoundNumber: currentSparkRoundNumber,
meridianContractAddress: '0x1a',
meridianRoundIndex: 123n
}
},
domain: 'foobar'
})
server = http.createServer(handler)
Expand Down