Skip to content

Commit

Permalink
fix: rebase errors and lint
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Jan 31, 2024
1 parent 8f4590a commit 31e0ddc
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 84 deletions.
389 changes: 354 additions & 35 deletions package-lock.json

Large diffs are not rendered by default.

45 changes: 19 additions & 26 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
type PublishOpts
} from './types.js'
import { buildRawMessage, validateToRawMessage } from './utils/buildRawMessage.js'
import { createGossipRpc, ensureControl } from './utils/create-gossip-rpc.js'
import { shuffle, messageIdToString } from './utils/index.js'
import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
import { multiaddrToIPStr } from './utils/multiaddr.js'
Expand Down Expand Up @@ -1053,13 +1054,12 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

// Handle messages
// TODO: (up to limit)
if (rpc.messages != null) {
for (const message of rpc.messages) {
if ((this.allowedTopics != null) && !this.allowedTopics.has(message.topic)) {
// Not allowed: message cache data-structures are not bounded by topic count
// TODO: Should apply behaviour penalties?
continue
}
for (const message of rpc.messages) {
if ((this.allowedTopics != null) && !this.allowedTopics.has(message.topic)) {
// Not allowed: message cache data-structures are not bounded by topic count
// TODO: Should apply behaviour penalties?
continue
}

const handleReceivedMessagePromise = this.handleReceivedMessage(from, message)
// Should never throw, but handle just in case
Expand Down Expand Up @@ -2111,7 +2111,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.publishedMessageIds.put(msgIdStr)

const batchPublish = opts?.batchPublish ?? this.opts.batchPublish
const rpc = { messages: [rawMsg] }
const rpc = createGossipRpc([rawMsg])
if (batchPublish) {
this.sendRpcInBatch(tosend, rpc)
} else {
Expand Down Expand Up @@ -2163,8 +2163,8 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
* This is not only faster but also avoid allocating memory for each peer
* see https://github.com/ChainSafe/js-libp2p-gossipsub/issues/344
*/
private sendRpcInBatch (tosend: Set<PeerIdStr>, rpc: IRPC): void {
const rpcBytes = RPC.encode(rpc).finish()
private sendRpcInBatch (tosend: Set<PeerIdStr>, rpc: RPC): void {
const rpcBytes = RPC.encode(rpc)
const prefixedData = encode.single(rpcBytes)
for (const id of tosend) {
const outboundStream = this.streamsOutbound.get(id)
Expand Down Expand Up @@ -2316,31 +2316,24 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

/** Mutates `outRpc` adding graft and prune control messages */
public piggybackControl (id: PeerIdStr, outRpc: RPC, ctrl: RPC.ControlMessage): void {
if (ctrl.graft != null) {
if (outRpc.control == null) outRpc.control = {}
if (outRpc.control.graft == null) outRpc.control.graft = []
for (const graft of ctrl.graft) {
if (graft.topicID != null && (this.mesh.get(graft.topicID)?.has(id) ?? false)) {
outRpc.control.graft.push(graft)
}
const rpc = ensureControl(outRpc)
for (const graft of ctrl.graft) {
if (graft.topicID != null && (this.mesh.get(graft.topicID)?.has(id) ?? false)) {
rpc.control.graft.push(graft)
}
}

if (ctrl.prune != null) {
if (outRpc.control == null) outRpc.control = {}
if (outRpc.control.prune == null) outRpc.control.prune = []
for (const prune of ctrl.prune) {
if (prune.topicID != null && !(this.mesh.get(prune.topicID)?.has(id) ?? false)) {
outRpc.control.prune.push(prune)
}
for (const prune of ctrl.prune) {
if (prune.topicID != null && !(this.mesh.get(prune.topicID)?.has(id) ?? false)) {
rpc.control.prune.push(prune)
}
}
}

/** Mutates `outRpc` adding ihave control messages */
private piggybackGossip (id: PeerIdStr, outRpc: RPC, ihave: RPC.ControlIHave[]): void {
if (outRpc.control == null) outRpc.control = {}
outRpc.control.ihave = ihave
const rpc = ensureControl(outRpc)
rpc.control.ihave = ihave
}

/**
Expand Down
12 changes: 6 additions & 6 deletions src/message/decodeRpc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RPC } from './rpc.js'
import { reader as r, type Reader } from 'protons-runtime'
import { RPC } from './rpc.js'

export interface DecodeRPCLimits {
maxSubscriptions: number
Expand Down Expand Up @@ -64,7 +64,7 @@ export function decodeRpc (bytes: Uint8Array, opts: DecodeRPCLimits): RPC {
return obj
}

function decodeControlMessage(reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlMessage {
function decodeControlMessage (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlMessage {
const obj: any = {
ihave: [],
iwant: [],
Expand Down Expand Up @@ -115,7 +115,7 @@ function decodeControlMessage(reader: Reader, length: number, opts: DecodeRPCLim
return obj
}

function decodeControlIHave(reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlIHave {
function decodeControlIHave (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlIHave {
const obj: any = {
messageIDs: []
}
Expand All @@ -142,7 +142,7 @@ function decodeControlIHave(reader: Reader, length: number, opts: DecodeRPCLimit
return obj
}

function decodeControlIWant(reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlIWant {
function decodeControlIWant (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlIWant {
const obj: any = {
messageIDs: []
}
Expand All @@ -169,7 +169,7 @@ function decodeControlIWant(reader: Reader, length: number, opts: DecodeRPCLimit
return obj
}

function decodeControlGraft(reader: Reader, length: number) {
function decodeControlGraft (reader: Reader, length: number) {
const obj: any = {}

const end = length == null ? reader.len : reader.pos + length
Expand All @@ -190,7 +190,7 @@ function decodeControlGraft(reader: Reader, length: number) {
return obj
}

function decodeControlPrune(reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlPrune {
function decodeControlPrune (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlPrune {
const obj: any = {
peers: []
}
Expand Down
3 changes: 1 addition & 2 deletions src/message/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */

import { encodeMessage, decodeMessage, message } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import { type Codec, decodeMessage, encodeMessage, message } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface RPC {
Expand Down
16 changes: 8 additions & 8 deletions src/utils/create-gossip-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ import type { RPC } from '../message/rpc.js'
/**
* Create a gossipsub RPC object
*/
export function createGossipRpc(messages: RPC.Message[] = [], control?: Partial<RPC.ControlMessage>): RPC {
export function createGossipRpc (messages: RPC.Message[] = [], control?: Partial<RPC.ControlMessage>): RPC {
return {
subscriptions: [],
messages,
control: control
control: control !== undefined
? {
graft: control.graft || [],
prune: control.prune || [],
ihave: control.ihave || [],
iwant: control.iwant || []
graft: control.graft ?? [],
prune: control.prune ?? [],
ihave: control.ihave ?? [],
iwant: control.iwant ?? []
}
: undefined
}
}

export function ensureControl(rpc: RPC): Required<RPC> {
if (!rpc.control) {
export function ensureControl (rpc: RPC): Required<RPC> {
if (rpc.control === undefined) {
rpc.control = {
graft: [],
prune: [],
Expand Down
2 changes: 1 addition & 1 deletion test/benchmark/protobuf.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe('protobuf', function () {
minMs: 60 * 1000
})

const testCases: { name: string; length: number }[] = [
const testCases: Array<{ name: string, length: number }> = [
// As of Oct 2023, Attestation length = 281
{ name: 'Attestation', length: 300 },
// A SignedBeaconBlock could be from 70_000 to 300_000
Expand Down
2 changes: 1 addition & 1 deletion test/decodeRpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ describe('decodeRpc', () => {
expect(endecode(rpc).control?.prune?.[1].peers).length(1, 'Wrong prune?.[1].peers len')
})

function endecode(rpc: RPC): RPC {
function endecode (rpc: RPC): RPC {
return decodeRpc(RPC.encode(rpc), decodeRpcLimits)
}
})
Expand Down
6 changes: 1 addition & 5 deletions test/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ export const createPeerId = async (): Promise<PeerId> => {
let seq = 0n
const defaultPeer = uint8ArrayFromString('12D3KooWBsYhazxNL7aeisdwttzc6DejNaM48889t5ifiS6tTrBf', 'base58btc')

<<<<<<< HEAD
export function makeTestMessage (i: number, topic: TopicStr, from?: PeerId): RPC.IMessage {
=======
export function makeTestMessage(i: number, topic: TopicStr, from?: PeerId): RPC.Message {
>>>>>>> b433f8e (feat: protons 7.2.1 protons-runtime 5.1.0)
export function makeTestMessage (i: number, topic: TopicStr, from?: PeerId): RPC.Message {
return {
seqno: uint8ArrayFromString((seq++).toString(16).padStart(16, '0'), 'base16'),
data: Uint8Array.from([i]),
Expand Down

0 comments on commit 31e0ddc

Please sign in to comment.