Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel monad #21

Closed
awalterschulze opened this issue Jul 25, 2017 · 13 comments
Closed

Channel monad #21

awalterschulze opened this issue Jul 25, 2017 · 13 comments

Comments

@awalterschulze
Copy link
Owner

awalterschulze commented Jul 25, 2017

A first attempt

https://goplay.space/#jrcRq1jPVS

@awalterschulze
Copy link
Owner Author

First we have to decide on an implementation of fmap for channels.
There are several options some of which I will lay out below.

Things to keep in mind:

  • bounded number of go routines
  • cancelation

https://blog.golang.org/pipelines

@awalterschulze
Copy link
Owner Author

I hope to somehow use channel capacities as a hint and way to do bounded go routines. So in the following examples you will see me creating a channel with the incoming channel's capacity, but don't think that I have thought it through.

@awalterschulze
Copy link
Owner Author

First up is a fmap with no go routines:

func deriveFmapNoGo(f func(string) int, in <-chan string) <-chan int {
	out := make(chan int, cap(in))
	for a := range in {
		b := f(a)
		out <- b
	}
	close(out)
	return out
}

I don't think this makes much sense as we will be waiting for the input channel to be closed before we can return the input channel. Also given that I have initialized the output channel with the capacity of the input channel, if this size is smaller than the number of items put on the input channel before it is closed, it will cause a deadlock.

@awalterschulze
Copy link
Owner Author

awalterschulze commented Aug 20, 2017

Next is the fmap with one go routine:

func deriveFmapGo(f func(string) int, in <-chan string) <-chan int {
	out := make(chan int, cap(in))
	go func() {
		for a := range in {
			b := f(a)
			out <- b
		}
		close(out)
	}()
	return out
}

This is currently my favourite.
It also scales to a channel of channels:

func deriveFmapGoChanChan(f func(string) <-chan int, in <-chan string) <-chan <-chan int {
	out := make(chan (<-chan int), cap(in))
	go func() {
		for a := range in {
			b := f(a)
			out <- b
		}
		close(out)
	}()
	return out
}

@awalterschulze
Copy link
Owner Author

awalterschulze commented Aug 20, 2017

But there is also the alternative of firing up a go routine for each incoming request

func deriveFmapAllTheGos(f func(string) <-chan int, in <-chan string) <-chan <-chan int {
	out := make(chan (<-chan int), cap(in))
	go func() {
		wait := sync.WaitGroup{}
		for a := range in {
			wait.Add(1)
			go func(a string) {
				b := f(a)
				out <- b
				wait.Done()
			}(a)
		}
		wait.Wait()
		close(out)
	}()
	return out
}

This has unbounded growth. Well technically we are bound by the speed at which requests are coming in on the in channel.

@awalterschulze
Copy link
Owner Author

awalterschulze commented Aug 20, 2017

Just for the record here would be our join function:

func deriveJoin(in <-chan <-chan int) <-chan int {
	out := make(chan int, cap(in))
	go func() {
		wait := sync.WaitGroup{}
		for c := range in {
			wait.Add(1)
			res := c
			go func() {
				for s := range res {
					out <- s
				}
				wait.Done()
			}()
		}
		wait.Wait()
		close(out)
	}()
	return out
}

Again here we create a go routine for every incoming channel on the in channel, so we have unbounded growth.

@awalterschulze
Copy link
Owner Author

awalterschulze commented Aug 20, 2017

Maybe adding a usage example would be helpful.

func lines() <-chan string {
	c := make(chan string, 2)
	go func() {
		c <- "my name is judge"
		c <- "welcome judy welcome judy"
		c <- "welcome hello welcome judy"
		c <- "welcome goodbye welcome judy"
		close(c)
	}()
	return c
}

func wordsize(line string) <-chan int {
	c := make(chan int, 2)
	go func() {
		words := strings.Split(line, " ")
		for _, word := range words {
			c <- len(word)
		}
		close(c)
	}()
	return c
}

func main() {
	c := deriveJoin(deriveFmap(wordsize, lines()))
	for size := range c {
		fmt.Printf("%d\n", size)
	}
}

@awalterschulze
Copy link
Owner Author

Here is some code I found in the wild and changed to only reflect the important parts.

type Result interface{}

type Request struct {
	ID int64
}

type Service interface{}

func newService() Service {
	panic("todo")
}

type ans struct {
	id  int64
	res Result
	err error
}

type serviceCall func(req Request, s Service) (Result, error)

func bounded(requests []Request, serviceCall serviceCall) (map[int64]Result, error) {
	s := newService()
	numWorkers := 5
	var failure bool

	in := make(chan Request, numWorkers)
	out := make(chan ans, numWorkers)
	defer close(out)

	for i := 0; i < numWorkers; i++ {
		go work(in, out, serviceCall, s, &failure)
	}
	for _, req := range requests {
		in <- req
	}
	close(in)

	result := make(map[int64]Result)
	for i := 0; i < len(requests); i++ {
		answer := <-out
		if answer.err != nil {
			return nil, answer.err
		}
		result[answer.id] = answer.res
	}
	return result, nil
}

func work(in <-chan Request, out chan<- ans, serviceCall serviceCall, s Service, fail *bool) {
	for req := range in {
		if *fail {
			return
		}
		result, err := serviceCall(req, s)
		if err != nil {
			*fail = true
		}
		out <- ans{id: req.ID, res: result, err: err}
	}
}

It would be great if the channel monad somehow covers this case, but it is not a requirement. I just think this is a great example of where bounded group is used, with some flaws, because these things are hard to do right every time.

@awalterschulze
Copy link
Owner Author

awalterschulze commented Aug 20, 2017

Here is another to bound the growth, by controlling it from outside of the monad

func main() {
    work := make(chan string, 2)
    c := deriveJoin(deriveFmap(wordsize, work))
    ss := []string{
        "my name is judge",
        "welcome judy welcome judy",
        "welcome hello welcome judy",
        "welcome goodbye welcome judy",
    }
    i := 0
    for i <= 2 {
        work <- ss[i]
        i++
    }
    for size := range c {
	fmt.Printf("%d\n", size)
        if i < len(ss) {
           work <- ss[i]
           i++
        }
    }
}

But since each line is creating more than one output on the output channel. This won't really work and in the long run still result in unbounded growth.

@awalterschulze
Copy link
Owner Author

awalterschulze commented Aug 23, 2017

Here is a rewrite of the wild code

func bounded2(requests []Request, serviceCall serviceCall) (map[int64]Result, error) {
	s := newService()
	numWorkers := 5
	workers := make(chan struct{}, numWorkers)

	requestsChan := make(chan Request)
	go func() {
		for _, req := range requests {
			requestsChan <- req
		}
		close(requestsChan)
	}()

	call := func(req Request) <-chan ans {
		c := make(chan ans)
		workers <- struct{}{}
		go func() {
			result, err := serviceCall(req, s)
			c <- ans{id: req.ID, res: result, err: err}
			<-workers
			close(c)
		}()
		return c
	}

	results := deriveCompose(
		requestsChan,
		call,
	)

	result := make(map[int64]Result)
	var err error
	for _, r := range results {
		if r.err != nil {
			if err == nil {
				r.err = err
			}
		} else {
			result[r.ID] = r.Result
		}
	}
	return result, err
}

I removed the fail since it does not actually cancel any service calls.

@awalterschulze
Copy link
Owner Author

Here is a rewrite with proper cancelation

type serviceCall func(ctx context.Context, req Request) (Result, error)

func bounded2(ctx context.Context, requests []Request, serviceCall serviceCall) (map[int64]Result, error) {
	var cancel func()
	ctx, cancel = context.WithCancel(ctx)
	numWorkers := 5
	workers := make(chan struct{}, numWorkers)

	requestsChan := make(chan Request)
	go func() {
		for _, req := range requests {
			requestsChan <- req
		}
		close(requestsChan)
	}()

	call := func(req Request) <-chan ans {
		c := make(chan ans)
		workers <- struct{}{}
		go func() {
			result, err := serviceCall(ctx, req)
			c <- ans{id: req.ID, res: result, err: err}
			<-workers
			close(c)
		}()
		return c
	}

	results := deriveCompose(
		requestsChan,
		call,
	)

	result := make(map[int64]Result)
	var err error
	for _, r := range results {
		if r.err != nil {
			if err == nil {
				cancel()
				r.err = err
			}
		} else {
			result[r.ID] = r.Result
		}
	}
	return result, err
}

@awalterschulze
Copy link
Owner Author

deriveFmap and deriveJoin have now been implemented.

Since we have shown how we can control cancelation and bound the growth from outside of this abstraction, it means that deriveFmap and deriveJoin do not have to worry about it.

@awalterschulze
Copy link
Owner Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant