Skip to content

Commit

Permalink
Merge pull request #3630 from activepieces/chore/make-update-flow-pub…
Browse files Browse the repository at this point in the history
…lished-version-a-transaction

chore: use db transactions for "update published flow version" request
  • Loading branch information
khaledmashaly committed Jan 15, 2024
2 parents fe52b2b + d2c6c7c commit 1673e92
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 100 deletions.
3 changes: 2 additions & 1 deletion packages/backend/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@
"@typescript-eslint/no-floating-promises": "error",
"@typescript-eslint/no-misused-promises": "warn",
"no-return-await": "off",
"@typescript-eslint/return-await": ["error", "in-try-catch"]
"@typescript-eslint/return-await": ["error", "in-try-catch"],
"default-case-last": "error"
}
}
]
Expand Down
29 changes: 29 additions & 0 deletions packages/backend/src/app/core/db/repo-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Repository, EntityManager, ObjectLiteral, EntitySchema } from 'typeorm'
import { databaseConnection } from '../../database/database-connection'

/**
* If given an {@link EntityManager}, returns a {@link Repository} for the current transaction.
* Otherwise, returns the {@link Repository} for the default connection.
*/
type RepoGetter<T extends ObjectLiteral = ObjectLiteral> = (entityManager?: EntityManager) => Repository<T>

const instances = new Map<EntitySchema, RepoGetter>()

/**
* Creates a {@link RepoGetter} for the given entity.
* @param entity The entity to create a {@link RepoGetter} for.
* @returns A {@link RepoGetter} for the given entity.
*/
export const repoFactory = <T extends ObjectLiteral>(entity: EntitySchema<T>): RepoGetter<T> => {
if (instances.has(entity)) {
return instances.get(entity) as RepoGetter<T>
}

const newInstance: RepoGetter<T> = (entityManager?: EntityManager) => {
return entityManager?.getRepository(entity)
?? databaseConnection.getRepository(entity)
}

instances.set(entity, newInstance as RepoGetter)
return newInstance
}
6 changes: 6 additions & 0 deletions packages/backend/src/app/core/db/transaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { EntityManager } from 'typeorm'
import { databaseConnection } from '../../database/database-connection'

export const transaction = async <T>(operation: (entityManager: EntityManager) => Promise<T>): Promise<T> => {
return databaseConnection.transaction(operation)
}
2 changes: 1 addition & 1 deletion packages/backend/src/app/ee/git-repos/git-sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type UpsertFlowOperation = {
export type FlowSyncOperation = DeleteFlowFromGitOperation | UpsertFlowIntoProjectOperation | DeleteFlowFromProjectOperation | UpsertFlowOperation

async function fetchFlowsForProject(projectId: string): Promise<PopulatedFlow[]> {
const flows = await flowRepo.findBy({
const flows = await flowRepo().findBy({
projectId,
})
return Promise.all(flows.map(f => {
Expand Down
107 changes: 70 additions & 37 deletions packages/backend/src/app/flows/flow-version/flow-version.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import {
SeekPage,
UserId,
} from '@activepieces/shared'
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'
import { EntityManager } from 'typeorm'
import { ActivepiecesError, ErrorCode } from '@activepieces/shared'
import { databaseConnection } from '../../database/database-connection'
import { repoFactory } from '../../core/db/repo-factory'
import { FlowVersionEntity } from './flow-version-entity'
import { flowVersionSideEffects } from './flow-version-side-effects'
import { DEFAULT_SAMPLE_DATA_SETTINGS } from '@activepieces/shared'
Expand All @@ -38,83 +38,102 @@ import { paginationHelper } from '../../helper/pagination/pagination-utils'

const branchSettingsValidator = TypeCompiler.Compile(BranchActionSettingsWithValidation)
const loopSettingsValidator = TypeCompiler.Compile(LoopOnItemsActionSettingsWithValidation)
const flowVersionRepo = databaseConnection.getRepository(FlowVersionEntity)
const flowVersionRepo = repoFactory(FlowVersionEntity)

export const flowVersionService = {
async lockPieceVersions(projectId: ProjectId, mutatedFlowVersion: FlowVersion): Promise<FlowVersion> {
if (mutatedFlowVersion.state === FlowVersionState.LOCKED) {
return mutatedFlowVersion
async lockPieceVersions({ projectId, flowVersion, entityManager }: LockPieceVersionsParams): Promise<FlowVersion> {
if (flowVersion.state === FlowVersionState.LOCKED) {
return flowVersion
}
return flowHelper.transferFlowAsync(mutatedFlowVersion, async (step) => {

return flowHelper.transferFlowAsync(flowVersion, async (step) => {
const clonedStep = JSON.parse(JSON.stringify(step))
switch (step.type) {
case ActionType.PIECE:
case TriggerType.PIECE: {
const newVersion = await pieceMetadataService.getOrThrow({
projectId,
name: step.settings.pieceName,
version: step.settings.pieceVersion,
})
clonedStep.settings.pieceVersion = newVersion.version
break
}
default:
break
const stepTypeIsPiece = [ActionType.PIECE, TriggerType.PIECE].includes(step.type)

if (stepTypeIsPiece) {
const pieceMetadata = await pieceMetadataService.getOrThrow({
projectId,
name: step.settings.pieceName,
version: step.settings.pieceVersion,
entityManager,
})

clonedStep.settings.pieceVersion = pieceMetadata.version
}

return clonedStep
})
},
async applyOperation(userId: UserId, projectId: ProjectId, flowVersion: FlowVersion, userOperation: FlowOperationRequest): Promise<FlowVersion> {

async applyOperation({ flowVersion, projectId, userId, userOperation, entityManager }: ApplyOperationParams): Promise<FlowVersion> {
let operations: FlowOperationRequest[] = []
let mutatedFlowVersion: FlowVersion = flowVersion

switch (userOperation.type) {
case FlowOperationType.USE_AS_DRAFT: {
const previousVersion = await flowVersionService.getFlowVersionOrThrow({
flowId: flowVersion.flowId,
versionId: userOperation.request.versionId,
removeSecrets: false,
})

operations = handleImportFlowOperation(flowVersion, previousVersion)
break
}
case FlowOperationType.IMPORT_FLOW:

case FlowOperationType.IMPORT_FLOW: {
operations = handleImportFlowOperation(flowVersion, userOperation.request)
break
case FlowOperationType.LOCK_FLOW:
mutatedFlowVersion = await this.lockPieceVersions(projectId, mutatedFlowVersion)
operations = [userOperation]
break
default:
}

case FlowOperationType.LOCK_FLOW: {
mutatedFlowVersion = await this.lockPieceVersions({
projectId,
flowVersion: mutatedFlowVersion,
entityManager,
})

operations = [userOperation]
break
case FlowOperationType.DUPLICATE_ACTION:
}

case FlowOperationType.DUPLICATE_ACTION: {
mutatedFlowVersion = await this.getFlowVersionOrThrow({
flowId: flowVersion.flowId,
versionId: flowVersion.id,
})

operations = [userOperation]
break
}

default: {
operations = [userOperation]
break
}
}

for (const operation of operations) {
mutatedFlowVersion = await applySingleOperation(projectId, mutatedFlowVersion, operation)
}

mutatedFlowVersion.updated = dayjs().toISOString()
mutatedFlowVersion.updatedBy = userId
await flowVersionRepo.update(flowVersion.id, mutatedFlowVersion as QueryDeepPartialEntity<FlowVersion>)
return flowVersionRepo.findOneByOrFail({
id: flowVersion.id,
})

return flowVersionRepo(entityManager).save(mutatedFlowVersion)
},

async getOne(id: FlowVersionId): Promise<FlowVersion | null> {
if (isNil(id)) {
return null
}
return flowVersionRepo.findOneBy({
return flowVersionRepo().findOneBy({
id,
})
},

async getLatestLockedVersionOrThrow(flowId: FlowId): Promise<FlowVersion> {
return flowVersionRepo.findOneOrFail({
return flowVersionRepo().findOneOrFail({
where: {
flowId,
state: FlowVersionState.LOCKED,
Expand Down Expand Up @@ -150,13 +169,13 @@ export const flowVersionService = {
beforeCursor: decodedCursor.previousCursor,
},
})
const paginationResult = await paginator.paginate(flowVersionRepo.createQueryBuilder('flow_version').where({
const paginationResult = await paginator.paginate(flowVersionRepo().createQueryBuilder('flow_version').where({
flowId,
}))
return paginationHelper.createPage<FlowVersion>(paginationResult.data, paginationResult.cursor)
},
async getFlowVersionOrThrow({ flowId, versionId, removeSecrets = false }: GetFlowVersionOrThrowParams): Promise<FlowVersion> {
let flowVersion: FlowVersion | null = await flowVersionRepo.findOne({
let flowVersion: FlowVersion | null = await flowVersionRepo().findOne({
where: {
flowId,
id: versionId,
Expand Down Expand Up @@ -201,7 +220,7 @@ export const flowVersionService = {
valid: false,
state: FlowVersionState.DRAFT,
}
return flowVersionRepo.save(flowVersion)
return flowVersionRepo().save(flowVersion)
},
}

Expand Down Expand Up @@ -493,3 +512,17 @@ type GetFlowVersionOrThrowParams = {
}

type NewFlowVersion = Omit<FlowVersion, 'created' | 'updated'>

type ApplyOperationParams = {
userId: UserId
projectId: ProjectId
flowVersion: FlowVersion
userOperation: FlowOperationRequest
entityManager?: EntityManager
}

type LockPieceVersionsParams = {
projectId: ProjectId
flowVersion: FlowVersion
entityManager?: EntityManager
}
4 changes: 2 additions & 2 deletions packages/backend/src/app/flows/flow/flow.repo.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { databaseConnection } from '../../database/database-connection'
import { repoFactory } from '../../core/db/repo-factory'
import { FlowEntity } from './flow.entity'

export const flowRepo = databaseConnection.getRepository(FlowEntity)
export const flowRepo = repoFactory(FlowEntity)
Loading

0 comments on commit 1673e92

Please sign in to comment.