Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buckets and splitBy operators #6388

Closed

Conversation

backbone87
Copy link
Contributor

@backbone87 backbone87 commented May 8, 2021

  • Add the operator to Rx
  • It must have a -spec.ts tests file covering the canonical corner cases, with marble diagram tests
  • The spec file should have a type definition test at the end of the spec to verify type definition for various use cases -- is the type test in splitBy-spec sufficient?
  • The operator must be documented in JSDoc style in the implementation file, including also the PNG marble diagram image -- how to make the marble diagram?
  • The operator should be listed in docs_app/content/guide/operators.md in a category of operators
  • The operator should also be documented. See Documentation Guidelines.

Description:

Introduces 2 new operators buckets and splitBy.

buckets uses a hash function to partition values across multiple multicasted (hot) observables. This behaves similar to groupBy, but the groups are known immediately.

splitBy is the special case of buckets where the count of buckets is 2. This is similar to partition, but has a usage ergonomy more similar to groupBy.

implementation is based upon bucketBy and splitBy in rxjs-etc by @cartant

Related issue (if exists): #3807 #4419 #5731

Copy link
Collaborator

@cartant cartant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments. Will review it properly, later. Also, for PRs that change the public API, you will need to use npm run api_guardian:update.

spec/operators/splitBy-spec.ts Outdated Show resolved Hide resolved
* default hash function converts the values into numbers, if they are not
* already.
*/
hashFn?: (value: T) => number;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a fan of abbreviations. I'd prefer hasher, instead, but this can be bikeshedded later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i also just picked this up from somewhere, bikeshed away ;)

src/internal/operators/splitBy.ts Show resolved Hide resolved
@cartant cartant added the 7.x Issues and PRs for version 7.x label May 9, 2021
@backbone87 backbone87 force-pushed the feature/buckets-and-split-by branch from 9467c97 to 6ad4057 Compare May 9, 2021 04:50
@backbone87
Copy link
Contributor Author

backbone87 commented May 9, 2021

in regards to naming: is intoBuckets a better choice?

edit: splitInto

@cartant
Copy link
Collaborator

cartant commented May 9, 2021

I think it could probably be a single operator - splitBy - with two signatures with different option types - SplitByPredicateOptions and SplitByHashOptions - with the predicate and count parameters moved into the respective options types. IDK. Much bikeshedding is possible.

@backbone87
Copy link
Contributor Author

personally, i dont like merging them into a single operator. too overloaded from user & dev perspective. but that's just my feeling. splitInto(3), splitBy(({ name }) => name === 'rxjs') just reads/sounds nice (not a native speaker though)

@cartant
Copy link
Collaborator

cartant commented May 9, 2021

TBH, I was not planning on bringing bucketBy - or whatever - into this codebase. I was only going to bring in splitBy. I don't anticipate there being much appetite for an additional operator. Especially one that hasn't been requested. Expect this to be bikeshedded/discussed/debated, too.

@benlesh
Copy link
Member

benlesh commented May 10, 2021

So looking over this, at first blush. buckets looks like a specialized groupBy. and splitBy is a specialized buckets. So it makes me wonder if we need either of these. 🤔 Just thinking out loud here.

const buckets = (count) => groupBy((value, i) => i % count)
// or
const buckets = ({ count, hasher }) => groupBy((value, i) => hasher(value, i) % count);
// or
const splitBy = (predicate) => groupBy(predicate);

Am I off-base? What am I missing?

@benlesh
Copy link
Member

benlesh commented May 10, 2021

To be clear @backbone87 ... this is awesome work! And I really appreciate it. However, we're going to be on the hook to support this API long-term, so I want to be sure we're doing the right thing.

@benlesh
Copy link
Member

benlesh commented May 10, 2021

Another thing to consider... And groupBy handles this... but there's a scenario with operators like this, where the consumer can take the observable out of the result and subscribe to it, then early-unsub from the source. In those cases, the expectation would be that the created "buckets" or "groups" would still work until they were unsubbed. At least that's true for groupBy, so I guess I would expect the same behavior here.

const source = interval(1000);
const result = source.pipe(buckets(3));

const capturedBucketSubs = [];

const outerSubs = result.subscribe(bucket => {
   capturedBucketSubs.push(bucket.subscribe(console.log));
});

// unsubbing outer first shouldn't kill existing inners
setTimeout(() => {
  outerSubs.unsubscribe();
}, 5000);

setTimeout(() => {
  for (const subs of capturedBucketSubs) {
    subs.unsubscribe();
  }
}, 10000);

I'm willing to agree this behavior is SUPER debatable, but there it is, and that's how groupBy has worked for years, for better or worse.

Here's an example of that behavior with groupBy: https://stackblitz.com/edit/rxjs-bcpetb?embed=1&file=index.ts

@benlesh
Copy link
Member

benlesh commented May 10, 2021

Sorry, too, @cartant, if I'm just springing this on you. When we talked about this at the core team meeting, I guess I was thinking splitBy did something different, but I'm not sure now, I guess, what I thought it did that groupBy couldn't do.

@backbone87
Copy link
Contributor Author

backbone87 commented May 10, 2021

buckets is not a special case of groupBy. check the examples in the doc comments which demonstrate the difference in behavior. the buckets are also subjects, so unsubbing from them should not change anything, but i can add a test for this.

edit: on buckets vs groupBy:
maybe this doesn't come clear in the examples, but buckets only emits a single value: an array of "hot" observables (the buckets). in the examples I use mergeAll for convenience.

@cartant
Copy link
Collaborator

cartant commented May 10, 2021

@benlesh As I mentioned above, I was planning on ignoring bucketBy/buckets when I eventually got around to dealing with #3807. I don't think it adds anything that's particularly useful. It's just how splitBy was implemented in rxjs-etc. And adding random operators to that package does not come with the implications of adding operators to the core.

FWIW, I kinda agree with you on dumping this in favour of groupBy and a partition deprecation that links to a deprecations/partition.md document that explains how to use groupBy instead.

When I added splitBy to rxjs-etc, groupBy didn't have the typings that it does now. 12 months or so ago Moshe added typings that made it play nice with user-defined type guards, so it composes pretty nicely:

