Skip to content

Commit

Permalink
Merge pull request #866 from activepieces/feat/scheduler-piece
Browse files Browse the repository at this point in the history
Feat/scheduler piece
  • Loading branch information
abuaboud committed Mar 24, 2023
2 parents 62c7635 + 317d93e commit 95074ae
Show file tree
Hide file tree
Showing 82 changed files with 723 additions and 463 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release-beta.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ jobs:
linux/arm/v7
push: true
tags: |
activepieces/activepieces:${{ env.RELEASE }}.5-beta
activepieces/activepieces:${{ env.RELEASE }}.2-beta
33 changes: 23 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"@ngrx/store": "^15.1.0",
"@ngrx/store-devtools": "^15.1.0",
"@sentry/node": "^7.38.0",
"@sinclair/typebox": "^0.26.3",
"airtable": "^0.11.6",
"ajv": "^8.12.0",
"angular-svg-icon": "^15.0.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { FastifyInstance, FastifyPluginOptions, FastifyRequest } from "fastify";
import { FastifyInstance, FastifyRequest } from "fastify";
import { webhookService } from "../webhooks/webhook-service";
import { appEventRoutingService } from "./app-event-routing.service";
import { engineHelper } from "../helper/engine-helper";
import { logger } from "../helper/logger";

export const appEventRoutingModule = async (app: FastifyInstance, _options: FastifyPluginOptions) => {
export const appEventRoutingModule = async (app: FastifyInstance) => {
app.register(appEventRoutingController, { prefix: "/v1/app-events" });
};

export const appEventRoutingController = async (fastify: FastifyInstance, options: FastifyPluginOptions) => {
export const appEventRoutingController = async (fastify: FastifyInstance) => {

fastify.post(
"/:pieceName",
Expand Down
4 changes: 3 additions & 1 deletion packages/backend/src/app/database/database-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { addtriggerevents1678621361185 } from "./migration/1678621361185-addtrig
import { removeCollectionVersion1678492809093 } from "./migration/1678492809093-removeCollectionVersion";
import { addEventRouting1678382946390 } from "./migration/1678382946390-add-event-routing";
import { bumpFixPieceVersions1678928503715 } from "./migration/1678928503715-bump-fix-piece-versions";
import { migrateSchedule1679014156667 } from "./migration/1679014156667-migrate-schedule";

const database = system.getOrThrow(SystemProp.POSTGRES_DATABASE);
const host = system.getOrThrow(SystemProp.POSTGRES_HOST);
Expand Down Expand Up @@ -62,7 +63,8 @@ const getMigrations = () => {
addtriggerevents1678621361185,
removeCollectionVersion1678492809093,
addEventRouting1678382946390,
bumpFixPieceVersions1678928503715
bumpFixPieceVersions1678928503715,
migrateSchedule1679014156667
];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class bumpFixPieceVersions1678928503715 implements MigrationInterface {
logger.info("bumpFixPieceVersions1678928503715, finished bumping " + count + " flows " + " and connections count " + connectionCount);
}

public async down(queryRunner: QueryRunner): Promise<void> {
public async down(): Promise<void> {
// Ignored
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { MigrationInterface, QueryRunner } from "typeorm"
import { logger } from "../../helper/logger";

const FLOW_VERSION_TABLE = "flow_version";
export class migrateSchedule1679014156667 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<void> {
logger.info("migrateSchedule1679014156667, started");

let count = 0;
const flowVersionRepo = queryRunner.connection.getRepository(FLOW_VERSION_TABLE);
const flowVersions = await flowVersionRepo.find();

for (const flowVersion of flowVersions) {
const step = flowVersion.trigger;
if(step.type === "SCHEDULE") {
step.type = "PIECE_TRIGGER";
step.settings = {
input: {
cronExpression: step.settings.cronExpression
},
triggerName: "cron_expression",
pieceName: "schedule",
pieceVersion: "0.0.1",
}
count++;
await flowVersionRepo.update(flowVersion.id, flowVersion);
}
}
logger.info("migrateSchedule1679014156667, finished flows " + count);

}

public async down(queryRunner: QueryRunner): Promise<void> {
logger.info("rolling back migrateSchedule1679014156667, started");

let count = 0;
const flowVersionRepo = queryRunner.connection.getRepository(FLOW_VERSION_TABLE);
const flowVersions = await flowVersionRepo.find();

for (const flowVersion of flowVersions) {
const step = flowVersion.trigger;
if(step.type === "PIECE_TRIGGER") {
if(step.settings.pieceName === "schedule"){
step.type = "SCHEDULE";
step.settings = {
cronExpression: step.settings.input.cronExpression
}
count++;
await flowVersionRepo.update(flowVersion.id, flowVersion);
}
}
}
logger.info("rolling back migrateSchedule1679014156667, finished flows " + count);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ function validateProps(props: PieceProperty, input: Record<string, unknown>) {

function buildSchema(props: PieceProperty): TSchema {
const entries = Object.entries(props);
const nonNullableUnknownPropType = Type.Not(Type.Null(),Type.Unknown());
const propsSchema: Record<string, TSchema> = {};
for (let i = 0; i < entries.length; ++i) {
const property = entries[i][1];
Expand All @@ -250,17 +251,17 @@ function buildSchema(props: PieceProperty): TSchema {
});
break;
case PropertyType.CHECKBOX:
propsSchema[name] = Type.Boolean({});
propsSchema[name] = Type.Union([Type.Boolean(),Type.String({})]) ;
break;
case PropertyType.NUMBER:
// Because it could be a variable
propsSchema[name] = Type.String({});
break;
case PropertyType.STATIC_DROPDOWN:
propsSchema[name] = Type.Any({});
propsSchema[name] = nonNullableUnknownPropType;
break;
case PropertyType.DROPDOWN:
propsSchema[name] = Type.Any({});
propsSchema[name] = nonNullableUnknownPropType;
break;
case PropertyType.OAUTH2:
// Only accepts connections variable.
Expand Down
1 change: 0 additions & 1 deletion packages/backend/src/app/flows/flow.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export const flowService = {
});
const queryBuilder = flowRepo.createQueryBuilder("flow").where({ collectionId, projectId });
const { data, cursor } = await paginator.paginate(queryBuilder.where({ collectionId }));
// TODO REPLACE WITH SQL QUERY
const flowVersionsPromises: Array<Promise<FlowVersion | null>> = [];
data.forEach((collection) => {
flowVersionsPromises.push(flowVersionService.getFlowVersion(projectId, collection.id, undefined, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ export const triggerEventService = {
switch (trigger.type) {
case TriggerType.WEBHOOK:
break;
case TriggerType.SCHEDULE:
throw new Error("Not implemented");
case TriggerType.PIECE:
throw new Error("Not implemented");
case TriggerType.EMPTY:
Expand Down Expand Up @@ -62,8 +60,6 @@ function getSourceName(trigger: Trigger): string {
switch (trigger.type) {
case TriggerType.WEBHOOK:
return TriggerType.WEBHOOK;
case TriggerType.SCHEDULE:
return TriggerType.SCHEDULE;
case TriggerType.PIECE:{
const pieceTrigger = trigger as PieceTrigger;
return pieceTrigger.settings.pieceName +"@" + pieceTrigger.settings.pieceVersion + ":" + pieceTrigger.settings.triggerName;
Expand Down
38 changes: 6 additions & 32 deletions packages/backend/src/app/helper/trigger-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ import {
import { ActivepiecesError, ErrorCode } from "@activepieces/shared";
import { flowQueue } from "../workers/flow-worker/flow-queue";
import { engineHelper } from "./engine-helper";
import { logger } from "../helper/logger";
import { getPiece } from "@activepieces/pieces-apps";
import { webhookService } from "../webhooks/webhook-service";
import { appEventRoutingService } from "../app-event-routing/app-event-routing.service";
import { captureException } from "@sentry/node";

const EVERY_FIVE_MINUTES = "*/5 * * * *";

export const triggerUtils = {
async executeTrigger({ payload, flowVersion, projectId, collectionId}: ExecuteTrigger): Promise<unknown[]> {
const flowTrigger = flowVersion.trigger;
Expand Down Expand Up @@ -65,23 +62,6 @@ export const triggerUtils = {
case TriggerType.PIECE:
await enablePieceTrigger({ collectionId, projectId, flowVersion });
break;

case TriggerType.SCHEDULE:
console.log("Created Schedule for flow version Id " + flowVersion.id);

await flowQueue.add({
id: flowVersion.id,
data: {
environment: RunEnvironment.PRODUCTION,
projectId: projectId,
collectionId,
flowVersion,
triggerType: TriggerType.SCHEDULE,
},
cronExpression: flowVersion.trigger.settings.cronExpression,
});

break;
default:
break;
}
Expand All @@ -92,14 +72,6 @@ export const triggerUtils = {
case TriggerType.PIECE:
await disablePieceTrigger({ collectionId, projectId, flowVersion });
break;

case TriggerType.SCHEDULE:
console.log("Deleted Schedule for flow version Id " + flowVersion.id);
await flowQueue.removeRepeatableJob({
id: flowVersion.id
});
break;

default:
break;
}
Expand Down Expand Up @@ -148,11 +120,12 @@ const enablePieceTrigger = async ({ flowVersion, projectId, collectionId }: Enab
for(const listener of listeners){
await appEventRoutingService.createListeners({projectId, flowId: flowVersion.flowId, appName, events: listener.events, identifierValue: listener.identifierValue });
}
}
break;
}
case TriggerStrategy.WEBHOOK:
break;
case TriggerStrategy.POLLING:
case TriggerStrategy.POLLING: {
const scheduleOptions = (response as ExecuteTriggerResponse).scheduleOptions;
await flowQueue.add({
id: flowVersion.id,
data: {
Expand All @@ -162,10 +135,11 @@ const enablePieceTrigger = async ({ flowVersion, projectId, collectionId }: Enab
flowVersion,
triggerType: TriggerType.PIECE,
},
cronExpression: EVERY_FIVE_MINUTES,
scheduleOptions: scheduleOptions,
});

break;

}
}
};

Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/app/webhooks/webhook-controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FastifyPluginCallback, FastifyReply, FastifyRequest } from "fastify";
import { FastifyPluginCallback, FastifyRequest } from "fastify";
import { StatusCodes } from "http-status-codes";
import { Static, Type } from "@sinclair/typebox";
import { ApId } from "@activepieces/shared";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ const repeatableJobConsumer = new Worker<RepeatableJobData, unknown, ApId>(

try {
switch (data.triggerType) {
case TriggerType.SCHEDULE:
await consumeScheduleTrigger(data);
break;
case TriggerType.PIECE:
await consumePieceTrigger(data);
break;
Expand All @@ -62,14 +59,6 @@ const repeatableJobConsumer = new Worker<RepeatableJobData, unknown, ApId>(
}
);

const consumeScheduleTrigger = async (data: RepeatableJobData): Promise<void> => {
await flowRunService.start({
environment: data.environment,
flowVersionId: data.flowVersion.id,
collectionId: data.collectionId,
payload: null,
});
};

const consumePieceTrigger = async (data: RepeatableJobData): Promise<void> => {
const flowVersion = await flowVersionService.getOne(data.flowVersion.id);
Expand Down
Loading

0 comments on commit 95074ae

Please sign in to comment.