Skip to content

Commit

Permalink
feat!: close streams gracefully (#458)
Browse files Browse the repository at this point in the history
* feat!: close streams gracefully

- Updates all libp2p related deps
- Switches `it-pb-stream` for `it-length-prefixed-stream` since we only use the methods from that module
- Stream close methods are now async

BREAKING CHANGE: stream close methods are now asyc, requires libp2p@0.46.x or later

* chore: restore prettier and eslint

* chore: restore extra eslint deps
  • Loading branch information
achingbrain committed Aug 3, 2023
1 parent 83b8e61 commit 3153ebf
Show file tree
Hide file tree
Showing 32 changed files with 13,721 additions and 15,434 deletions.
28,823 changes: 13,561 additions & 15,262 deletions package-lock.json

Large diffs are not rendered by default.

72 changes: 28 additions & 44 deletions package.json
Expand Up @@ -3,8 +3,10 @@
"version": "9.1.0",
"description": "A typescript implementation of gossipsub",
"files": [
"src",
"dist",
"src"
"!dist/test",
"!**/*.tsbuildinfo"
],
"type": "module",
"types": "dist/src/index.d.ts",
Expand Down Expand Up @@ -70,71 +72,53 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/crypto": "^1.0.3",
"@libp2p/interface-connection": "^5.0.1",
"@libp2p/interface-connection-manager": "^3.0.1",
"@libp2p/interface-keys": "^1.0.3",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interface-peer-store": "^2.0.3",
"@libp2p/interface-pubsub": "^4.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interfaces": "^3.2.0",
"@libp2p/logger": "^2.0.0",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/peer-record": "^5.0.0",
"@libp2p/pubsub": "^7.0.1",
"@libp2p/topology": "^4.0.0",
"@multiformats/multiaddr": "^12.0.0",
"@libp2p/crypto": "^2.0.0",
"@libp2p/interface": "^0.1.0",
"@libp2p/interface-internal": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"@libp2p/peer-id": "^3.0.0",
"@libp2p/pubsub": "^8.0.0",
"@multiformats/multiaddr": "^12.1.3",
"abortable-iterator": "^5.0.1",
"denque": "^1.5.0",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.1",
"it-pipe": "^3.0.1",
"it-pushable": "^3.1.0",
"multiformats": "^11.0.0",
"protobufjs": "^6.11.2",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^4.0.2"
"it-pushable": "^3.2.0",
"multiformats": "^12.0.1",
"protobufjs": "^7.2.4",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.2.4",
"@dapplion/benchmark": "^0.2.2",
"@libp2p/floodsub": "^7.0.1",
"@libp2p/interface-libp2p": "^3.1.0",
"@libp2p/interface-mocks": "^12.0.1",
"@libp2p/interface-pubsub-compliance-tests": "^5.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/peer-store": "^8.1.2",
"@dapplion/benchmark": "^0.2.4",
"@libp2p/floodsub": "^8.0.0",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/peer-id-factory": "^3.0.0",
"@libp2p/peer-store": "^9.0.0",
"@types/node": "^17.0.21",
"@typescript-eslint/eslint-plugin": "^3.0.2",
"@typescript-eslint/parser": "^3.0.2",
"aegir": "^38.1.8",
"benchmark": "^2.1.4",
"aegir": "^40.0.1",
"datastore-core": "^9.1.1",
"delay": "^5.0.0",
"detect-node": "^2.1.0",
"delay": "^6.0.0",
"eslint": "^7.1.0",
"eslint-config-standard": "^14.1.1",
"eslint-plugin-import": "^2.20.2",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettier": "^4.0.0",
"eslint-plugin-promise": "^4.2.1",
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^2.0.2",
"lodash": "^4.17.15",
"mkdirp": "^1.0.4",
"os": "^0.1.1",
"mkdirp": "^3.0.1",
"p-defer": "^4.0.0",
"p-event": "^5.0.1",
"p-event": "^6.0.0",
"p-retry": "^5.1.2",
"p-times": "^4.0.0",
"p-wait-for": "^5.0.0",
"p-wait-for": "^5.0.2",
"prettier": "^2.0.5",
"promisify-es6": "^1.0.3",
"sinon": "^15.0.3",
"sinon": "^15.1.2",
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2",
"util": "^0.12.3"
"ts-sinon": "^2.0.2"
},
"engines": {
"npm": ">=8.7.0"
Expand Down
87 changes: 42 additions & 45 deletions src/index.ts
@@ -1,22 +1,21 @@
import { pipe } from 'it-pipe'
import type { Connection, Stream } from '@libp2p/interface-connection'
import type { Connection, Stream } from '@libp2p/interface/connection'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { Logger, logger } from '@libp2p/logger'
import { createTopology } from '@libp2p/topology'
import type { PeerId } from '@libp2p/interface-peer-id'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import { type Logger, logger } from '@libp2p/logger'
import type { PeerId } from '@libp2p/interface/peer-id'
import { CustomEvent, EventEmitter } from '@libp2p/interface/events'

import { MessageCache, MessageCacheRecord } from './message-cache.js'
import { RPC, IRPC } from './message/rpc.js'
import { MessageCache, type MessageCacheRecord } from './message-cache.js'
import { RPC, type IRPC } from './message/rpc.js'
import * as constants from './constants.js'
import { shuffle, messageIdToString } from './utils/index.js'
import {
PeerScore,
PeerScoreParams,
PeerScoreThresholds,
type PeerScoreParams,
type PeerScoreThresholds,
createPeerScoreParams,
createPeerScoreThresholds,
PeerScoreStatsDump
type PeerScoreStatsDump
} from './score/index.js'
import { IWantTracer } from './tracer.js'
import { SimpleTimeCache } from './utils/time-cache.js'
Expand All @@ -31,56 +30,54 @@ import {
getMetrics,
IHaveIgnoreReason,
InclusionReason,
Metrics,
MetricsRegister,
type Metrics,
type MetricsRegister,
ScorePenalty,
TopicStrToLabel,
ToSendGroupCount
type TopicStrToLabel,
type ToSendGroupCount
} from './metrics.js'
import {
MsgIdFn,
PublishConfig,
TopicStr,
MsgIdStr,
type MsgIdFn,
type PublishConfig,
type TopicStr,
type MsgIdStr,
ValidateError,
PeerIdStr,
type PeerIdStr,
MessageStatus,
RejectReason,
RejectReasonObj,
FastMsgIdFn,
AddrInfo,
DataTransform,
type RejectReasonObj,
type FastMsgIdFn,
type AddrInfo,
type DataTransform,
rejectReasonFromAcceptance,
MsgIdToStrFn,
MessageId,
PublishOpts
type MsgIdToStrFn,
type MessageId,
type PublishOpts
} from './types.js'
import { buildRawMessage, validateToRawMessage } from './utils/buildRawMessage.js'
import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
import { computeAllPeersScoreWeights } from './score/scoreMetrics.js'
import { getPublishConfigFromPeerId } from './utils/publishConfig.js'
import type { GossipsubOptsSpec } from './config.js'
import {
import type {
Message,
PublishResult,
PubSub,
PubSubEvents,
PubSubInit,
StrictNoSign,
StrictSign,
SubscriptionChangeData,
TopicValidatorFn,
TopicValidatorResult
} from '@libp2p/interface-pubsub'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
TopicValidatorFn
} from '@libp2p/interface/pubsub'
import { StrictSign, StrictNoSign, TopicValidatorResult } from '@libp2p/interface/pubsub'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
import { pushable } from 'it-pushable'
import { InboundStream, OutboundStream } from './stream.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { Peer, PeerStore } from '@libp2p/interface-peer-store'
import { Multiaddr } from '@multiformats/multiaddr'
import type { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, type DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Peer, PeerStore } from '@libp2p/interface/peer-store'
import type { Multiaddr } from '@multiformats/multiaddr'
import { multiaddrToIPStr } from './utils/multiaddr.js'

type ConnectionDirection = 'inbound' | 'outbound'
Expand Down Expand Up @@ -589,10 +586,10 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = createTopology({
const topology = {
onConnect: this.onPeerConnected.bind(this),
onDisconnect: this.onPeerDisconnected.bind(this)
})
}
const registrarTopologyIds = await Promise.all(
this.multicodecs.map((multicodec) => registrar.register(multicodec, topology))
)
Expand Down Expand Up @@ -698,7 +695,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

const peerId = connection.remotePeer
// add peer to router
this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
this.addPeer(peerId, connection.direction, connection.remoteAddr)
// create inbound stream
this.createInboundStream(peerId, stream)
// attempt to create outbound stream
Expand All @@ -709,14 +706,14 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
* Registrar notifies an established connection with pubsub protocol
*/
private onPeerConnected(peerId: PeerId, connection: Connection): void {
this.metrics?.newConnectionCount.inc({ status: connection.stat.status })
this.metrics?.newConnectionCount.inc({ status: connection.status })
// libp2p may emit a closed connection and never issue peer:disconnect event
// see https://github.com/ChainSafe/js-libp2p-gossipsub/issues/398
if (!this.isStarted() || connection.stat.status !== 'OPEN') {
if (!this.isStarted() || connection.status !== 'open') {
return
}

this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
this.addPeer(peerId, connection.direction, connection.remoteAddr)
this.outboundInflightQueue.push({ peerId, connection })
}

Expand Down Expand Up @@ -1733,7 +1730,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
const connection = await this.components.connectionManager.openConnection(peerId)
for (const multicodec of this.multicodecs) {
for (const topology of this.components.registrar.getTopologies(multicodec)) {
topology.onConnect(peerId, connection)
topology.onConnect?.(peerId, connection)
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/metrics.ts
@@ -1,7 +1,14 @@
import { TopicValidatorResult } from '@libp2p/interface-pubsub'
import type { TopicValidatorResult } from '@libp2p/interface/pubsub'
import type { IRPC } from './message/rpc.js'
import type { PeerScoreThresholds } from './score/peer-score-thresholds.js'
import { MessageStatus, PeerIdStr, RejectReason, RejectReasonObj, TopicStr, ValidateError } from './types.js'
import {
MessageStatus,
type PeerIdStr,
RejectReason,
type RejectReasonObj,
type TopicStr,
ValidateError
} from './types.js'

/** Topic label as provided in `topicStrToLabel` */
export type TopicLabel = string
Expand Down
2 changes: 1 addition & 1 deletion src/score/peer-score-params.ts
@@ -1,5 +1,5 @@
import { ERR_INVALID_PEER_SCORE_PARAMS } from './constants.js'
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'

// This file defines PeerScoreParams and TopicScoreParams interfaces
// as well as constructors, default constructors, and validation functions
Expand Down
2 changes: 1 addition & 1 deletion src/score/peer-score-thresholds.ts
@@ -1,5 +1,5 @@
import { ERR_INVALID_PEER_SCORE_THRESHOLDS } from './constants.js'
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'

// This file defines PeerScoreThresholds interface
// as well as a constructor, default constructor, and validation function
Expand Down
4 changes: 2 additions & 2 deletions src/score/peer-score.ts
@@ -1,9 +1,9 @@
import { PeerScoreParams, validatePeerScoreParams } from './peer-score-params.js'
import { type PeerScoreParams, validatePeerScoreParams } from './peer-score-params.js'
import type { PeerStats, TopicStats } from './peer-stats.js'
import { computeScore } from './compute-score.js'
import { MessageDeliveries, DeliveryRecordStatus } from './message-deliveries.js'
import { logger } from '@libp2p/logger'
import { MsgIdStr, PeerIdStr, RejectReason, TopicStr, IPStr } from '../types.js'
import { type MsgIdStr, type PeerIdStr, RejectReason, type TopicStr, type IPStr } from '../types.js'
import type { Metrics, ScorePenalty } from '../metrics.js'
import { MapDef } from '../utils/set.js'

Expand Down
8 changes: 4 additions & 4 deletions src/stream.ts
@@ -1,9 +1,9 @@
import { Stream } from '@libp2p/interface-connection'
import type { Stream } from '@libp2p/interface/connection'
import { abortableSource } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import { pushable, Pushable } from 'it-pushable'
import { pushable, type Pushable } from 'it-pushable'
import { encode, decode } from 'it-length-prefixed'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Uint8ArrayList } from 'uint8arraylist'

type OutboundStreamOpts = {
/** Max size in bytes for pushable buffer. If full, will throw on .push */
Expand Down Expand Up @@ -34,7 +34,7 @@ export class OutboundStream {

get protocol(): string {
// TODO remove this non-nullish assertion after https://github.com/libp2p/js-libp2p-interfaces/pull/265 is incorporated
return this.rawStream.stat.protocol!
return this.rawStream.protocol!
}

push(data: Uint8Array): void {
Expand Down
2 changes: 1 addition & 1 deletion src/tracer.ts
@@ -1,4 +1,4 @@
import { MsgIdStr, MsgIdToStrFn, PeerIdStr, RejectReason } from './types.js'
import { type MsgIdStr, type MsgIdToStrFn, type PeerIdStr, RejectReason } from './types.js'
import type { Metrics } from './metrics.js'

/**
Expand Down
6 changes: 3 additions & 3 deletions src/types.ts
@@ -1,8 +1,8 @@
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PrivateKey } from '@libp2p/interface-keys'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PrivateKey } from '@libp2p/interface/keys'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { RPC } from './message/rpc.js'
import { Message, TopicValidatorResult } from '@libp2p/interface-pubsub'
import { type Message, TopicValidatorResult } from '@libp2p/interface/pubsub'

export type MsgIdStr = string
export type PeerIdStr = string
Expand Down
8 changes: 4 additions & 4 deletions src/utils/buildRawMessage.ts
Expand Up @@ -3,12 +3,12 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { marshalPublicKey, unmarshalPublicKey } from '@libp2p/crypto/keys'
import { randomBytes } from '@libp2p/crypto'
import { peerIdFromBytes } from '@libp2p/peer-id'
import type { PublicKey } from '@libp2p/interface-keys'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PublicKey } from '@libp2p/interface/keys'
import type { PeerId } from '@libp2p/interface/peer-id'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { RPC } from '../message/rpc.js'
import { PublishConfig, PublishConfigType, TopicStr, ValidateError } from '../types.js'
import { StrictSign, StrictNoSign, Message } from '@libp2p/interface-pubsub'
import { type PublishConfig, PublishConfigType, type TopicStr, ValidateError } from '../types.js'
import { StrictSign, StrictNoSign, type Message } from '@libp2p/interface/pubsub'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'

export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:')
Expand Down
2 changes: 1 addition & 1 deletion src/utils/msgIdFn.ts
@@ -1,5 +1,5 @@
import { sha256 } from 'multiformats/hashes/sha2'
import type { Message } from '@libp2p/interface-pubsub'
import type { Message } from '@libp2p/interface/pubsub'
import { msgId } from '@libp2p/pubsub/utils'

/**
Expand Down
2 changes: 1 addition & 1 deletion src/utils/multiaddr.ts
@@ -1,4 +1,4 @@
import { Multiaddr } from '@multiformats/multiaddr'
import type { Multiaddr } from '@multiformats/multiaddr'
import { convertToString } from '@multiformats/multiaddr/convert'

// Protocols https://github.com/multiformats/multiaddr/blob/master/protocols.csv
Expand Down
6 changes: 3 additions & 3 deletions src/utils/publishConfig.ts
@@ -1,7 +1,7 @@
import { unmarshalPrivateKey } from '@libp2p/crypto/keys'
import { StrictSign, StrictNoSign } from '@libp2p/interface-pubsub'
import type { PeerId } from '@libp2p/interface-peer-id'
import { PublishConfig, PublishConfigType } from '../types.js'
import { StrictSign, StrictNoSign } from '@libp2p/interface/pubsub'
import type { PeerId } from '@libp2p/interface/peer-id'
import { type PublishConfig, PublishConfigType } from '../types.js'

/**
* Prepare a PublishConfig object from a PeerId.
Expand Down

0 comments on commit 3153ebf

Please sign in to comment.