dogsAndCats.pipe(
  groupBy(isDog),
  mergeMap(grouped => grouped.key
    ? /* Observable<Dog> */ grouped.pipe(tap(() => console.log('woof')))
    : /* Observable<Cat> */ grouped.pipe(tap(() => console.log('meow')))
)

I don't know that there is any advantage in using splitBy instead and being presented with the two observables up front - this is the difference mentioned in the original issue:

This is conceptually similar to groupBy with only two groups, except it returns an Array of the grouped Observables instead of an Observable of the groups since the groups can be known ahead of time.

Rather, I think splitBy will be more difficult for peeps to grok, as the two known-ahead-of-time observables will need to be combined into a single observable:

dogsAndCats.pipe(
  splitBy(isDog),
  mergeMap(([dog$, cat$])=> merge(
    dog$.pipe(tap(() => console.log('woof'))),
    cat$.pipe(tap(() => console.log('meow')))
  )
)

TL/DR: I don't see a compelling case for splitBy either, so close this - and the related issues - and document the partition deprecation with examples on the docs site.

@benlesh
Copy link
Member

benlesh commented May 10, 2021

@cartant maybe if we get everything in a monorepo, rxjs-etc might be a solid candidate to join this repository, so we can direct things over there? Obviously debatable, but it might open the door for moving lesser used core operators over, as well. Just something to think about.

@benlesh
Copy link
Member

benlesh commented May 10, 2021

@backbone87 I see your point, that you get the array of observables ahead of time. I'll still need to think about it. Because, in practice, I'm not sure about the advantage that gives. Observable<Observable<X>> versus Observable<X>[]. But it is an interesting bit of nuance between the two approaches.

Still thinking about this one.

@backbone87
Copy link
Contributor Author

no hard feelings here on my side, a little bit of disappointment, but I can handle this, if it doesn't fit, then leave this out.

personally, I have used partition only on very few occasions, but when I did, I definitely wasn't happy with its behavior as it is right now and would have preferred the splitBy behavior. also, these situations could have most likely also been solved with groupBy.

@backbone87
Copy link
Contributor Author

backbone87 commented May 11, 2021

to bring in an example where the splitBy behavior may be useful:
imagine consuming a topic of some broker, which delivers Buffers. you already wrapped this in an observable. now you want to process these buffers. we may have different tools to do this: parseJsonMessage, will try to parse the buffer with JSON.parse. ofc this could fail, so the result would be Json | UnreadableMessage. A second step could be to validate json messages. so we have validateWithJsonSchema<T> which maps to T | UnreadableMessage. the problem here: how can i distinguish real unreadable messages from payloads that just look like UnreadableMessage. ofc we can introduce a wrapper for T, but this seems like a lot of extra stuff for something that we already decided before. so it would be better if parseJsonMessage & validateWithJsonSchema<T> return [Observable<Json>, Observable<UnreadableMessage>] and [Observable<T>, Observable<UnreadableMessage>] respectively. one could argue that is more like splitMap:

pipe(
  splitMap(2, ([nextParsed, nextUnreadable]) => (value) => {
    if (value === null) {
      return nextParsed(value);
    }
    try {
      nextParsed(JSON.parse(value));
    } catch (e) {
      nextUnreadable(new UnreadableMessage(value, e));
    }
  }),
  mergeMap(([parsed, unreadable]) => {
    return parsed.pipe(
      splitMap(2, ([nextValidated, nextUnreadable]) => (value) => {
        if (validate(value)) {
          nextValidated(value);
        } else {
          nextUnreadable(new UnreadableMessage(value, validate.errors));
        }
      }),
      mergeMap(([validated, unreadable2]) => of([validated, merge(unreadable, unreadable2)])),
    );
  }),
);

idk that still looks way too convoluted

@benlesh
Copy link
Member

benlesh commented May 21, 2021

After discussing this PR in the Core Team meeting, I'm so sorry to say that I don't think we're going to go ahead with it at this time, and we should probably close the related issue. That issue, TMK, was filed at a time when groupBy was a little less capable than it is now. While splitBy is indeed a little different, we've decided that the difference isn't sufficient enough to justify supporting the API as part of the library for years to come.

Thank you so much for your hard work, @backbone87, we all really do appreciate it. I'm sorry that this isn't something we can merge into core for now. Especially given that it was just trying to implement something that was requested by a core team member years ago.

Thank you again, @backbone87, and I hope this doesn't put you off from further contributions.

@backbone87
Copy link
Contributor Author

hm ok, no problem. i suspected it.
i have a few questions:

  • what is the way forward with partition? the operator is already deprecated, but the behavior of the factory is still to consume twice instead of multicasting
  • is there a way to achieve the bucket/splitBy behavior with another operator or pipeline of operators?
  • would it be an alternative to add options to groupBy to enable a similar behavior to bucket/splitBy. something like groupBy(({ key }) => key, { seed: () => of('key1', 'key2') }) where seed emits keys for which groups are eagerly opened and when it completes the groupBy completes to downstream, but still receives values from upstream and routes them into existing groups. unrouteable upstream values will trigger a stopped notification hook and seed defaults to () => NEVER, which means the groupBy does not create any groups eagerly and completes with the source.

@Polyterative
Copy link

Polyterative commented Feb 21, 2022

I'm interested in a way to replicate the splitBy with classic operators too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
7.x Issues and PRs for version 7.x
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants