Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Flatmap #49

Closed
pierrebeaucamp opened this issue Dec 20, 2017 · 10 comments
Closed

Feature request: Flatmap #49

pierrebeaucamp opened this issue Dec 20, 2017 · 10 comments

Comments

@pierrebeaucamp
Copy link

As per contribution guidelines, I'm opening an issue for discussion regarding the implementation of Flatmap.

Per Documentation:

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

A naive implementation could look like

func (o Observable) FlatMap(apply fx.FlatMappableFunc) Observable {
	out := make(chan interface{})
	go func() {
		for item := range o {
                        go func(sub Observable) {
                                handler := observer.Observer{
                                        NextHandler: func(i interface{}) {
                                                out <- i
                                        },
                                        ErrHandler: func(err error) {
                                                out <- err
                                        },
                                }
                                s := sub.Subscribe(handler)
                                <-s
                        }(apply(item))
		}           
		close(out)
	}()
	return Observable(out)
}

I see that the dev branch has a Merge function, however I'm not sure how to exploit the underlying reflect.SelecCase with a dynamic number of Observables. Creating a subscriber (and a go routine) for each element seems sub-optimal, but out of the top of my head I'm not aware of a different solution.

I'd be happy to submit a PR once agreement of the implementation is reached.

@pierrebeaucamp
Copy link
Author

PS: While I'm at it, it would also be very nice to have concatMap as well. Dependent on the implementation of flatMap, concatMap could be pretty easy.

@pierrebeaucamp
Copy link
Author

Also, as a more general question: I can't help but notice that the majority of ReactiveX functions are still missing. What is the plan to get them all in? I'd love to contribute a few, but I don't know if I should create a PR for each one. I don't want to flood this project with issues and PRs.

Also what is the purpose of the dev branch? What is the lifecycle of that branch, i.e. when is it getting merged into master?

@jochasinga
Copy link
Member

@pierrebeaucamp thanks for your help. I kept telling people how I started RxGo as an experiment and then it just took off. I've been away on a Go sabbatical working on languages like Ocaml and JavaScript, and now I'm back to work. There are a few things that will need overhaul, and it'd be great to get some longer-term helping hand.

I'm closing this, but if you would like to take it from here I could email you.

@jochasinga
Copy link
Member

Also just looking at your example there's already a new function alias FlatMappableFunc. I don't want RxGo to end up with all these new types or even little not-too-useful methods just to match Rx API but totally beat the purpose of using Go in the first place.

@venth
Copy link
Contributor

venth commented Feb 15, 2018

I've a plan to make a pull request. Which branch should I use as the starting point - dev or master?

@jochasinga
Copy link
Member

@venth are you implementing FlatMap?

@venth
Copy link
Contributor

venth commented Feb 15, 2018

Yes. I have a plan to make it on goroutines. I wondered, if I shall introduce TestSubscriber/Observer first

@venth
Copy link
Contributor

venth commented Feb 16, 2018

@jochasinga, I have doubts related to current operators implementation. Currently operators like:

  • First,
  • Last and
  • others

perform iteration through channel. I've compared it to other reactivex implementations. Others implementations like: Scala based, Java based are relaying on reactor pattern and functional programming. In current implementation instead of applying observer pattern I see iteration through values in a channel.

Did I understand it correctly or I didn't get something?

@venth
Copy link
Contributor

venth commented Feb 16, 2018

@venth
Copy link
Contributor

venth commented Mar 11, 2018

@jochasinga, it took me a while and few tries to grasp the idea of flatmap implementation. I prepared the version based observer and subscription without usage of channels (https://github.com/venth/gorx/blob/master/operator/flatmap_observable.go) and have an idea how to deliver flatmap implementation consistent to the channel based approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants