Skip to content

Commit

Permalink
refactor: move job data to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
abuaboud committed May 24, 2024
1 parent 7ffdb94 commit d3f023c
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ import dayjs from 'dayjs'
import { issuesService } from '../../ee/issues/issues-service'
import { notifications } from '../../helper/notifications'
import { flowQueue } from '../../workers/flow-worker/flow-queue'
import {
LATEST_JOB_DATA_SCHEMA_VERSION,
RepeatableJobType,
} from '../../workers/flow-worker/job-data'
import { JobType } from '../../workers/flow-worker/queues/queue'
import { flowRunHooks } from './flow-run-hooks'
import { logger } from '@activepieces/server-shared'
Expand All @@ -20,6 +16,10 @@ import {
ProgressUpdateType,
RunEnvironment,
} from '@activepieces/shared'
import {
LATEST_JOB_DATA_SCHEMA_VERSION,
RepeatableJobType,
} from 'server-worker'

type StartParams = {
flowRun: FlowRun
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import {
import { getEdition } from '../../../helper/secret-helper'
import { webhookService } from '../../../webhooks/webhook-service'
import { flowQueue } from '../../../workers/flow-worker/flow-queue'
import {
LATEST_JOB_DATA_SCHEMA_VERSION,
RepeatableJobType,
} from '../../../workers/flow-worker/job-data'
import { JobType } from '../../../workers/flow-worker/queues/queue'
import { getPieceTrigger } from './trigger-utils'
import { DEFAULT_FREE_PLAN_LIMIT } from '@activepieces/ee-shared'
Expand All @@ -31,6 +27,10 @@ import {
TriggerHookType,
TriggerType,
} from '@activepieces/shared'
import {
LATEST_JOB_DATA_SCHEMA_VERSION,
RepeatableJobType,
} from 'server-worker'

const POLLING_FREQUENCY_CRON_EXPRESSON = constructEveryXMinuteCron(
system.getNumber(SystemProp.TRIGGER_DEFAULT_POLL_INTERVAL) ?? 5,
Expand Down
2 changes: 1 addition & 1 deletion packages/server/api/src/app/webhooks/webhook-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { flowService } from '../flows/flow/flow.service'
import { getEdition } from '../helper/secret-helper'
import { EngineHttpResponse, engineResponseWatcher } from '../workers/flow-worker/engine-response-watcher'
import { flowQueue } from '../workers/flow-worker/flow-queue'
import { LATEST_JOB_DATA_SCHEMA_VERSION } from '../workers/flow-worker/job-data'
import { JobType } from '../workers/flow-worker/queues/queue'
import { logger } from '@activepieces/server-shared'
import {
Expand All @@ -23,6 +22,7 @@ import {
isNil,
WebhookUrlParams,
} from '@activepieces/shared'
import { LATEST_JOB_DATA_SCHEMA_VERSION } from 'server-worker'

export const webhookController: FastifyPluginAsyncTypebox = async (app) => {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ import { triggerHooks } from '../../../flows/trigger'
import { dedupeService } from '../../../flows/trigger/dedupe'
import { flowQueue } from '../flow-queue'
import { flowWorker } from '../flow-worker'
import {
DelayedJobData,
OneTimeJobData,
RenewWebhookJobData,
RepeatableJobType,
RepeatingJobData,
ScheduledJobData,
} from '../job-data'
import { consumeJobsInMemory } from '../queues/memory/memory-consumer'
import { memoryQueueManager } from '../queues/memory/memory-queue'
import { redisConsumer } from '../queues/redis/redis-consumer'
Expand All @@ -28,6 +20,14 @@ import { ActivepiecesError,
TriggerPayload,
TriggerType,
} from '@activepieces/shared'
import {
DelayedJobData,
OneTimeJobData,
RenewWebhookJobData,
RepeatableJobType,
RepeatingJobData,
ScheduledJobData,
} from 'server-worker'

const queueMode = system.getOrThrow<QueueMode>(SystemProp.QUEUE_MODE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { StatusCodes } from 'http-status-codes'
import { flowService } from '../../../flows/flow/flow.service'
import { webhookService } from '../../../webhooks/webhook-service'
import { EngineHttpResponse, engineResponseWatcher } from '../engine-response-watcher'
import { WebhookJobData } from '../job-data'
import { FlowStatus, isNil } from '@activepieces/shared'
import { WebhookJobData } from 'server-worker'

export const webhookConsumer = {
async consumeWebhook(data: WebhookJobData): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
} from '../../flows/flow-run/flow-run-service'
import { engineHelper, generateWorkerToken } from '../../helper/engine-helper'
import { getPiecePackage } from '../../pieces/piece-metadata-service'
import { OneTimeJobData } from './job-data'
import { exceptionHandler, logger } from '@activepieces/server-shared'
import {
Action, ActionType,
Expand All @@ -28,7 +27,7 @@ import {
Trigger,
TriggerType,
} from '@activepieces/shared'
import { Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker'
import { OneTimeJobData, Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker'

type LoadInputAndLogFileIdParams = {
flowVersion: FlowVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import { flowService } from '../../../../flows/flow/flow.service'
import { flowRunRepo } from '../../../../flows/flow-run/flow-run-service'
import { flowVersionService } from '../../../../flows/flow-version/flow-version.service'
import { getPieceTrigger } from '../../../../flows/trigger/hooks/trigger-utils'
import {
LATEST_JOB_DATA_SCHEMA_VERSION,
RepeatableJobType,
} from '../../job-data'
import {
AddParams,
DelayedJobAddParams,
Expand All @@ -30,7 +26,10 @@ import {
RunEnvironment,
TriggerType,
} from '@activepieces/shared'
import { ApMemoryQueue } from 'server-worker'
import {
ApMemoryQueue,
LATEST_JOB_DATA_SCHEMA_VERSION,
RepeatableJobType } from 'server-worker'

type FlowWithRenewWebhook = {
flow: Flow
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { QueueMode, system, SystemProp } from '@activepieces/server-shared'
import { ApId, ScheduleOptions } from '@activepieces/shared'
import {
DelayedJobData,
JobData,
OneTimeJobData,
RenewWebhookJobData,
RepeatingJobData,
WebhookJobData,
} from '../job-data'
import { QueueMode, system, SystemProp } from '@activepieces/server-shared'
import { ApId, ScheduleOptions } from '@activepieces/shared'
} from 'server-worker'

export const queueMode = system.getOrThrow(SystemProp.QUEUE_MODE) as QueueMode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { Worker } from 'bullmq'
import { createRedisClient } from '../../../../database/redis-connection'
import { flowQueueConsumer } from '../../consumer/flow-queue-consumer'
import { webhookConsumer } from '../../consumer/webook-consumer'
import { OneTimeJobData, ScheduledJobData, WebhookJobData } from '../../job-data'
import { ONE_TIME_JOB_QUEUE, SCHEDULED_JOB_QUEUE, WEBHOOK_JOB_QUEUE } from './redis-queue'
import { system, SystemProp } from '@activepieces/server-shared'
import { ApId } from '@activepieces/shared'
import { OneTimeJobData, ScheduledJobData, WebhookJobData } from 'server-worker'

let redisScheduledJobConsumer: Worker<ScheduledJobData, unknown>
let redisOneTimeJobConsumer: Worker<OneTimeJobData, unknown>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ import { DefaultJobOptions, Job, Queue } from 'bullmq'
import { createRedisClient } from '../../../../database/redis-connection'
import { flowRepo } from '../../../../flows/flow/flow.repo'
import { acquireLock } from '../../../../helper/lock'
import {
LATEST_JOB_DATA_SCHEMA_VERSION,
OneTimeJobData,
RepeatableJobType,
ScheduledJobData,
WebhookJobData,
} from '../../job-data'
import { AddParams, JobType, QueueManager, RemoveParams } from '../queue'
import { exceptionHandler, logger } from '@activepieces/server-shared'
import {
ActivepiecesError,
ApId, ErrorCode, ExecutionType, isNil, RunEnvironment, ScheduleType,
} from '@activepieces/shared'
import {
LATEST_JOB_DATA_SCHEMA_VERSION,
OneTimeJobData,
RepeatableJobType,
ScheduledJobData,
WebhookJobData,
} from 'server-worker'

export const WEBHOOK_JOB_QUEUE = 'webhookJobs'
export const ONE_TIME_JOB_QUEUE = 'oneTimeJobs'
Expand Down
3 changes: 2 additions & 1 deletion packages/server/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export { sandboxProvisioner } from './lib/sandbox/provisioner/sandbox-provisione
export { SandBoxCacheType } from './lib/sandbox/provisioner/sandbox-cache-key'
export * from './lib/sandbox/sandbox-manager'
export * from './lib/sandbox'
export * from './lib/utils/log-serializer'
export * from './lib/utils/log-serializer'
export * from './lib/job-data'
File renamed without changes.

0 comments on commit d3f023c

Please sign in to comment.