Skip to content

Stream.transform/4 is now aware of the acc’s tail, if presented#5680

Closed
am-kantox wants to merge 2 commits intoelixir-lang:masterfrom
am-kantox:make-stream-transform-care-of-tail
Closed

Stream.transform/4 is now aware of the acc’s tail, if presented#5680
am-kantox wants to merge 2 commits intoelixir-lang:masterfrom
am-kantox:make-stream-transform-care-of-tail

Conversation

@am-kantox
Copy link
Copy Markdown
Contributor

@am-kantox am-kantox commented Jan 19, 2017

Stream.transform/3 in current implementation was unaware of the tail, left in the accumulator:

[1, 2, 3, 4] |> Stream.transform([], fn i, acc ->
  if rem(i, 2) == 0, do: {[acc], [i]}, else: {[], acc ++ [i]}
end) |> Enum.to_list
#⇒ [[1], [2, 3]] # what happened with the tail, `4`?!

Within this patch, if the accumulator is there, do_transform_user is called once more time with :done status, making it possible to:

[1, 2, 3, 4] |> Stream.transform([], fn i, acc ->
  case i do
    :done -> {[acc], nil}
    i when rem(i, 2) == 0 -> {[acc], [i]}
    _ -> {[], acc ++ [i]}
    end
end) |> Enum.to_list
#⇒ [[1], [2, 3], [4]]

I am not sure about :done, but I think it makes sense in general.

@josevalim
Copy link
Copy Markdown
Member

The problem with :done is that you could be going through a list that has :done as an element. I believe the solution needs to be a bit more evolved, as we don't currently support you doing a final pass on the accumulated data. For example, see how chunk is implemented for workarounds.

@am-kantox
Copy link
Copy Markdown
Contributor Author

This looks like a chicken-egg problem and I do not see any harm in having a particularly dedicated value to be a show-stopper element.
There is a simple solution, that simply does not disturb the consumer with this last call:

      {_, []} when is_list(user_acc) ->
        do_list_transform(user_acc, user, fun, :halt, next, inner_acc, inner,
                          &Enumerable.List.reduce([user_acc], &1, fun), after_fun)

Instead of the do_transform_user(:done) in the pull request, this one silently appends the accumulated tail, if presented. My point was that it would be good to let the consumer distinguish between “queue is empty” and “normal processing” to possibly handle the end of data somehow. In that case, though, we should either change the signature of the reducer (which is bah :), or dedicate the specific value to denote “end of the queue.”

It’s evidently up to you to decide, but I like the approach with :done, or, if you want, :⏎.

@josevalim
Copy link
Copy Markdown
Member

Throughout the whole design of the Stream library we have been very careful to not introduce situations where there is ambiguity: i.e. situations where it would be unclear if a given atom is part of a collection or an special token emitted by the stream system. The proper way to solve this would be by using tuples or introducing a new callback. We won't accept any solution where the meaning of atoms are overloaded, as that will eventually turn out to be a bug or source of confusion.

@am-kantox
Copy link
Copy Markdown
Contributor Author

OK, now it’s just silently taking care about the tail being also included into the result.

@josevalim
Copy link
Copy Markdown
Member

@am-kantox unfortunately we cannot assume such as well. We cannot assume that every time the user accumulator is a list that it is something that is meant to be appended as is to the result. We need a general solution, otherwise someone that has the same needs but keeps the accumulator as a tuple or a map, because they need to keep track of more information, is still going to run into issues.

@am-kantox
Copy link
Copy Markdown
Contributor Author

@josevalim ok, I see what you are saying. I will do my best to implement a transform reducer in the streams/reducers.ex and a robust easy-to-read handling of the transform/4 itself.

@josevalim
Copy link
Copy Markdown
Member

@am-kantox you don't need to change streams/reducers.ex as those are only for reducers shared between Enum and Stream. In any case, the only stream that supports "resolving" an accumulator is the chunk one, so I would look at it for ideas.

@josevalim josevalim changed the title Stream.transform/3 is now aware of the acc’s tail, if presented. Stream.transform/3 is now aware of the acc’s tail, if presented Jan 25, 2017
@rschmukler
Copy link
Copy Markdown
Contributor

@josevalim I ran into this today as well. What about making it so that the after_fun in transform/4 gets the opportunity to emit additional events.

ie. instead of discarding after_fun's return, it can return a tuple of {[Enumerable.t], term}?

@am-kantox
Copy link
Copy Markdown
Contributor Author

am-kantox commented Jan 26, 2017

@josevalim ok, now I see what you were saying. I am in the middle of refactoring, I don’t give up. At the moment I have an amount of questions:

— is there any reason for having function/2 instances treated as Streams? Are there any contras against having this in stream_test.exs?

  defp lazy?(stream) do
-    match?(%Stream{}, stream) or is_function(stream, 2)
+    match?(%Stream{}, stream)
  end

I understand that requires more refactoring, but in my opinion it worth it.

— how do we suppose to distinguish tail to be emitted from “just the last accumulator value”?

We need a general solution, otherwise someone that has the same needs but keeps the accumulator as a tuple or a map

Tuple is not enumerable, so we can’t just rely on Enum.empty? or like. Well, I could count and keep a “length” of the accumulator on each step, whatever it means, and compare it against the previous step, but this looks too hacky to me.

