Skip to content

Commit

Permalink
refactor pubsub/monitor to use libp2p.pubsub
Browse files Browse the repository at this point in the history
Closes #28
  • Loading branch information
tabcat committed Nov 23, 2022
1 parent b6435ea commit 81e6afd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 53 deletions.
95 changes: 42 additions & 53 deletions src/pubsub/monitor.ts
Original file line number Diff line number Diff line change
@@ -1,80 +1,69 @@
import EventEmitter from 'events'
import { IPFS } from 'ipfs-core-types'
import { Startable } from '@libp2p/interfaces/startable'
import { Libp2p } from 'libp2p'

const peerJoin = 'peer-join'
const peerLeave = 'peer-leave'
const update = 'update'
import { peerIdString } from '~utils/index'

export class Monitor extends EventEmitter implements Startable {
readonly ipfs: IPFS
readonly topic: string
readonly interval: number
const peersJoin = 'peers-join'
const peersLeave = 'peers-leave'
const update = 'update'

export class Monitor extends EventEmitter {
private _isConnected: boolean
peers: Set<string>

private _isStarted: boolean

isStarted (): boolean {
return this._isStarted
isConnected (): boolean {
return this._isConnected
}

start (): void {
if (this.isStarted()) {
return
}

this._isStarted = true
constructor (
readonly libp2p: Libp2p,
readonly topic: string
) {
super()

const refresh = (): void => {
void this._poll().then(() => {
if (this.isStarted()) {
setTimeout(refresh, this.interval)
}
})
}
this._isConnected = false
this.peers = new Set()

refresh()
this.on(peersJoin, () => this.emit(update))
this.on(peersLeave, () => this.emit(update))
}

stop (): void {
if (!this.isStarted()) {
return
connect (): void {
if (!this.isConnected()) {
this.libp2p.pubsub.addEventListener('subscription-change', this._refreshPeers)
this.peers = new Set(this.libp2p.pubsub.getSubscribers(this.topic).map(peerIdString))
this._isConnected = true
}

this._isStarted = false
}

constructor (ipfs: IPFS, topic: string, interval: number = 1000) {
super()
this.ipfs = ipfs
this.topic = topic
this.interval = 1000

this.peers = new Set()

this.on(peerJoin, () => this.emit(update))
this.on(peerLeave, () => this.emit(update))

this._isStarted = false
disconnect (): void {
if (this.isConnected()) {
this.libp2p.pubsub.removeEventListener('subscription-change', this._refreshPeers)
this.peers = new Set()
this._isConnected = false
}
}

async _poll (): Promise<void> {
_refreshPeers (): void {
const _peers = this.peers
const peers = new Set((await this.ipfs.pubsub.peers(this.topic)).map(String))
this.peers = new Set(this.libp2p.pubsub.getSubscribers(this.topic).map(peerIdString))

if (!this.isStarted()) {
return
const join = new Set()
for (const peer of this.peers) {
!_peers.has(peer) && join.add(peer)
}

this.peers = peers
const leave = new Set()
for (const peer of _peers) {
!this.peers.has(peer) && leave.add(peer)
}

for (const peer of this.peers) {
!_peers.has(peer) && this.emit(peerJoin, peer)
if (join.size > 0) {
this.emit(peersJoin, join)
}

for (const peer of _peers) {
!this.peers.has(peer) && this.emit(peerLeave, peer)
if (leave.size > 0) {
this.emit(peersLeave, leave)
}
}
}
5 changes: 5 additions & 0 deletions src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import path from 'path'
import { CID } from 'multiformats/cid'
import { base32 } from 'multiformats/bases/base32'
import { peerIdFromString } from '@libp2p/peer-id'
import { PeerId } from '@libp2p/interface-peer-id'

import { Registry } from '~registry/index.js'
import { ManifestData } from '~manifest/interface.js'
Expand All @@ -16,6 +18,9 @@ export const parsedcid = (string: string): CID => CID.parse(string, base32)
export const encodedcid = (cid: CID): Uint8Array => cid.bytes
export const decodedcid = (bytes: Uint8Array): CID => CID.decode(bytes)

export const peerIdString = (peerId: PeerId): string => peerId.toCID().toString(base32)
export const parsedPeerId = (peerId: string): PeerId => peerIdFromString(peerId)

export interface DirsReturn {
[name: string]: string
}
Expand Down

0 comments on commit 81e6afd

Please sign in to comment.