diff --git a/.changeset/solid-boats-try.md b/.changeset/solid-boats-try.md new file mode 100644 index 00000000..74a5d662 --- /dev/null +++ b/.changeset/solid-boats-try.md @@ -0,0 +1,5 @@ +--- +"nostream": minor +--- + +added wot graph service and unit test diff --git a/.knip.json b/.knip.json index eb3256db..3de2a881 100644 --- a/.knip.json +++ b/.knip.json @@ -14,7 +14,8 @@ "lzma-native" ], "ignore": [ - ".nostr/**" + ".nostr/**", + "src/services/wot-service.ts" ], "commitlint": false, "eslint": false, diff --git a/resources/default-settings.yaml b/resources/default-settings.yaml index 1f8948a5..f67b315b 100755 --- a/resources/default-settings.yaml +++ b/resources/default-settings.yaml @@ -73,6 +73,10 @@ wot: minimumFollowers: 1 # Hours between full trust graph rebuilds. refreshIntervalHours: 24 + # Relay URLs to fetch follow lists from when building the trust graph. + seedRelays: + - "wss://relay.damus.io" + - "wss://relay.nostr.band" network: maxPayloadSize: 524288 # Uncomment only when using a trusted reverse proxy and configuring trustedProxies. diff --git a/src/@types/services.ts b/src/@types/services.ts index 5d8ea229..2de698ea 100644 --- a/src/@types/services.ts +++ b/src/@types/services.ts @@ -1,5 +1,6 @@ import { Invoice } from './invoice' import { Pubkey } from './base' +import { WoTSettings } from './settings' export interface IMaintenanceService { clearOldEvents(): Promise @@ -14,3 +15,10 @@ export interface IPaymentsService { sendInvoiceUpdateNotification(invoice: Invoice): Promise getPendingInvoices(): Promise } + +export interface IWotService { + buildGraph(settings: WoTSettings): Promise + isTrusted(pubkey: string): boolean + isReady(): boolean + reset(): void +} diff --git a/src/@types/settings.ts b/src/@types/settings.ts index 348efdae..924f4857 100644 --- a/src/@types/settings.ts +++ b/src/@types/settings.ts @@ -283,6 +283,11 @@ export interface WoTSettings { * Defaults to 24. */ refreshIntervalHours: number + /** + * Relay URLs to fetch follow lists from when building the trust graph. + * Falls back to the relay's own URL if empty. + */ + seedRelays: string[] } export interface Settings { diff --git a/src/services/wot-service.ts b/src/services/wot-service.ts new file mode 100644 index 00000000..cda66741 --- /dev/null +++ b/src/services/wot-service.ts @@ -0,0 +1,257 @@ +import { RawData, WebSocket } from 'ws' +import { randomUUID } from 'crypto' + +import { createLogger } from '../factories/logger-factory' +import { IWotService } from '../@types/services' +import { WoTSettings } from '../@types/settings' + +const logger = createLogger('wot-service') + +const PHASE1_TIMEOUT_MS = 5_000 +const PHASE2_BATCH_SIZE = 500 +const PHASE2_CONCURRENCY = 5 +const PHASE2_BATCH_TIMEOUT_MS = 30_000 + +// Kind 3 — contact / follow list +const KIND_FOLLOW_LIST = 3 + +/** + * Open a WebSocket to `relayUrl`, send a REQ for `filter`, collect all EVENT + * payloads, close on EOSE or timeout, resolve with collected events. + */ +function fetchEvents( + relayUrl: string, + filter: Record, + timeoutMs: number, +): Promise { + return new Promise((resolve) => { + const subId = `wot-${randomUUID().slice(0, 8)}` + const events: any[] = [] + let settled = false + + const finish = () => { + if (settled) { + return + } + settled = true + clearTimeout(timer) + try { ws.close() } catch { /* ignore */ } + resolve(events) + } + + const timer = setTimeout(finish, timeoutMs) + + let ws: WebSocket + try { + ws = new WebSocket(relayUrl, { timeout: timeoutMs }) + } catch (err) { + logger.warn('wot-service: could not create WebSocket to %s: %o', relayUrl, err) + clearTimeout(timer) + resolve(events) + return + } + + ws.on('open', () => { + ws.send(JSON.stringify(['REQ', subId, filter])) + }) + + ws.on('message', (raw: RawData) => { + try { + const msg = JSON.parse(raw.toString('utf8')) + if (!Array.isArray(msg)) { + return + } + + if (msg[0] === 'EVENT' && msg[1] === subId && msg[2]) { + events.push(msg[2]) + } else if (msg[0] === 'EOSE' && msg[1] === subId) { + finish() + } + } catch { /* malformed message — ignore */ } + }) + + ws.on('error', (err) => { + logger.warn('wot-service: WebSocket error for %s: %o', relayUrl, err) + finish() + }) + + ws.on('close', finish) + }) +} + +/** + * Fetch from multiple relays in parallel, deduplicate events by id. + * Exported so it can be injected / replaced in tests. + */ +export async function fetchFromRelays( + relayUrls: string[], + filter: Record, + timeoutMs: number, +): Promise { + const results = await Promise.all( + relayUrls.map((url) => fetchEvents(url, filter, timeoutMs)) + ) + + const seen = new Set() + const deduped: any[] = [] + + for (const batch of results) { + for (const event of batch) { + if (typeof event.id === 'string' && !seen.has(event.id)) { + seen.add(event.id) + deduped.push(event) + } + } + } + + return deduped +} + +/** + * Run up to `concurrency` async tasks from `items` at a time. + */ +async function runConcurrent( + items: T[], + concurrency: number, + task: (item: T) => Promise, +): Promise { + const queue = [...items] + const workers = Array.from({ length: Math.min(concurrency, queue.length) }, async () => { + while (queue.length > 0) { + const item = queue.shift() + if (item !== undefined) { + await task(item) + } + } + }) + await Promise.all(workers) +} + +// static service + +export type RelayFetcher = ( + relayUrls: string[], + filter: Record, + timeoutMs: number, +) => Promise + +export class WotService implements IWotService { + private trustMap: Map = new Map() + private booted = false + private building = false + + /** + * @param fetcher - relay fetch function. Defaults to the real WebSocket + * implementation. Pass a stub in tests. + */ + public constructor(private readonly fetcher: RelayFetcher = fetchFromRelays) {} + + public async buildGraph(settings: WoTSettings): Promise { + if (this.building) { + logger('build already in progress — skipping') + return + } + + this.building = true + logger.info('starting WoT graph build, seed=%s relays=%o', settings.seedPubkey, settings.seedRelays) + + try { + // local accumulators — never touch live state during the build + const followerCount = new Map() + const oneHopSet = new Set() + + // ── Phase 1: fetch owner's follow list (1-hop) ────────────────────────── + const phase1Events = await this.fetcher( + settings.seedRelays, + { authors: [settings.seedPubkey], kinds: [KIND_FOLLOW_LIST] }, + PHASE1_TIMEOUT_MS, + ) + + for (const event of phase1Events) { + if (!Array.isArray(event.tags)) { + continue + } + for (const tag of event.tags) { + if (Array.isArray(tag) && tag[0] === 'p' && typeof tag[1] === 'string' && tag[1].length === 64) { + const pubkey = tag[1] + oneHopSet.add(pubkey) + followerCount.set(pubkey, (followerCount.get(pubkey) ?? 0) + 1) + } + } + } + + logger.info('phase 1 complete: %d 1-hop pubkeys', oneHopSet.size) + + // ── Phase 2: fetch 2-hop follow lists (batched + concurrent) ─────────── + const oneHopList = Array.from(oneHopSet) + const batches: string[][] = [] + + for (let i = 0; i < oneHopList.length; i += PHASE2_BATCH_SIZE) { + batches.push(oneHopList.slice(i, i + PHASE2_BATCH_SIZE)) + } + + await runConcurrent(batches, PHASE2_CONCURRENCY, async (batch) => { + try { + const events = await this.fetcher( + settings.seedRelays, + { authors: batch, kinds: [KIND_FOLLOW_LIST] }, + PHASE2_BATCH_TIMEOUT_MS, + ) + + for (const event of events) { + if (!Array.isArray(event.tags)) { + continue + } + for (const tag of event.tags) { + if (Array.isArray(tag) && tag[0] === 'p' && typeof tag[1] === 'string' && tag[1].length === 64) { + const pubkey = tag[1] + followerCount.set(pubkey, (followerCount.get(pubkey) ?? 0) + 1) + } + } + } + } catch (err) { + logger.warn('wot-service: phase 2 batch failed: %o', err) + } + }) + + logger.info('phase 2 complete: %d unique pubkeys in follower map', followerCount.size) + + // ── Phase 3: build new trust map and swap atomically ─────────────────── + const newTrustMap = new Map() + + // owner is always trusted + newTrustMap.set(settings.seedPubkey, true) + + for (const [pubkey, count] of followerCount) { + if (count >= settings.minimumFollowers) { + newTrustMap.set(pubkey, true) + } + } + + // atomic swap + this.trustMap = newTrustMap + this.booted = true + + logger.info('WoT graph build complete: %d trusted pubkeys', newTrustMap.size) + } catch (err) { + logger.error('wot-service: graph build failed: %o', err) + throw err + } finally { + this.building = false + } + } + + public isTrusted(pubkey: string): boolean { + return this.trustMap.get(pubkey) === true + } + + public isReady(): boolean { + return this.booted + } + + public reset(): void { + this.trustMap = new Map() + this.booted = false + this.building = false + } +} diff --git a/test/unit/services/wot-service.spec.ts b/test/unit/services/wot-service.spec.ts new file mode 100644 index 00000000..bf683bae --- /dev/null +++ b/test/unit/services/wot-service.spec.ts @@ -0,0 +1,239 @@ +import chai from 'chai' +import chaiAsPromised from 'chai-as-promised' +import Sinon from 'sinon' +import sinonChai from 'sinon-chai' + +import { WoTSettings } from '../../../src/@types/settings' +import { WotService, RelayFetcher } from '../../../src/services/wot-service' + +chai.use(sinonChai) +chai.use(chaiAsPromised) + +const { expect } = chai + +const SEED_PUBKEY = 'a'.repeat(64) +const TRUSTED_PUBKEY = 'b'.repeat(64) +const UNTRUSTED_PUBKEY = 'c'.repeat(64) +const UNKNOWN_PUBKEY = 'd'.repeat(64) + +const baseSettings: WoTSettings = { + enabled: true, + seedPubkey: SEED_PUBKEY, + minimumFollowers: 1, + refreshIntervalHours: 24, + seedRelays: ['wss://relay.example.com'], +} + +/** Building a minimal kind-3 event */ +function makeFollowEvent(id: string, authorPubkey: string, follows: string[]): object { + return { + id, + pubkey: authorPubkey, + kind: 3, + tags: follows.map((pk) => ['p', pk]), + content: '', + created_at: 1_700_000_000, + sig: 'f'.repeat(128), + } +} + +describe('WotService', () => { + let sandbox: Sinon.SinonSandbox + let fetcher: Sinon.SinonStub + let service: WotService + + beforeEach(() => { + sandbox = Sinon.createSandbox() + // Default: return empty list for every relay fetch call + fetcher = sandbox.stub, ReturnType>().resolves([]) + service = new WotService(fetcher) + }) + + afterEach(() => { + sandbox.restore() + }) + + // testing initial state: + + describe('before any build', () => { + it('isReady() returns false', () => { + expect(service.isReady()).to.equal(false) + }) + + it('isTrusted() returns false for any pubkey', () => { + expect(service.isTrusted(SEED_PUBKEY)).to.equal(false) + expect(service.isTrusted(TRUSTED_PUBKEY)).to.equal(false) + }) + }) + + // testing the build graph state: + + describe('buildGraph()', () => { + it('sets isReady() to true after a successful build', async () => { + await service.buildGraph(baseSettings) + + expect(service.isReady()).to.equal(true) + }) + + it('seedPubkey is always trusted after build regardless of follower count', async () => { + // fetcher returns nothing — seed has no followers at all + fetcher.resolves([]) + + await service.buildGraph(baseSettings) + + expect(service.isTrusted(SEED_PUBKEY)).to.equal(true) + }) + + it('trusts a pubkey that meets minimumFollowers threshold', async () => { + // phase 1: seed follows TRUSTED_PUBKEY (follower count = 1) + fetcher.onFirstCall().resolves([ + makeFollowEvent('evt1', SEED_PUBKEY, [TRUSTED_PUBKEY]), + ]) + + await service.buildGraph({ ...baseSettings, minimumFollowers: 1 }) + + expect(service.isTrusted(TRUSTED_PUBKEY)).to.equal(true) + }) + + it('does not trust a pubkey below minimumFollowers threshold', async () => { + // phase 1: seed follows TRUSTED_PUBKEY + // phase 2: TRUSTED_PUBKEY follows UNTRUSTED_PUBKEY + // UNTRUSTED_PUBKEY has count=1, minimumFollowers=2 → not trusted + fetcher.onFirstCall().resolves([ + makeFollowEvent('evt1', SEED_PUBKEY, [TRUSTED_PUBKEY]), + ]) + fetcher.onSecondCall().resolves([ + makeFollowEvent('evt2', TRUSTED_PUBKEY, [UNTRUSTED_PUBKEY]), + ]) + + await service.buildGraph({ ...baseSettings, minimumFollowers: 2 }) + + expect(service.isTrusted(UNTRUSTED_PUBKEY)).to.equal(false) + }) + + it('returns false for a pubkey not seen in any follow list', async () => { + fetcher.onFirstCall().resolves([ + makeFollowEvent('evt1', SEED_PUBKEY, [TRUSTED_PUBKEY]), + ]) + + await service.buildGraph(baseSettings) + + expect(service.isTrusted(UNKNOWN_PUBKEY)).to.equal(false) + }) + + it('is a no-op when a build is already in flight', async () => { + // first call hangs — never resolves + let releaseHang!: () => void + fetcher.onFirstCall().returns( + new Promise((resolve) => { releaseHang = () => resolve([]) }) + ) + + // start first build — hangs on phase 1 fetch + const first = service.buildGraph(baseSettings) + + // second call while first is in flight — must return immediately + await service.buildGraph(baseSettings) + + // fetcher called only once (by the first build) + expect(fetcher.callCount).to.equal(1) + + // clean up + releaseHang() + await first + }) + + it('clears the building flag even when buildGraph throws', async () => { + fetcher.onFirstCall().rejects(new Error('relay unreachable')) + + await expect(service.buildGraph(baseSettings)).to.eventually.be.rejectedWith('relay unreachable') + + // building flag must be cleared — a subsequent build must proceed + fetcher.reset() + fetcher.resolves([]) + + await expect(service.buildGraph(baseSettings)).to.eventually.be.fulfilled + }) + + it('phase 1 tag parsing ignores non-p tags', async () => { + fetcher.onFirstCall().resolves([ + { + id: 'evt1', + pubkey: SEED_PUBKEY, + kind: 3, + tags: [ + ['e', TRUSTED_PUBKEY], // wrong tag type — must be ignored + ['p', TRUSTED_PUBKEY], // correct + ], + content: '', + created_at: 1_700_000_000, + sig: 'f'.repeat(128), + }, + ]) + + await service.buildGraph(baseSettings) + + expect(service.isTrusted(TRUSTED_PUBKEY)).to.equal(true) + }) + + it('phase 1 tag parsing ignores pubkeys that are not 64 hex chars', async () => { + fetcher.onFirstCall().resolves([ + { + id: 'evt1', + pubkey: SEED_PUBKEY, + kind: 3, + tags: [ + ['p', 'tooshort'], // invalid length — must be ignored + ['p', TRUSTED_PUBKEY], // valid + ], + content: '', + created_at: 1_700_000_000, + sig: 'f'.repeat(128), + }, + ]) + + await service.buildGraph(baseSettings) + + expect(service.isTrusted('tooshort')).to.equal(false) + expect(service.isTrusted(TRUSTED_PUBKEY)).to.equal(true) + }) + }) + + // testting reset functionality: + + describe('reset()', () => { + it('clears booted state — isReady() returns false after reset', async () => { + await service.buildGraph(baseSettings) + expect(service.isReady()).to.equal(true) + + service.reset() + + expect(service.isReady()).to.equal(false) + }) + + it('clears trust map — isTrusted() returns false for all pubkeys after reset', async () => { + fetcher.onFirstCall().resolves([ + makeFollowEvent('evt1', SEED_PUBKEY, [TRUSTED_PUBKEY]), + ]) + await service.buildGraph(baseSettings) + expect(service.isTrusted(SEED_PUBKEY)).to.equal(true) + expect(service.isTrusted(TRUSTED_PUBKEY)).to.equal(true) + + service.reset() + + expect(service.isTrusted(SEED_PUBKEY)).to.equal(false) + expect(service.isTrusted(TRUSTED_PUBKEY)).to.equal(false) + }) + + it('allows a fresh build after reset', async () => { + await service.buildGraph(baseSettings) + service.reset() + + fetcher.reset() + fetcher.resolves([]) + + await service.buildGraph(baseSettings) + + expect(service.isReady()).to.equal(true) + }) + }) +}) diff --git a/test/unit/utils/settings.spec.ts b/test/unit/utils/settings.spec.ts index dff99e6a..b2536589 100644 --- a/test/unit/utils/settings.spec.ts +++ b/test/unit/utils/settings.spec.ts @@ -270,6 +270,8 @@ describe('SettingsStatic', () => { expect(defaults).to.have.nested.property('wot.seedPubkey', '') expect(defaults).to.have.nested.property('wot.minimumFollowers', 1) expect(defaults).to.have.nested.property('wot.refreshIntervalHours', 24) + expect(defaults).to.have.nested.property('wot.seedRelays') + expect((defaults as any).wot.seedRelays).to.be.an('array').with.length.greaterThan(0) }) it('user config wot block overrides defaults', () => { @@ -283,6 +285,16 @@ describe('SettingsStatic', () => { expect(merged.wot?.minimumFollowers).to.equal(3) // non-overridden fields stay as defaults expect(merged.wot?.refreshIntervalHours).to.equal(24) + expect(merged.wot?.seedRelays).to.be.an('array').with.length.greaterThan(0) + }) + + it('user config seedRelays overrides default relay list', () => { + const defaults = SettingsStatic.loadAndParseYamlFile( + SettingsStatic.getDefaultSettingsFilePath() + ) + const userConfig = { wot: { seedRelays: ['wss://my-relay.com'] } } + const merged = mergeDeepRight(defaults, userConfig) as Settings + expect(merged.wot?.seedRelays).to.deep.equal(['wss://my-relay.com']) }) }) })