Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/ripe-wolves-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/engine-multi': patch
'@openfn/ws-worker': patch
---

Fix an issue where memory may not be released after runs
File renamed without changes.
14 changes: 0 additions & 14 deletions packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,9 @@ export const workflowComplete = (
const { workflowId, state: result, threadId } = event;

logger.success('complete workflow ', workflowId);
//logger.info(event.state);

// TODO I don't know how we'd get here in this architecture
// if (!allWorkflows.has(workflowId)) {
// throw new Error(`Workflow with id ${workflowId} is not defined`);
// }

state.status = 'done';
state.duration = Date.now() - state.startTime!;

// Important! We do NOT write the result back to this state object
// It has a tendency to not get garbage collected and causing memory problems

// TODO do we have to remove this from the active workflows array?
// const idx = activeWorkflows.findIndex((id) => id === workflowId);
// activeWorkflows.splice(idx, 1);

// forward the event on to any external listeners
context.emit(externalEvents.WORKFLOW_COMPLETE, {
threadId,
Expand Down
22 changes: 1 addition & 21 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import type {
EventHandler,
ExecuteOptions,
RuntimeEngine,
WorkflowState,
} from './types';
import type { AutoinstallOptions } from './api/autoinstall';

Expand Down Expand Up @@ -100,7 +99,6 @@ const createEngine = async (
options: EngineOptions,
workerPath?: string
): Promise<InternalEngine> => {
const states: Record<string, WorkflowState> = {};
const contexts: Record<string, ExecutionContext> = {};
const deferredListeners: Record<string, Record<string, EventHandler>[]> = {};

Expand Down Expand Up @@ -144,17 +142,6 @@ const createEngine = async (
retries: options.workerValidationRetries,
});

const registerWorkflow = (plan: ExecutionPlan, input: State) => {
// TODO throw if already registered?
const state = createState(plan, input);
states[state.id] = state;
return state;
};

const getWorkflowState = (workflowId: string) => states[workflowId];

const getWorkflowStatus = (workflowId: string) => states[workflowId]?.status;

// TODO too much logic in this execute function, needs farming out
// I don't mind having a wrapper here but it must be super thin
// TODO maybe engine options is too broad?
Expand All @@ -165,13 +152,9 @@ const createEngine = async (
) => {
options.logger!.debug('executing plan ', plan?.id ?? '<no id>');
const workflowId = plan.id!;
// TODO throw if plan is invalid
// Wait, don't throw because the server will die
// Maybe return null instead
const state = registerWorkflow(plan, input);

const context = new ExecutionContext({
state,
state: createState(plan, input),
logger: options.logger!,
callWorker,
options: {
Expand Down Expand Up @@ -244,9 +227,6 @@ const createEngine = async (
options,
workerPath: resolvedWorkerPath,
logger: options.logger,
registerWorkflow,
getWorkflowState,
getWorkflowStatus,
execute: executeWrapper,
listen,
destroy,
Expand Down
23 changes: 0 additions & 23 deletions packages/engine-multi/test/engine.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import test from 'ava';
import path from 'node:path';
import { createMockLogger } from '@openfn/logger';
import type { ExecutionPlan } from '@openfn/lexicon';

import createEngine, { InternalEngine } from '../src/engine';
import * as e from '../src/events';
Expand Down Expand Up @@ -53,28 +52,6 @@ test.serial('create an engine', async (t) => {

test.todo('throw if the worker is invalid');

test.serial('register a workflow', async (t) => {
const plan = { id: 'z' };
engine = await createEngine(options);

const state = engine.registerWorkflow(plan);

t.is(state.status, 'pending');
t.is(state.id, plan.id);
t.deepEqual(state.plan, plan);
});

test.serial('get workflow state', async (t) => {
const plan = { id: 'z' } as ExecutionPlan;
engine = await createEngine(options);

const s = engine.registerWorkflow(plan);

const state = engine.getWorkflowState(plan.id);

t.deepEqual(state, s);
});

test.serial('use the default worker path', async (t) => {
engine = await createEngine({ logger, repoDir: '.' });
t.true(engine.workerPath.endsWith('worker/thread/run.js'));
Expand Down
32 changes: 24 additions & 8 deletions packages/engine-multi/test/memtest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// run this file with limited memory
// --max-old-space-size=50 or whatever
// NODE_OPTIONS="--max-old-space-size=50"

import { getHeapStatistics } from 'node:v8';
import { createMockLogger } from '@openfn/logger';
import { randomUUID } from 'node:crypto';
import createAPI from '../src/api';
Expand All @@ -11,10 +11,16 @@ const logger = createMockLogger();

let api: any;

function heap(reason: string) {
const { used_heap_size } = getHeapStatistics();
const mb = used_heap_size / 1024 / 1024;
console.log(`>> [${reason}] Used heap at ${mb.toFixed(2)}mb`);
}

function run() {
const job = `
export default [(state) => {
state.data = new Array(1024 * 1024 * 4).fill('z').join('')
state.data = new Array(1024 * 1024 * 7).fill('z').join('')
return state
}]`;

Expand All @@ -29,29 +35,39 @@ function run() {
},
options: {},
};
console.log('>> running', plan.id);
// console.log('>> running', plan.id);

api.execute(plan, {});

api.listen(plan.id!, {
'workflow-complete': () => {
completedCount++;
heap('workflow-complete');
console.log('>> Finished', completedCount);

setTimeout(() => {
run();
}, 10);
// setTimeout(() => {
// run();
// }, 10);
},
});
}

const runBatch = () => {
for (let i = 0; i < 4; i++) {
run();
}
};

async function start() {
api = await createAPI({
logger,
maxWorkers: 1,
maxWorkers: 4,
});

run();
runBatch();
setInterval(() => {
runBatch();
}, 200);
}

start();
115 changes: 115 additions & 0 deletions packages/ws-worker/perf/artillery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// WORKER_DISABLE_EXIT_LISTENERS=true clinic heapprofiler -- node clinic.js 1 --open false --collect-only
// start lightning: pnpm start --port 9991
import { getHeapStatistics } from 'node:v8';
import createWorker from './dist/index.js';
import createRTE from '@openfn/engine-multi';
import createLightningServer from '@openfn/lightning-mock';

import payload from './payload.json' with { type: 'json' };
import { createMockLogger } from '@openfn/logger';
const obj = JSON.stringify({
data: payload.data,

// Only send part of the payload or the mock will throw an error
// This is enough data to prove the point
references: payload.references.slice(0,10)
})

let lng;
let worker;
let idgen = 1;

const WORKFLOW_COUNT = 1e6;
let workflowsFinished = 0;

setInterval(() => {
heap('POLL');
}, 1000);

await setup();
heap('SETUP');
await test();


function heap(reason) {
const { used_heap_size } = getHeapStatistics();
const mb = used_heap_size / 1024 / 1024;
console.log(`>> [${reason}] Used heap at ${mb.toFixed(2)}mb`);
}

// start the server
async function setup() {
const engineOptions = {
repoDir: process.env.OPENFN_REPO_DIR,
maxWorkers: 4,
logger: createMockLogger()
};

const engine = await createRTE(engineOptions);

worker = createWorker(engine, {
port: 9992,
lightning: 'ws://localhost:9991/worker',
maxWorkflows: 4,
backoff: {
min: 10,
max: 5e4,
},
logger: createMockLogger()
});

worker.on('workflow-complete', (evt) => {
heap('WORKFLOW:COMPLETE');
if (++workflowsFinished === WORKFLOW_COUNT) {
console.log('>> all done!');
// console.log('>> Hit CTRL+C to exit and generate heap profile');
// process.send('SIGINT');
// process.abort();
process.exit(0);
}
});
}

// send a bunch of jobs through
async function test() {
const sleep = (duration = 100) =>
new Promise((resolve) => setTimeout(resolve, duration));

let count = 0;
const max = 1;

while (count++ < WORKFLOW_COUNT) {
const w = wf();
await fetch(`http://localhost:9991/run`, {
method: 'POST',
body: JSON.stringify(w),
headers: {
'content-type': 'application/json',
},
keepalive: true,

});
// await sleep(5 * 1000);
await sleep(500);
}
}



function wf() {
const step = `
export default [() => {
return ${obj};
}]`;
return {
id: `run-${idgen++}`,
triggers: [],
edges: [],
jobs: [
{
id: 'a',
body: step,
},
],
};
}
File renamed without changes.
12 changes: 12 additions & 0 deletions packages/ws-worker/perf/mem-empty.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { getHeapStatistics } from 'node:v8';

function heap(reason) {
const { used_heap_size } = getHeapStatistics();
const mb = used_heap_size / 1024 / 1024;
console.log(`>> [${reason}] Used heap at ${mb.toFixed(2)}mb`);
}

heap('start');
setTimeout(() => {
heap('end');
}, 1000);
219 changes: 219 additions & 0 deletions packages/ws-worker/perf/mem-large.js

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions packages/ws-worker/perf/mem-small.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { getHeapStatistics } from 'node:v8';

function heap(reason) {
const { used_heap_size } = getHeapStatistics();
const mb = used_heap_size / 1024 / 1024;
console.log(`>> [${reason}] Used heap at ${mb.toFixed(2)}mb`);
}

const json = `{
"id": "a",
"triggers": [],
"edges": [],
"jobs": [
{
"id": "job1",
"state": { "data": { "done": true } },
"adaptor": "@openfn/language-common@1.7.7",
"body": { "result": 42 }
}
]
}
`;

heap('start');
const obj = JSON.parse(json);
heap('end');
194 changes: 194 additions & 0 deletions packages/ws-worker/perf/payload.json

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion packages/ws-worker/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ export function execute(
const lightningEvent = eventMap[eventName] ?? eventName;
try {
// updateSentryEvent(eventName);
let start = Date.now();
await handler(context, event);
logger.info(`${plan.id} :: ${lightningEvent} :: OK`);
logger.info(
`${plan.id} :: sent ${lightningEvent} :: OK :: ${
Date.now() - start
}ms`
);
} catch (e: any) {
if (!e.reportedToSentry) {
Sentry.captureException(e);
Expand Down
17 changes: 0 additions & 17 deletions packages/ws-worker/test.js

This file was deleted.