Skip to content

Commit 689ad03

Browse files
committed
feat(task): added the pipe operator
1 parent 0720652 commit 689ad03

4 files changed

Lines changed: 248 additions & 39 deletions

File tree

src/operators/basic.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { Task, UncaughtError } from '../task';
2+
3+
export type IMapFn <A, B> = (a: A) => B;
4+
export type ITaskChainFn<A, B, E> = (value: A) => Task<B, E>;
5+
export type IPipeFn<T1, T2, E1, E2> = (a: Task<T1, E1>) => Task<T2, E2>;
6+
7+
export function map<T1, T2, E> (fn: IMapFn<T1, T2>) {
8+
return function (input: Task<T1, E>): Task<T2, E | UncaughtError> {
9+
return new Task((outerResolve, outerReject) => {
10+
input.fork(
11+
outerReject,
12+
value => {
13+
try {
14+
const result = fn(value);
15+
outerResolve(result);
16+
} catch (error) {
17+
outerReject(new UncaughtError(error));
18+
}
19+
}
20+
);
21+
});
22+
};
23+
}
24+
25+
export function chain<T1, T2, E1, E2> (fn: ITaskChainFn<T1, T2, E2>) {
26+
return function (input: Task<T1, E1>): Task<T2, E1 | E2 | UncaughtError> {
27+
return new Task((outerResolve, outerReject) => {
28+
input.fork(
29+
outerReject,
30+
value => {
31+
try {
32+
fn(value).fork(outerReject, outerResolve);
33+
}
34+
catch (err) {
35+
outerReject(new UncaughtError(err));
36+
}
37+
}
38+
);
39+
});
40+
};
41+
}
42+
43+
export function catchError<T1, T2, E1, E2> (fn: ITaskChainFn<E1, T2, E2>) {
44+
return function (input: Task<T1, E1>): Task<T1 | T2, E2 | UncaughtError> {
45+
return new Task((outerResolve, outerReject) => {
46+
input.fork(
47+
err => {
48+
try {
49+
fn(err).fork(outerReject, outerResolve);
50+
}
51+
catch (err) {
52+
outerReject(new UncaughtError(err));
53+
}
54+
},
55+
outerResolve
56+
);
57+
});
58+
};
59+
}
60+

src/task.ts

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
export type IMapFn <A, B> = (a: A) => B;
2-
export type ITaskChainFn<A, B, E> = (value: A) => Task<B, E>;
1+
import { catchError, chain, IMapFn, IPipeFn, ITaskChainFn, map } from './operators/basic';
2+
33

44
export class UncaughtError extends Error {
55
type: 'UncaughtError';
@@ -95,52 +95,45 @@ export class Task <T, E> {
9595
});
9696
}
9797

98-
map<TResult> (fn: IMapFn<T, TResult>): Task<TResult, E | UncaughtError> {
98+
pipe (): Task<T, E>;
99+
pipe<T1, E1> (f1: IPipeFn<T, T1, E, E1>): Task<T1, E1 | UncaughtError>;
100+
pipe<T1, T2, E1, E2> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>): Task<T2, E2 | UncaughtError>;
101+
pipe<T1, T2, T3, E1, E2, E3> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>): Task<T3, E3 | UncaughtError>;
102+
pipe<T1, T2, T3, T4, E1, E2, E3, E4> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>): Task<T4, E4 | UncaughtError>;
103+
pipe<T1, T2, T3, T4, T5, E1, E2, E3, E4, E5> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>): Task<T5, E5 | UncaughtError>;
104+
pipe<T1, T2, T3, T4, T5, T6, E1, E2, E3, E4, E5, E6> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>, f6: IPipeFn<T5, T6, E5, E6>): Task<T6, E6 | UncaughtError>;
105+
pipe<T1, T2, T3, T4, T5, T6, T7, E1, E2, E3, E4, E5, E6, E7> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>, f6: IPipeFn<T5, T6, E5, E6>, f7: IPipeFn<T6, T7, E6, E7>): Task<T7, E7 | UncaughtError>;
106+
pipe<T1, T2, T3, T4, T5, T6, T7, T8, E1, E2, E3, E4, E5, E6, E7, E8> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>, f6: IPipeFn<T5, T6, E5, E6>, f7: IPipeFn<T6, T7, E6, E7>, f8: IPipeFn<T7, T8, E7, E8>): Task<T8, E8 | UncaughtError>;
107+
pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, E1, E2, E3, E4, E5, E6, E7, E8, E9> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>, f6: IPipeFn<T5, T6, E5, E6>, f7: IPipeFn<T6, T7, E6, E7>, f8: IPipeFn<T7, T8, E7, E8>, f9: IPipeFn<T8, T9, E8, E9>): Task<T9, E9 | UncaughtError>;
108+
pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, E1, E2, E3, E4, E5, E6, E7, E8, E9, E10> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>, f6: IPipeFn<T5, T6, E5, E6>, f7: IPipeFn<T6, T7, E6, E7>, f8: IPipeFn<T7, T8, E7, E8>, f9: IPipeFn<T8, T9, E8, E9>, f10: IPipeFn<T9, T10, E9, E10>): Task<T10, E10 | UncaughtError>;
109+
pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, E1, E2, E3, E4, E5, E6, E7, E8, E9, E10, E11> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>, f6: IPipeFn<T5, T6, E5, E6>, f7: IPipeFn<T6, T7, E6, E7>, f8: IPipeFn<T7, T8, E7, E8>, f9: IPipeFn<T8, T9, E8, E9>, f10: IPipeFn<T9, T10, E9, E10>, f11: IPipeFn<T10, T11, E10, E11>): Task<T11, E11 | UncaughtError>;
110+
pipe<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, E1, E2, E3, E4, E5, E6, E7, E8, E9, E10, E11, E12> (f1: IPipeFn<T, T1, E, E1>, f2: IPipeFn<T1, T2, E1, E2>, f3: IPipeFn<T2, T3, E2, E3>, f4: IPipeFn<T3, T4, E3, E4>, f5: IPipeFn<T4, T5, E4, E5>, f6: IPipeFn<T5, T6, E5, E6>, f7: IPipeFn<T6, T7, E6, E7>, f8: IPipeFn<T7, T8, E7, E8>, f9: IPipeFn<T8, T9, E8, E9>, f10: IPipeFn<T9, T10, E9, E10>, f11: IPipeFn<T10, T11, E10, E11>, f12: IPipeFn<T11, T12, E11, E12>): Task<T12, E12 | UncaughtError>;
111+
pipe<TFinal, EFinal> (...fns: any[]): Task<TFinal, EFinal | UncaughtError> {
99112
return new Task((outerResolve, outerReject) => {
100-
this.fork(
101-
outerReject,
102-
value => {
113+
const newTask = fns.reduce(
114+
(task, f) => {
103115
try {
104-
const result = fn(value);
105-
outerResolve(result);
106-
} catch (error) {
107-
outerReject(new UncaughtError(error));
116+
return f(task);
117+
} catch (err) {
118+
return Task.reject(new UncaughtError(err));
108119
}
109-
}
120+
},
121+
this
110122
);
123+
return newTask.fork(outerReject, outerResolve);
111124
});
112125
}
113126

127+
map<TResult> (fn: IMapFn<T, TResult>): Task<TResult, E | UncaughtError> {
128+
return map<T, TResult, E>(fn)(this);
129+
}
130+
114131
chain<TResult, EResult> (fn: ITaskChainFn<T, TResult, EResult>): Task<TResult, E | EResult | UncaughtError> {
115-
return new Task((outerResolve, outerReject) => {
116-
this.fork(
117-
outerReject,
118-
value => {
119-
try {
120-
fn(value).fork(outerReject, outerResolve);
121-
}
122-
catch (err) {
123-
outerReject(new UncaughtError(err));
124-
}
125-
}
126-
);
127-
});
132+
return chain<T, TResult, E, EResult>(fn)(this);
128133
}
129134

130135
catch<TResult, EResult> (fn: ITaskChainFn<E, TResult, EResult>): Task<T | TResult, EResult | UncaughtError> {
131-
return new Task((outerResolve, outerReject) => {
132-
this.fork(
133-
err => {
134-
try {
135-
fn(err).fork(outerReject, outerResolve);
136-
}
137-
catch (err) {
138-
outerReject(new UncaughtError(err));
139-
}
140-
},
141-
outerResolve
142-
);
143-
});
136+
return catchError<T, TResult, E, EResult>(fn)(this);
144137
}
145138

146139
fork (errorFn: (error: E) => any, successFn: (value: T) => any): void {
@@ -152,3 +145,4 @@ export class Task <T, E> {
152145
);
153146
}
154147
}
148+

test/task.test.ts

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
import { catchError, chain, map } from '../src/operators/basic';
12
import { Task, UncaughtError } from '../src/task';
23
import { assertFork, jestAssertNever, jestAssertUntypedNeverCalled } from './jest-helper';
34

45

