-
Notifications
You must be signed in to change notification settings - Fork 8.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Build Kibana observable lib based on "native" observables #14209
Build Kibana observable lib based on "native" observables #14209
Conversation
platform/config/ConfigService.ts
Outdated
this.config$, | ||
map(config => get(config, path)), | ||
distinctUntilChanged((prev, next) => isEqual(prev, next)) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example of using this lib ^
packages/kbn-observable/README.md
Outdated
@@ -0,0 +1,48 @@ | |||
# k$ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this readme describes the essentials
Not well edited, just thoughts:
I totally agree here, but I also think this means that we currently exposing a massive library we probably don't understand fully, with subtle behaviors that aren't obvious until you crack open the source code. When I cracked open the source code I also found that there are a lot of undocumented features, and even the primary docs are not up to date.
I still think this is the way to go, but I think also want to make sure we are open minded to deviations from RxJS and that we consider the value of replacing operators wholesale from RxJS. There are a lot of things in RxJS that don't make sense to me and that I would do differently, and perhaps make more sense for a corelib in Kibana. For instance, rather than reuse an existing implementation and tie it's entire core API to some external library cycle.js wrote https://github.com/staltz/xstream, which behaves similar to RxJS but has some key differences:
Making everything multicast doesn't seem like an option to me, but I do think we can redesign some of the multi-cast stuff to be a little less crazy and perhaps find a better way to model behaviors and or subjects. |
packages/kbn-observable/README.md
Outdated
|
||
// `k$` accepts a second argument, an array of "operators" that modifies the | ||
// input value and return an observable that reflects all of the modifications. | ||
k$(observable, [ map(i => 2017 + i), last() ]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I very much prefer the two-argument signature to the n-argument signature you've implemented here, mostly so the observable and operators are clearly separated but also so that multi line calls look a little more like chaining:
const object$ = $fromFactory(async() => {
const resp = await ajax();
return resp.objects;
});
k$(object$, [
map(...),
filter(...),
reduce(...)
]);
// vs
k$(
object$,
map(...),
filter(...),
reduce(...)
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I did that initially, but TypeScript struggles with typing it for some reason, e.g. if you map from a string to a number in the first operator, the second operator will still be typed as going from string (not number). Might have been something weird I did, so I can try to revert that and see if I can make it work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypeScript struggles with typing it for some reason
😭
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I know :/
I played around with this, and it doesn't seem to work with the array. Simplified example using TypeScript playground. To display the error I'm just trying to "force" result
to be a number, but you'll see that TypeScript believes it should be a string instead. It's probably related to how TypeScript handles overloads and generics within arrays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can also be something like this or similar :)
k$(object$)(
map(...),
filter(...),
reduce(...)
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, interesting idea @azasypkin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's an interesting idea. I wonder if people will just start doing something like:
const myFactory$ = k$(object$);
And then we're in some ways kinda back where we started. However, this is a very simple type, and the result of calling myFactory$
above will always be a native observable.
I'll go through and make the changes, then we can see if we like it.
I'm really glad you got this typed, it will really help me better understand how types work |
Good points re RxJS. As you say, the docs are definitely lacking at times. And the lib contains lots of stuff. We could definitely simplify things to only solve for our use-cases, while still allowing plugins to use RxJS or whatever (internally we'd likely use our dep throughout, instead of pulling in other observable libs though). |
I have mixed feelings: I agree with @spalger points, but at the same time share @kjbekkelund concerns. There is a lot of work to be done for our own "RxJS" (implementation, documentation and tests) even though we need just a limited set of APIs. I'm wondering if we can find a reasonable trade-off for the time being. For example Obviously this approach has its own downsides, but just wanted to add my 2 cents. Regarding PoC - if we decide to implement operators from scratch I'd vote for very limited set of supported operators (at least at the beginning) so that it doesn't slow us down. |
0a715c2
to
4597888
Compare
7a59ba5
to
0ab2c8d
Compare
In 0f8f3dd I applied @azasypkin's idea of k$(object$)(
map(...),
filter(...),
reduce(...)
); Better? Weird? |
0f8f3dd
to
1e02616
Compare
@azasypkin This is ready for a first pass. There's still a few "rough edges" (e.g docs), but would be great to get some feedback on what's there. |
8911524
to
85eb74c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reading PR, just few comments so far.
@@ -0,0 +1,89 @@ | |||
import { noop } from 'lodash'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: noop
is used only once here, maybe you can use just () => {}
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
op8: UnaryFunction<G, H>, | ||
op9: UnaryFunction<H, I> | ||
): UnaryFunction<T, I>; | ||
/* tslint:enable:max-line-length */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we really need this tslint
line?
/* tslint:enable:max-line-length */ | ||
|
||
export function pipe<T, R>( | ||
...fns: Array<UnaryFunction<T, R>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: IIRC we usually use []
notation for arrays in types. So maybe UnaryFunction<T, R>[]
here and in the function below?
fns: Array<UnaryFunction<T, R>> | ||
): UnaryFunction<T, R> { | ||
if (!fns) { | ||
return noop as UnaryFunction<any, any>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit: as UnaryFunction<T, R>
? It's arguable though, hence optional :)
export function pipeFromArray<T, R>( | ||
fns: Array<UnaryFunction<T, R>> | ||
): UnaryFunction<T, R> { | ||
if (!fns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Did you mean fns.length === 0
here?
// TODO decide if this is needed or not | ||
|
||
// try to coerce the result into an observable | ||
const coerced = tryCoerce(result as any); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit: maybe result as ObservableInput<T>
?
* @param {Function} | ||
* @returns {Observable} | ||
*/ | ||
export function $fromFactory<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: What use cases do you envision for this method? There is no need to change anything here or something, just wondering. I was just a bit confused (before I read the jsdoc) that the "factory" is essentially a "one-time factory" used to produce a single value, but it's probably just me :)
* @param items vararg items | ||
* @return | ||
*/ | ||
export function $of<T>(...items: T[]): Observable<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: why do we need this factory if we have Observable.of
already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I made it a pure alias. Still not sure I like it, but I know Spencer loves it ;)
values[i] = value; | ||
|
||
if (needFirstCount === 0) { | ||
observer.next(values.slice(0) as T[]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit: 0
in slice
can be omitted.
$from(observable).subscribe({ | ||
next(value) { | ||
if (values[i] === pending) { | ||
needFirstCount -= 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Just curious, why not needFirstCount--
? :)
actual.push(val); | ||
}, | ||
complete() { | ||
expect(actual).toEqual([[3, 1], [3, 2], [2, 2], [2, 3]]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: if this expect fails, will the test fail with proper message or "this test timed out" instead?
export class ObjectUnsubscribedError extends Error { | ||
constructor() { | ||
const err: any = super('object unsubscribed'); | ||
(<any>this).name = err.name = 'ObjectUnsubscribedError'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Hmm, it seems we don't have to cast this
to any
here, or am I missing something?
|
||
export class Subject<T> extends Observable<T> implements Subscription { | ||
observers: Observer<T>[] = []; | ||
closed: boolean = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isn't boolean
inferred from the default value?
} | ||
|
||
for (const observer of this.observers) { | ||
observer.next!(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It seems that !
is unnecessary here.
this.isStopped = true; | ||
|
||
for (const observer of this.observers) { | ||
observer.error!(err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same note about !
here and all other cases below.
import { ObjectUnsubscribedError } from './errors'; | ||
|
||
export class BehaviorSubject<T> extends Subject<T> { | ||
constructor(private _value: T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same note/question about _
in names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing _
for now, then we can decide later whether or not we want to add it to private
/protected
fields.
protected _subscribe(observer: Observer<T>): Subscription { | ||
const subscription = super._subscribe(observer); | ||
|
||
if (subscription && !subscription.closed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it seems subscription
is always a defined value and there is no need in subscription &&
.
} | ||
|
||
next(value: T): void { | ||
this._value = value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: it's bikeshedding, but should we update the value if subject is already closed/stopped?
const noop = () => {}; | ||
|
||
export class Subject<T> extends Observable<T> implements Subscription { | ||
observers: Observer<T>[] = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: do we really want all these properties to be public and writeable? I'm especially worried about observers
list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unintentional. Forgot making them non-public.
return this._value; | ||
} | ||
|
||
get value(): T { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we probably want to leave only one public property/method to retrieve the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ only getValue
now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, that's a huge amount of work!
I have mixed feelings about API ergonomics, I can get used to it :) and we can definitely make it better later on.
|
||
return source.subscribe({ | ||
next(value) { | ||
let result: boolean = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: type should be automatically inferred, I think
* Filter items emitted by the source Observable by only emitting those that | ||
* satisfy a specified predicate. | ||
* | ||
* @param predicate A function that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit: somewhat unexpected too-early line break here :)
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Emits only the first value (or the first value that meets some condition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: hmm or the first value that meets some condition
- it seems we don't support it yet :)
export function first<T>(): MonoTypeOperatorFunction<T> { | ||
return function firstOperation(source) { | ||
return new Observable(observer => { | ||
let hasCompleted = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: where do we update this variable? And if it's an issue and we need to update that variable, would be great to have a unit test that would have caught that.
@@ -0,0 +1,36 @@ | |||
import { k$ } from '../../k$'; | |||
import { first } from '../../operators'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit: import { first } from '../first'
|
||
source.subscribe({ | ||
error(err) { | ||
results.push(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: should we check that err
is indeed the original error?
import { Subject } from '../../Subject'; | ||
import { k$ } from '../../k$'; | ||
import { $of } from '../../factories'; | ||
import { skipRepeats } from '../../operators'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import { skipRepeats } from '../skipRepeats';
expect(actual).toEqual(['a']); | ||
}); | ||
|
||
test('should raises error if source raises error', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: should raise
expect(error).toHaveBeenCalledWith(thrownError); | ||
}); | ||
|
||
test('should raises error if source throws', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: should raise
// create an array that will receive values as observables | ||
// update and initialize it with `pending` symbols so that | ||
// we know when observables emit for the first time | ||
const values: (symbol | T)[] = observables.map(() => pending); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Likely GC won't allow values
to stay in memory, but maybe we still should clean-up values on complete. What do you think?
a811e1c
to
e8866c7
Compare
}); | ||
} | ||
|
||
unsubscribe() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed unsubscribe
for now. Not sure if it's ever needed in our context. In our usage we've just complete
d it instead.
The tests look great. They describe the library well for anyone who wants to know what this PR adds to the codebase. But I noticed there aren't tests for Observable itself, which is a fork of the sample implementation in the TC39 spec. Is that why we don't have tests for it? I think if we're comfortable having tests for Subject, BehaviorSubject, and things that depend on Observable, AND we also want to build our own implementation of Observable, testing the core library is important. It seems simple enough to write some tests for things we use, like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, and I also want to add: can we remove the TODO's, or resolve them?
// === Abstract Operations === | ||
|
||
function nonEnum(obj) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of lines with trailing spaces from here to line 321. Might want to run an eslint --fix
on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to note, we've decided not to do any fixes in this file because it's ported over from a non-version-controlled source.
|
||
function getMethod(obj, key) { | ||
|
||
let value = obj[key]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could just be const value = obj[key];
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore
|
||
let value = obj[key]; | ||
|
||
if (value == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style issue: We do multi-line ifs with curlies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore
|
||
// Assert: observer._observer is undefined | ||
|
||
let cleanup = subscription._cleanup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore
} | ||
|
||
function cleanupFromSubscription(subscription) { | ||
return _=> { subscription.unsubscribe() }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I get it. This file was copied over from elsewhere, and we just need to clean it up a bit. We should change this line to
return () => { subscription.unsubscribe(); };
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore
packages/kbn-observable/README.md
Outdated
const resultObservable = k$(sourceObservable, [...operators]); | ||
|
||
// e.g. | ||
const source = Observable.from(1,2,3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.of
packages/kbn-observable/README.md
Outdated
operator was specified. | ||
|
||
Because of that problem we ended up with `k$(source)(...operators)`. With this | ||
change TypeScript is able to corretly type the operator arguments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correctly*
which is a funny typo given the meaning of the word 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙈
could do `source.pipe(...operators)` instead, but we decided against it because | ||
we didn't want to start adding features directly on the `Observable` object, but | ||
rather follow the spec as close as possible, and only update whenever the spec | ||
receives updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit, this is pretty sad, since source.pipe(...operators)
is lovely. But I respect our adherence to principles.
|
||
/** | ||
* TODO link to Subject, then explain what a BehaviorSubject is. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me help with that:
/**
* Subject: http://reactivex.io/documentation/subject.html
* BehaviorSubject: When an observer subscribes to a BehaviorSubject,
* it begins by emitting the item most recently emitted by the source Observable
* (or a seed/default value if none has yet been emitted) and then continues to emit
* any other items emitted later by the source Observable(s).
*/
* `error(e)`, and `complete()`. To feed a new value to the Subject, just call | ||
* `next(theValue)`, and it will be multicasted to the Observers registered to | ||
* listen to the Subject. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the point of this big comment block. Why not just link to http://reactivex.io/documentation/subject.html, or use the description from there which is more concise? Eventually this comment block will become superfluous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My goal has been that we'll add more and more docs to kbn-observable
, so you don't need to go other places to understand the basics of the library, and then we can link other places to give "more context". That's why I felt it made sense to have this big comment block.
Ideally I'd love to see it grow even bigger over time, as we explain the ideas to people and gradually come up with better explanations and better examples that we pull into the comment. I feel that can make it easier for us to diverge from rxjs over time too.
e6513e5
to
81d3614
Compare
Thanks for the feedback, @archanid. I've added some more docs and cleanups, and will be handling the last todos now. The reason I didn't add tests for |
Pushed a couple more things. This feels ready now (I think 🙈). Except for not having tests for |
Not having tests for this core feature feels like a huge hole in our codebase, especially with how readily people are going to adopt Observables. Even though it is close to the spec, they'll be using our implementation of Observable, which lives in our repo. It is not a library that the spec has published, that we are then pulling in via npm. So it seems to make sense to protect at least some of the main functionality we expect from I'll leave it up to you, though. This really LGTM! |
Thanks for the great reviews. Merging now, then we can handle whatever comes up in new PRs :) |
I randomly got here browsing my phone so ignore my ignorance here 😄 Does our interface expose enough of the observable contract http://reactivex.io/documentation/contract.html So folks can create e.g Rx wrappers on the consumption side? Just map/filter/reduce doesn't suffice. |
@Mpdreamz Yep. The community has agreed on how to do interop (using |
This PR implements our own small Observable library. There are docs in the README in the PR that explains the why and how (there's also some of that in the description below).
To get this stuff running locally (as we don't have a proper build system yet):
From a clean setup locally (the best is probably to just
rm
your node_modules):(Yeah, this is horrible — yeah, this is something I'm currently working on fixing properly)
Old discussion/explanation:
This is just an exploration. I hit some snags when trying to expose a minimal, but spec-compliant observable implementation (I'll just call them "native observables" from now on), while still using RxJS internally (see #13760). This was discussed in #13608.
When discussing this @spalger had the idea of working directly with native observables, and have operators etc that always return native observables. We could then base everything on the implementation in https://github.com/tc39/proposal-observable and build whatever helpers we need.
I based my work on his initial PoC, and tried to add the features needed to replace RxJS fully in the new platform code. This includes BehaviorSubject, several other operators, adding TypeScript support, exploring how it can be tested, etc. This lib is built so it can be easily codemod-ed to the new "lettable"/"pipeable" operators in RxJS.
This PoC currently doesn't run, as the new platform still depends on a couple features in RxJS that's not yet built using this new approach. One example is
shareReplay
(which relies on multicast support, which I haven't written any code for yet).After playing around with this for a while I'm not sure I like it. There is a lot of "subtlety" to getting the operators correct, there's more features than I initially thought that we actually rely on, and we also need good resources for people to understand the observable lib.
I think we have three choices:
I think I prefer solution 1. I'd love to have native observables everywhere, but I think this adds too much cost for us right now.