Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Order of sinks can still be an issue #512

Closed
pixeleet opened this issue Jan 26, 2017 · 11 comments
Closed

Order of sinks can still be an issue #512

pixeleet opened this issue Jan 26, 2017 · 11 comments

Comments

@pixeleet
Copy link

Sorry, for the kind of bloated repro-code couldn't make it smaller :( I have a hunch that the issue is somewhat inherent to the HTTP driver but I can't put my finger on it yet.

Code to reproduce the issue:

import xs from 'xstream';
import delay from 'xstream/extra/delay';
import dropRepeats from 'xstream/extra/dropRepeats';
import Cycle from '@cycle/xstream-run';
import {div, button, p, makeDOMDriver} from '@cycle/dom';
import {makeHTTPDriver} from '@cycle/http';
import onionify from 'cycle-onionify';
import isolate from '@cycle/isolate'

function main(sources) {

  function Posts(sources) {
    function intent(httpSource) {
      return httpSource.select('getPosts')
        .flatten()
        .map(res => res.body)
        .map(body => ({ type: 'getPosts', payload: body }))
    }
    function model(action$) {
      const initialStateReducer$ = xs.of(posts => posts ? [...posts] : [])
      const postReducer$ = action$.filter(a => a.type === 'getPosts')
        .map(a => a.payload)
        .debug('got new post...')
        .map(posts => prevState => [...prevState, ...posts])
      return xs.merge(initialStateReducer$, postReducer$)
    }
    function view(posts$) {
      return posts$.map(posts => {
        console.log(posts)
        return div('.container', [
          div('Posts'),
          div('.posts', posts.map(post => p(post.title)))
        ])
      })
    }

    const request$ = xs.of({ url: 'https://jsonplaceholder.typicode.com/posts', category: 'getPosts' })
    const posts$ = sources.onion.state$
    const action$ = intent(sources.HTTP)
    const reducer$ = model(action$)
    const vdom$ = view(posts$)

    return {
      DOM: vdom$,
      onion: reducer$,
      HTTP: request$
    }
  }

  function Users(sources) {
    function intent(httpSource) {
      return httpSource.select('getUsers')
        .flatten()
        .map(res => res.body)
        .map(body => ({ type: 'getUsers', payload: body }))
    }
    function model(action$) {
      const initialStateReducer$ = xs.of(users => users ? [...users] : [])
      const usersReducer$ = action$.filter(a => a.type === 'getUsers')
        .map(a => a.payload)
        .debug('got users...')
        .map(users => prevState => [...prevState, ...users])
      return xs.merge(initialStateReducer$, usersReducer$)
    }
    function view(users$) {
      return users$.map(users => {
        console.log(users)
        return div('.container', [
          div('Users'),
          div('.users', users.map(user => p(user.name)))
        ])
      })
    }

    const request$ = xs.of({ url: 'https://jsonplaceholder.typicode.com/users', category: 'getUsers' })
    const posts$ = sources.onion.state$
    const action$ = intent(sources.HTTP)
    const reducer$ = model(action$)
    const vdom$ = view(posts$)

    return {
      DOM: vdom$,
      onion: reducer$,
      HTTP: request$
    }
  }

  function dumbRouter(sources) {
    const router = xs.merge(
      xs.of('posts'),
      xs.of('users').compose(delay(5000))
    )

    const pages = {
      posts: sources => isolate(Posts, 'posts')(sources),
      users: sources => isolate(Users, 'users')(sources)
    }

    const component$ = sources.router
      .compose(dropRepeats())
      .map(route => pages[route])
      .map(component => component(sources))
      .debug('sinks')
      .remember()

    /* This doesn't work */

    const sinks = {
      HTTP: component$.map(component => component.HTTP).flatten(),
      DOM: component$.map(component => component.DOM).flatten().startWith(div('loading...')),
      onion: component$.map(component => component.onion).flatten(),
      router
    }

    /* This does work */

    // const sinks = {
    //   DOM: component$.map(component => component.DOM).flatten().startWith(div('loading...')),
    //   onion: component$.map(component => component.onion).flatten(),
    //   HTTP: component$.map(component => component.HTTP).flatten(),
    //   router
    // }

    return sinks
  }

  return isolate(dumbRouter)(sources)
}

const wrappedMain = onionify(main);

Cycle.run(wrappedMain, {
  DOM: makeDOMDriver('#main-container'),
  HTTP: makeHTTPDriver(),
  router: router$ => router$.debug('routeChanged').startWith('posts')
});

Expected behavior:
First I see the Posts component, rendering the titles of the 100 posts
After 5 seconds I see the Users component which renders the 10 users

Actual behavior:
First I see the Posts component, rendering the titles of the 100 posts
After 5 seconds I see the Users component but it doesn't render the 10 users, the request is fired but it's response handler is never called.

Versions of packages used:

"dependencies": {
    "@cycle/dom": "12.0.3",
    "@cycle/http": "^11.3.0",
    "@cycle/isolate": "^1.3.2",
    "@cycle/xstream-run": "^4.2.0",
    "cycle-onionify": "^2.4.0",
    "xstream": "^9.3.0"
  },
  "devDependencies": {
    "browserify": "13.0.0",
    "babel-preset-es2015": "^6.3.13",
    "babel-register": "^6.4.3",
    "babelify": "7.2.0",
    "mkdirp": "0.5.x"
  },
@staltz
Copy link
Member

staltz commented Jan 28, 2017

I reproduced it here, also happens with the latest (non-rc) versions of the dependencies you mentioned. Seems like a legit bug. I'm labeling it scope:dom, scope:http, scope:run because it's unclear so far where the source of the problem is.

Any ideas @whitecolor? You helped us solve this in run using buffers.

@staltz
Copy link
Member

staltz commented Jan 28, 2017

Seems like the scope of the problem is Onionify, because replacing

        reducerMimic$.imitate(sinks[name]);

with

        var buffer = {next: [], error: [], complete: []};
        var replicator = {
            next: function (x) { buffer.next.push(x); },
            error: function (x) { buffer.error.push(x); },
            complete: function (x) { buffer.complete.push(x); },
        };
        sinks[name].addListener({
            next: function (x) { replicator.next(x); },
            error: function (x) { replicator.error(x); },
            complete: function (x) { replicator.complete(x); },
        });
        buffer.next.forEach(function (x) { reducerMimic$._n(x); });
        buffer.error.forEach(function (e) { reducerMimic$._e(e); });
        buffer.complete.forEach(function () { reducerMimic$._c(); });
        replicator.next = function (x) { reducerMimic$._n(x); };
        replicator.error = function (e) { reducerMimic$._e(e); };
        replicator.complete = function () { reducerMimic$._c(); };

Solves the problem, but I still want to know exactly why, deeper.

@wclr
Copy link
Contributor

wclr commented Jan 30, 2017

I believe the same problem as it was in cycle core itself: late listeners of reducerMimic$ will only get values that are emitted by target sink after subscribed and values got by internal imitate subscription before this won't be visible to them. So imitate should be modified to fix this to have internal buffer like in the core.

@staltz
Copy link
Member

staltz commented Mar 24, 2017

Closing this issue here because we opened an issue in onionify staltz/cycle-onionify#17

@staltz staltz closed this as completed Mar 24, 2017
@binary-koan
Copy link

@staltz I can reproduce this without onionify. Looks like it's something to do with using merge on a synchronous and asynchronous stream:

import {run} from '@cycle/run';
import {makeDOMDriver, div} from '@cycle/dom';
import xs from 'xstream';

function makeEchoDriver() {
  return function echoDriver(events$) {
    events$.addListener({ next: event => console.log(event) })
  }
}

function main (sources) {
  // This works
  //const number$ = xs.of(1, 2, 3)
  
  // This doesn't
  const number$ = xs.merge(xs.of(1, 2, 3), sources.DOM.select("button").events("click"))

  return {
    // Not important, although you can reproduce this by mapping over number$ for the DOM as well
    DOM: xs.of(div("test")),

    // The first one of these is the only one to get the events when using merge()
    echo: number$,
    echo2: number$.map(n => n * 2)
  };
}

const drivers = {
  DOM: makeDOMDriver('.app'), // Just to have events() as an async stream
  echo: makeEchoDriver(),
  echo2: makeEchoDriver()
}

run(main, drivers)

Might be a bug with xstream even?

@Widdershin Widdershin reopened this Apr 15, 2018
@ntilwalli
Copy link
Contributor

ntilwalli commented Apr 15, 2018

@binary-koan I believe this is by design. xstream is multicast by default, meaning it's not referentially transparent. Not referentially transparent means the first use of number$ is not the same as the second use of number$. When the echo sink subscription gets propagated up to number$, it is the first subscriber and it synchronously subscribes to the merged xs.of(...) and the DOM-event stream. The xs.of(..) stream causes immediate emissions and completes, but because it's merged with the DOM-event stream (which doesn't complete, since it's hot), the overall number$ does not complete . When the echo2 sink subscribes to number$, it textually reads like the same stream, and it uses the same stream reference, but it's behavior is different than the first time it was referenced. Multicast streams have state. And that state tracks the question "Does this stream reference already have any subscribers?". The second time around number$ already has one subscriber so the second subscriber does not propagate the subscription request up to the xs.of(...) because the merged stream has not yet completed, it's still active, so there are not immediate synchronous emissions for the second subscription. RxJS and Most are referentially transparent so by default, they would work as you seem to expect. In those libs you need to multicast explicitly (which for some people also feels unintuitive).

The reason the non-merged stream does work as you expect is because it's not merged with a hot (will-not-complete) stream. So the second subscription sees the completed stream, and thus does trigger a subscription which causes the sync emissions the second time around as well.

