Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Before merging to main, check out the release branch locally and run the followi
1. Run `pnpm changeset tag` to generate tags
1. Push tags `git push --tags`

Rememebr tags may need updating if commits come in after the tags are first generated.
Remember tags may need updating if commits come in after the tags are first generated.

## TypeSync

Expand Down
38 changes: 18 additions & 20 deletions packages/engine-multi/src/util/ensure-payload-size.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
export const REDACTED_STATE = {
data: '[REDACTED_STATE]',
_$REDACTED$_: true,
};
// This specifies which keys of an event payload to potentially redact
// if they are too big
const KEYS_TO_VERIFY = ['state', 'final_state', 'log'];

export const REDACTED_LOG = {
message: ['[REDACTED: Message length exceeds payload limit]'],
_$REDACTED$_: true,
const replacements: Record<string, any> = {
log: {
message: ['[REDACTED: Message length exceeds payload limit]'],
},
default: {
data: '[REDACTED]',
},
};

export const verify = (value: any, limit_mb: number = 10) => {
Expand All @@ -32,19 +35,14 @@ export const verify = (value: any, limit_mb: number = 10) => {
export default (payload: any, limit_mb: number = 10) => {
const newPayload = { ...payload };

// The payload could be any of the runtime events
// The bits we might want to redact are state and message
try {
verify(payload.state, limit_mb);
} catch (e) {
newPayload.state = REDACTED_STATE;
newPayload.redacted = true;
}
try {
verify(payload.log, limit_mb);
} catch (e) {
Object.assign(newPayload.log, REDACTED_LOG);
newPayload.redacted = true;
for (const key of KEYS_TO_VERIFY) {
try {
verify(payload[key], limit_mb);
} catch (e) {
Object.assign(newPayload[key], replacements[key] ?? replacements.default);
newPayload.redacted = true;
}
}

return newPayload;
};
6 changes: 2 additions & 4 deletions packages/engine-multi/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import type { ExecutionPlan } from '@openfn/lexicon';

import createAPI from '../src/api';
import type { RuntimeEngine } from '../src';
import { REDACTED_STATE, REDACTED_LOG } from '../src/util/ensure-payload-size';

const logger = createMockLogger(undefined, { level: 'debug' });
let api: RuntimeEngine;
Expand Down Expand Up @@ -536,7 +535,7 @@ export default [(state) => {
.execute(plan, emptyState, options)
.on('workflow-complete', ({ state }) => {
t.log(state);
t.deepEqual(REDACTED_STATE, state);
t.is(state.data, '[REDACTED]');
done();
});
});
Expand Down Expand Up @@ -566,9 +565,8 @@ export default [(state) => {
api
.execute(plan, emptyState, options)
.on('workflow-log', (evt) => {
console.log(evt);
if (evt.name === 'JOB') {
t.deepEqual(evt.message, REDACTED_LOG.message);
t.true(/redacted/i.test(evt.message[0]));
}
})
.on('workflow-complete', () => {
Expand Down
28 changes: 21 additions & 7 deletions packages/engine-multi/test/util/ensure-payload-size.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import test from 'ava';
import ensurePayloadSize, {
REDACTED_LOG,
REDACTED_STATE,
verify,
} from '../../src/util/ensure-payload-size';
import ensurePayloadSize, { verify } from '../../src/util/ensure-payload-size';

const mb = (bytes: number) => bytes / 1024 / 1024;

Expand Down Expand Up @@ -62,7 +58,9 @@ test('redact payload with state', (t) => {
};

const newPayload = ensurePayloadSize(payload, 1);
t.deepEqual(newPayload.state, REDACTED_STATE);
t.deepEqual(newPayload.state, {
data: '[REDACTED]',
});
t.true(newPayload.redacted);
});

Expand All @@ -74,6 +72,22 @@ test('redact payload with log message', (t) => {
};

const newPayload = ensurePayloadSize(payload, 1);
t.deepEqual(newPayload.log, REDACTED_LOG);
t.deepEqual(newPayload.log, {
message: ['[REDACTED: Message length exceeds payload limit]'],
});
t.true(newPayload.redacted);
});

test('redact payload with final_state', (t) => {
const payload = {
final_state: {
data: new Array(1024 * 1024).fill('z').join(''),
},
};

const newPayload = ensurePayloadSize(payload, 1);
t.deepEqual(newPayload.final_state, {
data: '[REDACTED]',
});
t.true(newPayload.redacted);
});
7 changes: 7 additions & 0 deletions packages/lexicon/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# lexicon

## 1.2.5

### Patch Changes

- 09dd4b2: - Add `final_state` object to `workflow:complete` event
- Remove unused `final_dataclip_id` from `workflow:complete` payload

## 1.2.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export type RunStartReply = {}; // no payload

export type RunCompletePayload = ExitReason & {
timestamp: TimeInMicroSeconds;
final_dataclip_id?: string; // TODO this will be removed soon
final_state?: any; // The aggregated final state from the workflow (handles branching)
};
export type RunCompleteReply = undefined;

Expand Down
2 changes: 1 addition & 1 deletion packages/lexicon/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lexicon",
"version": "1.2.4",
"version": "1.2.5",
"description": "Central repo of names and type definitions",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
12 changes: 12 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# @openfn/lightning-mock

## 2.3.0

### Minor Changes

- 09dd4b2: - Add `final_state` object to `workflow:complete` event
- Remove unused `final_dataclip_id` from `workflow:complete` payload

### Patch Changes

- Updated dependencies [09dd4b2]
- @openfn/lexicon@1.2.4

## 2.2.9

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.2.9",
"version": "2.3.0",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
3 changes: 1 addition & 2 deletions packages/lightning-mock/src/api-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ const setupDevAPI = (
}) => {
if (evt.runId === runId) {
state.events.removeListener(RUN_COMPLETE, handler);
const result = state.dataclips[evt.payload.final_dataclip_id!];
resolve(result);
resolve(evt.payload.final_state);
}
};
state.events.addListener(RUN_COMPLETE, handler);
Expand Down
10 changes: 5 additions & 5 deletions packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ const createSocketAPI = (
) {
const { ref, join_ref, topic } = evt;
const {
final_dataclip_id,
final_state,
reason,
error_type,
error_message,
Expand All @@ -367,16 +367,16 @@ const createSocketAPI = (
} = evt.payload;

logger?.info('Completed run ', runId);
logger?.debug(final_dataclip_id);
logger?.debug(final_state);

state.pending[runId].status = 'complete';

// TODO we'll remove this stuff soon
// Store the final state directly from the payload
if (!state.results[runId]) {
state.results[runId] = { state: null, workerId: 'mock' };
}
if (final_dataclip_id) {
state.results[runId].state = state.dataclips[final_dataclip_id];
if (final_state) {
state.results[runId].state = final_state;
}

let payload: any = validateReasons(evt.payload);
Expand Down
9 changes: 3 additions & 6 deletions packages/lightning-mock/test/channels/run.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ test.serial('complete an run through the run channel', async (t) => {
const a = run1;
server.registerRun(a);
server.startRun(a.id);
server.addDataclip('abc', { answer: 42 });

const channel = await join(`run:${a.id}`, { token: 'a.b.c' });
channel
.push(RUN_COMPLETE, { reason: 'success', final_dataclip_id: 'abc' })
.push(RUN_COMPLETE, { reason: 'success', final_state: { answer: 42 } })
.receive('ok', () => {
const { pending, results } = server.getState();
t.deepEqual(pending[a.id], {
Expand Down Expand Up @@ -197,7 +196,6 @@ test.serial(
const result = { answer: 42 };

server.startRun(run1.id);
server.addDataclip('result', result);

server.waitForResult(run1.id).then((dataclip: DataClip) => {
t.deepEqual(result, dataclip);
Expand All @@ -206,7 +204,7 @@ test.serial(

const channel = await join(`run:${run1.id}`, { token: 'a.b.c' });
channel.push(RUN_COMPLETE, {
final_dataclip_id: 'result',
final_state: result,
} as RunCompletePayload);
});
}
Expand All @@ -220,7 +218,6 @@ test.serial(
const result = { answer: 42 };

server.startRun(run1.id);
server.addDataclip('result', result);

server.waitForResult(run1.id).then(() => {
const dataclip = server.getResult(run1.id);
Expand All @@ -230,7 +227,7 @@ test.serial(

const channel = await join(`run:${run1.id}`, { token: 'a.b.c' });
channel.push(RUN_COMPLETE, {
final_dataclip_id: 'result',
final_state: result,
} as RunCompletePayload);
});
}
Expand Down
12 changes: 12 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# ws-worker

## 1.17.0

### Minor Changes

- 09dd4b2: - Add `final_state` object to `workflow:complete` event
- Remove unused `final_dataclip_id` from `workflow:complete` payload

### Patch Changes

- Updated dependencies [09dd4b2]
- @openfn/lexicon@1.2.4

## 1.16.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.16.1",
"version": "1.17.0",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
8 changes: 4 additions & 4 deletions packages/ws-worker/src/events/run-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ export default async function onWorkflowComplete(
) {
const { state, onFinish, logger } = context;

// TODO I dont think the run final dataclip IS the last job dataclip
// Especially not in parallelisation
const result = state.dataclips[state.lastDataclipId!];
// Use the aggregated final state from the runtime
// This handles branching workflows correctly by returning all leaf states
const result = event.state;

const reason = calculateRunExitReason(state);
await logFinalReason(context, reason);

try {
await sendEvent<RunCompletePayload>(context, RUN_COMPLETE, {
final_dataclip_id: state.lastDataclipId!,
final_state: result,
Comment thread
josephjclark marked this conversation as resolved.
timestamp: timeInMicroseconds(event.time),
...reason,
});
Expand Down
1 change: 0 additions & 1 deletion packages/ws-worker/src/events/run-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export default async function onRunError(
await logFinalReason(context, reason);

await sendEvent<RunCompletePayload>(context, RUN_COMPLETE, {
final_dataclip_id: state.lastDataclipId!,
...reason,
});

Expand Down
12 changes: 10 additions & 2 deletions packages/ws-worker/src/mock/runtime-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,19 @@ async function createMock() {
},
};
setTimeout(async () => {
const startTime = Date.now();
dispatch('workflow-start', { workflowId: id, threadId: threadId });

try {
await run(xplan, input, opts as any);
dispatch('workflow-complete', { workflowId: id, threadId: threadId });
const result = await run(xplan, input, opts as any);
const duration = Date.now() - startTime;
dispatch('workflow-complete', {
workflowId: id,
threadId: threadId,
state: result,
duration,
time: BigInt(Date.now()) * BigInt(1000000), // Convert to nanoseconds
});
} catch (e: any) {
dispatch('workflow-error', {
threadId: threadId,
Expand Down
Loading