From b9fd549a92d9576367b73ed38904cf9650687b86 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Tue, 4 Apr 2023 18:11:21 +0200 Subject: [PATCH] refactor: Upsert workflow statistics to suppress unnecessary error messages (#5863) --- packages/cli/src/events/WorkflowStatistics.ts | 110 +++++++++++++----- packages/cli/test/unit/Events.test.ts | 12 +- 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/packages/cli/src/events/WorkflowStatistics.ts b/packages/cli/src/events/WorkflowStatistics.ts index 34b191b45ae6c..006dc506d4ff7 100644 --- a/packages/cli/src/events/WorkflowStatistics.ts +++ b/packages/cli/src/events/WorkflowStatistics.ts @@ -1,10 +1,79 @@ import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; +import { LoggerProxy } from 'n8n-workflow'; import * as Db from '@/Db'; import { StatisticsNames } from '@db/entities/WorkflowStatistics'; import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; import { QueryFailedError } from 'typeorm'; import { Container } from 'typedi'; import { InternalHooks } from '@/InternalHooks'; +import config from '@/config'; + +enum StatisticsUpsertResult { + insert = 'insert', + update = 'update', + failed = 'failed', +} + +async function upsertWorkflowStatistics( + eventName: StatisticsNames, + workflowId: string, +): Promise { + const dbType = config.getEnv('database.type'); + const tablePrefix = config.getEnv('database.tablePrefix'); + try { + if (dbType === 'sqlite') { + await Db.collections.WorkflowStatistics + .query(`INSERT INTO "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent") + VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP) + ON CONFLICT (workflowId, name) DO UPDATE SET + count = count + 1, + latestEvent = CURRENT_TIMESTAMP returning count + `); + // SQLite does not offer a reliable way to know whether or not an insert or update happened. + // We'll use a naive approach in this case. Query again after and it might cause us to miss the + // first production execution sometimes due to concurrency, but it's the only way. + + const counter = await Db.collections.WorkflowStatistics.findOne({ + where: { + name: eventName, + workflowId, + }, + }); + + if (counter?.count === 1) { + return StatisticsUpsertResult.insert; + } + return StatisticsUpsertResult.update; + } else if (dbType === 'postgresdb') { + const queryResult = (await Db.collections.WorkflowStatistics + .query(`insert into "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent") + values (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) on conflict ("name", "workflowId") + do update set "count" = "${tablePrefix}workflow_statistics"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP returning *;`)) as Array<{ + count: number; + }>; + if (queryResult[0].count === 1) { + return StatisticsUpsertResult.insert; + } + return StatisticsUpsertResult.update; + } else { + const queryResult = (await Db.collections.WorkflowStatistics + .query(`insert into \`${tablePrefix}workflow_statistics\` (count, + latestEvent, + name, + workflowId) + values (1, NOW(), "${eventName}", "${workflowId}") ON DUPLICATE KEY UPDATE count = count + 1, latestEvent = NOW();`)) as { + affectedRows: number; + }; + if (queryResult.affectedRows === 1) { + return StatisticsUpsertResult.insert; + } + // MySQL returns 2 affected rows on update + return StatisticsUpsertResult.update; + } + } catch (error) { + return StatisticsUpsertResult.failed; + } +} export async function workflowExecutionCompleted( workflowData: IWorkflowBase, @@ -27,36 +96,23 @@ export async function workflowExecutionCompleted( const workflowId = workflowData.id; if (!workflowId) return; - // Try insertion and if it fails due to key conflicts then update the existing entry instead try { - await Db.collections.WorkflowStatistics.insert({ - count: 1, - name, - workflowId, - latestEvent: new Date(), - }); - - // If we're here we can check if we're sending the first production success metric - if (name !== StatisticsNames.productionSuccess) return; - - // Get the owner of the workflow so we can send the metric - const owner = await getWorkflowOwner(workflowId); - const metrics = { - user_id: owner.id, - workflow_id: workflowId, - }; + const upsertResult = await upsertWorkflowStatistics(name, workflowId); - // Send the metrics - await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics); - } catch (error) { - if (!(error instanceof QueryFailedError)) { - throw error; + if ( + name === StatisticsNames.productionSuccess && + upsertResult === StatisticsUpsertResult.insert + ) { + const owner = await getWorkflowOwner(workflowId); + const metrics = { + user_id: owner.id, + workflow_id: workflowId, + }; + // Send the metrics + await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics); } - - await Db.collections.WorkflowStatistics.update( - { workflowId, name }, - { count: () => 'count + 1', latestEvent: new Date() }, - ); + } catch (error) { + LoggerProxy.verbose('Unable to fire first workflow success telemetry event'); } } diff --git a/packages/cli/test/unit/Events.test.ts b/packages/cli/test/unit/Events.test.ts index 25c28973523c6..80433466f0ffb 100644 --- a/packages/cli/test/unit/Events.test.ts +++ b/packages/cli/test/unit/Events.test.ts @@ -17,7 +17,11 @@ type WorkflowStatisticsRepository = Repository; jest.mock('@/Db', () => { return { collections: { - WorkflowStatistics: mock(), + WorkflowStatistics: mock({ + findOne: jest.fn(() => ({ + count: 1, + })), + }), }, }; }); @@ -101,9 +105,9 @@ describe('Events', () => { test('should not send metrics for updated entries', async () => { // Call the function with a fail insert, ensure update is called *and* metrics aren't sent - workflowStatisticsRepository.insert.mockImplementationOnce(() => { - throw new QueryFailedError('invalid insert', [], ''); - }); + workflowStatisticsRepository.findOne.mockImplementationOnce(() => ({ + count: 2, + })); const workflow = { id: '1', name: '',