Skip to content

Commit

Permalink
feat: limit rpc fields using protons api
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Feb 9, 2024
1 parent a82b44f commit 79fb5d7
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 370 deletions.
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"multiformats": "^13.0.1",
"protons-runtime": "^5.3.0",
"protons-runtime": "5.4.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.1"
},
Expand All @@ -105,7 +105,7 @@
"p-retry": "^6.2.0",
"p-wait-for": "^5.0.2",
"sinon": "^17.0.1",
"protons": "^7.4.0",
"protons": "^7.5.0",
"time-cache": "^0.3.0",
"ts-sinon": "^2.0.2"
},
Expand Down
19 changes: 17 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE,
BACKOFF_SLACK
} from './constants.js'
import { decodeRpc, type DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { type DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { RPC } from './message/rpc.js'
import { MessageCache, type MessageCacheRecord } from './message-cache.js'
import {
Expand Down Expand Up @@ -955,7 +955,22 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const rpcBytes = data.subarray()
// Note: This function may throw, it must be wrapped in a try {} catch {} to prevent closing the stream.
// TODO: What should we do if the entire RPC is invalid?
const rpc = decodeRpc(rpcBytes, this.decodeRpcLimits)
// const rpc = decodeRpc(rpcBytes, this.decodeRpcLimits)
const rpc = RPC.decode(rpcBytes, {
limits: {
subscriptions: this.decodeRpcLimits.maxSubscriptions,
messages: this.decodeRpcLimits.maxMessages,
control$: {
ihave: this.decodeRpcLimits.maxIhaveMessageIDs,
iwant: this.decodeRpcLimits.maxIwantMessageIDs,
graft: this.decodeRpcLimits.maxControlMessages,
prune: this.decodeRpcLimits.maxControlMessages,
prune$: {
peers: this.decodeRpcLimits.maxPeerInfos
}
}
}
})

this.metrics?.onRpcRecv(rpc, rpcBytes.length)

Expand Down
207 changes: 0 additions & 207 deletions src/message/decodeRpc.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import { reader as r, type Reader } from 'protons-runtime'
import { RPC } from './rpc.js'

export interface DecodeRPCLimits {
maxSubscriptions: number
maxMessages: number
Expand All @@ -18,207 +15,3 @@ export const defaultDecodeRpcLimits: DecodeRPCLimits = {
maxControlMessages: Infinity,
maxPeerInfos: Infinity
}

/**
* Copied code from src/message/rpc.ts but with decode limits to prevent OOM attacks
*/
export function decodeRpc (bytes: Uint8Array, opts: DecodeRPCLimits): RPC {
// Mutate to use the option as stateful counter. Must limit the total count of messageIDs across all IWANT, IHAVE
// else one count put 100 messageIDs into each 100 IWANT and "get around" the limit
opts = { ...opts }

const reader = r(bytes)
const obj: any = {
subscriptions: [],
messages: []
}

const end = reader.len

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
if (obj.subscriptions.length < opts.maxSubscriptions) {
obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32()))
} else {
reader.skipType(tag & 7)
}
break
case 2:
if (obj.messages.length < opts.maxMessages) {
obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32()))
} else {
reader.skipType(tag & 7)
}
break
case 3:
obj.control = decodeControlMessage(reader, reader.uint32(), opts)
break
default:
reader.skipType(tag & 7)
break
}
}
return obj
}

function decodeControlMessage (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlMessage {
const obj: any = {
ihave: [],
iwant: [],
graft: [],
prune: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
if (obj.ihave.length < opts.maxControlMessages) {
obj.ihave.push(decodeControlIHave(reader, reader.uint32(), opts))
} else {
reader.skipType(tag & 7)
}
break
case 2:
if (obj.iwant.length < opts.maxControlMessages) {
obj.iwant.push(decodeControlIWant(reader, reader.uint32(), opts))
} else {
reader.skipType(tag & 7)
}
break
case 3:
if (obj.graft.length < opts.maxControlMessages) {
obj.graft.push(decodeControlGraft(reader, reader.uint32()))
} else {
reader.skipType(tag & 7)
}
break
case 4:
if (obj.prune.length < opts.maxControlMessages) {
obj.prune.push(decodeControlPrune(reader, reader.uint32(), opts))
} else {
reader.skipType(tag & 7)
}
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
}

function decodeControlIHave (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlIHave {
const obj: any = {
messageIDs: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.topicID = reader.string()
break
case 2:
if (opts.maxIhaveMessageIDs-- > 0) obj.messageIDs.push(reader.bytes())
else reader.skipType(tag & 7)
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
}

function decodeControlIWant (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlIWant {
const obj: any = {
messageIDs: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
if (opts.maxIwantMessageIDs-- > 0) {
obj.messageIDs.push(reader.bytes())
} else {
reader.skipType(tag & 7)
}
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
}

function decodeControlGraft (reader: Reader, length: number): RPC.ControlGraft {
const obj: any = {}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.topicID = reader.string()
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
}

function decodeControlPrune (reader: Reader, length: number, opts: DecodeRPCLimits): RPC.ControlPrune {
const obj: any = {
peers: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.topicID = reader.string()
break
case 2:
if (opts.maxPeerInfos-- > 0) {
obj.peers.push(RPC.PeerInfo.codec().decode(reader, reader.uint32()))
} else {
reader.skipType(tag & 7)
}
break
case 3:
obj.backoff = reader.uint64()
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
}
Loading

0 comments on commit 79fb5d7

Please sign in to comment.