Skip to content

Commit

Permalink
Setup basic scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
AdiRishi committed Jan 31, 2024
1 parent 1998311 commit ad3a981
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 28 deletions.
29 changes: 29 additions & 0 deletions src/business-logic/schedule-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Env } from '..';
import { bulkUpdateStatus, getEventsToBeScheduled } from '../db/data-layer';
import { chunkArray, groupBy } from '../utils/array';

export async function scheduleAllPendingEvents(env: Env) {
const queuesBySlug = new Map<string, Queue>();
queuesBySlug.set('OUTBOUND_QUEUE', env.OUTBOUND_QUEUE);

for await (const events of getEventsToBeScheduled(env)) {
const eventsByQueue = groupBy(events, (e) => e.queueSlug);
for (const [queueSlug, queueEvents] of eventsByQueue) {
const queue = queuesBySlug.get(queueSlug);
if (!queue) {
throw new Error(`Queue not found for slug: ${queueSlug}`);
}
const chunks = chunkArray(queueEvents, 100);
for (const chunk of chunks) {
await queue.sendBatch(
chunk.map((e) => ({ body: JSON.parse(e.data) as unknown, contentType: 'json' }))
);
await bulkUpdateStatus(
chunk.map((e) => e.id),
'SUCCEEDED',
env
);
}
}
}
}
42 changes: 41 additions & 1 deletion src/db/data-layer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { eq } from 'drizzle-orm';
import { eq, sql, inArray } from 'drizzle-orm';
import { Env } from '..';
import { getDrizzleClient } from './client';
import { eventTable, EventInsertType, EventSelectType } from './schema';
Expand All @@ -12,3 +12,43 @@ export async function getEventById(id: string, env: Env): Promise<EventSelectTyp
const db = getDrizzleClient(env);
return await db.select().from(eventTable).where(eq(eventTable.id, id)).get();
}

export async function bulkUpdateStatus(
ids: string[],
newStatus: EventSelectType['status'],
env: Env
) {
const db = getDrizzleClient(env);
await db
.update(eventTable)
.set({ status: newStatus })
.where(inArray(eventTable.id, ids))
.execute();
}

