Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: maybeIterator #68

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft

feat: maybeIterator #68

wants to merge 12 commits into from

Conversation

jeswr
Copy link
Collaborator

@jeswr jeswr commented Apr 28, 2022

Resolves #66

This PR introduces a maybeIterator which takes an iterator as input, and returns an iterator only if it is non-empty. Otherwise null is returned.

Happy to accept naming suggestions on this one.

Pinging @jacosaz

@jeswr jeswr marked this pull request as draft April 28, 2022 05:10
@RubenVerborgh
Copy link
Owner

Could you elaborate on use cases?

@jeswr
Copy link
Collaborator Author

jeswr commented Apr 28, 2022

The use case for me is to terminate forwards chaining reasoning here is some of the code for it, very WIP. Below is a minimal naive example to give you an idea of what the primary logic is

function getConsequents(quad: RDF.Quad): AsyncIterator<Quad> {
  // Gets any new consequences that can result from adding 'quad' to the dataset with the current rules
}

// New quad to be added to the dataset
let iterator = single(quad);
// N3 Store
const store = new Store();

while ((iterator = maybeIterator(iterator)) !== null) {
  // Get any new quads, by applying rules to quads generated in previous 'loop'
  iterator = union(iterator.map( q => getConsequents(q) ))
  // Add new quads to the store, and filter out from the iterator if already present in the store
  iterator = iterator.filter(q => store.addQuad(q))
}

Generally, it can be used to terminate any recursive operation that eventually stops generating new items.

I recognise that this is not as not as important as the performance issues that are open, so those should still be addressed before this.

@RubenVerborgh
Copy link
Owner

The while loop is a good example!

while ((iterator = maybeIterator(iterator)) !== null) {

I take it this is

while ((iterator = await maybeIterator(iterator)) !== null) {

but yes, I see the case now. Thinking perhaps to call it ensureNonEmpty.

Although for this case (and if this is the general pattern), it could also be

do  {
  // Get any new quads, by applying rules to quads generated in previous 'loop'
  iterator = union(iterator.map( q => getConsequents(q) ))
  // Add new quads to the store, and filter out from the iterator if already present in the store
  iterator = iterator.filter(q => store.addQuad(q))
} while (await hasItems(iterator))

with

function hasItems(iterator) {
  return new Promise((resolve, reject) => {
    iterator.once('error', reject);
    iterator.once('data', () => resolve(true));
    iterator.once('end', () => resolve(false));
  });
}

@jeswr
Copy link
Collaborator Author

jeswr commented Apr 28, 2022

await maybeIterator(iterator)

Ah yes, good catch, and indeed do-while is a better choice a lot of the time.

hasItems

The current hasItems implementation will result in one element being 'discarded' each time because of the once call, which is why there is need to return a new iterator with the item re-appended. Though the logic for retrieving the first element, and then re-appending it may well be improved.

I'm also hesitant about the use of once because the Readable API will leave an iterator in flowing mode once a when a data listener is unsubscribed (for backwards compat reasons) and I had made changes to align with that in #43 - this means that an iterator would be left in flowing mode after one element has been emitted using once.

In addition I noticed some buggy behavior on a somewhat similar approach (due to the implementation of toArray) with

export async function maybeIterator<T>(source: AsyncIterator<T>): Promise<null | AsyncIterator<T>> {
  const elem = await source.take(1).toArray();
  return elem.length === 1 ? source.prepend(elem) : null;
}

where I get the following failing tests

 1) maybeIterator
       Should return an iterator with all elements if the iterator is not empty
         fromArray:

      AssertionError: expected [ 1 ] to deeply equal [ 1, 2, 3 ]
      + expected - actual

       [
         1
      +  2
      +  3
       ]
      
      at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:94:83)

  2) maybeIterator
       Should return an iterator with all elements if the iterator is not empty
         range 1-3:

      AssertionError: expected [ 1 ] to deeply equal [ 1, 2, 3 ]
      + expected - actual

       [
         1
      +  2
      +  3
       ]
      
      at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:97:74)

  3) maybeIterator
       Should return an iterator with all elements if the iterator is not empty
         MyItemBufferingIterator:

      AssertionError: expected [ 8 ] to deeply equal [ 8, 6, 4, 2, 0 ]
      + expected - actual

       [
         8
      +  6
      +  4
      +  2
      +  0
       ]
      
      at Context.<anonymous> (file:///home/jeswr/comunica-perf/AsyncIterator/test/maybeIterator-test.js:103:92)

@RubenVerborgh
Copy link
Owner

RubenVerborgh commented Apr 28, 2022

The current hasItems implementation will result in one element being 'discarded' each time because of the once call,

No, because it's after the filter has been applied? So it's just a second listener?

I'm also hesitant about the use of once

Yes, the better implementation is to remove the listeners yourself.

@jeswr
Copy link
Collaborator Author

jeswr commented Apr 28, 2022

No, because it's after the filter has been applied? So it's just a second listener?

But won't it still pull the first element out of the list before iterator = union(iterator.map( q => getConsequents(q) )) is applied in the next iteration of the do-while loop?

Yes, the better implementation is to remove the listeners yourself.

Even then the readable API leaves the iterator in flowing mode; it requires that you explicitly call iterator.pause() in order to stop flowing mode IIRC.


https://nodejs.org/api/stream.html#two-reading-modes

For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream. Also, if there are piped destinations, then calling stream.pause() will not guarantee that the stream will remain paused once those destinations drain and ask for more data.

@RubenVerborgh
Copy link
Owner

But won't it still pull the first element out of the list before iterator = union(iterator.map( q => getConsequents(q) )) is applied in the next iteration of the do-while loop?

Oof, you're right.

Copy link
Owner

@RubenVerborgh RubenVerborgh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some quick thoughts.

asynciterator.ts Outdated Show resolved Hide resolved
asynciterator.ts Outdated
let item;
do {
if ((item = source.read()) !== null)
return source.prepend([item]);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would return new AsyncIterator whose read method has if (item) { const rest = item; item = null; return item;} return iterator.read();.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once #65 is resolved/merged I imagine this doesn't produce much of an optimization, so not sure if it is worthwhile doing this.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah; might still be a good local implementation for the quick one-element case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local implementation for the quick one-element case.

In going to do this I realised that the logic readable events should be roughly identical to in #59 so will wait until that is stable / merged and then we can use the same logic or refactor the necessary logic into a common abstract class.

asynciterator.ts Outdated Show resolved Hide resolved
@coveralls
Copy link

coveralls commented Apr 29, 2022

Coverage Status

Coverage decreased (-0.7%) to 99.337% when pulling 9d29345 on maybeIterator into fec14f4 on main.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AsyncIterator-ext Package
3 participants