5-
66
describe('Task', () => {
77
describe('fork', () => {
88
it('should be lazy (dont call if not forked)', cb => {
@@ -417,4 +417,156 @@ describe('Task', () => {
417417
);
418418
});
419419
});
420+
describe('pipe', () => {
421+
it('Should return the same task if no function is passed', (cb) => {
422+
// GIVEN: a resolved value
423+
const task = Task.resolve(0);
424+
425+
// WHEN: we call pipe with no transformation
426+
const result = task.pipe();
427+
428+
// THEN: the success function should be called without transformation
429+
result.fork(
430+
jestAssertNever(cb),
431+
assertFork(cb, x => expect(x).toBe(0))
432+
);
433+
});
434+
435+
it('Should work with one function', (cb) => {
436+
// GIVEN: a resolved value
437+
const task = Task.resolve(0);
438+
439+
// WHEN: we pipe the value
440+
const result = task.pipe(
441+
map(n => '' + n)
442+
);
443+
444+
// THEN: the success function should be called with the transformed value
445+
result.fork(
446+
jestAssertNever(cb),
447+
assertFork(cb, x => expect(x).toBe('0'))
448+
);
449+
});
450+
451+
it('Should work with multiple functions', (cb) => {
452+
// GIVEN: a resolved value
453+
const task = Task.resolve(0);
454+
455+
// WHEN: we pipe the value
456+
const result = task.pipe(
457+
map(n => '' + n),
458+
map(s => s + '!'),
459+
map(s => s + '!'),
460+
map(s => s + '!')
461+
);
462+
463+
// THEN: the success function should be called with the transformed value
464+
result.fork(
465+
jestAssertNever(cb),
466+
assertFork(cb, x => expect(x).toBe('0!!!'))
467+
);
468+
});
469+
470+
it('Should work with multiple functions converting success types correctly', (cb) => {
471+
// GIVEN: a resolved value
472+
const task = Task.resolve(0);
473+
474+
// WHEN: we pipe the value
475+
const result = task.pipe(
476+
map(n => '' + n),
477+
map(s => parseInt(s)),
478+
map(n => '' + n),
479+
map(s => parseInt(s))
480+
);
481+
482+
// THEN: the success function should be called with the transformed value
483+
result.fork(
484+
jestAssertNever(cb),
485+
assertFork(cb, x => expect(x).toBe(0))
486+
);
487+
});
488+
it('Should be lazy (dont call if not forked)', (cb) => {
489+
// GIVEN: a resolved value
490+
const task = Task.resolve(0);
491+
492+
// and a manually created pipeable function
493+
const dontCall = (t: Task<number, never>) => {
494+
// This should not be called
495+
expect(true).toBe(false);
496+
return t;
497+
};
498+
499+
// WHEN: we pipe the value but dont fork
500+
const result = task.pipe(
501+
dontCall
502+
);
503+
504+
// THEN: the content of the task is never called
505+
setTimeout(cb, 20);
506+
});
507+
508+
it('Should handle pipeable methods that throw', (cb) => {
509+
// GIVEN: a resolved value
510+
const task = Task.resolve(0);
511+
512+
// and a manually created pipeable function
513+
const pipeThatThrows = (t: Task<number, never>): Task<number, never> => {
514+
throw 'oops';
515+
};
516+
517+
// WHEN: we pipe the value
518+
const result = task.pipe(
519+
pipeThatThrows
520+
);
521+
522+
// THEN: the error function should be called with the new error
523+
result.fork(
524+
assertFork(cb, x => {expect(x).toBeInstanceOf(UncaughtError); }),
525+
jestAssertUntypedNeverCalled(cb)
526+
);
527+
});
528+
529+
it('Should be able to recover from pipeable methods that throw', (cb) => {
530+
// GIVEN: a resolved value
531+
const task = Task.resolve(0);
532+
const task2 = Task.resolve('0');
533+
// and a manually created pipeable function
534+
const pipeThatThrows = (t: Task<number, never>): Task<number, never> => {
535+
throw 'oops';
536+
};
537+
538+
// WHEN: we pipe the value
539+
const result = task.pipe(
540+
pipeThatThrows,
541+
catchError(err => task2)
542+
);
543+
544+
// THEN: the success function should be called with the catched value
545+
result.fork(
546+
jestAssertNever(cb),
547+
assertFork(cb, x => expect(x).toBe('0'))
548+
);
549+
});
550+
551+
it('Should be able to propagate errors correctly', (cb) => {
552+
// GIVEN: a resolved value
553+
const task = Task.resolve(0);
554+
const rej = Task.reject('buu');
555+
const task2 = Task.resolve('0');
556+
557+
// WHEN: we pipe the value
558+
const result = task.pipe(
559+
chain(val => rej),
560+
// err should be of type `string | UncauchtError`
561+
// The first because of rej, the second one because of chain
562+
catchError(err => task2)
563+
);
564+
565+
// THEN: the success function should be called with the catched value
566+
result.fork(
567+
jestAssertNever(cb),
568+
assertFork(cb, x => expect(x).toBe('0'))
569+
);
570+
});
571+
});
420572
});

tsconfig.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
]
1818
},
1919
"include": [
20-
"src"
20+
"src/**/*.ts",
21+
"test/**/*.ts",
22+
"node_modules/@types"
23+
2124
]
2225
}

0 commit comments

Comments
 (0)