I've actually never seen the issue this example exposes in a real-world project because the need for xs.of() with more than one value is exceedingly rare (I've only ever seen them used during stream-learning exercises). I can't think of a use-case where I need more than one synchronous emission to start with and in the case where there is only one emission to start, using remember solves the problem because ecto2, being the late subscriber would still see the value, since it was cached in a length-1 buffer and then emitted to late subscribers... (where "late-subscriber" means not-the-first but on the same turn of the event-loop as the first subscriber). In the case you do need more than one sync emission to start, or you need more than 1-length buffer, you can use the buffer operator (https://github.com/staltz/xstream/blob/master/src/extra/buffer.ts)

If you change yourxs.of stream to have only one emission and then append a remember, this example should work. Like so: const number$ = xs.merge(xs.of(1), sources.DOM.select("button").events("click")).remember()

BTW, I'm saving this question. It's perfect for a streams-in-JS interview. It's got all the topics which make streams hard to initially grok... hot/cold, unicast/multicast and event-loop-ordering (sync/async) 💯 😉

@ntilwalli
Copy link
Contributor

ntilwalli commented Apr 15, 2018

@pixeleet Regarding your original question, the HTTP sink emission in the child is synchronous, so it happens before the response stream has been subscribed. One hacky solution at that time was to add a small delay the sync HTTP emission to the sink so the source subscription completes before the emission, but this was more generally solved by the microtask queue update: 9d0fc02

@Widdershin
Copy link
Member

I've actually never seen the issue this example exposes in a real-world project because the need for xs.of() with more than one value is exceedingly rare (I've only ever seen them used during stream-learning exercises).

This is actually a problem in @binary-koan's app, and using xs.of with a single emission is not a viable option in his case.

Even if this is by design, I think we should question the pros and cons of that design.

This is quite unintuitive and can cause nasty bugs that are hard to resolve.

One option I can see to resolve it is to actually make of and fromArray asynchronous. I know that some will argue that people can opt in with .compose(delay(0)), but libraries like RxJS provide that sort of behaviour if people want it. We could also take the current implementation as make it an extra, like ofSync and fromArraySync, and make the default async. That way it's less likely people will get bitten by nasty behaviour like this but they can still opt in to sync if they know the risks.

Since xstream is designed for Cycle, and aims to eliminate many common issues that crop up from using stream libraries designed to be more general purpose, I think we should strongly consider doing whatever we can to resolve this class of issues in xstream.

@staltz
Copy link
Member

staltz commented Apr 17, 2018

Thanks @binary-koan for the reproducible snippet. I read this issue but could not find time until now to comment in details. I still want to find the root cause instead of putting a simple setTimeout or setImmediate there. @whitecolor is strongly against that approach, and I nowadays tend to agree with him (after having put setTimeout inside cycle/run and seeing some non-intended problems).

It could be an issue related to scheduling:
https://staltz.com/primer-on-rxjs-schedulers.html

Or could be just a detail or corner case in the buffer system in cycle/run, or something along those lines.

I'm careful to not go to a quick solution for this, I want to first understand what's the root cause and how broad is it as a foundational issue. In summary, I "agree with everyone" here, just want to make sure we move carefully and very intentionally with surgical precision.

@binary-koan
Copy link

binary-koan commented Apr 17, 2018

Thanks for all the explanations and context - great to get more of an idea of what the issues and concepts surrounding this are!

I'll just add a thought as someone relatively new to building apps in cycle: one of the great realisations I had was how I could set up all my game interactions and how they map to states up front, then just leave cycle to schedule all those interactions in a way that makes sense. It was oddly jarring to be taken out of that convenient asynchronous world and back into one where the order of method calls matters and state is something you have to worry about, just because (as I imagined) I happened to be firing an action at the start of the timeline.

I'll definitely be reading up more on different implementations of streams and when they're useful, just worth noting that (for better or worse) this was the first time I even needed to care about concepts like sync/async and multicasting in a couple of weeks of tinkering with cycle.

@staltz
Copy link
Member

staltz commented May 19, 2018

Okay, in the big picture, I can now realize that @binary-koan's example was about scheduling, which expressed itself through the ordering of sinks, but that's just because the ordering of sinks is a one-to-one relation to the order of subscribes underneath. This original issue was about race conditions when during initial setup, and I know these two cases sound very similar, but scheduling is more general than this issue.

So I'll close this issue. You can always open another issue that refers to this issue, but the truth is that this issue is resolved.

Furthermore, the latest example was about scheduling with xstream, so it should actually belong to the xstream repo. If you use RxJS v5+ and opt-in to use a breadth-first scheduler (like queue scheduler), then the problem will go away. In RxJS v4+ that was the default scheduler, which made it much easier to reason about, but the two downsides were that it trashed the stacktrace a lot, plus it was considerably slower.

That said, I agree with @Widdershin that we should focus on intuitive APIs, and using a breadth-first scheduler by default would "just work" for most people. We would then need to figure out: (1) how to keep the stack trace sane for those wishing to debug their app OR provide completely custom debugging tools, and (2) how to document/teach when to opt-in to faster schedulers.

So this is something I want to eventually fix, but it's a big project.

@staltz staltz closed this as completed May 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants