From 7b09739ba34915c548f8ec437376007913ef7956 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 21 Mar 2023 22:22:36 +0100 Subject: [PATCH 1/2] `station events` --- README.md | 31 +++++++++++++++++++++++++++---- bin/station.js | 5 +++++ commands/activity.js | 34 +++++++++++----------------------- commands/events.js | 24 ++++++++++++++++++++++++ commands/index.js | 1 + commands/metrics.js | 31 +++++-------------------------- lib/activity.js | 29 ++++++++++++++++++++++++++++- lib/metrics.js | 30 +++++++++++++++++++++++++++++- test/test.js | 27 +++++++++++++++++++++++++++ 9 files changed, 157 insertions(+), 55 deletions(-) create mode 100644 commands/events.js diff --git a/README.md b/README.md index 4a5ba3f6..57215a45 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,28 @@ $ station logs --follow ... ``` +### `$ station events` + +Get combined real-time events from `$ station metrics` and `$ station activity`. + +```bash +$ station events +{"type":"jobs-completed","total":36} +{"type":"activity:info","module":"Saturn","message":"Saturn Node was able to connect to the Orchestrator and will now start connecting to the Saturn network..."} +... +``` + +The following event types exist: + +- `jobs-completed` + - `total` +- `activity:info` + - `module` + - `message` +- `activity:error` + - `module` + - `message` + ### `$ station --help` Show help. @@ -145,10 +167,11 @@ $ station --help Usage: station [options] Commands: - station Start Station [default] - station metrics Show metrics - station activity Show activity log - station logs [module] Show module logs + station Start Station [default] + station metrics Show metrics + station activity Show activity log + station logs [module] Show module logs + station events Events stream Options: -v, --version Show version number [boolean] diff --git a/bin/station.js b/bin/station.js index 63b06032..2e0be536 100755 --- a/bin/station.js +++ b/bin/station.js @@ -7,6 +7,8 @@ import { paths } from '../lib/paths.js' import * as Sentry from '@sentry/node' import yargs from 'yargs/yargs' import { hideBin } from 'yargs/helpers' +import { maybeCreateMetricsFile } from '../lib/metrics.js' +import { maybeCreateActivityFile } from '../lib/activity.js' const pkg = JSON.parse(await fs.readFile(join(paths.repoRoot, 'package.json'))) @@ -19,6 +21,8 @@ Sentry.init({ await fs.mkdir(join(paths.moduleStorage, 'saturn-L2-node'), { recursive: true }) await fs.mkdir(paths.moduleLogs, { recursive: true }) +await maybeCreateMetricsFile() +await maybeCreateActivityFile() yargs(hideBin(process.argv)) .usage('Usage: $0 [options]') @@ -27,6 +31,7 @@ yargs(hideBin(process.argv)) .commands('activity', 'Show activity log', () => {}, commands.activity) .command('logs [module]', 'Show module logs', () => {}, commands.logs) .choices('module', ['saturn-l2-node']) + .command('events', 'Events stream', () => {}, commands.events) .version(`${pkg.name}: ${pkg.version}`) .alias('v', 'version') .alias('h', 'help') diff --git a/commands/activity.js b/commands/activity.js index d0a172ce..7e6dcc0a 100644 --- a/commands/activity.js +++ b/commands/activity.js @@ -1,32 +1,20 @@ -import fs from 'node:fs/promises' -import { Tail } from 'tail' -import { paths } from '../lib/paths.js' -import { parseLog, formatLog } from '../lib/log.js' -import { maybeCreateFile } from '../lib/util.js' +import { formatLog } from '../lib/log.js' +import { followActivity, getActivity } from '../lib/activity.js' -const formatLogLine = line => { - const { date, text } = parseLog(line) - const { type, message } = JSON.parse(text) +const formatActivityObject = ({ type, message, date }) => { return formatLog(`${type.toUpperCase().padEnd(5)} ${message}`, date) } -const followActivity = () => { - const tail = new Tail(paths.activity, { nLines: 10 }) - tail.on('line', line => process.stdout.write(formatLogLine(line))) -} - -const getActivity = async () => { - const activityLog = await fs.readFile(paths.activity, 'utf-8') - for (const line of activityLog.trim().split('\n').filter(Boolean)) { - process.stdout.write(formatLogLine(line)) - } -} - export const activity = async ({ follow }) => { - await maybeCreateFile(paths.activity) if (follow) { - followActivity() + for await (const obj of followActivity()) { + process.stdout.write(formatActivityObject(obj)) + } } else { - await getActivity() + process.stdout.write( + (await getActivity()) + .map(obj => formatActivityObject(obj)) + .join('') + ) } } diff --git a/commands/events.js b/commands/events.js new file mode 100644 index 00000000..4940a6eb --- /dev/null +++ b/commands/events.js @@ -0,0 +1,24 @@ +import { followMetrics } from '../lib/metrics.js' +import { followActivity } from '../lib/activity.js' + +export const events = async () => { + await Promise.all([ + (async () => { + for await (const metrics of followMetrics()) { + console.log(JSON.stringify({ + type: 'jobs-completed', + total: metrics.totalJobsCompleted + })) + } + })(), + (async () => { + for await (const activity of followActivity()) { + console.log(JSON.stringify({ + type: `activity:${activity.type}`, + module: activity.source, + message: activity.message + })) + } + })() + ]) +} diff --git a/commands/index.js b/commands/index.js index ce66e882..9055a60f 100644 --- a/commands/index.js +++ b/commands/index.js @@ -2,3 +2,4 @@ export { station } from './station.js' export { metrics } from './metrics.js' export { logs } from './logs.js' export { activity } from './activity.js' +export { events } from './events.js' diff --git a/commands/metrics.js b/commands/metrics.js index d773dd8f..d2a7edb2 100644 --- a/commands/metrics.js +++ b/commands/metrics.js @@ -1,32 +1,11 @@ -import fs from 'node:fs/promises' -import { Tail } from 'tail' -import { paths } from '../lib/paths.js' -import { formatLog, parseLog } from '../lib/log.js' -import { maybeCreateFile } from '../lib/util.js' - -const metricsLogLineToJSON = metrics => - JSON.stringify(JSON.parse(parseLog(metrics).text), 0, 2) - -const followMetrics = () => { - const tail = new Tail(paths.metrics, { nLines: 1 }) - tail.on('line', line => console.log(metricsLogLineToJSON(line))) -} - -const getLatestMetrics = async () => { - const metrics = await fs.readFile(paths.metrics, 'utf-8') - console.log(metricsLogLineToJSON(metrics.trim().split('\n').pop())) -} +import { followMetrics, getLatestMetrics } from '../lib/metrics.js' export const metrics = async ({ follow }) => { - await maybeCreateFile( - paths.metrics, - formatLog( - JSON.stringify({ totalJobsCompleted: 0, totalEarnings: '0' }) + '\n' - ) - ) if (follow) { - followMetrics() + for await (const obj of followMetrics()) { + console.log(JSON.stringify(obj, 0, 2)) + } } else { - await getLatestMetrics() + console.log(JSON.stringify(await getLatestMetrics(), 0, 2)) } } diff --git a/lib/activity.js b/lib/activity.js index 36f4ab4e..376c442f 100644 --- a/lib/activity.js +++ b/lib/activity.js @@ -1,6 +1,10 @@ import { paths } from './paths.js' -import { createLogStream, SerializeStream } from './log.js' +import { createLogStream, SerializeStream, parseLog } from './log.js' import { Transform } from 'node:stream' +import { maybeCreateFile } from './util.js' +import { Tail } from 'tail' +import { on } from 'node:events' +import fs from 'node:fs/promises' class AddSourceStream extends Transform { constructor (source) { @@ -20,3 +24,26 @@ export const createActivityStream = source => { .pipe(createLogStream(paths.activity)) return addSourceStream } + +const activityLogLineToObject = line => { + const { date, text } = parseLog(line) + const { type, message, source } = JSON.parse(text) + return { date, type, source, message } +} + +export const followActivity = async function * ({ signal } = {}) { + const tail = new Tail(paths.activity, { nLines: 10 }) + for await (const [line] of on(tail, 'line', { signal })) { + yield activityLogLineToObject(line) + } +} + +export const getActivity = async () => { + return (await fs.readFile(paths.activity, 'utf-8')) + .trim() + .split('\n') + .filter(Boolean) + .map(line => activityLogLineToObject(line)) +} + +export const maybeCreateActivityFile = () => maybeCreateFile(paths.activity) diff --git a/lib/metrics.js b/lib/metrics.js index 7db42fa8..bcf9e21d 100644 --- a/lib/metrics.js +++ b/lib/metrics.js @@ -1,7 +1,12 @@ -import { createLogStream, SerializeStream } from './log.js' +import { createLogStream, SerializeStream, parseLog, formatLog } from './log.js' import { Transform } from 'node:stream' import { writePoint } from './telemetry.js' import { Point } from '@influxdata/influxdb-client' +import fs from 'node:fs/promises' +import { on } from 'node:events' +import { paths } from './paths.js' +import { Tail } from 'tail' +import { maybeCreateFile } from './util.js' // Metrics stream // - Filters duplicate entries @@ -16,6 +21,29 @@ export const createMetricsStream = metricsPath => { return deduplicateStream } +export const maybeCreateMetricsFile = async () => { + await maybeCreateFile( + paths.metrics, + formatLog( + JSON.stringify({ totalJobsCompleted: 0, totalEarnings: '0' }) + '\n' + ) + ) +} + +const metricsLogLineToObject = metrics => JSON.parse(parseLog(metrics).text) + +export const getLatestMetrics = async () => { + const metrics = await fs.readFile(paths.metrics, 'utf-8') + return metricsLogLineToObject(metrics.trim().split('\n').pop()) +} + +export const followMetrics = async function * ({ signal } = {}) { + const tail = new Tail(paths.metrics, { nLines: 1 }) + for await (const [line] of on(tail, 'line', { signal })) { + yield metricsLogLineToObject(line) + } +} + class DeduplicateStream extends Transform { constructor () { super({ objectMode: true }) diff --git a/test/test.js b/test/test.js index f1913ca5..4c76bb7b 100644 --- a/test/test.js +++ b/test/test.js @@ -202,6 +202,33 @@ test('Activity', async t => { }) }) +test('Events', async t => { + const XDG_STATE_HOME = join(tmpdir(), randomUUID()) + await fs.mkdir( + dirname(getPaths(XDG_STATE_HOME).activity), + { recursive: true } + ) + await fs.writeFile( + getPaths(XDG_STATE_HOME).activity, + '[3/14/2023, 10:38:14 AM] {"source":"Saturn","type":"info","message":"beep boop"}\n' + ) + const ps = execa( + station, + ['events'], + { env: { XDG_STATE_HOME } } + ) + const events = [] + for await (const line of ps.stdout) { + events.push(JSON.parse(line.toString())) + if (events.length === 2) break + } + ps.kill() + t.same(events, [ + { type: 'jobs-completed', total: 0 }, + { type: 'activity:info', module: 'Saturn', message: 'beep boop' } + ]) +}) + test('Lockfile', async t => { const XDG_STATE_HOME = join(tmpdir(), randomUUID()) const ps = execa(station, { env: { XDG_STATE_HOME, FIL_WALLET_ADDRESS } }) From 9d886e31ceccf7c736a39cb5184a1dd4317cedd7 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 21 Mar 2023 22:27:58 +0100 Subject: [PATCH 2/2] fix lint --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 57215a45..6d681703 100644 --- a/README.md +++ b/README.md @@ -150,13 +150,13 @@ $ station events The following event types exist: - `jobs-completed` - - `total` + - `total` - `activity:info` - - `module` - - `message` + - `module` + - `message` - `activity:error` - - `module` - - `message` + - `module` + - `message` ### `$ station --help`