Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: flatMap #64

Closed
romansl opened this issue Jul 14, 2015 · 25 comments
Closed

Java: flatMap #64

romansl opened this issue Jul 14, 2015 · 25 comments
Labels

Comments

@romansl
Copy link

romansl commented Jul 14, 2015

Library lacks primitive flatMap. It has strong practical usage. But I dont know how to implement it in terms of FRP.

Example:

public fun request(): Stream<String> {
    // perform http request.
}

public val refresh: StreamSink<Unit> = Sodium.streamSink<Unit>()
public val valueFromServer: Cell<String> = refresh.flatMap { request() }.hold("")

public fun <A, B> Stream<A>.flatMap(transform: (Event<A>) -> Stream<B>): Stream<B> {
    // how to implement?
}

Or is there a way to do it without flatMap?

@the-real-blackh
Copy link

I think this is equivalent, but I might be wrong:

return Stream.switchS(this.map(transform));

@the-real-blackh
Copy link

Actually it's probably this:

    return Stream.switchS(this.map(transform).hold(new Stream<>()));

@romansl
Copy link
Author

romansl commented Jul 14, 2015

Now I understand why we need switchS. Thank you. Looking at code I found that switchS is almost flatten. I implemented flatten. It helps to implement other basic primitives.

flatten is:

public fun <A> Stream<Stream<A>?>.flatten(): Stream<A>

switchS is:

        Sodium.tx {
            Operational.value(cell).flatten()
        }

flatMap is:

        Sodium.tx {
            stream.map { request() }.flatten()
        }

@romansl
Copy link
Author

romansl commented Jul 15, 2015

We have a problem here. This example is not working:

        fun foo(i: Int): Stream<String> {
            val sink = Sodium.streamSink<String>()
            sink.send("A" + i)
            return sink
        }

        val l = sink.map {
            foo(it.value)
        }.hold(null).switchS().listen { out.add(it.value) }

If resulted stream has firings, them not appear in listen because they cleaned by Transaction.last before meeting with Transaction.last called by switchS.

What do you think?

@the-real-blackh
Copy link

The hold introduces a delay, so it's working as the semantics define. If you need the stream to output something when the switch happens, you'll need to do that separately.

To put it another way, you can do what you want with Sodium as specified, but you can't do it that way.

@the-real-blackh
Copy link

Depending on the problem, defer() might solve it.

@romansl
Copy link
Author

romansl commented Jul 17, 2015

Where I must place defer for this example began to work? Or how to rewrite this to implement true flatMap?

@the-real-blackh
Copy link

"true" flatMap is something that hasn't been defined denotationally, so the whole thing would need to be thought through. Rx has no respect for event simultaneity and this means compositionality is completely broken in Rx. FRP's biggest advantage has just been thrown out the window.
Anyway, try this:

 fun foo(i: Int): Stream<String> {
            val sink = Sodium.streamSink<String>()
            sink.send("A" + i)
            return sink.defer()         <-------------
        }

        val l = sink.map {
            foo(it.value)
        }.hold(null).switchS().listen { out.add(it.value) }

@the-real-blackh
Copy link

It's probably more instructive to look at what actual problem you're trying to solve. Then we've got something to test the advantages and disadvantages against.

@romansl
Copy link
Author

romansl commented Jul 17, 2015

It does not work. I dont know why for now. Logically this was to help... Hm.

Actually im trying to use Sodum in my project. The flatMap needed to perform io operations for example:

fun queryImage(url: String): Stream<Bitmap> {
    val bm = cahce.get(url)
    if (bm != null) {
        val out = Sodium.streamSink<Bitmap>()
        out.send(bm)
        return out
    }

    return queryFromServer(url)
}

clicked().flatMap {
    queryImage(...)
}.listen(...)

@the-real-blackh
Copy link

I think it should work. If it doesn't, it's a bug. If you can't make it work, can you reproduce it in a test case in Java for me? If you do that, I'll fix it.
Note: cache should be modelled as a cell, or it's not referentially transparent.
I think it would be legitimate here to use the same pattern and just add defer to out, like this:

return out.defer()

Now the cache lookup and I/O are both working the same way. This is just the sort of case that I think defer() is for.

@romansl
Copy link
Author

romansl commented Jul 17, 2015

It does not work because same reason. The send in defer calls Transaction.last before switchS last.

@the-real-blackh
Copy link

Ah, OK - that's a bug. I'll find some time and fix it.

@the-real-blackh
Copy link

Fixing it now.

@the-real-blackh
Copy link

I can't reproduce it in the Java version. I've written the test case below. Please add this to your test suite and see what happens. Check that the post() tasks are executed after the last() ones.

NOTE: I've checked this in.

    public void testSwitchAndDefer()
    {
        List<String> out = new ArrayList();
        StreamSink<Integer> si = new StreamSink();
        Listener l = Cell.switchS(si.map(i -> {
            Cell<String> c = new Cell<>("A"+i);
            return Operational.value(c).defer();
        }).hold(new Stream<String>())).listen(x -> { out.add(x); });
        si.send(2);
        si.send(4);
        l.unlisten();
        assertEquals(Arrays.asList("A2", "A4"), out);
    }

@romansl
Copy link
Author

romansl commented Jul 17, 2015

My code is not working. Hm... Can you explain why you code works? Here test case that shows loosing of value by defer:

    public fun testDefer2() {
        val out = ArrayList<String>()
        val sink = Sodium.streamSink<Int>()
        val l2 = arrayOfNulls<Listener>(1)
        val l = Sodium.tx {
            sink.map {
                const("A").operational().value().defer()
            }.listen {
                l2[0] = it.value.listen {
                    out.add(it.value)
                }
            }
        }
        sink.send(1)
        l2[0]?.unlisten()
        l.unlisten();
        TestCase.assertEquals(listOf("A"), out);
    }

It fails.

@the-real-blackh
Copy link

I've finished a day of book writing today so I'll get to this when I can.

@romansl
Copy link
Author

romansl commented Jul 18, 2015

I found a bug in my Transaction implementation. The close is called in the Operational.value. Now testDefer2 and testSwitchAndDefer pass, but testFlatMap2 still fail.

@romansl
Copy link
Author

romansl commented Jul 18, 2015

Now it complitly works. But I dont like the fact that flatten, flatMap and switchS loose events without defer. Should I hadndle this case inside FlattenHandler? I have to think...

@the-real-blackh
Copy link

They don't lose events as such. It's the way the semantics are defined. The question, "Can we improve the semantics?" is always open. And, we should investigate these questions. But, these semantics are the same as Conal Elliott's definition and I have found they work in practice. Reactive Banana works the same way too.

@romansl
Copy link
Author

romansl commented Aug 4, 2015

Back to flatMap... I have a problem with the send here:

        fun foo(i: Int): Stream<String> {
            val sink = Sodium.streamSink<String>()
            sink.send("A" + i) // <--- Exception!
            return sink
        }

        val l = sink.map {
            foo(it.value)
        }.hold(null).switchS().listen { out.add(it.value) }

The send throws You are not allowed to use send() inside a Sodium callback. Earlier I use special function just(A) for this:

    public fun <A> just(value: A): Stream<A> {
        val sink = StreamWithSend<A>()
        Transaction.apply2 {
            sink.send(it, Value<A>(value))
        }
        return sink
    }

It works without exceptions, but now I need send inside flatMap. How to handle this?

@the-real-blackh
Copy link

There are two things to be said here:

  1. You can use Operational.value(new Cell("A" + i)) to do what you are trying to do without generating an exception. I think this is a valid idiom.
  2. (Unless I am mistaken) the resulting code will output nothing because of the inherent delay in switchS().

These are the semantics of Sodium, and, as I said, they're basically the same as Conal Elliott's FRP. A lot of thought went into their design. flatMap() from Rx can't be immediately shoe-horned into this design.

@romansl
Copy link
Author

romansl commented Aug 4, 2015

The flatMap is necessary for my code. This example is simplified part of my real code.

Here:

        fun foo(i: Int): Stream<String> {
            val sink = Sodium.streamSink<String>()
            Promise.call(Executors.DB) {
                // query from DB.
            }.then(Executors.UI) {
                val items = result.map { ... }
                sink.send(items)
            }
            return sink
        }

This simple code will fail if called from UI thread and the result is returned from DB before Promise.then has been called.

@the-real-blackh
Copy link

I consider what you're doing there to be a legitimate way of doing things, and I discuss this in the section on promises in the book... I've got some re-writes to that section that aren't in the MEAP yet.

What I recommended is different in some small details, though: Use the Operational.value(new Cell(i)) idiom to create a "spark" and listen to it with the (new) listenWeak() method. You should do this instead of using I/O "directly". Use addListener(l) on the StreamSink (where l was returned by listenWeak()). Have the promise callback call sink.send(). This must be on a different thread.

You might have to explain how your promise works ... I don't quite understand about result and Promise.then. Must go to bed now.

@romansl
Copy link
Author

romansl commented Aug 4, 2015

Here the code of my Promise implementation: https://github.com/romansl/Promeso

And one more thing. Executors.UI code:

enum Executors {
    // ...
    UI {
        private final Handler mHandler = new Handler(Looper.getMainLooper());
        private final long mUIThreadId = Looper.getMainLooper().getThread().getId();

        @Override
        public void execute(@NotNull final Runnable command) {
            if (Thread.currentThread().getId() == mUIThreadId) {
                command.run();
            } else {
                mHandler.post(command);
            }
        }
    },
    // ...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants