Skip to content

Commit

Permalink
Add exportEvents function (#408)
Browse files Browse the repository at this point in the history
* export events via vm upgrade

* cleaner exportEvents upgrade, add RetryError

* add basic tests

* test more things, fix buffer length issue

* fix type

* add missing vm method

* add plugin scaffold to imports

* Use JSDoc style for tooltips and fix onEvent typing

* stringClamp

* remove dead code

* add consts

* log locally

* better log

* it's a hub now

* less events in benchmark to hopefully deflake a test

* fix type bug

* fix awkward bug

* add statsd for export event jobs

* typefix and rename

* fix ! in test

* flush on teardown

* config as a string, as it should

Co-authored-by: Michael Matloka <dev@twixes.com>
  • Loading branch information
mariusandra and Twixes committed May 27, 2021
1 parent a465836 commit a00dd0a
Show file tree
Hide file tree
Showing 12 changed files with 547 additions and 56 deletions.
4 changes: 2 additions & 2 deletions benchmarks/vm/memory.benchmark.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PluginEvent } from '@posthog/plugin-scaffold/src/types'

import { Plugin, PluginConfig, PluginConfigVMReponse } from '../../src/types'
import { Plugin, PluginConfig, PluginConfigVMResponse } from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
import { createPluginConfigVM } from '../../src/worker/vm/vm'
import { commonOrganizationId } from '../../tests/helpers/plugins'
Expand Down Expand Up @@ -77,7 +77,7 @@ test('test vm memory usage', async () => {
const usedAtStart = getUsed()

let used = usedAtStart
const vms: PluginConfigVMReponse[] = []
const vms: PluginConfigVMResponse[] = []

for (let i = 0; i < numVMs; i++) {
const vm = await createPluginConfigVM(hub, mockConfig, indexJs)
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"@maxmind/geoip2-node": "^2.3.1",
"@posthog/clickhouse": "^1.7.0",
"@posthog/piscina": "^2.2.0-posthog",
"@posthog/plugin-contrib": "^0.0.3",
"@posthog/plugin-contrib": "^0.0.5",
"@sentry/node": "^5.29.0",
"@sentry/tracing": "^5.29.0",
"@types/lru-cache": "^5.1.0",
Expand Down Expand Up @@ -83,7 +83,7 @@
},
"devDependencies": {
"@babel/cli": "^7.13.0",
"@posthog/plugin-scaffold": "0.5.0",
"@posthog/plugin-scaffold": "0.10.0",
"@types/adm-zip": "^0.4.33",
"@types/babel__standalone": "^7.1.3",
"@types/generic-pool": "^3.1.9",
Expand Down
31 changes: 20 additions & 11 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import ClickHouse from '@posthog/clickhouse'
import { PluginAttachment, PluginConfigSchema, PluginEvent, Properties } from '@posthog/plugin-scaffold'
import { Meta, PluginAttachment, PluginConfigSchema, PluginEvent, Properties } from '@posthog/plugin-scaffold'
import { Pool as GenericPool } from 'generic-pool'
import { StatsD } from 'hot-shots'
import { Redis } from 'ioredis'
Expand Down Expand Up @@ -293,18 +293,27 @@ export type WorkerMethods = {
ingestEvent: (event: PluginEvent) => Promise<IngestEventResponse>
}

export interface PluginConfigVMReponse {
export type VMMethods = {
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
onEvent?: (event: PluginEvent) => Promise<void>
onSnapshot?: (event: PluginEvent) => Promise<void>
exportEvents?: (events: PluginEvent[]) => Promise<void>
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
// DEPRECATED
processEventBatch?: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}

export interface PluginConfigVMResponse {
vm: VM
methods: {
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
onEvent?: (event: PluginEvent) => Promise<void>
onSnapshot?: (event: PluginEvent) => Promise<void>
processEvent?: (event: PluginEvent) => Promise<PluginEvent>
// DEPRECATED
processEventBatch?: (batch: PluginEvent[]) => Promise<PluginEvent[]>
}
methods: VMMethods
tasks: Record<PluginTaskType, Record<string, PluginTask>>
}

export interface PluginConfigVMInternalResponse<M extends Meta = Meta> {
methods: VMMethods
tasks: Record<PluginTaskType, Record<string, PluginTask>>
meta: M
}

export interface EventUsage {
Expand Down
9 changes: 9 additions & 0 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,12 @@ export function filterIncrementProperties(incrementProperties: unknown): Record<

return filteredIncrementProperties
}

export function clamp(value: number, min: number, max: number): number {
return value > max ? max : value < min ? min : value
}

export function stringClamp(value: string, def: number, min: number, max: number): number {
const nanToNull = (nr: number): null | number => (isNaN(nr) ? null : nr)
return clamp(nanToNull(parseInt(value)) ?? def, min, max)
}
2 changes: 1 addition & 1 deletion src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function runProcessEvent(server: Hub, event: PluginEvent): Promise<

try {
returnedEvent = (await processEvent(returnedEvent)) || null
if (returnedEvent.team_id !== teamId) {
if (returnedEvent && returnedEvent.team_id !== teamId) {
returnedEvent.team_id = teamId
throw new IllegalOperationError('Plugin tried to change event.team_id')
}
Expand Down
2 changes: 2 additions & 0 deletions src/worker/vm/imports.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BigQuery } from '@google-cloud/bigquery'
import * as contrib from '@posthog/plugin-contrib'
import * as scaffold from '@posthog/plugin-scaffold'
import * as AWS from 'aws-sdk'
import crypto from 'crypto'
import * as genericPool from 'generic-pool'
Expand All @@ -17,6 +18,7 @@ export const imports = {
'node-fetch': fetch,
'snowflake-sdk': snowflake,
'@google-cloud/bigquery': { BigQuery },
'@posthog/plugin-scaffold': scaffold,
'@posthog/plugin-contrib': contrib,
'aws-sdk': AWS,
pg: pg,
Expand Down
20 changes: 12 additions & 8 deletions src/worker/vm/lazy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
Hub,
PluginCapabilities,
PluginConfig,
PluginConfigVMReponse,
PluginConfigVMResponse,
PluginLogEntrySource,
PluginLogEntryType,
PluginTask,
Expand All @@ -18,7 +18,7 @@ import { createPluginConfigVM } from './vm'
export class LazyPluginVM {
initialize?: (hub: Hub, pluginConfig: PluginConfig, indexJs: string, logInfo: string) => Promise<void>
failInitialization?: () => void
resolveInternalVm: Promise<PluginConfigVMReponse | null>
resolveInternalVm: Promise<PluginConfigVMResponse | null>

constructor() {
this.resolveInternalVm = new Promise((resolve) => {
Expand Down Expand Up @@ -56,23 +56,27 @@ export class LazyPluginVM {
})
}

async getOnEvent(): Promise<PluginConfigVMReponse['methods']['onEvent'] | null> {
async getExportEvents(): Promise<PluginConfigVMResponse['methods']['exportEvents'] | null> {
return (await this.resolveInternalVm)?.methods.exportEvents || null
}

async getOnEvent(): Promise<PluginConfigVMResponse['methods']['onEvent'] | null> {
return (await this.resolveInternalVm)?.methods.onEvent || null
}

async getOnSnapshot(): Promise<PluginConfigVMReponse['methods']['onSnapshot'] | null> {
async getOnSnapshot(): Promise<PluginConfigVMResponse['methods']['onSnapshot'] | null> {
return (await this.resolveInternalVm)?.methods.onSnapshot || null
}

async getProcessEvent(): Promise<PluginConfigVMReponse['methods']['processEvent'] | null> {
async getProcessEvent(): Promise<PluginConfigVMResponse['methods']['processEvent'] | null> {
return (await this.resolveInternalVm)?.methods.processEvent || null
}

async getProcessEventBatch(): Promise<PluginConfigVMReponse['methods']['processEventBatch'] | null> {
async getProcessEventBatch(): Promise<PluginConfigVMResponse['methods']['processEventBatch'] | null> {
return (await this.resolveInternalVm)?.methods.processEventBatch || null
}

async getTeardownPlugin(): Promise<PluginConfigVMReponse['methods']['teardownPlugin'] | null> {
async getTeardownPlugin(): Promise<PluginConfigVMResponse['methods']['teardownPlugin'] | null> {
return (await this.resolveInternalVm)?.methods.teardownPlugin || null
}

Expand All @@ -87,7 +91,7 @@ export class LazyPluginVM {
private async inferPluginCapabilities(
hub: Hub,
pluginConfig: PluginConfig,
vm: PluginConfigVMReponse
vm: PluginConfigVMResponse
): Promise<void> {
if (!pluginConfig.plugin) {
throw new Error(`'PluginConfig missing plugin: ${pluginConfig}`)
Expand Down
152 changes: 152 additions & 0 deletions src/worker/vm/upgrades/export-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { createBuffer } from '@posthog/plugin-contrib'
import { Plugin, PluginEvent, PluginMeta, RetryError } from '@posthog/plugin-scaffold'

import { Hub, PluginConfig, PluginConfigVMInternalResponse, PluginTaskType } from '../../../types'
import { status } from '../../../utils/status'
import { stringClamp } from '../../../utils/utils'

const MAXIMUM_RETRIES = 15
const EXPORT_BUFFER_BYTES_MINIMUM = 1
const EXPORT_BUFFER_BYTES_DEFAULT = 1024 * 1024
const EXPORT_BUFFER_BYTES_MAXIMUM = 100 * 1024 * 1024
const EXPORT_BUFFER_SECONDS_MINIMUM = 1
const EXPORT_BUFFER_SECONDS_DEFAULT = 10
const EXPORT_BUFFER_SECONDS_MAXIMUM = 600

type ExportEventsUpgrade = Plugin<{
global: {
exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsToIgnore: Set<string>
exportEventsWithRetry: (payload: ExportEventsJobPayload, meta: PluginMeta<ExportEventsUpgrade>) => Promise<void>
}
config: {
exportEventsBufferBytes: string
exportEventsBufferSeconds: string
exportEventsToIgnore: string
}
jobs: {
exportEventsWithRetry: ExportEventsJobPayload
}
}>

interface ExportEventsJobPayload extends Record<string, any> {
batch: PluginEvent[]
batchId: number
retriesPerformedSoFar: number
}

/**
* Inject export abstraction code into plugin VM if it has method `exportEvents`:
* - add `global`/`config`/`jobs` stuff specified in the `ExportEventsUpgrade` type above,
* - patch `onEvent` with code to add the event to a buffer.
*/
export function upgradeExportEvents(
hub: Hub,
pluginConfig: PluginConfig,
response: PluginConfigVMInternalResponse<PluginMeta<ExportEventsUpgrade>>
): void {
const { methods, tasks, meta } = response

if (!methods.exportEvents) {
return
}

const uploadBytes = stringClamp(
meta.config.exportEventsBufferBytes,
EXPORT_BUFFER_BYTES_DEFAULT,
EXPORT_BUFFER_BYTES_MINIMUM,
EXPORT_BUFFER_BYTES_MAXIMUM
)
const uploadSeconds = stringClamp(
meta.config.exportEventsBufferSeconds,
EXPORT_BUFFER_SECONDS_DEFAULT,
EXPORT_BUFFER_SECONDS_MINIMUM,
EXPORT_BUFFER_SECONDS_MAXIMUM
)

meta.global.exportEventsToIgnore = new Set(
meta.config.exportEventsToIgnore
? meta.config.exportEventsToIgnore.split(',').map((event: string) => event.trim())
: null
)

meta.global.exportEventsBuffer = createBuffer({
limit: uploadBytes,
timeoutSeconds: uploadSeconds,
onFlush: async (batch) => {
const jobPayload = {
batch,
batchId: Math.floor(Math.random() * 1000000),
retriesPerformedSoFar: 0,
}
// Running the first export code directly, without a job in between
await meta.global.exportEventsWithRetry(jobPayload, meta)
},
})

meta.global.exportEventsWithRetry = async (
payload: ExportEventsJobPayload,
meta: PluginMeta<ExportEventsUpgrade>
) => {
const start = new Date()
try {
await methods.exportEvents?.(payload.batch)
hub.statsd?.timing('plugin.export_events.success', start, {
plugin: pluginConfig.plugin?.name ?? '?',
teamId: pluginConfig.team_id.toString(),
})
} catch (err) {
if (err instanceof RetryError) {
if (payload.retriesPerformedSoFar < MAXIMUM_RETRIES) {
const nextRetrySeconds = 2 ** (payload.retriesPerformedSoFar + 1) * 3
await meta.jobs
.exportEventsWithRetry({ ...payload, retriesPerformedSoFar: payload.retriesPerformedSoFar + 1 })
.runIn(nextRetrySeconds, 'seconds')

status.info(
'馃殐',
`Enqueued PluginConfig ${pluginConfig.id} batch ${payload.batchId} for retry #${
payload.retriesPerformedSoFar + 1
} in ${Math.round(nextRetrySeconds)}s`
)
hub.statsd?.increment('plugin.export_events.retry_enqueued', {
retry: `${payload.retriesPerformedSoFar + 1}`,
plugin: pluginConfig.plugin?.name ?? '?',
teamId: pluginConfig.team_id.toString(),
})
} else {
status.info(
'鈽狅笍',
`Dropped PluginConfig ${pluginConfig.id} batch ${payload.batchId} after retrying ${payload.retriesPerformedSoFar} times`
)
hub.statsd?.increment('plugin.export_events.retry_dropped', {
retry: `${payload.retriesPerformedSoFar}`,
plugin: pluginConfig.plugin?.name ?? '?',
teamId: pluginConfig.team_id.toString(),
})
}
} else {
throw err
}
}
}

tasks.job['exportEventsWithRetry'] = {
name: 'exportEventsWithRetry',
type: PluginTaskType.Job,
exec: (payload) => meta.global.exportEventsWithRetry(payload as ExportEventsJobPayload, meta),
}

const oldOnEvent = methods.onEvent
methods.onEvent = async (event: PluginEvent) => {
if (!meta.global.exportEventsToIgnore.has(event.event)) {
meta.global.exportEventsBuffer.add(event, JSON.stringify(event).length)
}
await oldOnEvent?.(event)
}

const oldTeardownPlugin = methods.teardownPlugin
methods.teardownPlugin = async () => {
await Promise.all([meta.global.exportEventsBuffer.flush(), oldTeardownPlugin?.()])
}
}
Loading

0 comments on commit a00dd0a

Please sign in to comment.