Skip to content

Commit

Permalink
Change Error to Errors
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterOfBinary committed Feb 18, 2017
1 parent e97f54e commit 2b3e224
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 9 deletions.
4 changes: 2 additions & 2 deletions batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (b *Batch) doReader(ctx context.Context) {
ps := &PipelineStage{
Input: in,
Output: out,
Error: errs,
Errors: errs,
}

go b.src.Read(ctx, ps)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (b *Batch) process(ctx context.Context) {
ps := &PipelineStage{
Input: in,
Output: out,
Error: errs,
Errors: errs,
}

go b.proc.Process(ctx, ps)
Expand Down
2 changes: 1 addition & 1 deletion batch/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (p printProcessor) Process(ctx context.Context, ps *batch.PipelineStage) {
for item := range ps.Input {
// Get returns the item itself
if item.Get() == 5 {
ps.Error <- errors.New("cannot process 5")
ps.Errors <- errors.New("cannot process 5")
continue
}

Expand Down
8 changes: 6 additions & 2 deletions batch/pipeline_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ type PipelineStage struct {
//Retry chan<- *Item

// Error is for any errors encountered during the pipeline stage.
Error chan<- error
Errors chan<- error
}

// Close closes the pipeline stage.
//
// Note that it will also close the write channels. Do not close them separately
// or it will panic.
func (p *PipelineStage) Close() {
close(p.Output)
close(p.Error)
close(p.Errors)
}
4 changes: 2 additions & 2 deletions batch/pipeline_stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func TestPipelineStage_Close(t *testing.T) {
errs := make(chan error)
ps := &PipelineStage{
Output: out,
Error: errs,
Errors: errs,
}

ps.Close()
Expand All @@ -21,6 +21,6 @@ func TestPipelineStage_Close(t *testing.T) {
select {
case <-errs:
default:
t.Error("ps.Error was not closed")
t.Error("ps.Errors was not closed")
}
}
2 changes: 1 addition & 1 deletion processor/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ type Error struct {

// Process discards all data sent to it after a certain amount of time.
func (p *Error) Process(ctx context.Context, ps *batch.PipelineStage) {
ps.Error <- p.Err
ps.Errors <- p.Err
ps.Close()
}
2 changes: 1 addition & 1 deletion source/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ type Error struct {

// Read returns an error and then closes.
func (s *Error) Read(ctx context.Context, ps *batch.PipelineStage) {
ps.Error <- s.Err
ps.Errors <- s.Err
ps.Close()
}

0 comments on commit 2b3e224

Please sign in to comment.