— what do you think about adding two new members to Stream struct, namely catcher and after_fun to generalize the way of error handling and finalization? They belong to reduce, they likely should not be spreaded amongst the whole stream.ex.

At the moment that’s it. I should be able to introduce the approach this weekend.

@am-kantox
Copy link
Copy Markdown
Contributor Author

@rschmukler after_fun is definitely not where one might expect to receive a tail. It should be concatenated with the rest and [possibly] reversed.

@josevalim
Copy link
Copy Markdown
Member

@rschmukler after_fun is definitely not where one might expect to receive a tail. It should be concatenated with the rest and [possibly] reversed.

Just to make it clear: the concatenated with the rest and possibly reversed should be done by your code and not by the Stream code. If Stream does it, then it has to assume something about the accumulator but it should never do that.

So for example, allowing after_fun to return something like {:emit, items, last_acc}, is a valid way for solving this issue.

is there any reason for having function/2 instances treated as Streams? Are there any contras against having this in stream_test.exs?

The function/2 is a convenience so you are not forced to create %Stream{}. %Stream{} compose better though. You can change lazy? if you want, that's fine. We shouldn't care about which is which.

@am-kantox
Copy link
Copy Markdown
Contributor Author

OK, then maybe the easiest solution would be: if and only after_fun returns a function/1, this function is being called with acc and the value received is assigned to the result?

  defp do_after(nil, {acc, _user_acc}), do: acc
  defp do_after(fun, {acc, user_acc}) do
    case fun.(user_acc) do
      finalize when is_function(finalize, 1) -> finalize.(acc)
      _ -> acc
    end
  end
# for the case of acc is `List`, this might be passed: 
after_fun = fn rest -> fn acc -> [rest | acc] end  end

I can’t see any drawbacks of this solution. Tests are passed.

@am-kantox am-kantox changed the title Stream.transform/3 is now aware of the acc’s tail, if presented Stream.transform/4 is now aware of the acc’s tail, if presented Jan 26, 2017
@rschmukler
Copy link
Copy Markdown
Contributor

@am-kantox I'm curious, what are the advantages of returning a function/1 instead of the {:emit, items, last_acc} as proposed by @josevalim ? To me, the tuple seems more inline with the rest of the standard library's style.

@am-kantox
Copy link
Copy Markdown
Contributor Author

@rschmukler in my opinion, it is more flexible in terms of consumer’s code. The only thing at this very moment, that matters, is the last value of acc, so there is a) not much sense to return two values and b) not much sense to introduce a new entity :emit.
Am I missing something?

@rschmukler
Copy link
Copy Markdown
Contributor

rschmukler commented Jan 26, 2017

@am-kantox The after_fun already functions as a good place to do the work that would be done in the function/1 so to me it seems like an additional function for no reason. ie. What would live in the after_fun that wouldn't live in the new function/1 being returned? The proposed tuple syntax is more inline with the Enumerable.result type command pattern that is used in the stream.ex source anyway, so to me it feels more in line.

@josevalim any thoughts on the proposed APIs?

@josevalim
Copy link
Copy Markdown
Member

Fwiw, the problem with the after_fun is that it is called even in case of errors, where you don't care about the accumulator or where it may be even wrong. So an extra function may be the way to go.

@rschmukler
Copy link
Copy Markdown
Contributor

Fair point, then @am-kantox solution looks good to me 👍

@josevalim
Copy link
Copy Markdown
Member

@am-kantox and @rschmukler would your solution be solved by a custom chunk_by function that also receives the accumulator?

1..10
|> Stream.chunk_by([], fn i, acc ->
  if rem(i, 2) == 0, do: {Enum.reverse([i | acc]), []}, else: {[], [i | acc]}
end, fn acc -> Enum.reverse(acc) end)
|> Enum.to_list
#=> [[1, 2], [3, 4], [5, 6], [7, 8], ...]

@am-kantox
Copy link
Copy Markdown
Contributor Author

My particular problem would be solved, but I still think that Stream.transform/4 also deserves an ability to handle the not yet yielded accumulator. It’s not always chunking, it could be a plain collecting or whatever:

(1..10) |> Stream.transform([], fn i, acc -> {[], [i | acc]} end) |> Enum.to_list
#⇒ []

I think that to make the statement “Stream.transform is useful as it can be used as the basis to implement many of the functions defined in this module” truthy it should provide an ability to handle the tail.

@josevalim
Copy link
Copy Markdown
Member

@am-kantox changing Stream.transform/4 to handle this concern seems to be too complicated and prone for failures. I am running some tests on this PR and there are many situations where the accumulator is being lost, exactly because there are two loops to consider here.

The code in transform/4 is already very complex and I am not looking forward to add more to it. Especially as transform is more about transforming each element rather than accumulating.

@am-kantox
Copy link
Copy Markdown
Contributor Author

@josevalim I am 100% in agreement with “not adding more complexity to transform.” This PR does not bring any complexity to transform itself, though.
Whether you think that what happens now is better, let it be that way; honestly, I can’t think of any not very contrived example where chunk_by is not enough.

@josevalim
Copy link
Copy Markdown
Member

Let's go with Stream.chunk_by/4 then. Closing this in favor of #5888. Thank you!

@josevalim josevalim closed this Mar 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants