Skip to content

Commit

Permalink
fix: improve callback ordering (#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
npaton committed Feb 5, 2024
1 parent e4edef5 commit 40a5d5c
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 61 deletions.
7 changes: 7 additions & 0 deletions .changeset/callback-ordering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@empirica/core": patch
---

Improve callback ordering by only allowing 1 callback to run at a time. This is
change also make addScopes await for the scope changes to be added locally
before returning.
1 change: 1 addition & 0 deletions lib/@empirica/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
"@swc/helpers": "0.4.2",
"@unocss/reset": "^0.51.4",
"archiver": "5.3.1",
"async-mutex": "0.4.1",
"rxjs": "7.5.5",
"stream-buffers": "3.0.2",
"zod": "3.17.3"
Expand Down
98 changes: 55 additions & 43 deletions lib/@empirica/core/src/admin/cake.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Mutex } from "async-mutex";
import { Observable } from "rxjs";
import { Attribute } from "../shared/attributes";
import { ScopeConstructor } from "../shared/scopes";
Expand All @@ -14,7 +15,7 @@ import {
TajEventListener,
TajribaEvent,
} from "./events";
import { subscribeAsync } from "./observables";
import { lockedAsyncSubscribe } from "./observables";
import { Connection, ConnectionMsg } from "./participants";
import { PromiseHandle, promiseHandle } from "./promises";
import { Scope, ScopeMsg } from "./scopes";
Expand All @@ -33,6 +34,7 @@ export class Cake<
postCallback: (() => Promise<void>) | undefined;
private stopped = false;
private unsubs: unsuber[] = [];
private mutex = new Mutex();

