Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(orchestration): introducing the scheduler #2132

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ COPY packages/data-ingestion/package.json ./packages/data-ingestion/package.json
COPY packages/logs/package.json ./packages/logs/package.json
COPY packages/records/package.json ./packages/records/package.json
COPY packages/types/package.json ./packages/types/package.json
COPY packages/scheduler/package.json ./packages/scheduler/package.json
COPY package*.json ./

# Install every dependencies
Expand Down
27 changes: 26 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"packages/records",
"packages/server",
"packages/runner",
"packages/scheduler",
bodinsamuel marked this conversation as resolved.
Show resolved Hide resolved
"packages/persist",
"packages/jobs",
"packages/webapp",
Expand Down
14 changes: 0 additions & 14 deletions packages/logs/lib/vitest.d.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/logs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
"path": "../utils"
}
],
"include": ["lib/**/*"]
"include": ["lib/**/*", "../utils/lib/vitest.d.ts"]
}
14 changes: 0 additions & 14 deletions packages/records/lib/vitest.d.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/records/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
"outDir": "dist"
},
"references": [{ "path": "../utils" }],
"include": ["lib/**/*"]
"include": ["lib/**/*", "../utils/lib/vitest.d.ts"]
}
4 changes: 4 additions & 0 deletions packages/scheduler/lib/db/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import knex from 'knex';
import { config } from './config.js';

export const db = knex(config);
30 changes: 30 additions & 0 deletions packages/scheduler/lib/db/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { envs } from '../env.js';
import type { Knex } from 'knex';

export const schema = envs.SCHEDULER_DATABASE_SCHEMA;
export const databaseUrl =
envs.SCHEDULER_DATABASE_URL ||
envs.NANGO_DATABASE_URL ||
`postgres://${envs.NANGO_DB_USER}:${envs.NANGO_DB_PASSWORD}@${envs.NANGO_DB_HOST}:${envs.NANGO_DB_PORT}/${envs.NANGO_DB_NAME}`;

const runningMigrationOnly = process.argv.some((v) => v === 'migrate:latest');
const isJS = !runningMigrationOnly;

const config: Knex.Config = {
client: 'postgres',
connection: {
connectionString: databaseUrl,
statement_timeout: 60000
},
searchPath: schema,
pool: { min: 2, max: 50 },
migrations: {
extension: isJS ? 'js' : 'ts',
directory: 'migrations',
tableName: 'migrations',
loadExtensions: [isJS ? '.js' : '.ts'],
schemaName: schema
}
};

export { config };
21 changes: 21 additions & 0 deletions packages/scheduler/lib/db/migrate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { logger } from '../utils/logger.js';
import { db } from './client.js';
import { schema, config } from './config.js';
import { dirname } from '../env.js';
import path from 'node:path';

export async function migrate(): Promise<void> {
logger.info('[scheduler] migration');
const dir = path.join(dirname, 'scheduler/dist/db/migrations');
await db.raw(`CREATE SCHEMA IF NOT EXISTS ${schema}`);

const [, pendingMigrations] = (await db.migrate.list({ ...config.migrations, directory: dir })) as [unknown, string[]];

if (pendingMigrations.length === 0) {
logger.info('[scheduler] nothing to do');
return;
}

await db.migrate.latest({ ...config.migrations, directory: dir });
logger.info('[scheduler] migrations completed.');
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import type { Knex } from 'knex';
import { TASKS_TABLE } from '../../models/tasks.js';

export async function up(knex: Knex): Promise<void> {
await knex.transaction(async (trx) => {
await trx.raw(`
CREATE TYPE task_states AS ENUM (
'CREATED',
'STARTED',
'SUCCEEDED',
'FAILED',
'EXPIRED',
'CANCELLED'
);
`);
await trx.raw(`
CREATE TABLE ${TASKS_TABLE} (
id uuid PRIMARY KEY,
name varchar(255) NOT NULL,
payload json NOT NULL,
group_key varchar(255) NOT NULL,
retry_max integer NOT NULL default(0),
retry_count integer NOT NULL default(0),
starts_after timestamp with time zone NOT NULL,
created_to_started_timeout_secs integer NOT NULL,
started_to_completed_timeout_secs integer NOT NULL,
heartbeat_timeout_secs integer NOT NULL,
created_at timestamp with time zone NOT NULL,
state task_states NOT NULL,
last_state_transition_at timestamp with time zone NOT NULL,
last_heartbeat_at timestamp with time zone NOT NULL,
output json NULL,
terminated boolean
);
`);
// TODO: add indexes
});
}

export async function down(knex: Knex): Promise<void> {
await knex.raw(`DROP TABLE IF EXISTS ${TASKS_TABLE}`);
await knex.raw(`DROP TYPE IF EXISTS task_states`);
}
10 changes: 10 additions & 0 deletions packages/scheduler/lib/db/test.helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { db } from './client.js';
import { schema } from './config.js';

/*********************************/
/* WARNING: to use only in tests */
/*********************************/

export async function clearDb(): Promise<void> {
await db.raw(`DROP SCHEMA IF EXISTS ${schema} CASCADE`);
}
7 changes: 7 additions & 0 deletions packages/scheduler/lib/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { ENVS, parseEnvs } from '@nangohq/utils';

export const envs = parseEnvs(ENVS);
export const filename = fileURLToPath(import.meta.url);
export const dirname = path.dirname(path.join(filename, '../../'));
3 changes: 3 additions & 0 deletions packages/scheduler/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './db/migrate.js';
export * from './scheduler.js';
export * from './types.js';
Loading
Loading