Skip to content
This repository was archived by the owner on Jun 30, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ $ station logs --follow
...
```

### `$ station events`

Get combined real-time events from `$ station metrics` and `$ station activity`.

```bash
$ station events
{"type":"jobs-completed","total":36}
{"type":"activity:info","module":"Saturn","message":"Saturn Node was able to connect to the Orchestrator and will now start connecting to the Saturn network..."}
...
```

The following event types exist:

- `jobs-completed`
- `total`
- `activity:info`
- `module`
- `message`
- `activity:error`
- `module`
- `message`

### `$ station --help`

Show help.
Expand All @@ -145,10 +167,11 @@ $ station --help
Usage: station <command> [options]

Commands:
station Start Station [default]
station metrics Show metrics
station activity Show activity log
station logs [module] Show module logs
station Start Station [default]
station metrics Show metrics
station activity Show activity log
station logs [module] Show module logs
station events Events stream

Options:
-v, --version Show version number [boolean]
Expand Down
5 changes: 5 additions & 0 deletions bin/station.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { paths } from '../lib/paths.js'
import * as Sentry from '@sentry/node'
import yargs from 'yargs/yargs'
import { hideBin } from 'yargs/helpers'
import { maybeCreateMetricsFile } from '../lib/metrics.js'
import { maybeCreateActivityFile } from '../lib/activity.js'

const pkg = JSON.parse(await fs.readFile(join(paths.repoRoot, 'package.json')))

Expand All @@ -19,6 +21,8 @@ Sentry.init({

await fs.mkdir(join(paths.moduleStorage, 'saturn-L2-node'), { recursive: true })
await fs.mkdir(paths.moduleLogs, { recursive: true })
await maybeCreateMetricsFile()
await maybeCreateActivityFile()

yargs(hideBin(process.argv))
.usage('Usage: $0 <command> [options]')
Expand All @@ -27,6 +31,7 @@ yargs(hideBin(process.argv))
.commands('activity', 'Show activity log', () => {}, commands.activity)
.command('logs [module]', 'Show module logs', () => {}, commands.logs)
.choices('module', ['saturn-l2-node'])
.command('events', 'Events stream', () => {}, commands.events)
.version(`${pkg.name}: ${pkg.version}`)
.alias('v', 'version')
.alias('h', 'help')
Expand Down
34 changes: 11 additions & 23 deletions commands/activity.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
import fs from 'node:fs/promises'
import { Tail } from 'tail'
import { paths } from '../lib/paths.js'
import { parseLog, formatLog } from '../lib/log.js'
import { maybeCreateFile } from '../lib/util.js'
import { formatLog } from '../lib/log.js'
import { followActivity, getActivity } from '../lib/activity.js'

const formatLogLine = line => {
const { date, text } = parseLog(line)
const { type, message } = JSON.parse(text)
const formatActivityObject = ({ type, message, date }) => {
return formatLog(`${type.toUpperCase().padEnd(5)} ${message}`, date)
}

const followActivity = () => {
const tail = new Tail(paths.activity, { nLines: 10 })
tail.on('line', line => process.stdout.write(formatLogLine(line)))
}

const getActivity = async () => {
const activityLog = await fs.readFile(paths.activity, 'utf-8')
for (const line of activityLog.trim().split('\n').filter(Boolean)) {
process.stdout.write(formatLogLine(line))
}
}

export const activity = async ({ follow }) => {
await maybeCreateFile(paths.activity)
if (follow) {
followActivity()
for await (const obj of followActivity()) {
process.stdout.write(formatActivityObject(obj))
}
} else {
await getActivity()
process.stdout.write(
(await getActivity())
.map(obj => formatActivityObject(obj))
.join('')
)
}
}
24 changes: 24 additions & 0 deletions commands/events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { followMetrics } from '../lib/metrics.js'
import { followActivity } from '../lib/activity.js'

export const events = async () => {
await Promise.all([
(async () => {
for await (const metrics of followMetrics()) {
console.log(JSON.stringify({
type: 'jobs-completed',
total: metrics.totalJobsCompleted
}))
}
})(),
(async () => {
for await (const activity of followActivity()) {
console.log(JSON.stringify({
type: `activity:${activity.type}`,
module: activity.source,
message: activity.message
}))
}
})()
])
}
1 change: 1 addition & 0 deletions commands/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export { station } from './station.js'
export { metrics } from './metrics.js'
export { logs } from './logs.js'
export { activity } from './activity.js'
export { events } from './events.js'
31 changes: 5 additions & 26 deletions commands/metrics.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,11 @@
import fs from 'node:fs/promises'
import { Tail } from 'tail'
import { paths } from '../lib/paths.js'
import { formatLog, parseLog } from '../lib/log.js'
import { maybeCreateFile } from '../lib/util.js'

const metricsLogLineToJSON = metrics =>
JSON.stringify(JSON.parse(parseLog(metrics).text), 0, 2)

const followMetrics = () => {
const tail = new Tail(paths.metrics, { nLines: 1 })
tail.on('line', line => console.log(metricsLogLineToJSON(line)))
}

const getLatestMetrics = async () => {
const metrics = await fs.readFile(paths.metrics, 'utf-8')
console.log(metricsLogLineToJSON(metrics.trim().split('\n').pop()))
}
import { followMetrics, getLatestMetrics } from '../lib/metrics.js'

export const metrics = async ({ follow }) => {
await maybeCreateFile(
paths.metrics,
formatLog(
JSON.stringify({ totalJobsCompleted: 0, totalEarnings: '0' }) + '\n'
)
)
if (follow) {
followMetrics()
for await (const obj of followMetrics()) {
console.log(JSON.stringify(obj, 0, 2))
}
} else {
await getLatestMetrics()
console.log(JSON.stringify(await getLatestMetrics(), 0, 2))
}
}
29 changes: 28 additions & 1 deletion lib/activity.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { paths } from './paths.js'
import { createLogStream, SerializeStream } from './log.js'
import { createLogStream, SerializeStream, parseLog } from './log.js'
import { Transform } from 'node:stream'
import { maybeCreateFile } from './util.js'
import { Tail } from 'tail'
import { on } from 'node:events'
import fs from 'node:fs/promises'

class AddSourceStream extends Transform {
constructor (source) {
Expand All @@ -20,3 +24,26 @@ export const createActivityStream = source => {
.pipe(createLogStream(paths.activity))
return addSourceStream
}

const activityLogLineToObject = line => {
const { date, text } = parseLog(line)
const { type, message, source } = JSON.parse(text)
return { date, type, source, message }
}

export const followActivity = async function * ({ signal } = {}) {
const tail = new Tail(paths.activity, { nLines: 10 })
for await (const [line] of on(tail, 'line', { signal })) {
yield activityLogLineToObject(line)
}
}

export const getActivity = async () => {
return (await fs.readFile(paths.activity, 'utf-8'))
.trim()
.split('\n')
.filter(Boolean)
.map(line => activityLogLineToObject(line))
}

export const maybeCreateActivityFile = () => maybeCreateFile(paths.activity)
30 changes: 29 additions & 1 deletion lib/metrics.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { createLogStream, SerializeStream } from './log.js'
import { createLogStream, SerializeStream, parseLog, formatLog } from './log.js'
import { Transform } from 'node:stream'
import { writePoint } from './telemetry.js'
import { Point } from '@influxdata/influxdb-client'
import fs from 'node:fs/promises'
import { on } from 'node:events'
import { paths } from './paths.js'
import { Tail } from 'tail'
import { maybeCreateFile } from './util.js'

// Metrics stream
// - Filters duplicate entries
Expand All @@ -16,6 +21,29 @@ export const createMetricsStream = metricsPath => {
return deduplicateStream
}

export const maybeCreateMetricsFile = async () => {
await maybeCreateFile(
paths.metrics,
formatLog(
JSON.stringify({ totalJobsCompleted: 0, totalEarnings: '0' }) + '\n'
)
)
}

const metricsLogLineToObject = metrics => JSON.parse(parseLog(metrics).text)

export const getLatestMetrics = async () => {
const metrics = await fs.readFile(paths.metrics, 'utf-8')
return metricsLogLineToObject(metrics.trim().split('\n').pop())
}

export const followMetrics = async function * ({ signal } = {}) {
const tail = new Tail(paths.metrics, { nLines: 1 })
for await (const [line] of on(tail, 'line', { signal })) {
yield metricsLogLineToObject(line)
}
}

class DeduplicateStream extends Transform {
constructor () {
super({ objectMode: true })
Expand Down
27 changes: 27 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,33 @@ test('Activity', async t => {
})
})

test('Events', async t => {
const XDG_STATE_HOME = join(tmpdir(), randomUUID())
await fs.mkdir(
dirname(getPaths(XDG_STATE_HOME).activity),
{ recursive: true }
)
await fs.writeFile(
getPaths(XDG_STATE_HOME).activity,
'[3/14/2023, 10:38:14 AM] {"source":"Saturn","type":"info","message":"beep boop"}\n'
)
const ps = execa(
station,
['events'],
{ env: { XDG_STATE_HOME } }
)
const events = []
for await (const line of ps.stdout) {
events.push(JSON.parse(line.toString()))
if (events.length === 2) break
}
ps.kill()
t.same(events, [
{ type: 'jobs-completed', total: 0 },
{ type: 'activity:info', module: 'Saturn', message: 'beep boop' }
])
})

test('Lockfile', async t => {
const XDG_STATE_HOME = join(tmpdir(), randomUUID())
const ps = execa(station, { env: { XDG_STATE_HOME, FIL_WALLET_ADDRESS } })
Expand Down