constructor(
private evtctx: EventContext<Context, Kinds>,
Expand Down Expand Up @@ -212,7 +214,8 @@ export class Cake<
until?: Scope<Context, Kinds>
) {
let handle: PromiseHandle | undefined = promiseHandle();
const unsub = subscribeAsync(
const unsub = lockedAsyncSubscribe(
this.mutex,
this.kindSubscription(kind),
async ({ scope, done }) => {
if (this.stopped) {
Expand Down Expand Up @@ -371,30 +374,34 @@ export class Cake<

transitionEvents: TajEventListener<EvtCtxCallback<Context, Kinds>>[] = [];
startTransitionAdd() {
const unsub = subscribeAsync(this.transitions, async (transition) => {
for (const callback of this.transitionEvents) {
if (this.stopped) {
return;
}
const unsub = lockedAsyncSubscribe(
this.mutex,
this.transitions,
async (transition) => {
for (const callback of this.transitionEvents) {
if (this.stopped) {
return;
}

debug(
`transition callback from '${transition.from}' to '${transition.to}'`
);

try {
await callback.callback(this.evtctx, {
transition,
step: transition.step,
});
} catch (err) {
prettyPrintError("transition", err as Error);
}
debug(
`transition callback from '${transition.from}' to '${transition.to}'`
);

try {
await callback.callback(this.evtctx, {
transition,
step: transition.step,
});
} catch (err) {
prettyPrintError("transition", err as Error);
}

if (this.postCallback) {
await this.postCallback();
if (this.postCallback) {
await this.postCallback();
}
}
}
});
);

this.unsubs.push(unsub);
}
Expand All @@ -403,7 +410,8 @@ export class Cake<
connectionsMap = new Map<string, Connection>();
async startConnected() {
let handle: PromiseHandle | undefined = promiseHandle();
const unsub = subscribeAsync(
const unsub = lockedAsyncSubscribe(
this.mutex,
this.connections,
async ({ connection, done }) => {
if (this.stopped) {
Expand Down Expand Up @@ -454,33 +462,37 @@ export class Cake<

disconnectedEvents: TajEventListener<EvtCtxCallback<Context, Kinds>>[] = [];
startDisconnected() {
const unsub = subscribeAsync(this.connections, async ({ connection }) => {
if (this.stopped) {
return;
}
const unsub = lockedAsyncSubscribe(
this.mutex,
this.connections,
async ({ connection }) => {
if (this.stopped) {
return;
}

if (!connection || connection.connected) {
return;
}
if (!connection || connection.connected) {
return;
}

this.connectionsMap.delete(connection.participant.id);
this.connectionsMap.delete(connection.participant.id);

for (const callback of this.disconnectedEvents) {
debug(`disconnected callback`);
for (const callback of this.disconnectedEvents) {
debug(`disconnected callback`);

try {
await callback.callback(this.evtctx, {
participant: connection.participant,
});
} catch (err) {
prettyPrintError("participant disconnect", err as Error);
}
try {
await callback.callback(this.evtctx, {
participant: connection.participant,
});
} catch (err) {
prettyPrintError("participant disconnect", err as Error);
}

if (this.postCallback) {
await this.postCallback();
if (this.postCallback) {
await this.postCallback();
}
}
}
});
);

this.unsubs.push(unsub);
}
Expand Down
1 change: 1 addition & 0 deletions lib/@empirica/core/src/admin/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ export class Flusher {

flushAfter(cb: () => Promise<void>): (() => Promise<void>) | void {
if (!this.postCallback) {
cb();
return;
}

Expand Down
32 changes: 32 additions & 0 deletions lib/@empirica/core/src/admin/observables.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { E_CANCELED, Mutex } from "async-mutex";
import { Observable, Subject, concatMap, takeUntil } from "rxjs";
import { warn } from "../utils/console";

Expand Down Expand Up @@ -62,6 +63,37 @@ export async function awaitObsValueChange<T>(obs: Observable<T>): Promise<T> {
return val;
}

// Subscribe to an observable and use the lock for sequential execution of async
// functions.
export function lockedAsyncSubscribe<T>(
mutex: Mutex,
obs: Observable<T>,
fn: (val: T) => Promise<any>
) {
return obs.subscribe({
next: async (val) => {
try {
const release = await mutex.acquire();
try {
await fn(val);
} catch (err) {
console.error("error in async observable subscription");
console.error(err);
} finally {
release();
}
} catch (err) {
if (err !== E_CANCELED) {
console.error(
"error acquiring lock in async observable subscription"
);
console.error(err);
}
}
},
});
}

// This does not behave correctly with a ReplaySubject
export function subscribeAsync<T>(
obs: Observable<T>,
Expand Down
27 changes: 10 additions & 17 deletions lib/@empirica/core/src/admin/runloop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export class Runloop<
this.scopes,
new Flusher(this.postCallback.bind(this, true))
);

this.cake = new Cake(
this.evtctx,
this.scopes.scope.bind(this.scopes),
Expand Down Expand Up @@ -264,17 +265,12 @@ export class Runloop<

async addScopes(inputs: AddScopeInput[]) {
if (this.stopped) {
// warn("addScopes on stopped", inputs);

return [];
}

const addScopes = this.taj.addScopes(inputs).catch((err) => {
warn(err.message);
return [];
});
this.scopePromises.push(
addScopes.then((scopes) => {
const addScopes = this.taj
.addScopes(inputs)
.then((scopes) => {
for (const scope of scopes) {
for (const attrEdge of scope.attributes.edges) {
this.attributesSub.next({
Expand All @@ -293,15 +289,18 @@ export class Runloop<

return scopes;
})
);
.catch((err) => {
warn(err.message);
return [];
});

this.scopePromises.push(addScopes);

return addScopes;
}

async addGroups(inputs: AddGroupInput[]) {
if (this.stopped) {
// warn("addGroups on stopped", inputs);

return [];
}

Expand All @@ -312,8 +311,6 @@ export class Runloop<

async addLinks(inputs: LinkInput[]) {
if (this.stopped) {
// warn("addLinks on stopped", inputs);

return [];
}

Expand All @@ -329,8 +326,6 @@ export class Runloop<

async addSteps(inputs: AddStepInput[]) {
if (this.stopped) {
// warn("addSteps on stopped", inputs);

return [];
}

Expand All @@ -341,8 +336,6 @@ export class Runloop<

async addTransitions(inputs: TransitionInput[]) {
if (this.stopped) {
// warn("addTransitions on stopped", inputs);

return [];
}

Expand Down
11 changes: 10 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions tests/stress/tests/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ export const adminNewBatch = ({
throw new Error("lastNewBatch not found");
}

console.log(`LAST BATCH ID: ${lastNewBatch.id}`);

// Get the line of the last batch
const lineSelector = `li[data-batch-line-id="${lastNewBatch.id}"]`;
const line = actor.page.locator(lineSelector);
Expand Down

0 comments on commit 40a5d5c

Please sign in to comment.