Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

question about Channel->map->map->ReduceByKey->AddOutput #36

Closed
adeagle opened this issue Feb 29, 2016 · 1 comment
Closed

question about Channel->map->map->ReduceByKey->AddOutput #36

adeagle opened this issue Feb 29, 2016 · 1 comment

Comments

@adeagle
Copy link

adeagle commented Feb 29, 2016

////////////////////////Question/////////////////////////////
blocking and can't print "println("reduce:")"

////////////////////////////Source Code///////////////////////

package main

import (
"flag"
"fmt"
"strings"
"strconv"
"sync"
_"os"
_"bufio"
_"io"
"io/ioutil"
"encoding/gob"
_ "github.com/chrislusf/glow/driver"
"github.com/chrislusf/glow/flow"
)

type WordSentence struct {
Word string
LineNumber int
}

type AccessByAgeGroup struct {
Addr string
Info MemInfo
}

type MemInfo struct {
Addr string
Size int
Count int
}

func init() {
gob.Register(MemInfo{})
}

func goStart(wg *sync.WaitGroup, fn func()) {
wg.Add(1)
go func() {
defer wg.Done()
fn()
}()
}

func testWordCount2() {

println("testWordCount2")
flowOut2 := make(chan AccessByAgeGroup)
chIn     := make(chan string)
f2       := flow.New()

f2.Channel(chIn).Partition(1).Map(func(line string) MemInfo {
    //println(line)
    words:=strings.Split(line, ":")
    //println(words[0]+" "+words[1])
    s, _ := strconv.ParseInt(words[1], 16, 0)
    return MemInfo{words[0], int(s), 1}
}).Map(func(ws MemInfo) (string, MemInfo) {
    println(ws.Addr)
    return ws.Addr, ws
}).ReduceByKey(func(x MemInfo, y MemInfo) (MemInfo) {
    println("reduce:")
    return MemInfo{x.Addr,x.Size+y.Size,x.Count+y.Count}
}).AddOutput(flowOut2)

flow.Ready()

var wg sync.WaitGroup

goStart(&wg, func() {
    f2.Run()
})

goStart(&wg, func() {
    for t := range flowOut2 {
        fmt.Printf("%s size:%-8d count:%-8d\n",
            t.Info.Addr,t.Info.Size,t.Info.Count)
    }
})

bytes, err := ioutil.ReadFile("passwd")
if err != nil {
    println("Failed to read")
    return
}

lines := strings.Split(string(bytes), "\r\n")
for _, line := range lines {
    chIn <-line
}

wg.Wait()

}

func main() {
flag.Parse()
testWordCount2()
}

@chrislusf
Copy link
Owner

If you ctrl+C the program in standalone mode, you can see these:

step:Input0
  output : d0
     shard:0 time:2.038281735s processed 2779
step:Map1
  input  : d0
     shard:0 time:2.038321665s processed 2779
  output : d1
     shard:0 time:2.038380948s processed 2779
step:Map2
  input  : d1
     shard:0 time:2.038388886s processed 2779
  output : d2
     shard:0 time:2.038408013s processed 2779
step:LocalSort3
  input  : d2
     shard:0 time:2.03841651s processed 2779
  output : d3
     shard:0 time:2.038403271s processed 0
step:LocalReduceByKey4
  input  : d3
     shard:0 time:2.038411775s processed 0
  output : d4
     shard:0 time:2.038419295s processed 0
step:MergeSorted5
  input  : d4
     shard:0 time:2.038427534s processed 0
  output : d5
     shard:0 time:2.038420435s processed 0
step:LocalReduceByKey6
  input  : d5
     shard:0 time:2.038428171s processed 0
  output : d6
     shard:0 time:2.038496247s processed 0

This shows the "step:LocalSort3" is not started.

Now you can do a "kill -3" for the program:

goroutine 23 [chan receive]:
github.com/chrislusf/glow/flow.(*Dataset).Map.func1(0xc8200a23c0)
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/dataset_map.go:27 +0x9b
github.com/chrislusf/glow/flow.(*Task).RunTask(0xc8200a23c0)
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step_task.go:33 +0x2d
github.com/chrislusf/glow/flow.(*Step).RunStep.func1(0xc820067540, 0x0, 0xc8200a23c0)
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:22 +0x48
created by github.com/chrislusf/glow/flow.(*Step).RunStep
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:23 +0xd3

It shows the "step:Map2" is not finished. What it is waiting for?

It shows the "chIn" channel is not closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants