Skip to content

Commit

Permalink
Merge pull request #4024 from activepieces/fix/rate-limit
Browse files Browse the repository at this point in the history
fix: use redis for task limit
  • Loading branch information
abuaboud committed Feb 26, 2024
2 parents a15d79c + 3306f19 commit c48c333
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 23 deletions.
48 changes: 43 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -57,7 +57,7 @@ async function sendProjectRecords(timestamp: number): Promise<void> {
assertNotNullOrUndefined(item, 'No item found for tasks')
const project = await projectService.getOneOrThrow(projectId)
const billingPeriod = projectUsageService.getCurrentingStartPeriod(project.created)
const usage = await projectUsageService.getDayUsageForBillingPeriod(projectId, billingPeriod)
const usage = await projectUsageService.getUsageForBillingPeriod(projectId, billingPeriod)
await stripe.subscriptionItems.createUsageRecord(item.id, {
quantity: Math.max(usage.tasks - projectBilling.includedTasks, 0),
timestamp: dayjs(timestamp).unix(),
Expand Down
19 changes: 18 additions & 1 deletion packages/server/api/src/app/ee/flow-run/cloud-flow-run-hooks.ts
@@ -1,4 +1,6 @@
import { ApEdition } from '@activepieces/shared'
import { FlowRunHooks } from '../../flows/flow-run/flow-run-hooks'
import { getEdition } from '../../helper/secret-helper'
import { tasksLimit } from '../project-plan/tasks-limit'

export const platformRunHooks: FlowRunHooks = {
Expand All @@ -7,4 +9,19 @@ export const platformRunHooks: FlowRunHooks = {
projectId,
})
},
}
async onFinish({
projectId,
tasks,
}: {
projectId: string
tasks: number
}): Promise<void> {
const edition = getEdition()
if ([ApEdition.CLOUD, ApEdition.ENTERPRISE].includes(edition)) {
await tasksLimit.incrementOrCreateRedisRecord(
projectId,
tasks,
)
}
},
}
53 changes: 39 additions & 14 deletions packages/server/api/src/app/ee/project-plan/tasks-limit.ts
Expand Up @@ -4,14 +4,17 @@ import {
ErrorCode,
ProjectId,
ProjectPlan,
isNil,
} from '@activepieces/shared'

import { projectLimitsService } from './project-plan.service'
import { exceptionHandler } from 'server-shared'
import dayjs from 'dayjs'
import { apDayjs } from '../../helper/dayjs-helper'
import { flowRunService } from '../../flows/flow-run/flow-run-service'
import { getEdition } from '../../helper/secret-helper'
import { projectUsageService } from '../../project/usage/project-usage-service'
import { createRedisClient } from '../../database/redis-connection'
import { Redis } from 'ioredis'
import { projectService } from '../../project/project-service'

async function limitTasksPerMonth({
projectPlan,
Expand Down Expand Up @@ -43,10 +46,7 @@ async function limit({ projectId }: { projectId: ProjectId }): Promise<void> {
if (!projectPlan) {
return
}
const consumedTasks = await flowRunService.getTasksUsedAfter({
projectId,
created: dayjs(nextResetDatetime(projectPlan.created)).subtract(30, 'days').toISOString(),
})
const consumedTasks = await incrementOrCreateRedisRecord(projectId, 0)
await limitTasksPerMonth({
consumedTasks,
projectPlan,
Expand All @@ -68,13 +68,38 @@ async function limit({ projectId }: { projectId: ProjectId }): Promise<void> {

export const tasksLimit = {
limit,
incrementOrCreateRedisRecord,
}

function nextResetDatetime(datetime: string): string {
const thirtyDaysInMs = 30 * 24 * 60 * 60 * 1000
const date = apDayjs(datetime)
const currentDate = apDayjs()
const nextResetInMs =
thirtyDaysInMs - (currentDate.diff(date, 'millisecond') % thirtyDaysInMs)
return currentDate.add(nextResetInMs, 'millisecond').toISOString()
}
const getRedisConnection = (() => {
let redis: Redis | null = null

return (): Redis => {
if (!isNil(redis)) {
return redis
}
redis = createRedisClient()
return redis
}
})()


async function incrementOrCreateRedisRecord(projectId: string, incrementBy: number): Promise<number> {
const project = await projectService.getOneOrThrow(projectId)
const billingPeriod = projectUsageService.getCurrentingStartPeriod(project.created)
const key = `project-usage:${projectId}:${billingPeriod}`
const redis = getRedisConnection()
const keyExists = await redis.exists(key)

if (keyExists === 0) {
const consumedTasks = await flowRunService.getTasksUsedAfter({
projectId,
created: billingPeriod,
})
await redis.set(key, consumedTasks)
return incrementBy
}
else {
return redis.incrby(key, incrementBy)
}
}
Expand Up @@ -137,7 +137,7 @@ async function enrichWithUsageAndPlan(
return {
...project,
plan: await projectLimitsService.getOrCreateDefaultPlan(project.id, DEFAULT_FREE_PLAN_LIMIT),
usage: await projectUsageService.getDayUsageForBillingPeriod(project.id, projectUsageService.getCurrentingStartPeriod(project.created)),
usage: await projectUsageService.getUsageForBillingPeriod(project.id, projectUsageService.getCurrentingStartPeriod(project.created)),
}
}

Expand Down
10 changes: 10 additions & 0 deletions packages/server/api/src/app/flows/flow-run/flow-run-hooks.ts
@@ -1,11 +1,21 @@
export type FlowRunHooks = {
onPreStart({ projectId }: { projectId: string }): Promise<void>
onFinish({
projectId,
tasks,
}: {
projectId: string
tasks: number
}): Promise<void>
}

const emptyHooks: FlowRunHooks = {
async onPreStart() {
// DO NOTHING
},
async onFinish() {
// DO NOTHING
},
}

let hooks = emptyHooks
Expand Down
Expand Up @@ -16,6 +16,7 @@ import {
import { JobType } from '../../workers/flow-worker/queues/queue'
import { notifications } from '../../helper/notifications'
import { HookType } from './flow-run-service'
import { flowRunHooks } from './flow-run-hooks'

type StartParams = {
flowRun: FlowRun
Expand Down Expand Up @@ -46,6 +47,10 @@ const calculateDelayForResumeJob = (

export const flowRunSideEffects = {
async finish({ flowRun }: { flowRun: FlowRun }): Promise<void> {
await flowRunHooks
.getHooks()
.onFinish({ projectId: flowRun.projectId, tasks: flowRun.tasks! })

await notifications.notifyRun({
flowRun,
})
Expand Down
Expand Up @@ -5,7 +5,7 @@ import { apDayjs } from '../../helper/dayjs-helper'


export const projectUsageService = {
async getDayUsageForBillingPeriod(projectId: string, startBillingPeriod: string): Promise<ProjectUsage> {
async getUsageForBillingPeriod(projectId: string, startBillingPeriod: string): Promise<ProjectUsage> {
const flowTasks = await flowRunService.getTasksUsedAfter({
projectId,
created: getCurrentingStartPeriod(startBillingPeriod),
Expand Down

0 comments on commit c48c333

Please sign in to comment.