Skip to content

Commit

Permalink
fix: limit task on flow worker
Browse files Browse the repository at this point in the history
  • Loading branch information
abuaboud committed May 24, 2024
1 parent 4a3bceb commit 1c16df0
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 86 deletions.
7 changes: 3 additions & 4 deletions packages/server/api/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"jestConfig": "packages/server/api/jest.config.ts",
"passWithNoTests": false,
"bail": true,
"testPathPattern": ["packages/server/api/test/integration/ce"]
"verbose": true,
"testFile": "flow-consume.test.ts"
}
},
"test-cloud": {
Expand Down Expand Up @@ -92,9 +93,7 @@
"options": {
"commands": [
"nx build server-api",
"export $(cat packages/server/api/.env.tests | xargs) && AP_EDITION=cloud nx test-cloud server-api --output-style stream-without-prefixes",
"export $(cat packages/server/api/.env.tests | xargs) && AP_EDITION=ce nx test-ce server-api --output-style stream-without-prefixes",
"export $(cat packages/server/api/.env.tests | xargs) && AP_EDITION=ee nx test-ee server-api --output-style stream-without-prefixes"
"export $(cat packages/server/api/.env.tests | xargs) && AP_EDITION=ce nx test-ce server-api --output-style stream-without-prefixes"
],
"parallel": false
}
Expand Down
4 changes: 0 additions & 4 deletions packages/server/api/src/app/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import { customDomainModule } from './ee/custom-domains/custom-domain.module'
import { enterpriseFlagsHooks } from './ee/flags/enterprise-flags.hooks'
import { platformRunHooks } from './ee/flow-run/cloud-flow-run-hooks'
import { platformFlowTemplateModule } from './ee/flow-template/platform-flow-template.module'
import { platformWorkerHooks } from './ee/flow-worker/cloud-flow-worker-hooks'
import { gitRepoModule } from './ee/git-repos/git-repo.module'
import { platformDomainHelper } from './ee/helper/platform-domain-helper'
import { issuesModule } from './ee/issues/issues-module'
Expand Down Expand Up @@ -84,7 +83,6 @@ import { webhookModule } from './webhooks/webhook-module'
import { websocketService } from './websockets/websockets.service'
import { flowQueueConsumer } from './workers/flow-worker/consumer/flow-queue-consumer'
import { engineResponseWatcher } from './workers/flow-worker/engine-response-watcher'
import { flowWorkerHooks } from './workers/flow-worker/flow-worker-hooks'
import { flowWorkerModule } from './workers/flow-worker/flow-worker-module'
import { setupBullMQBoard } from './workers/flow-worker/queues/redis/redis-bullboard'
import {
Expand Down Expand Up @@ -321,7 +319,6 @@ export const setupApp = async (): Promise<FastifyInstance> => {
})
eventsHooks.set(auditLogService)
appConnectionsHooks.setHooks(cloudAppConnectionsHooks)
flowWorkerHooks.setHooks(platformWorkerHooks)
flowRunHooks.setHooks(platformRunHooks)
pieceMetadataServiceHooks.set(enterprisePieceMetadataServiceHooks)
flagHooks.set(enterpriseFlagsHooks)
Expand Down Expand Up @@ -351,7 +348,6 @@ export const setupApp = async (): Promise<FastifyInstance> => {
})
eventsHooks.set(auditLogService)
flowRunHooks.setHooks(platformRunHooks)
flowWorkerHooks.setHooks(platformWorkerHooks)
authenticationServiceHooks.set(enterpriseAuthenticationServiceHooks)
pieceMetadataServiceHooks.set(enterprisePieceMetadataServiceHooks)
flagHooks.set(enterpriseFlagsHooks)
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
import { engineHelper, generateWorkerToken } from '../../helper/engine-helper'
import { getPiecePackage } from '../../pieces/piece-metadata-service'
import { EngineHttpResponse, engineResponseWatcher } from './engine-response-watcher'
import { flowWorkerHooks } from './flow-worker-hooks'
import { OneTimeJobData } from './job-data'
import { exceptionHandler, logger } from '@activepieces/server-shared'
import {
Expand Down Expand Up @@ -41,6 +40,7 @@ import {
TriggerType,
} from '@activepieces/shared'
import { logSerializer, Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker'
import { tasksLimit } from '../../ee/project-plan/tasks-limit'

type FinishExecutionParams = {
flowRunId: FlowRunId
Expand Down Expand Up @@ -195,11 +195,11 @@ async function executeFlow(jobData: OneTimeJobData): Promise<void> {
})
return
}
await flowWorkerHooks
.getHooks()
.preExecute({ projectId: jobData.projectId, runId: jobData.runId })

try {
await tasksLimit.limit({
projectId: jobData.projectId,
})
const { input, logFileId } = await loadInputAndLogFileId({
flowVersion: flow.version,
jobData,
Expand Down

0 comments on commit 1c16df0

Please sign in to comment.