Skip to content

Commit

Permalink
feat(Observable): RxJS doesn't even lift. (#7202)
Browse files Browse the repository at this point in the history
+ Removes `lift`, `source`, and `operator` from `Observable`
+ Updates related code

BREAKING CHANGE: `Observable#lift`, `Observable#source`, and `Observable#operator` is no longer a part of the API. These were never meant to be public and have been deprecated for more than 3 years.

Resolves #7201
  • Loading branch information
benlesh committed Mar 9, 2023
1 parent 4e14361 commit e0bdccf
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 480 deletions.
281 changes: 0 additions & 281 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -692,284 +692,3 @@ describe('Observable', () => {
});
});


/** @test {Observable} */
describe('Observable.lift', () => {
let rxTestScheduler: TestScheduler;

beforeEach(() => {
rxTestScheduler = new TestScheduler(observableMatcher);
});

class MyCustomObservable<T> extends Observable<T> {
static from<T>(source: any) {
const observable = new MyCustomObservable<T>();
observable.source = <Observable<T>>source;
return observable;
}
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
return observable;
}
}

it('should return Observable which calls FinalizationLogic of operator on unsubscription', (done) => {
const myOperator: Operator<any, any> = {
call: (subscriber: Subscriber<any>, source: any) => {
const subscription = source.subscribe((x: any) => subscriber.next(x));
return () => {
subscription.unsubscribe();
done();
};
},
};

(NEVER as any).lift(myOperator)
.subscribe()
.unsubscribe();

});

it('should be overridable in a custom Observable type that composes', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
map((x) => {
return 10 * x;
})
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
{ next: function (x) {
expect(x).to.equal(expected.shift());
}, error: () => {
done(new Error('should not be called'));
}, complete: () => {
done();
} }
);
});


it('should composes Subjects in the simple case', () => {
const subject = new Subject<number>();

const result = subject.pipe(
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct. (but you're advised not to do this)

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

/**
* Seriously, never do this. It's probably bad that we've allowed this. Fortunately, it's not
* a common practice, so maybe we can remove it?
*/
it('should demonstrate the horrors of sharing and lifting the Subject through', () => {
const subject = new Subject<number>();

const shared = subject.pipe(
share()
);

const result1 = shared.pipe(
map(x => x * 10)
) as any as Subject<number>; // Yes, this is correct.

const result2 = shared.pipe(
map(x => x - 10)
) as any as Subject<number>; // Yes, this is correct.
expect(result1 instanceof Subject).to.be.true;

const emitted1: any[] = [];
result1.subscribe(value => emitted1.push(value));

const emitted2: any[] = [];
result2.subscribe(value => emitted2.push(value));

// THIS IS HORRIBLE DON'T DO THIS.
result1.next(10);
result2.next(20); // Yuck
result1.next(30);

expect(emitted1).to.deep.equal([100, 200, 300]);
expect(emitted2).to.deep.equal([0, 10, 20]);
});

it('should compose through combineLatestWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' -a--b-----c-d-e-|');
const e2 = cold(' --1--2-3-4---| ');
const expected = '--A-BC-D-EF-G-H-|';

const result = MyCustomObservable.from(e1).pipe(
combineLatestWith(e2),
map(([a, b]) => String(a) + String(b))
);

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected, {
A: 'a1',
B: 'b1',
C: 'b2',
D: 'b3',
E: 'b4',
F: 'c4',
G: 'd4',
H: 'e4',
});
});
});

it('should compose through concatWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' --a--b-|');
const e2 = cold(' --x---y--|');
const expected = '--a--b---x---y--|';

const result = MyCustomObservable.from(e1).pipe(concatWith(e2));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
});
});
it('should compose through mergeWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' -a--b-| ');
const e2 = cold(' --x--y-|');
const expected = '-ax-by-|';

const result = MyCustomObservable.from(e1).pipe(mergeWith(e2));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
});
});

it('should compose through raceWith', () => {
rxTestScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const e1 = cold(' ---a-----b-----c----|');
const e1subs = ' ^-------------------!';
const e2 = cold(' ------x-----y-----z----|');
const e2subs = ' ^--!';
const expected = '---a-----b-----c----|';

const result = MyCustomObservable.from<string>(e1).pipe(
raceWith(e2)
);

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});

