Skip to content
This repository has been archived by the owner on Mar 7, 2024. It is now read-only.

Add bq_ingested_timestamp #9

Merged
merged 9 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 108 additions & 40 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { createBuffer } from '@posthog/plugin-contrib'
import { Plugin, PluginMeta, PluginEvent } from '@posthog/plugin-scaffold'
import { BigQuery, Table } from '@google-cloud/bigquery'

class RetryError extends Error {}
import { Plugin, PluginMeta, PluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { BigQuery, Table, TableField, TableMetadata } from '@google-cloud/bigquery'

type BigQueryPlugin = Plugin<{
global: {
bigQueryClient: BigQuery
bigQueryTable: Table
bigQueryTableFields: TableField[]

exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsToIgnore: Set<string>
exportEventsWithRetry: (payload: UploadJobPayload, meta: PluginMeta<BigQueryPlugin>) => Promise<void>
deduplicateEvents: boolean
}
config: {
datasetId: string
Expand All @@ -20,6 +20,7 @@ type BigQueryPlugin = Plugin<{
exportEventsBufferBytes: string
exportEventsBufferSeconds: string
exportEventsToIgnore: string
deduplicateEvents: string
}
jobs: {
exportEventsWithRetry: UploadJobPayload
Expand All @@ -44,43 +45,87 @@ export const setupPlugin: BigQueryPlugin['setupPlugin'] = async (meta) => {
throw new Error('Table ID not provided!')
}

if (config.deduplicateEvents === 'Yes') {
global.deduplicateEvents = true
} else {
global.deduplicateEvents = false
}

const credentials = JSON.parse(attachments.googleCloudKeyJson.contents.toString())
global.bigQueryClient = new BigQuery({
projectId: credentials['project_id'],
credentials,
autoRetry: false,
})
global.bigQueryTable = global.bigQueryClient.dataset(config.datasetId).table(config.tableId)

global.bigQueryTableFields = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
{ name: 'bq_ingested_timestamp', type: 'TIMESTAMP' },
]

try {
// check if the table exists
await global.bigQueryTable.get()
const [metadata]: TableMetadata[] = await global.bigQueryTable.getMetadata()

if (!metadata.schema || !metadata.schema.fields) {
throw new Error("Can not get metadata for table. Please check if the table schema is defined.")
}

const existingFields = metadata.schema.fields
const fieldsToAdd = global.bigQueryTableFields.filter(
({ name }) => !existingFields.find((f: any) => f.name === name)
)

if (fieldsToAdd.length > 0) {
console.info(
`Incomplete schema on BigQuery table! Adding the following fields to reach parity: ${JSON.stringify(
fieldsToAdd
)}`
)

let result: TableMetadata
try {
metadata.schema.fields = metadata.schema.fields.concat(fieldsToAdd)
;[result] = await global.bigQueryTable.setMetadata(metadata)
} catch (error) {
const fieldsToStillAdd = global.bigQueryTableFields.filter(
({ name }) => !result.schema?.fields?.find((f: any) => f.name === name)
)

if (fieldsToStillAdd.length > 0) {
throw new Error(
`Tried adding fields ${JSON.stringify(fieldsToAdd)}, but ${JSON.stringify(
fieldsToStillAdd
)} still to add. Can not start plugin.`
)
}
}
}
} catch (error) {
// some other error? abort!
if (!error.message.includes('Not found')) {
throw new Error(error)
}
console.log(`Creating BigQuery Table - ${config.datasetId}:${config.tableId}`)

const schema = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
]

try {
await global.bigQueryClient.dataset(config.datasetId).createTable(config.tableId, { schema })
await global.bigQueryClient
.dataset(config.datasetId)
.createTable(config.tableId, { schema: global.bigQueryTableFields })
} catch (error) {
// a different worker already created the table
if (!error.message.includes('Already Exists')) {
throw new Error()
throw error
}
}
}
Expand All @@ -89,6 +134,12 @@ export const setupPlugin: BigQueryPlugin['setupPlugin'] = async (meta) => {
}

export async function exportEventsToBigQuery(events: PluginEvent[], { global }: PluginMeta<BigQueryPlugin>) {
const insertOptions = {
createInsertId: false,
partialRetries: 0,
raw: true,
}

if (!global.bigQueryTable) {
throw new Error('No BigQuery client initialized!')
}
Expand Down Expand Up @@ -119,24 +170,40 @@ export async function exportEventsToBigQuery(events: PluginEvent[], { global }:
elements = $elements
}

return {
uuid,
event: eventName,
properties: JSON.stringify(ingestedProperties || {}),
elements: JSON.stringify(elements || {}),
set: JSON.stringify($set || {}),
set_once: JSON.stringify($set_once || {}),
distinct_id,
team_id,
ip,
site_url,
timestamp: timestamp ? global.bigQueryClient.timestamp(timestamp) : null,
const object: {json: Record<string, any>, insertId?: string} = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this object if not using insertId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could remove the insertId from the type, but other than that, nope, can't do, because the sdk by default inserts a random insertId unless I'm using the raw version, which requires the object to look like this^

json: {
uuid,
event: eventName,
properties: JSON.stringify(ingestedProperties || {}),
elements: JSON.stringify(elements || {}),
set: JSON.stringify($set || {}),
set_once: JSON.stringify($set_once || {}),
distinct_id,
team_id,
ip,
site_url,
timestamp: timestamp,
bq_ingested_timestamp: new Date().toISOString(),
}
}

if (global.deduplicateEvents) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we've disabled BQ retrying, is this used for anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've disabled retrying via the bigquery library, not via the plugin-server, so yes, this does work as long as the retry doesn't happen more than a few minutes later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant why do we need to dedupe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, because exceeding timeouts via the async guard doesn't stop the request from succeeding (most times, depending on the case), so we're making two requests with the same data to bigquery, which leads to duplicates.

This helps resolve some of them. (https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now, makes sense!

object.insertId = uuid
}
return object
})
await global.bigQueryTable.insert(rows)
console.log(`Inserted ${events.length} ${events.length > 1 ? 'events' : 'event'} to BigQuery`)

const start = Date.now()
await global.bigQueryTable.insert(rows, insertOptions)
const end = Date.now() - start

console.log(`Inserted ${events.length} ${events.length > 1 ? 'events' : 'event'} to BigQuery. Took ${end/1000} seconds.`)

} catch (error) {
console.error(`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `, error)
console.error(
`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `,
error
)
throw new RetryError(`Error inserting into BigQuery! ${JSON.stringify(error.errors)}`)
}
}
Expand All @@ -147,7 +214,8 @@ const setupBufferExportCode = (
meta: PluginMeta<BigQueryPlugin>,
exportEvents: (events: PluginEvent[], meta: PluginMeta<BigQueryPlugin>) => Promise<void>
) => {
const uploadBytes = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferBytes) || 1024 * 1024, 100))