export async function* getEventsToBeScheduled(env: Env, batchSize = 1000, cutoffLength = 10_000) {
const db = getDrizzleClient(env);
const queryStatement = db
.select({
id: eventTable.id,
data: eventTable.data,
queueSlug: eventTable.queueSlug,
dateScheduledUtc: eventTable.dateScheduledUtc,
})
.from(eventTable)
.where(
sql`${eventTable.dateScheduledUtc} <= unixepoch() AND ${eventTable.status} = 'SCHEDULED'`
)
.limit(sql.placeholder('batchSize'))
.prepare();

let totalEventsProcessed = 0;
let noMoreEvents = false;

do {
const events = await queryStatement.all({ batchSize });
totalEventsProcessed += events.length;
noMoreEvents = events.length === 0;
yield events;
} while (totalEventsProcessed < cutoffLength && !noMoreEvents);
}
4 changes: 1 addition & 3 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ export const eventTable = sqliteTable(
.notNull()
.primaryKey()
.$default(() => createId()),
dataKey: text('data_key').notNull(),
dataLocation: text('data_location', { enum: ['R2', 'KV'] }).notNull(),
data: text('data').notNull(),
queueSlug: text('queue_slug').notNull(),
dateAddedUtc: epochSeconds('date_added_utc')
.notNull()
Expand All @@ -23,7 +22,6 @@ export const eventTable = sqliteTable(
.default('SCHEDULED'),
},
(table) => ({
dataKeyIdx: index('event_data_key_idx').on(table.dataKey),
dateScheduledUtcStatusIdx: index('event_date_scheduled_utc_status_idx').on(
table.dateScheduledUtc,
table.status
Expand Down
6 changes: 6 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { scheduleAllPendingEvents } from './business-logic/schedule-events';
import { app } from './routes';

export type Env = {
ENVIRONMENT: 'testing' | 'development' | 'production';
__D1_BETA_DB: D1Database;
DB: D1Database;
OUTBOUND_QUEUE: Queue;
};

export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
return app.fetch(request, env, ctx);
},

async scheduled(_event: ScheduledEvent, env: Env, _ctx: ExecutionContext): Promise<void> {
await scheduleAllPendingEvents(env);
},
};
66 changes: 44 additions & 22 deletions src/routes/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,52 @@ import { createId } from '../db/createId';

export const v1Router = new Hono<{ Bindings: Env }>();

const v1EventSchema = z.object({
data: z.record(z.unknown()),
queueIdentifier: z.string(),
scheduleAt: z.coerce.date(),
});

v1Router.post('/event', zValidator('json', v1EventSchema), async (c) => {
const bodyData = c.req.valid('json');
const eventId = createId();
const event = await saveEvent(
{
id: eventId,
data: JSON.stringify(bodyData.data),
queueSlug: bodyData.queueIdentifier,
dateScheduledUtc: bodyData.scheduleAt,
},
c.env
);
return c.json(event);
});

v1Router.post(
'/event',
zValidator(
'json',
z.object({
data: z.record(z.unknown()),
dataLocation: z.enum(['KV', 'R2']),
queueIdentifier: z.string(),
scheduleAt: z.coerce.date(),
})
),
'/event/batch',
zValidator('json', z.object({ events: z.array(v1EventSchema).max(512) })),
async (c) => {
const bodyData = c.req.valid('json');
const eventId = createId();
const event = await saveEvent(
{
id: eventId,
dataKey: `${bodyData.queueIdentifier}:${eventId}`,
dataLocation: bodyData.dataLocation,
queueSlug: bodyData.queueIdentifier,
dateScheduledUtc: bodyData.scheduleAt,
},
c.env
);
return c.json(event);
const eventPromises = bodyData.events.map(async (event) => {
const eventId = createId();
return await saveEvent(
{
id: eventId,
data: JSON.stringify(event.data),
queueSlug: event.queueIdentifier,
dateScheduledUtc: event.scheduleAt,
},
c.env
);
});
const settledEvents = await Promise.allSettled(eventPromises);
const events = settledEvents.map((settledEvent) => {
if (settledEvent.status === 'rejected') {
console.log(settledEvent.reason);
return { error: settledEvent.reason as unknown };
}
return settledEvent.value;
});
return c.json({ events });
}
);
41 changes: 41 additions & 0 deletions src/utils/array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Splits an array into chunks of the specified size.
*
* @param {Array<T>} array The array to split.
* @param {number} chunkSize The size of each chunk.
* @returns {Array<Array<T>>} An array of chunks.
*/
export function chunkArray<T>(array: T[], chunkSize: number): T[][] {
if (chunkSize < 1) {
throw new Error(`chunkSize must be greater than 0, got ${chunkSize}`);
}
const chunkedArray: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunkedArray.push(array.slice(i, i + chunkSize));
}
return chunkedArray;
}

/**
* Groups an array of items by a callback.
*
* @param {Array<T>} array The array to group.
* @param {GroupByCallback<T>} callback The callback to group by.
* @returns {Map<string, T[]>} A map of groups.
*/
export function groupBy<T>(array: T[], callback: GroupByCallback<T>): Map<string, T[]> {
const map = new Map<string, T[]>();

for (const item of array) {
const key = callback(item);
const collection = map.get(key);
if (!collection) {
map.set(key, [item]);
} else {
collection.push(item);
}
}

return map;
}
type GroupByCallback<T> = (item: T) => string;
4 changes: 2 additions & 2 deletions wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENVIRONMENT = 'production'

[[d1_databases]]
binding = "DB" # i.e. available in your Worker on env.DB
database_name = "cloudflare-queues-scheduler-db"
database_id = "06bc7967-bda8-403c-8f08-80be47c7c6fc"
database_name = "scheduler-db"
database_id = "aa0a2253-b40c-49c5-88a8-6d67671f8012"
migrations_table = "d1_migrations"
migrations_dir = "d1_migrations"

0 comments on commit ad3a981

Please sign in to comment.