Skip to content
Permalink
Browse files

feat(dispatch): piped dispatch

this feature allows to combine multiple reducers in a pipeline, which get executed in one roundtrip. This results in individually time tracked and logged sequential reducers but only a single overall action being reported to Redux DevTools

Related issue #89
  • Loading branch information...
gins3000 authored and zewa666 committed Jun 6, 2019
1 parent 8dc678e commit 4a78ad51c5fa867f88bd0f3eb4a99aa5df00ad47
Showing with 476 additions and 49 deletions.
  1. +4 −0 src/middleware.ts
  2. +96 −41 src/store.ts
  3. +2 −1 test/unit/helpers.ts
  4. +34 −1 test/unit/middleware.spec.ts
  5. +340 −6 test/unit/store.spec.ts
@@ -5,6 +5,10 @@ export const DEFAULT_LOCAL_STORAGE_KEY = "aurelia-store-state";
export interface CallingAction {
name: string;
params?: any[];
pipedActions?: {
name: string;
params?: any[];
}[];
}

export type Middleware<T, S = any> = (state: T, originalState: T | undefined, settings: S, action?: CallingAction) => T | Promise<T | undefined | false> | void | false;
@@ -25,13 +25,28 @@ export interface StoreOptions {
devToolsOptions?: DevToolsOptions;
}

