Skip to content

Commit

Permalink
Start a basic server which runs a job on POST requests
Browse files Browse the repository at this point in the history
  • Loading branch information
josephjclark committed Aug 25, 2022
1 parent 0ffbf66 commit b9394eb
Show file tree
Hide file tree
Showing 13 changed files with 806 additions and 121 deletions.
9 changes: 7 additions & 2 deletions packages/runtime-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,28 @@
"scripts": {
"test": "pnpm ava",
"build": "rimraf dist/ .rollup.cache && rollup -c",
"watch": "pnpm rollup -cw --no-watch.clearScreen"
"watch": "pnpm rollup -cw --no-watch.clearScreen",
"serve": "nodemon -e ts --exec 'tsm --experimental-vm-modules src/server/index.ts'"
},
"author": "Joe Clark <jclark@openfn.org>",
"license": "ISC",
"dependencies": {
"@openfn/runtime": "workspace:^0.0.1",
"@types/koa": "^2.13.5",
"@types/workerpool": "^6.1.0",
"koa": "^2.13.4",
"piscina": "^3.2.0",
"tsm": "^2.2.2",
"workerpool": "^6.2.1"
},
"devDependencies": {
"@rollup/plugin-typescript": "^8.3.2",
"@types/node": "^17.0.31",
"ava": "^4.2.0",
"nodemon": "^2.0.19",
"rollup": "^2.72.1",
"rollup-plugin-dts": "^4.2.1",
"ts-node": "^10.7.0",
"ts-node": "^10.8.1",
"tslib": "^2.4.0",
"typescript": "^4.6.4"
}
Expand Down
13 changes: 0 additions & 13 deletions packages/runtime-manager/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ export default [
typescript({ tsconfig: "./tsconfig.json" }),
],
},
{
input: "test/t.ts",
output: [
{
file: "dist/t.js",
format: "esm",
sourcemap: true,
},
],
plugins: [
typescript({ tsconfig: "./tsconfig.json" }),
],
},
{
input: "src/worker.ts",
output: [{
Expand Down
34 changes: 19 additions & 15 deletions packages/runtime-manager/src/Manager.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { EventEmitter } from 'node:events';
import path from 'node:path';
//import workerpool from 'workerpool';
import Piscina from 'piscina';

class Bus extends EventEmitter {};

type JobRegistry = Record<string, string>;

// Manages a pool of workers
Expand All @@ -14,14 +10,23 @@ const Manager = function() {
const workers = new Piscina({
filename: path.resolve('./dist/worker.js')
});
workers.on('ready', console.log)

workers.on('message', console.log)

// Maintain state of each job
// I really really want some details about the thread its running in...
const state: Record<string, any> = {};

// Run a job in a worker
// Accepts the name of a registered job
const run = async (name: string) => {
const run = async (name: string, state: any) => {
const src = registry[name];
if (src) {
//return await workers.exec('runJob', [src])
return await workers.run(src)
// need a unique job + process id to go here
state[name] = true
const result = await workers.run([src, state])
delete state[name];
return result;
}
throw new Error("Job not found: " + name);
};
Expand All @@ -38,18 +43,17 @@ const Manager = function() {

const getRegisteredJobs = () => Object.keys(registry);

// const getActiveJobs = () => {

// }

const bus = new Bus();

const getActiveJobs = () => {
return workers.threads;
}

return {
run,
registerJob,
getRegisteredJobs,
on: (evt: string, fn: () => void) => bus.on(evt, fn),
getActiveJobs,
// subscribe directly to worker events
on: (evt: string, fn: () => void) => workers.on(evt, fn),
// I don't think we actually want a publish event?
}
};
Expand Down
67 changes: 66 additions & 1 deletion packages/runtime-manager/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,67 @@
import koa from 'koa';
import fs from 'node:fs/promises';
import path from 'node:path';
import Manager from '../Manager';

const loadJobs = async () => {
for (const name of ['slow-random']) {
const source = await fs.readFile(path.resolve(`src/server/jobs/${name}.js`), { encoding: 'utf8' });
runtime.registerJob(name, source);
}
console.log('Jobs loaded:')
console.log(runtime.getRegisteredJobs());
};

const app = new koa();

console.log('starting server')

const runtime = Manager();

loadJobs();

// Create http server
export default {}
// GET works return alist of workers

// on post to job/name we run that job

// need a web socket to listen to and report changes

const handlePost = (ctx: koa.Context) => {
ctx;
// start a job
runJob('slow-random')
};

const runJob = async (name: string) => {
console.log(`Running job: ${name}...`)

const result = await runtime.run(name, {
configuration: {
delay: 4000
}
});

console.log('--')
console.log(`Job ${name} finished`)
console.log(result)
console.log('--')
report();
}

const report = () => {
const threadCount = runtime.getActiveJobs().length;
console.log(`active threads: ${threadCount}`)
}

app.use((ctx) => {
if (ctx.method === "POST") {
handlePost(ctx);
}
})

app.listen(1234)

report();

export default {}
9 changes: 9 additions & 0 deletions packages/runtime-manager/src/server/jobs/slow-random.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// This job takes a random number of seconds and returns a random number
const slowmo = (state) => new Promise((resolve) => {
const done = () => {
resolve({ data: { result: Math.random() * 100 }})
};
setTimeout(done, state.configuration?.delay ?? 500);
});

export default [slowmo];
6 changes: 6 additions & 0 deletions packages/runtime-manager/src/server/manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// middleware to handle the worker manager
const middleware = () => {

}

export default middleware;
18 changes: 3 additions & 15 deletions packages/runtime-manager/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
// // Dedicated worker for running jobs
// // Interesting plot twist: how do we enable vm modules in the thread?
// import workerpool from 'workerpool';
// import run from '@openfn/runtime';

// const runJob = async (src: string) => {
// return await run(src);
// };

// workerpool.worker({
// runJob
// })

// Dedicated worker for running jobs
// Security thoughts: the process inherits the node command arguments
// (it has to for experimental modules to work)
// Is this a concern? If secrets are passed in they could be visible
// The sandbox should hep
import run from '@openfn/runtime';

export default async (src: string) => {
return await run(src);
export default async (args: [string, any]) => {
const [src, state] = args;
return await run(src, state);
};
74 changes: 74 additions & 0 deletions packages/runtime-manager/test/jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import test from 'ava';
import execute from '@openfn/runtime';
import slowmo from '../src/server/jobs/slow-random';

type SlowMoState = {
data: {
result: number;
}
}

const wait = async(time: number) => new Promise(resolve => {
setTimeout(resolve, time);
});

test('slowmo should return a value', async (t) => {
const result = await execute(slowmo) as SlowMoState;

t.assert(result);
t.assert(result.data.result);
t.falsy(isNaN(result.data.result))
});

test('slowmo should return after 500ms', async (t) => {
let result;

execute(slowmo).then((r)=> {
result = r;
});

// Should not return immediately
t.falsy(result);

await wait(100)
t.falsy(result);

// Should have returned by now
await wait(500);
// @ts-ignore
t.falsy(isNaN(result.data.result))
});

test('slowmo should accept a delay time as config', async (t) => {
let result;

const state = {
configuration: {
delay: 10
},
data: {}
};
execute(slowmo, state).then((r)=> {
result = r;
});

// Should not return immediately
t.falsy(result);

// Should have data already
await wait(100)
// @ts-ignore
t.falsy(isNaN(result.data.result))
});

test('slowmo should return random numbers', async (t) => {
const state = {
configuration: {
delay: 1
},
data: {}
};
const a = await execute(slowmo, state)
const b = await execute(slowmo, state)
t.assert(a !== b);
})
42 changes: 42 additions & 0 deletions packages/runtime-manager/test/manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import test from 'ava';
import Manager from '../src/Manager';

test('Should create a new manager', (t) => {
const m = Manager();
t.assert(m);
t.assert(m.run);
});

test('Should register a job', (t) => {
const m = Manager();
m.registerJob('my_job', 'x');
t.assert(m);
});

test('Should throw if registering a job that already exists', (t) => {
const m = Manager();
m.registerJob('my_job', 'x');
t.throws(() => m.registerJob('my_job', 'x'));
});

test('Should return a registered job list', (t) => {
const m = Manager();
m.registerJob('my_job', 'x');
m.registerJob('my_other_job', 'x');

t.deepEqual(m.getRegisteredJobs(), ['my_job', 'my_other_job']);
});

// The ava test runner doesn't seem to be getting the experimental_vm_modules flag and so this fails :(
test.skip('Should run a simple job', async (t) => {
const m = Manager();
m.registerJob('test', 'export default [() => 10];');
const result = await m.run('test');
// @ts-ignore
t.assert(result === 10);
});

// should publish an event when a job starts
// should publish an event when a job stops
// should return a job list
// should return a list of active jobs
41 changes: 0 additions & 41 deletions packages/runtime-manager/test/manger.test.ts

This file was deleted.

Loading

0 comments on commit b9394eb

Please sign in to comment.