const uploadBytes = Math.max(1024*1024, Math.min(parseInt(meta.config.exportEventsBufferBytes) || 1024 * 1024, 1024*1024*10))
const uploadSeconds = Math.max(1, Math.min(parseInt(meta.config.exportEventsBufferSeconds) || 30, 600))

meta.global.exportEventsToIgnore = new Set(
Expand Down Expand Up @@ -199,12 +267,12 @@ const setupBufferExportCode = (

export const jobs: BigQueryPlugin['jobs'] = {
exportEventsWithRetry: async (payload, meta) => {
meta.global.exportEventsWithRetry(payload, meta)
await meta.global.exportEventsWithRetry(payload, meta)
},
}

export const onEvent: BigQueryPlugin['onEvent'] = (event, { global }) => {
if (!global.exportEventsToIgnore.has(event.event)) {
global.exportEventsBuffer.add(event)
global.exportEventsBuffer.add(event, JSON.stringify(event).length)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

}
}
10 changes: 9 additions & 1 deletion plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,22 @@
"name": "Maximum upload size in bytes",
"type": "string",
"default": "1048576",
"hint": "Default 1MB. Upload events after buffering this many of them. BigQuery has a 250KB limit per row, so events are still sent individually after the buffer is flushed. The value must be between 1 and 10 MB."
"hint": "Default 1MB. Upload events after buffering this many of them. The value must be between 1 MB and 10 MB."
},
{
"key": "exportEventsBufferSeconds",
"name": "Export events at least every X seconds",
"type": "string",
"default": "30",
"hint": "Default 30 seconds. If there are events to upload and this many seconds has passed since the last upload, then upload the queued events. The value must be between 1 and 600 seconds."
},
{
"key": "deduplicateEvents",
"name": "Attempt deduplicating events",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we have yes and no, might make sense wording as a question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean something like: "Attempt deduplicating events?"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for example

"type": "choice",
"choices": ["Yes", "No"],
"default": "Yes",
"hint": "Default 'Yes'. We try to deduplicate events being sent to Bigquery if set to 'Yes'. Set to 'No' to increase Bigquery insertion quota limit"
}
]
}