-
Notifications
You must be signed in to change notification settings - Fork 2
/
stream.go
80 lines (70 loc) · 2.53 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package stream
import (
"context"
"fmt"
"io"
"sync"
"github.com/ONSdigital/log.go/v2/log"
)
type Transformer = func(ctx context.Context, r io.Reader, w io.Writer) error
type Consumer = func(ctx context.Context, r io.Reader) error
// Stream is a generic streamming method that creates 2 go-routines:
// - one transforms the provided body into another stream by calling the provided transform method
// - the other consumes the transformed stream using the provided consume method
// This method block until all work is complete, at which point all Readers and Writers are closed and any error is returned.
func Stream(ctx context.Context, body io.ReadCloser, transform Transformer, consume Consumer) error {
pipeReader, pipeWriter := io.Pipe()
wg := &sync.WaitGroup{}
var errTransform, errConsume error
// Start go-routine to read the provided body, transform it 'on-the-fly' and write the transformed data to the pipe writer
// When 'transform' finishes its execution, the pipe writer is closed (with error if 'transform' func returned an error).
wg.Add(1)
go func() {
defer func() {
closeResponseBody(ctx, body)
wg.Done()
}()
errTransform = transform(ctx, body, pipeWriter)
if errTransform != nil {
err := pipeWriter.CloseWithError(errTransform)
if err != nil {
log.Error(ctx, "stream error: error closing pipe writer from transformer go-routine during error handling", err)
}
return
}
err := pipeWriter.Close()
if err != nil {
log.Error(ctx, "stream error: error closing pipe writer from transformer go-routine after 'consume' was successful", err)
}
}()
// Start go-routine to read transformed data from pipe reader and call the consumer func
wg.Add(1)
go func() {
defer wg.Done()
errConsume = consume(ctx, pipeReader)
if errConsume != nil {
if err := pipeWriter.Close(); err != nil {
log.Error(ctx, "stream error: error closing pipe reader from consumer go-routine during error handling", err)
}
}
}()
wg.Wait()
if errTransform != nil && errConsume != nil {
return fmt.Errorf("transform error: %v, consumer error: %v", errTransform, errConsume)
}
if errTransform != nil {
return fmt.Errorf("transform error: %w", errTransform)
}
if errConsume != nil {
return fmt.Errorf("consumer error: %w", errConsume)
}
return nil
}
// closeResponseBody closes the response body and logs an error if unsuccessful
func closeResponseBody(ctx context.Context, body io.Closer) {
if body != nil {
if err := body.Close(); err != nil {
log.Error(ctx, "error closing http response body", err)
}
}
}