it('should compose through zipWith', () => {
rxTestScheduler.run(({ cold, expectObservable }) => {
const e1 = cold(' -a--b-----c-d-e-|');
const e2 = cold(' --1--2-3-4---| ');
const expected = '--A--B----C-D| ';

const result = MyCustomObservable.from(e1).pipe(zipWith(e2));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected, {
A: ['a', '1'],
B: ['b', '2'],
C: ['c', '3'],
D: ['d', '4'],
});
});
});

it('should allow injecting behaviors into all subscribers in an operator ' + 'chain when overridden', (done) => {
// The custom Subscriber
const log: Array<string> = [];

class LogSubscriber<T> extends Subscriber<T> {
next(value?: T): void {
log.push('next ' + value);
if (!this.isStopped) {
this._next(value!);
}
}
}

// The custom Operator
class LogOperator<T, R> implements Operator<T, R> {
constructor(private childOperator: Operator<T, R>) {}

call(subscriber: Subscriber<R>, source: any): TeardownLogic {
return this.childOperator.call(new LogSubscriber<R>(subscriber), source);
}
}

// The custom Observable
class LogObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new LogObservable<R>();
observable.source = this;
observable.operator = new LogOperator(operator);
return observable;
}
}

// Use the LogObservable
const result = new LogObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
map((x) => 10 * x),
filter((x) => x > 15),
count()
);

expect(result instanceof LogObservable).to.be.true;

const expected = [2];

result.subscribe(
{ next: function (x) {
expect(x).to.equal(expected.shift());
}, error: () => {
done(new Error('should not be called'));
}, complete: () => {
expect(log).to.deep.equal([
'next 10', // map
'next 20', // map
'next 20', // filter
'next 30', // map
'next 30', // filter
'next 2', // count
]);
done();
} }
);
});
});
17 changes: 0 additions & 17 deletions spec/helpers/interop-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import { Observable, Operator, Subject, Subscriber, Subscription } from 'rxjs';
export function asInteropObservable<T>(observable: Observable<T>): Observable<T> {
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'lift') {
const { lift } = target as any;
return interopLift(lift);
}
if (key === 'subscribe') {
const { subscribe } = target;
return interopSubscribe(subscribe);
Expand All @@ -23,7 +19,6 @@ export function asInteropObservable<T>(observable: Observable<T>): Observable<T>
const { lift, subscribe, ...rest } = Object.getPrototypeOf(target);
return {
...rest,
lift: interopLift(lift),
subscribe: interopSubscribe(subscribe)
};
}
Expand Down Expand Up @@ -56,18 +51,6 @@ export function asInteropSubscriber<T>(subscriber: Subscriber<T>): Subscriber<T>
});
}

function interopLift<T, R>(lift: (operator: Operator<T, R>) => Observable<R>) {
return function (this: Observable<T>, operator: Operator<T, R>): Observable<R> {
const observable = lift.call(this, operator);
const { call } = observable.operator!;
observable.operator!.call = function (this: Operator<T, R>, subscriber: Subscriber<R>, source: any) {
return call.call(this, asInteropSubscriber(subscriber), source);
};
observable.source = asInteropObservable(observable.source!);
return asInteropObservable(observable);
};
}

function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
return function (this: Observable<T>, ...args: any[]): Subscription {
const [arg] = args;
Expand Down
2 changes: 1 addition & 1 deletion spec/operators/delay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ describe('delay', () => {
const result = e1.pipe(
repeatWhen((notifications) => {
const delayed = notifications.pipe(delay(t));
subscribeSpy = sinon.spy((delayed as any)['source'], 'subscribe');
subscribeSpy = sinon.spy(notifications as any, 'subscribe');
return delayed;
}),
skip(1),
Expand Down
47 changes: 0 additions & 47 deletions spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1463,53 +1463,6 @@ describe('groupBy operator', () => {
});
});

it('should not break lift() composability', (done) => {
class MyCustomObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
return observable;
}
}

const result = new MyCustomObservable((observer: Observer<number>) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
groupBy(
(x: number) => x % 2,
(x: number) => x + '!'
)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expectedGroups = [
{ key: 1, values: ['1!', '3!'] },
{ key: 0, values: ['2!'] },
];

result.subscribe({
next: (g: any) => {
const expectedGroup = expectedGroups.shift()!;
expect(g.key).to.equal(expectedGroup.key);

g.subscribe((x: any) => {
expect(x).to.deep.equal(expectedGroup.values.shift());
});
},
error: (x) => {
done(new Error('should not be called'));
},
complete: () => {
done();
},
});
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
Expand Down

0 comments on commit e0bdccf

Please sign in to comment.