interface DispatchQueueItem<T> {
export interface PipedDispatch<T> {
pipe: <P extends any[]>(reducer: Reducer<T, P> | string, ...params: P) => PipedDispatch<T>;
dispatch: () => Promise<void>;
}

interface DispatchAction<T> {
reducer: Reducer<T>;
params: any[];
}

interface DispatchQueueItem<T> {
actions: DispatchAction<T>[];
resolve: any;
reject: any;
}

export class UnregisteredActionError<T, P extends any[]> extends Error {
constructor(reducer?: string | Reducer<T, P>) {
super(`Tried to dispatch an unregistered action ${reducer && (typeof reducer === "string" ? reducer : reducer.name)}`);
}
}

export class Store<T> {
public readonly state: Observable<T>;

@@ -102,22 +117,52 @@ export class Store<T> {
this._state.next(state);
}

public dispatch<P extends any[]>(reducer: Reducer<T, P> | string, ...params: P) {
let action: Reducer<T, P>;
public dispatch<P extends any[]>(reducer: Reducer<T, P> | string, ...params: P): Promise<void> {
const action = this.lookupAction(reducer as Reducer<T> | string);
if (!action) {
return Promise.reject(new UnregisteredActionError(reducer));
}

if (typeof reducer === "string") {
const result = Array.from(this.actions)
.find((val) => val[1].type === reducer);
return this.queueDispatch([{
reducer: action,
params
}]);
}

public pipe<P extends any[]>(reducer: Reducer<T, P> | string, ...params: P): PipedDispatch<T> {
const pipeline: DispatchAction<T>[] = [];

const dispatchPipe: PipedDispatch<T> = {
dispatch: () => this.queueDispatch(pipeline),
pipe: <NextP extends any[]>(nextReducer: Reducer<T, NextP> | string, ...nextParams: NextP) => {
const action = this.lookupAction(nextReducer as Reducer<T> | string);
if (!action) {
throw new UnregisteredActionError(reducer);
}
pipeline.push({ reducer: action, params: nextParams });
return dispatchPipe;
}
};

return dispatchPipe.pipe(reducer, ...params);
}

private lookupAction(reducer: Reducer<T> | string): Reducer<T> | undefined {
if (typeof reducer === "string") {
const result = Array.from(this.actions).find(([_, action]) => action.type === reducer);
if (result) {
action = result[0];
return result[0];
}
} else {
action = reducer;
} else if (this.actions.has(reducer)) {
return reducer;
}

return undefined;
}

private queueDispatch(actions: DispatchAction<T>[]) {
return new Promise<void>((resolve, reject) => {
this.dispatchQueue.push({ reducer: action, params, resolve, reject } as any);
this.dispatchQueue.push({ actions, resolve, reject });
if (this.dispatchQueue.length === 1) {
this.handleQueue();
}
@@ -129,7 +174,7 @@ export class Store<T> {
const queueItem = this.dispatchQueue[0];

try {
await this.internalDispatch(queueItem.reducer, ...queueItem.params);
await this.internalDispatch(queueItem.actions);
queueItem.resolve();
} catch (e) {
queueItem.reject(e);
@@ -140,28 +185,37 @@ export class Store<T> {
}
}

private async internalDispatch(reducer: Reducer<T>, ...params: any[]) {
if (!this.actions.has(reducer)) {
throw new Error(`Tried to dispatch an unregistered action${reducer ? " " + reducer.name : ""}`);
private async internalDispatch(actions: DispatchAction<T>[]) {
const unregisteredAction = actions.find((a) => !this.actions.has(a.reducer));
if (unregisteredAction) {
throw new UnregisteredActionError(unregisteredAction.reducer);
}

PLATFORM.performance.mark("dispatch-start");

const action = {
...this.actions.get(reducer)!,
params
const pipedActions = actions.map((a) => ({
type: this.actions.get(a.reducer)!.type,
params: a.params,
reducer: a.reducer
}));

const callingAction: CallingAction = {
name: pipedActions.map((a) => a.type).join("->"),
params: pipedActions.reduce<any[]>((p, a) => p.concat(a.params), []),
pipedActions: pipedActions.map((a) => ({
name: a.type,
params: a.params
}))
};

if (this.options.logDispatchedActions) {
this.logger[getLogType(this.options, "dispatchedActions", LogLevel.info)](`Dispatching: ${action.type}`);
this.logger[getLogType(this.options, "dispatchedActions", LogLevel.info)](`Dispatching: ${callingAction.name}`);
}

const beforeMiddleswaresResult = await this.executeMiddlewares(
this._state.getValue(),
MiddlewarePlacement.Before,
{
name: action.type,
params
}
callingAction
);

if (beforeMiddleswaresResult === false) {
@@ -171,26 +225,27 @@ export class Store<T> {
return;
}

const result = await reducer(beforeMiddleswaresResult, ...params);
if (result === false) {
PLATFORM.performance.clearMarks();
PLATFORM.performance.clearMeasures();
let result: T | false = beforeMiddleswaresResult;
for (const action of pipedActions) {
result = await action.reducer(result, ...action.params);
if (result === false) {
PLATFORM.performance.clearMarks();
PLATFORM.performance.clearMeasures();

return;
}
PLATFORM.performance.mark("dispatch-after-reducer-" + action.type);
return;
}

PLATFORM.performance.mark("dispatch-after-reducer-" + action.type);

if (!result && typeof result !== "object") {
throw new Error("The reducer has to return a new state");
if (!result && typeof result !== "object") {
throw new Error("The reducer has to return a new state");
}
}

let resultingState = await this.executeMiddlewares(
result,
MiddlewarePlacement.After,
{
name: action.type,
params
}
callingAction
);

if (resultingState === false) {
@@ -218,22 +273,22 @@ export class Store<T> {

const measures = PLATFORM.performance.getEntriesByName("startEndDispatchDuration");
this.logger[getLogType(this.options, "performanceLog", LogLevel.info)](
`Total duration ${measures[0].duration} of dispatched action ${action.type}:`,
`Total duration ${measures[0].duration} of dispatched action ${callingAction.name}:`,
measures
);
} else if (this.options.measurePerformance === PerformanceMeasurement.All) {
const marks = PLATFORM.performance.getEntriesByType("mark");
const totalDuration = marks[marks.length - 1].startTime - marks[0].startTime;
this.logger[getLogType(this.options, "performanceLog", LogLevel.info)](
`Total duration ${totalDuration} of dispatched action ${action.type}:`,
`Total duration ${totalDuration} of dispatched action ${callingAction.name}:`,
marks
);
}

PLATFORM.performance.clearMarks();
PLATFORM.performance.clearMeasures();

this.updateDevToolsState(action, resultingState);
this.updateDevToolsState({ type: callingAction.name, params: callingAction.params }, resultingState);
}

private executeMiddlewares(state: T, placement: MiddlewarePlacement, action: CallingAction): T | false {
@@ -287,14 +342,14 @@ export class Store<T> {
}

private registerHistoryMethods() {
this.registerAction("jump", jump as any as Reducer<T>);
this.registerAction("jump", jump as Reducer<T>);
}
}

export function dispatchify<T, P extends any[]>(action: Reducer<T, P> | string) {
const store = Container.instance.get(Store);
const store: Store<T> = Container.instance.get(Store);

return function (...params: P) {
return store.dispatch(action, ...params) as Promise<void>;
return store.dispatch(action, ...params);
}
}
@@ -6,7 +6,8 @@ import {
import { StateHistory } from "../../src/history";

export type testState = {
foo: string
foo: string,
bar?: string;
};

export function createTestStore() {
@@ -144,6 +144,39 @@ describe("middlewares", () => {
);
});

it("should have a reference all piped actions", async () => {
const store = createStoreWithStateAndOptions<TestState>(initialState, { propagateError: true });
const expectedActionName1 = "FirstActionObservedByMiddleware";
const expectedActionName2 = "SecondActionObservedByMiddleware";

const firstActionObservedByMiddleware = (state: TestState, _foo: string) => state;
const secondActionObservedByMiddleware = (state: TestState, _bar: string) => state;

const actionAwareMiddleware: Middleware<TestState> = (_, __, ___, action) => {
expect(action).toBeDefined();
expect(action!.name).toBe(`${expectedActionName1}->${expectedActionName2}`);
expect(action!.params).toBeDefined();
expect(action!.params).toEqual(["A", "B"]);
expect(action!.pipedActions).toEqual([
{ name: expectedActionName1, params: ["A"] },
{ name: expectedActionName2, params: ["B"] }
]);
}

store.registerAction(expectedActionName1, firstActionObservedByMiddleware);
store.registerAction(expectedActionName2, secondActionObservedByMiddleware);
store.registerMiddleware(actionAwareMiddleware, MiddlewarePlacement.After);

await executeSteps(
store,
false,
() => store
.pipe(firstActionObservedByMiddleware, "A")
.pipe(secondActionObservedByMiddleware, "B")
.dispatch()
);
});

describe("which are applied before action dispatches", () => {
it("should synchronously change the provided present state", done => {
const store = createStoreWithState(initialState);
@@ -265,7 +298,7 @@ describe("middlewares", () => {
it("should get additionally the original state, before prev modifications passed in", done => {
const store = createStoreWithState(initialState);

const decreaseAfter = (currentState: TestState, originalState: TestState | undefined) => {
const decreaseAfter = (currentState: TestState, originalState: TestState | undefined) => {
const newState = Object.assign({}, currentState);
newState.counter = originalState!.counter;

0 comments on commit 4a78ad5

Please sign in to comment.
You can’t perform that action at this time.