Skip to content

Commit

Permalink
feat: pss send and subscribe (#49)
Browse files Browse the repository at this point in the history
* feat: pss send and subscribe

* chore: fix linter issues

* test: add peer tests with connectivity endpoints

* test: change debug url

* test: fix linter error

* feat: add pss send, receive and subscribe to Bee class

* test: add await to pssSend

* refactor: change pssReceive to use pssSubscribe

* fix: sleep types in test utils

* test: changed tests to promise style

* fix: leaking handles in tests

* test: avoid stuck tests when failing expectations

* test: fix empty message issues

* docs: add comment to pssReceive

* chore: add export to interfaces
  • Loading branch information
agazso committed Jan 4, 2021
1 parent 5930948 commit c0b2ac1
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 21 deletions.
17 changes: 15 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
},
"dependencies": {
"axios": "^0.21.0",
"tar-js": "^0.3.0"
"isomorphic-ws": "^4.0.1",
"tar-js": "^0.3.0",
"ws": "^7.4.1"
},
"devDependencies": {
"@babel/core": "^7.12.3",
Expand All @@ -59,6 +61,7 @@
"@types/tar-stream": "^2.1.0",
"@types/terser-webpack-plugin": "^5.0.2",
"@types/webpack-bundle-analyzer": "^3.9.0",
"@types/ws": "^7.4.0",
"@types/yargs-parser": "^15.0.0",
"@typescript-eslint/eslint-plugin": "^4.8.2",
"@typescript-eslint/parser": "^4.8.2",
Expand Down
143 changes: 142 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,20 @@ import * as collection from './modules/collection'
import * as tag from './modules/tag'
import * as pinning from './modules/pinning'
import * as bytes from './modules/bytes'
import { Tag, FileData, Reference, UploadOptions } from './types'
import * as pss from './modules/pss'
import * as connectivity from './modules/debug/connectivity'
import {
Tag,
FileData,
Reference,
UploadOptions,
PublicKey,
AddressPrefix,
Address,
PssMessageHandler,
PssSubscription,
} from './types'
import { BeeError } from './utils/error'

/**
* The Bee class provides a way of interacting with the Bee APIs based on the provided url
Expand Down Expand Up @@ -209,6 +222,134 @@ export class Bee {
unpinData(reference: Reference): Promise<pinning.Response> {
return pinning.unpinData(this.url, reference)
}

/**
* Send to recipient or target with Postal Service for Swarm
*
* @param topic Topic name
* @param target Target message address prefix
* @param data Message to be sent
* @param recipient Recipient public key
*
*/
pssSend(
topic: string,
target: AddressPrefix,
data: string | Uint8Array,
recipient?: PublicKey,
): Promise<pss.Response> {
return pss.send(this.url, topic, target, data, recipient)
}

/**
* Subscribe to messages with Postal Service for Swarm
*
* @param topic Topic name
* @param handler Message handler interface
*
* @returns Subscription to a given topic
*/
pssSubscribe(topic: string, handler: PssMessageHandler): PssSubscription {
const ws = pss.subscribe(this.url, topic)

let cancelled = false
const cancel = () => {
if (cancelled === false) {
cancelled = true
// although the WebSocket API offers a `close` function, it seems that
// with the library that we are using (isomorphic-ws) it doesn't close
// the websocket properly, whereas `terminate` does
ws.terminate()
}
}

const subscription = {
topic,
cancel,
}

ws.onmessage = ev => {
const data = new Uint8Array(Buffer.from(ev.data))

// ignore empty messages
if (data.length > 0) {
handler.onMessage(data, subscription)
}
}
ws.onerror = ev => {
// ignore errors after subscription was cancelled
if (!cancelled) {
handler.onError(new BeeError(ev.message), subscription)
}
}

return subscription
}

/**
* Receive message with Postal Service for Swarm
*
* Because sending a PSS message is slow and CPU intensive,
* it is not supposed to be used for general messaging but
* most likely for setting up an encrypted communication
* channel by sending a one-off message.
*
* This is a helper function to wait for exactly one message to
* arrive and then cancel the subscription. Additionally a
* timeout can be provided for the message to arrive or else
* an error will be thrown.
*
* @param topic Topic name
* @param timeoutMsec Timeout in milliseconds
*
* @returns Message in byte array
*/
pssReceive(topic: string, timeoutMsec = 0): Promise<Uint8Array> {
return new Promise((resolve, reject) => {
let timeout: number | undefined
const subscription = this.pssSubscribe(topic, {
onError: error => {
clearTimeout(timeout)
subscription.cancel()
reject(error.message)
},
onMessage: message => {
clearTimeout(timeout)
subscription.cancel()
resolve(message)
},
})

if (timeoutMsec > 0) {
// we need to cast the type because Typescript is getting confused with Node.js'
// alternative type definitions
timeout = (setTimeout(() => {
subscription.cancel()
reject(new BeeError('pssReceive timeout'))
}, timeoutMsec) as unknown) as number
}
})
}
}

/**
* The BeeDebug class provides a way of interacting with the Bee debug APIs based on the provided url
*
* @param url URL of a running Bee node
*/
export class BeeDebug {
constructor(readonly url: string) {}

async getOverlayAddress(): Promise<Address> {
const nodeAddresses = await connectivity.getNodeAddresses(this.url)

return nodeAddresses.overlay
}

async getPssPublicKey(): Promise<PublicKey> {
const nodeAddresses = await connectivity.getNodeAddresses(this.url)

return nodeAddresses.pss_public_key
}
}
export default Bee
34 changes: 34 additions & 0 deletions src/modules/debug/connectivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { safeAxios } from '../../utils/safeAxios'

export interface NodeAddresses {
overlay: string
underlay: string[]
ethereum: string
public_key: string
pss_public_key: string
}

export async function getNodeAddresses(url: string): Promise<NodeAddresses> {
const response = await safeAxios<NodeAddresses>({
url: url + '/addresses',
responseType: 'json',
})

return response.data
}

export interface Peer {
address: string
}
export interface Peers {
peers: Peer[]
}

export async function getPeers(url: string): Promise<Peers> {
const response = await safeAxios<Peers>({
url: url + '/peers',
responseType: 'json',
})

return response.data
}
50 changes: 50 additions & 0 deletions src/modules/pss.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import WebSocket from 'isomorphic-ws'

import { PublicKey } from '../types'
import { prepareData } from '../utils/data'
import { safeAxios } from '../utils/safeAxios'

const endpoint = '/pss'

export interface Response {
message: string
code: number
}

/**
* Send to recipient or target with Postal Service for Swarm
*
* @param url Bee url
* @param topic Topic name
* @param target Target message address prefix
* @param recipient Recipient public key
*
*/
export async function send(
url: string,
topic: string,
target: string,
data: string | Uint8Array,
recipient?: PublicKey,
): Promise<Response> {
const response = await safeAxios<Response>({
method: 'post',
url: `${url}${endpoint}/send/${topic}/${target.slice(0, 4)}`,
data: await prepareData(data),
responseType: 'json',
params: { recipient },
})

return response.data
}

/**
* Subscribe for messages on the given topic
*
* @param topic Topic name
*/
export function subscribe(url: string, topic: string): WebSocket {
const wsUrl = url.replace(/^http/i, 'ws')

return new WebSocket(`${wsUrl}${endpoint}/subscribe/${topic}`)
}
16 changes: 16 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { BeeError } from '../utils/error'

export interface Dictionary<T> {
[Key: string]: T
}

export type Reference = string
export type PublicKey = string

export type Address = string
export type AddressPrefix = Address

export const REFERENCE_LENGTH = 64
export const ENCRYPTED_REFERENCE_LENGTH = 2 * REFERENCE_LENGTH
Expand Down Expand Up @@ -58,3 +64,13 @@ export interface CollectionEntry<T> {
* Represents Collections
*/
export type Collection<T> = Array<CollectionEntry<T>>

export interface PssSubscription {
readonly topic: string
cancel: () => void
}

export interface PssMessageHandler {
onMessage: (message: Uint8Array, subscription: PssSubscription) => void
onError: (error: BeeError, subscription: PssSubscription) => void
}
Loading

0 comments on commit c0b2ac1

Please sign in to comment.