Permalink
Browse files

More debug messages in sink

  • Loading branch information...
samuell committed Aug 16, 2016
1 parent 1c7c086 commit fc500ccf242b6b7f184d6cb228e05434fd72ca66
Showing with 6 additions and 3 deletions.
  1. +2 −3 pipeline.go
  2. +4 −0 sink.go
View
@@ -39,12 +39,11 @@ func (pl *PipelineRunner) Run() {
os.Exit(1)
}
for i, proc := range pl.processes {
Debug.Printf("PipelineRunner: Looping over process %d: %v ...\n", i, proc)
if i < len(pl.processes)-1 {
Debug.Printf("PipelineRunner: Starting process %d in new go-routine: %v\n", i, proc)
Debug.Printf("PipelineRunner: Starting process %d of type %v: in new go-routine...\n", i, reflect.TypeOf(proc))
go proc.Run()
} else {
Debug.Printf("PipelineRunner: Starting process %d: in main go-routine: %v\n", i, proc)
Debug.Printf("PipelineRunner: Starting process %d of type %v: in main go-routine...\n", i, reflect.TypeOf(proc))
proc.Run()
}
}
View
@@ -19,11 +19,14 @@ func (proc *Sink) Connect(ch chan interface{}) {
// Execute the Sink component
func (proc *Sink) Run() {
ok := true
Debug.Printf("Length of inPorts: %d\n", len(proc.inPorts))
for len(proc.inPorts) > 0 {
for i, ich := range proc.inPorts {
select {
case _, ok = <-ich:
Debug.Printf("Received on in-port %d in sink\n", i)
if !ok {
Debug.Printf("Port on %d not ok, in sink\n", i)
proc.deleteInPortAtKey(i)
continue
}
@@ -34,6 +37,7 @@ func (proc *Sink) Run() {
}
func (proc *Sink) deleteInPortAtKey(i int) {
Debug.Println("Deleting inport at key", i, "in sink")
proc.inPorts = append(proc.inPorts[:i], proc.inPorts[i+1:]...)
}

0 comments on commit fc500cc

Please sign in to comment.