Skip to content

Commit

Permalink
feat: rxjs two way compatibility with Symbol.observable
Browse files Browse the repository at this point in the history
  • Loading branch information
dominiksta committed Nov 4, 2023
1 parent 2059e87 commit 9358246
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 9 deletions.
1 change: 1 addition & 0 deletions packages/core/cypress/component/all.cy.disabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import './rx/state.cy';
import './rx/store.cy';
import './rx/share.cy';
import './rx/streams.cy';
import './rx/rxjs-interop.cy';
import './http.cy';

import './attribute-reflection.cy';
Expand Down
60 changes: 60 additions & 0 deletions packages/core/cypress/component/rx/rxjs-interop.cy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { rx } from '$thispkg';
import { sleep } from '$thispkg/util/time';
import { attempt } from "../../support/helpers";
import * as rxjs from 'rxjs';

describe('rxjs interop', () => {
it('sync mvui to rxjs', () => {
let res = 0;

const hi = rxjs.from(rx.of([0, 1, 2])).pipe(
rxjs.map(v => v + 1),
rxjs.tap(v => res = v),
);
hi.subscribe();

expect(res).to.eq(3);

}),

it('sync rxjs to mvui', () => {
let res = 0;

const hi = rx.from(rxjs.of(0, 1, 2)).pipe(
rx.map(v => v + 1),
rx.tap(v => res = v),
);
hi.subscribe();

expect(res).to.eq(3);

})

it('async mvui to rxjs switchMap', attempt(async () => {
let isFirst = true;
async function firstTakesLonger(value: number) {
if (isFirst) { isFirst = false; await sleep(200); }
return value + 1;
}

let values: number[] = [];
let completed = false;
const asyncMapped = rxjs.from(rx.interval(50)).pipe(
rxjs.take(3),
rxjs.switchMap(v => rxjs.from(firstTakesLonger(v))),
);

asyncMapped.subscribe({
next: v => values.push(v),
complete: () => completed = true,
});

await sleep(300);

expect(asyncMapped).to.be.instanceof(rxjs.Observable);
expect(values).to.deep.eq([2, 3])
expect(completed).to.be.true;
}))
});


3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"cypress": "^12.8.1",
"esbuild": "0.17.11",
"msw": "^1.2.1",
"rxjs": "^7.8.1",
"source-map-loader": "^4.0.1",
"ts-loader": "^9.4.2",
"ts-node": "^10.9.1",
Expand All @@ -42,4 +43,4 @@
"msw": {
"workerDirectory": "cypress/support"
}
}
}
58 changes: 50 additions & 8 deletions packages/core/src/rx/operators/creation/from.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,62 @@
import { Observer } from "../../interface";
import Stream from "../../stream";

type ObserverDefinitionInterop<T> =
Partial<Observer<T>> | ((value: T) => void) | undefined;

export function from<ValuesT>(values: Iterable<ValuesT>): Stream<ValuesT>;
type StreamInterop<T> = {
[Symbol.observable]: () => {
subscribe: (obs: ObserverDefinitionInterop<T>) => { unsubscribe: () => void }
}
};

// we cannot just use StreamInterop because of the way that rxjs implements
// Symbol.observable. it works with plain js but typescript gets confused
type StreamInteropRxJS<T> = {
subscribe: (obs: ObserverDefinitionInterop<T>) => { unsubscribe: () => void }
};

type StreamInput<T> = Stream<T>
| Iterable<T>
| Promise<T>
| StreamInterop<T>
| StreamInteropRxJS<T>;

export function from<ValuesT>(promise: Promise<ValuesT>): Stream<ValuesT>;
export function from<T>(noop: Stream<T>): Stream<T>;
export function from<T>(iterable: Iterable<T>): Stream<T>;
export function from<T>(promise: Promise<T>): Stream<T>;
export function from<T>(interop: StreamInterop<T>): Stream<T>;
export function from<T>(interopRxJS: StreamInteropRxJS<T>): Stream<T>;

/**
Convert an Arrray or a Promise to a Stream.
@group Stream Creation Operators
*/
export default function from<ValuesT>(
input: Iterable<ValuesT> | Promise<ValuesT>
): Stream<ValuesT> {
export default function from<T>(
input: StreamInput<T>
): Stream<T> {
if (input instanceof Stream) return input;
if (isIterable(input)) {
return new Stream(observer => {
for (let value of input) observer.next(value);
observer.complete();
});
} else if (isPromse(input)) {
} else if (isPromise(input)) {
return new Stream(observer => {
input
.then(v => observer.next(v))
.catch(e => observer.error(e))
.finally(() => observer.complete());
});
} else if (isInterop(input)) {
return new Stream(observer => {
return input[Symbol.observable]().subscribe(observer).unsubscribe;
});
} else if (isInteropRxJS(input)) {
return new Stream(observer => {
return input.subscribe(observer).unsubscribe;
});
} else {
throw new Error('Could not convert given value to stream');
}
Expand All @@ -36,10 +69,19 @@ function isIterable(input: unknown): input is Iterable<unknown> {
|| typeof input !== 'object'
) return false

return typeof (input as any)[Symbol.iterator] === 'function';
return (typeof (input as any)[Symbol.iterator] === 'function') ||
input instanceof Array;
}

function isPromse(input: unknown): input is Promise<unknown> {
function isPromise(input: unknown): input is Promise<unknown> {
if (typeof input !== 'object') return false;
return typeof (input as any)?.then === 'function';
}

function isInterop(input: unknown): input is StreamInterop<unknown> {
return typeof (input as any)[Symbol.observable] === 'function';
}

function isInteropRxJS(input: unknown): input is StreamInteropRxJS<unknown> {
return typeof (input as any)['subscribe'] === 'function';
}
19 changes: 19 additions & 0 deletions packages/core/src/rx/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ import EmptyError from "./empty-error";
import { Observer, ObserverDefinition, Subscribable, TeardownLogic } from "./interface";
import { pipe } from "./util";

if (!('observable' in Symbol))
(Symbol as any)['observable'] = (Symbol as any).for('@@observable');

declare global {
interface SymbolConstructor {
readonly observable: symbol;
}
}

export type OperatorFunction<InputT, ResultT> =
(stream: Stream<InputT>) => Stream<ResultT>;

Expand Down Expand Up @@ -169,6 +178,16 @@ export default class Stream<T> implements Subscribable<T> {
return _BasicOperators.filter(filter)(this);
}

[Symbol.observable]() {
return {
subscribe: (observer: ObserverDefinition<T>) => {
const unsub = this.subscribe(observer);
return {
unsubscribe: unsub,
}
}
};
}

// async/await
// ----------------------------------------------------------------------
Expand Down

0 comments on commit 9358246

Please sign in to comment.