Skip to content

Commit

Permalink
feat: rate limit msgs/events, send command results
Browse files Browse the repository at this point in the history
  • Loading branch information
cameri committed Nov 16, 2022
1 parent a46fcc6 commit ff9b87f
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/@types/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export type Range<F extends number, T extends number> = Exclude<
Enumerate<F>
>

export type Factory<TOutput = any, TInput = any> = (input: TInput) => TOutput
export type Factory<TOutput = any, TInput = void> = (input: TInput) => TOutput

export type DatabaseClient = Knex

Expand Down
11 changes: 10 additions & 1 deletion src/@types/messages.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { EventId, Range } from './base'
import { SubscriptionFilter, SubscriptionId } from './subscription'
import { Event } from './event'
import { Range } from './base'

export enum MessageType {
REQ = 'REQ',
EVENT = 'EVENT',
CLOSE = 'CLOSE',
NOTICE = 'NOTICE',
EOSE = 'EOSE',
OK = 'OK'
}

export type IncomingMessage =
Expand All @@ -20,6 +21,7 @@ export type OutgoingMessage =
| OutgoingEventMessage
| EndOfStoredEventsNotice
| NoticeMessage
| CommandResult

export type SubscribeMessage = {
[index in Range<2, 100>]: SubscriptionFilter
Expand Down Expand Up @@ -51,6 +53,13 @@ export interface NoticeMessage {
1: string
}

export interface CommandResult {
0: MessageType.OK
1: EventId
2: boolean
3: string
}

export interface EndOfStoredEventsNotice {
0: MessageType.EOSE
1: SubscriptionId
Expand Down
47 changes: 44 additions & 3 deletions src/adapters/web-socket-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { attemptValidation } from '../utils/validation'
import { createLogger } from '../factories/logger-factory'
import { Event } from '../@types/event'
import { Factory } from '../@types/base'
import { IRateLimiter } from '../@types/utils'
import { ISettings } from '../@types/settings'
import { isEventMatchingFilter } from '../utils/event'
import { messageSchema } from '../schemas/message-schema'

Expand All @@ -21,7 +23,7 @@ const debugHeartbeat = debug.extend('heartbeat')

export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
public clientId: string
// private clientAddress: string
private clientAddress: string
private alive: boolean
private subscriptions: Map<SubscriptionId, SubscriptionFilter[]>

Expand All @@ -30,13 +32,17 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
private readonly request: IncomingHttpMessage,
private readonly webSocketServer: IWebSocketServerAdapter,
private readonly createMessageHandler: Factory<IMessageHandler, [IncomingMessage, IWebSocketAdapter]>,
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
private readonly settingsFactory: Factory<ISettings>,
) {
super()
this.alive = true
this.subscriptions = new Map()

this.clientId = Buffer.from(this.request.headers['sec-websocket-key'], 'base64').toString('hex')
// this.clientAddress = this.request.headers['x-forwarded-for'] as string
this.clientAddress = (this.request.headers['x-forwarded-for'] ?? this.request.socket.remoteAddress) as string

debug('client %s from address %s', this.clientId, this.clientAddress)

this.client
.on('message', this.onClientMessage.bind(this))
Expand Down Expand Up @@ -120,10 +126,15 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
private async onClientMessage(raw: Buffer) {
let abort: () => void
try {
if (await this.isRateLimited(this.clientAddress)) {
this.sendMessage(createNoticeMessage('rate limited'))
return
}

const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf8')))

const messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable
if (typeof messageHandler.abort === 'function') {
if (typeof messageHandler?.abort === 'function') {
abort = messageHandler.abort.bind(messageHandler)
this.client.prependOnceListener('close', abort)
}
Expand All @@ -145,6 +156,36 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter
}
}

private async isRateLimited(client: string): Promise<boolean> {
const {
rateLimits,
ipWhitelist = [],
} = this.settingsFactory().limits?.message ?? {}

if (ipWhitelist.includes(client)) {
debug('rate limit check %s: skipped', client)
return false
}

const rateLimiter = this.slidingWindowRateLimiter()

const hit = (period: number, rate: number) =>
rateLimiter.hit(
`${client}:message:${period}`,
1,
{ period: period, rate: rate },
)

const hits = await Promise.all(
rateLimits
.map(({ period, rate }) => hit(period, rate))
)

debug('rate limit check %s: %o = %o', client, rateLimits.map(({ period }) => period), hits)

return hits.some((thresholdCrossed) => thresholdCrossed)
}

private onClientPong() {
debugHeartbeat('client %s pong', this.clientId)
this.alive = true
Expand Down
6 changes: 5 additions & 1 deletion src/factories/websocket-adapter-factory.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { IncomingMessage } from 'http'
import { WebSocket } from 'ws'

import { createSettings } from './settings-factory'
import { IEventRepository } from '../@types/repositories'
import { IWebSocketServerAdapter } from '../@types/adapters'
import { messageHandlerFactory } from './message-handler-factory'
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
import { WebSocketAdapter } from '../adapters/web-socket-adapter'


Expand All @@ -14,5 +16,7 @@ export const webSocketAdapterFactory = (
client,
request,
webSocketServerAdapter,
messageHandlerFactory(eventRepository)
messageHandlerFactory(eventRepository),
slidingWindowRateLimiterFactory,
createSettings,
)
36 changes: 21 additions & 15 deletions src/handlers/delegated-event-message-handler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { mergeDeepLeft } from 'ramda'

import { DelegatedEvent, Event } from '../@types/event'
import { EventDelegatorMetadataKey, EventTags } from '../constants/base'
import { createCommandResult } from '../utils/messages'
import { createLogger } from '../factories/logger-factory'
import { createNoticeMessage } from '../utils/messages'
import { DelegatedEvent } from '../@types/event'
import { EventMessageHandler } from './event-message-handler'
import { IMessageHandler } from '../@types/message-handlers'
import { IncomingEventMessage } from '../@types/messages'
Expand All @@ -14,50 +12,58 @@ const debug = createLogger('delegated-event-message-handler')

export class DelegatedEventMessageHandler extends EventMessageHandler implements IMessageHandler {
public async handleMessage(message: IncomingEventMessage): Promise<void> {
debug('received message: %o', message)
const [, event] = message

let reason = this.canAcceptEvent(event)
let reason = await this.isEventValid(event)
if (reason) {
debug('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
return
}

if (await this.isRateLimited(event)) {
debug('event %s rejected: rate-limited')
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'rate-limited: slow down'))
return
}

reason = await this.isEventValid(event)
reason = this.canAcceptEvent(event)
if (reason) {
debug('event %s rejected: %s', event.id, reason)
this.webSocket.emit(WebSocketAdapterEvent.Message, createNoticeMessage(`Event rejected: ${reason}`))
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, reason))
return
}

const [, delegator] = event.tags.find((tag) => tag.length === 4 && tag[0] === EventTags.Delegation)
const delegatedEvent: DelegatedEvent = mergeDeepLeft(
event,
{
const delegatedEvent: DelegatedEvent = {
...event,
[EventDelegatorMetadataKey]: delegator,
}
)
}

const strategy = this.strategyFactory([delegatedEvent, this.webSocket])

if (typeof strategy?.execute !== 'function') {
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: event not supported'))
return
}

try {
await strategy.execute(delegatedEvent)
} catch (error) {
debug('error handling message %o: %o', message, error)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
}
}

protected async isEventValid(event: Event): Promise<string | undefined> {
protected async isEventValid(event: DelegatedEvent): Promise<string | undefined> {
const reason = await super.isEventValid(event)
if (reason) {
return reason
}

if (!await isDelegatedEventValid(event)) {
return `Event with id ${event.id} from ${event.pubkey} is invalid delegated event`
return 'invalid: delegation verification failed'
}
}
}

0 comments on commit ff9b87f

Please sign in to comment.