Skip to content

Commit

Permalink
Add monitor (#375)
Browse files Browse the repository at this point in the history
* Add monitor

* Simplify monitor + tests

* Remove 'done' event

* load the local state on download

* Change how local state of the file is loaded

* Skip bitfield for now

* Use speedometer

* expose upload/download speed methods

* gc monitors

* Remove monitor from the Set on close

* Resolve comments

* Add a note
  • Loading branch information
MKPLKN committed Sep 9, 2024
1 parent 951e511 commit f0ae34e
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 0 deletions.
16 changes: 16 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const safetyCatch = require('safety-catch')
const crypto = require('hypercore-crypto')
const Hypercore = require('hypercore')
const { BLOCK_NOT_AVAILABLE, BAD_ARGUMENT } = require('hypercore-errors')
const Monitor = require('./lib/monitor')

const keyEncoding = new SubEncoder('files', 'utf-8')

Expand All @@ -30,6 +31,7 @@ module.exports = class Hyperdrive extends ReadyResource {
this.blobs = null
this.supportsMetadata = true
this.encryptionKey = opts.encryptionKey || null
this.monitors = new Set()

this._active = opts.active !== false
this._openingBlobs = null
Expand Down Expand Up @@ -188,6 +190,8 @@ module.exports = class Hyperdrive extends ReadyResource {
if (!this._checkout && !this._batching) {
await this.corestore.close()
}

await this.closeMonitors()
}

async _openBlobsFromHeader (opts) {
Expand Down Expand Up @@ -278,6 +282,18 @@ module.exports = class Hyperdrive extends ReadyResource {
return this.blobs
}

monitor (name, opts = {}) {
const monitor = new Monitor(this, { name, ...opts })
this.monitors.add(monitor)
return monitor
}

async closeMonitors () {
const closing = []
for (const monitor of this.monitors) closing.push(monitor.close())
await Promise.allSettled(closing)
}

async get (name, opts) {
const node = await this.entry(name, opts)
if (!node?.value.blob) return null
Expand Down
111 changes: 111 additions & 0 deletions lib/monitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
const ReadyResource = require('ready-resource')
const safetyCatch = require('safety-catch')
const speedometer = require('speedometer')

module.exports = class Monitor extends ReadyResource {
constructor (drive, opts = {}) {
super()
this.drive = drive
this.blobs = null
this.name = opts.name || null
this.entry = opts.entry || null

this._boundOnUpload = this._onUpload.bind(this)
this._boundOnDownload = this._onDownload.bind(this)

const stats = {
startTime: 0,
percentage: 0,
peers: 0,
speed: 0,
blocks: 0,
totalBytes: 0, // local + bytes loaded during monitoring
monitoringBytes: 0, // bytes loaded during monitoring
targetBytes: 0,
targetBlocks: 0
}

// Updated on each upload/download event
this.uploadStats = { ...stats }
this.downloadStats = { ...stats }

this.uploadSpeedometer = null
this.downloadSpeedometer = null

this.ready().catch(safetyCatch)
}

async _open () {
await this.drive.ready()
this.blobs = await this.drive.getBlobs()
if (!this.entry && this.name) this.entry = await this.drive.entry(this.name)
if (this.entry) this._setEntryInfo()

// Handlers
this.blobs.core.on('upload', this._boundOnUpload)
this.blobs.core.on('download', this._boundOnDownload)
}

async _close () {
this.blobs.core.off('upload', this._boundOnUpload)
this.blobs.core.off('download', this._boundOnDownload)
this.drive.monitors.delete(this)
}

_setEntryInfo () {
if (!this.downloadStats.targetBytes || !this.downloadStats.targetBlocks) {
this.downloadStats.targetBytes = this.entry.value.blob.byteLength
this.downloadStats.targetBlocks = this.entry.value.blob.blockLength
}

if (!this.uploadStats.targetBytes || !this.uploadStats.targetBlocks) {
this.uploadStats.targetBytes = this.entry.value.blob.byteLength
this.uploadStats.targetBlocks = this.entry.value.blob.blockLength
}
}

_onUpload (index, bytes, from) {
if (!this.uploadSpeedometer) this.uploadSpeedometer = speedometer()
this.uploadStats.speed = this.uploadSpeedometer(bytes)
this._updateStats(this.uploadStats, index, bytes, from)
}

_onDownload (index, bytes, from) {
if (!this.downloadSpeedometer) this.downloadSpeedometer = speedometer()
this.downloadStats.speed = this.downloadSpeedometer(bytes)
this._updateStats(this.downloadStats, index, bytes, from)
}

_updateStats (stats, index, bytes, from) {
if (!this.entry || this.closing) return
if (!isWithinRange(index, this.entry)) return

if (!stats.startTime) stats.startTime = Date.now()
stats.peers = from.replicator.peers.length
stats.blocks++
stats.monitoringBytes += bytes
stats.totalBytes += bytes
// NOTE: you should not rely on the percentage until the monitor is initialized with the local state of the file
stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100)

this.emit('update')
}

downloadSpeed () {
return this.downloadSpeedometer ? this.downloadSpeedometer() : 0
}

uploadSpeed () {
return this.uploadSpeedometer ? this.uploadSpeedometer() : 0
}
}

function isWithinRange (index, entry) {
if (!entry || !entry.value) return
const { blockOffset, blockLength } = entry.value.blob
return index >= blockOffset && index < blockOffset + blockLength
}

function toFixed (n) {
return Math.round(n * 100) / 100
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"mirror-drive": "^1.2.0",
"ready-resource": "^1.0.0",
"safety-catch": "^1.0.2",
"speedometer": "^1.1.0",
"streamx": "^2.12.4",
"sub-encoder": "^2.1.1",
"unix-path-resolve": "^1.0.2"
Expand Down
63 changes: 63 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1562,6 +1562,69 @@ test('drive.list (recursive false) ignore', async (t) => {
t.alike(entries, expectedEntries)
})

test('upload/download can be monitored', async (t) => {
t.plan(27)
const { corestore, drive, swarm, mirror } = await testenv(t.teardown)
swarm.on('connection', (conn) => corestore.replicate(conn))
swarm.join(drive.discoveryKey, { server: true, client: false })
await swarm.flush()

mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn))
mirror.swarm.join(drive.discoveryKey, { server: false, client: true })
await mirror.swarm.flush()

const file = '/example.md'
const bytes = 1024 * 100 // big enough to trigger more than one update event
const buffer = Buffer.alloc(bytes, '0')
await drive.put(file, buffer)

{
// Start monitoring upload
const monitor = drive.monitor(file)
await monitor.ready()
t.is(monitor.name, file)
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.uploadStats.blocks, expectedBlocks.pop())
t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop())
t.is(monitor.uploadStats.targetBlocks, 2)
t.is(monitor.uploadStats.targetBytes, bytes)
t.is(monitor.uploadSpeed(), monitor.uploadStats.speed)
if (!expectedBlocks.length) t.is(monitor.uploadStats.percentage, 100)
t.absent(monitor.downloadStats.blocks)
})
}

{
// Start monitoring download
const monitor = mirror.drive.monitor(file)
await monitor.ready()
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.downloadStats.blocks, expectedBlocks.pop())
t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop())
t.is(monitor.downloadStats.targetBlocks, 2)
t.is(monitor.downloadStats.targetBytes, bytes)
t.is(monitor.downloadSpeed(), monitor.downloadStats.speed)
if (!expectedBlocks.length) t.is(monitor.downloadStats.percentage, 100)
t.absent(monitor.uploadStats.blocks)
})
}

await mirror.drive.get(file)
})

test('monitor is removed from the Set on close', async (t) => {
const { drive } = await testenv(t.teardown)
const monitor = drive.monitor('/example.md')
await monitor.ready()
t.is(drive.monitors.size, 1)
await monitor.close()
t.is(drive.monitors.size, 0)
})

async function testenv (teardown) {
const corestore = new Corestore(RAM)
await corestore.ready()
Expand Down

0 comments on commit f0ae34e

Please sign in to comment.