From d3f023cfa88fae2f327d03a50198fe1619331b29 Mon Sep 17 00:00:00 2001 From: Mohammad AbuAboud Date: Fri, 24 May 2024 17:18:35 +0000 Subject: [PATCH] refactor: move job data to worker --- .../app/flows/flow-run/flow-run-side-effects.ts | 8 ++++---- .../flows/trigger/hooks/enable-trigger-hook.ts | 8 ++++---- .../api/src/app/webhooks/webhook-controller.ts | 2 +- .../flow-worker/consumer/flow-queue-consumer.ts | 16 ++++++++-------- .../flow-worker/consumer/webook-consumer.ts | 2 +- .../src/app/workers/flow-worker/flow-worker.ts | 3 +-- .../flow-worker/queues/memory/memory-queue.ts | 9 ++++----- .../src/app/workers/flow-worker/queues/queue.ts | 6 +++--- .../flow-worker/queues/redis/redis-consumer.ts | 2 +- .../flow-worker/queues/redis/redis-queue.ts | 14 +++++++------- packages/server/worker/src/index.ts | 3 ++- .../flow-worker => worker/src/lib}/job-data.ts | 0 12 files changed, 36 insertions(+), 37 deletions(-) rename packages/server/{api/src/app/workers/flow-worker => worker/src/lib}/job-data.ts (100%) diff --git a/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts b/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts index 7a83876cab..1008dd8734 100644 --- a/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts +++ b/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts @@ -2,10 +2,6 @@ import dayjs from 'dayjs' import { issuesService } from '../../ee/issues/issues-service' import { notifications } from '../../helper/notifications' import { flowQueue } from '../../workers/flow-worker/flow-queue' -import { - LATEST_JOB_DATA_SCHEMA_VERSION, - RepeatableJobType, -} from '../../workers/flow-worker/job-data' import { JobType } from '../../workers/flow-worker/queues/queue' import { flowRunHooks } from './flow-run-hooks' import { logger } from '@activepieces/server-shared' @@ -20,6 +16,10 @@ import { ProgressUpdateType, RunEnvironment, } from '@activepieces/shared' +import { + LATEST_JOB_DATA_SCHEMA_VERSION, + RepeatableJobType, +} from 'server-worker' type StartParams = { flowRun: FlowRun diff --git a/packages/server/api/src/app/flows/trigger/hooks/enable-trigger-hook.ts b/packages/server/api/src/app/flows/trigger/hooks/enable-trigger-hook.ts index 11b316aaa4..b34de899dc 100644 --- a/packages/server/api/src/app/flows/trigger/hooks/enable-trigger-hook.ts +++ b/packages/server/api/src/app/flows/trigger/hooks/enable-trigger-hook.ts @@ -8,10 +8,6 @@ import { import { getEdition } from '../../../helper/secret-helper' import { webhookService } from '../../../webhooks/webhook-service' import { flowQueue } from '../../../workers/flow-worker/flow-queue' -import { - LATEST_JOB_DATA_SCHEMA_VERSION, - RepeatableJobType, -} from '../../../workers/flow-worker/job-data' import { JobType } from '../../../workers/flow-worker/queues/queue' import { getPieceTrigger } from './trigger-utils' import { DEFAULT_FREE_PLAN_LIMIT } from '@activepieces/ee-shared' @@ -31,6 +27,10 @@ import { TriggerHookType, TriggerType, } from '@activepieces/shared' +import { + LATEST_JOB_DATA_SCHEMA_VERSION, + RepeatableJobType, +} from 'server-worker' const POLLING_FREQUENCY_CRON_EXPRESSON = constructEveryXMinuteCron( system.getNumber(SystemProp.TRIGGER_DEFAULT_POLL_INTERVAL) ?? 5, diff --git a/packages/server/api/src/app/webhooks/webhook-controller.ts b/packages/server/api/src/app/webhooks/webhook-controller.ts index 5603dca31b..e1b7e2ca5d 100644 --- a/packages/server/api/src/app/webhooks/webhook-controller.ts +++ b/packages/server/api/src/app/webhooks/webhook-controller.ts @@ -7,7 +7,6 @@ import { flowService } from '../flows/flow/flow.service' import { getEdition } from '../helper/secret-helper' import { EngineHttpResponse, engineResponseWatcher } from '../workers/flow-worker/engine-response-watcher' import { flowQueue } from '../workers/flow-worker/flow-queue' -import { LATEST_JOB_DATA_SCHEMA_VERSION } from '../workers/flow-worker/job-data' import { JobType } from '../workers/flow-worker/queues/queue' import { logger } from '@activepieces/server-shared' import { @@ -23,6 +22,7 @@ import { isNil, WebhookUrlParams, } from '@activepieces/shared' +import { LATEST_JOB_DATA_SCHEMA_VERSION } from 'server-worker' export const webhookController: FastifyPluginAsyncTypebox = async (app) => { diff --git a/packages/server/api/src/app/workers/flow-worker/consumer/flow-queue-consumer.ts b/packages/server/api/src/app/workers/flow-worker/consumer/flow-queue-consumer.ts index ca50fa7389..29bff8c0a1 100644 --- a/packages/server/api/src/app/workers/flow-worker/consumer/flow-queue-consumer.ts +++ b/packages/server/api/src/app/workers/flow-worker/consumer/flow-queue-consumer.ts @@ -5,14 +5,6 @@ import { triggerHooks } from '../../../flows/trigger' import { dedupeService } from '../../../flows/trigger/dedupe' import { flowQueue } from '../flow-queue' import { flowWorker } from '../flow-worker' -import { - DelayedJobData, - OneTimeJobData, - RenewWebhookJobData, - RepeatableJobType, - RepeatingJobData, - ScheduledJobData, -} from '../job-data' import { consumeJobsInMemory } from '../queues/memory/memory-consumer' import { memoryQueueManager } from '../queues/memory/memory-queue' import { redisConsumer } from '../queues/redis/redis-consumer' @@ -28,6 +20,14 @@ import { ActivepiecesError, TriggerPayload, TriggerType, } from '@activepieces/shared' +import { + DelayedJobData, + OneTimeJobData, + RenewWebhookJobData, + RepeatableJobType, + RepeatingJobData, + ScheduledJobData, +} from 'server-worker' const queueMode = system.getOrThrow(SystemProp.QUEUE_MODE) diff --git a/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts b/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts index 816c309e69..3079bd979d 100644 --- a/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts +++ b/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts @@ -2,8 +2,8 @@ import { StatusCodes } from 'http-status-codes' import { flowService } from '../../../flows/flow/flow.service' import { webhookService } from '../../../webhooks/webhook-service' import { EngineHttpResponse, engineResponseWatcher } from '../engine-response-watcher' -import { WebhookJobData } from '../job-data' import { FlowStatus, isNil } from '@activepieces/shared' +import { WebhookJobData } from 'server-worker' export const webhookConsumer = { async consumeWebhook(data: WebhookJobData): Promise { diff --git a/packages/server/api/src/app/workers/flow-worker/flow-worker.ts b/packages/server/api/src/app/workers/flow-worker/flow-worker.ts index 712e58a16e..f670d5a9e4 100644 --- a/packages/server/api/src/app/workers/flow-worker/flow-worker.ts +++ b/packages/server/api/src/app/workers/flow-worker/flow-worker.ts @@ -4,7 +4,6 @@ import { } from '../../flows/flow-run/flow-run-service' import { engineHelper, generateWorkerToken } from '../../helper/engine-helper' import { getPiecePackage } from '../../pieces/piece-metadata-service' -import { OneTimeJobData } from './job-data' import { exceptionHandler, logger } from '@activepieces/server-shared' import { Action, ActionType, @@ -28,7 +27,7 @@ import { Trigger, TriggerType, } from '@activepieces/shared' -import { Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker' +import { OneTimeJobData, Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker' type LoadInputAndLogFileIdParams = { flowVersion: FlowVersion diff --git a/packages/server/api/src/app/workers/flow-worker/queues/memory/memory-queue.ts b/packages/server/api/src/app/workers/flow-worker/queues/memory/memory-queue.ts index d9d419fab9..6caeb35f0f 100644 --- a/packages/server/api/src/app/workers/flow-worker/queues/memory/memory-queue.ts +++ b/packages/server/api/src/app/workers/flow-worker/queues/memory/memory-queue.ts @@ -4,10 +4,6 @@ import { flowService } from '../../../../flows/flow/flow.service' import { flowRunRepo } from '../../../../flows/flow-run/flow-run-service' import { flowVersionService } from '../../../../flows/flow-version/flow-version.service' import { getPieceTrigger } from '../../../../flows/trigger/hooks/trigger-utils' -import { - LATEST_JOB_DATA_SCHEMA_VERSION, - RepeatableJobType, -} from '../../job-data' import { AddParams, DelayedJobAddParams, @@ -30,7 +26,10 @@ import { RunEnvironment, TriggerType, } from '@activepieces/shared' -import { ApMemoryQueue } from 'server-worker' +import { + ApMemoryQueue, + LATEST_JOB_DATA_SCHEMA_VERSION, + RepeatableJobType } from 'server-worker' type FlowWithRenewWebhook = { flow: Flow diff --git a/packages/server/api/src/app/workers/flow-worker/queues/queue.ts b/packages/server/api/src/app/workers/flow-worker/queues/queue.ts index e04d61978e..2904ac1874 100644 --- a/packages/server/api/src/app/workers/flow-worker/queues/queue.ts +++ b/packages/server/api/src/app/workers/flow-worker/queues/queue.ts @@ -1,3 +1,5 @@ +import { QueueMode, system, SystemProp } from '@activepieces/server-shared' +import { ApId, ScheduleOptions } from '@activepieces/shared' import { DelayedJobData, JobData, @@ -5,9 +7,7 @@ import { RenewWebhookJobData, RepeatingJobData, WebhookJobData, -} from '../job-data' -import { QueueMode, system, SystemProp } from '@activepieces/server-shared' -import { ApId, ScheduleOptions } from '@activepieces/shared' +} from 'server-worker' export const queueMode = system.getOrThrow(SystemProp.QUEUE_MODE) as QueueMode diff --git a/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-consumer.ts b/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-consumer.ts index 40e3c72523..3d6e1a4ae6 100644 --- a/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-consumer.ts +++ b/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-consumer.ts @@ -3,10 +3,10 @@ import { Worker } from 'bullmq' import { createRedisClient } from '../../../../database/redis-connection' import { flowQueueConsumer } from '../../consumer/flow-queue-consumer' import { webhookConsumer } from '../../consumer/webook-consumer' -import { OneTimeJobData, ScheduledJobData, WebhookJobData } from '../../job-data' import { ONE_TIME_JOB_QUEUE, SCHEDULED_JOB_QUEUE, WEBHOOK_JOB_QUEUE } from './redis-queue' import { system, SystemProp } from '@activepieces/server-shared' import { ApId } from '@activepieces/shared' +import { OneTimeJobData, ScheduledJobData, WebhookJobData } from 'server-worker' let redisScheduledJobConsumer: Worker let redisOneTimeJobConsumer: Worker diff --git a/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-queue.ts b/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-queue.ts index e630b56fe4..1f5f3c6035 100644 --- a/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-queue.ts +++ b/packages/server/api/src/app/workers/flow-worker/queues/redis/redis-queue.ts @@ -3,19 +3,19 @@ import { DefaultJobOptions, Job, Queue } from 'bullmq' import { createRedisClient } from '../../../../database/redis-connection' import { flowRepo } from '../../../../flows/flow/flow.repo' import { acquireLock } from '../../../../helper/lock' -import { - LATEST_JOB_DATA_SCHEMA_VERSION, - OneTimeJobData, - RepeatableJobType, - ScheduledJobData, - WebhookJobData, -} from '../../job-data' import { AddParams, JobType, QueueManager, RemoveParams } from '../queue' import { exceptionHandler, logger } from '@activepieces/server-shared' import { ActivepiecesError, ApId, ErrorCode, ExecutionType, isNil, RunEnvironment, ScheduleType, } from '@activepieces/shared' +import { + LATEST_JOB_DATA_SCHEMA_VERSION, + OneTimeJobData, + RepeatableJobType, + ScheduledJobData, + WebhookJobData, +} from 'server-worker' export const WEBHOOK_JOB_QUEUE = 'webhookJobs' export const ONE_TIME_JOB_QUEUE = 'oneTimeJobs' diff --git a/packages/server/worker/src/index.ts b/packages/server/worker/src/index.ts index eed95f838d..1bc23a4b84 100644 --- a/packages/server/worker/src/index.ts +++ b/packages/server/worker/src/index.ts @@ -4,4 +4,5 @@ export { sandboxProvisioner } from './lib/sandbox/provisioner/sandbox-provisione export { SandBoxCacheType } from './lib/sandbox/provisioner/sandbox-cache-key' export * from './lib/sandbox/sandbox-manager' export * from './lib/sandbox' -export * from './lib/utils/log-serializer' \ No newline at end of file +export * from './lib/utils/log-serializer' +export * from './lib/job-data' \ No newline at end of file diff --git a/packages/server/api/src/app/workers/flow-worker/job-data.ts b/packages/server/worker/src/lib/job-data.ts similarity index 100% rename from packages/server/api/src/app/workers/flow-worker/job-data.ts rename to packages/server/worker/src/lib/job-data.ts