-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrent_producer.go
95 lines (82 loc) · 2.68 KB
/
concurrent_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package producer
import (
"context"
"fmt"
"sync"
"time"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
// ConcurrentProducer can run multiple Produce jobs concurrently.
// error channel should be sunk, otherwise it would block messages production eventually
// in order to stop it gracefully, close Messages channel
// and then sink Errors channel till it's closed
type ConcurrentProducer interface {
Messages() chan<- proto.Message
Errors() <-chan Error
}
type Error struct {
Err error
Message proto.Message
}
func (e Error) Error() string {
return fmt.Sprintf("message: \n%s\n could not be produced: %s",
protojson.Format(e.Message),
e.Err.Error())
}
func (e Error) Unwrap() error {
return e.Err
}
type BasicConcurrentProducerConfig struct {
// JobsNumber should be relatively high, since IPFS communication can sometimes block for extend periods
JobsNumber uint `envconfig:"JOBS_NUMBER" default:"20"`
ProduceTimeout time.Duration `envconfig:"PRODUCE_TIMEOUT" default:"2m"`
ErrBuf uint `envconfig:"ERR_CHAN_BUFF" default:"50"`
MessageBuf uint `envconfig:"MESSAGE_BUFF" default:"250"`
}
func StartBasicConcurrentProducer(globalCtx context.Context, blockingProducer Producer, config BasicConcurrentProducerConfig) *BasicConcurrentProducer {
concurrentProducer := &BasicConcurrentProducer{
blockingProducer: blockingProducer,
config: config,
messages: make(chan proto.Message, config.MessageBuf),
errors: make(chan Error, config.ErrBuf),
}
concurrentProducer.jobsWg.Add(int(config.JobsNumber))
for i := uint(0); i < config.JobsNumber; i++ {
go concurrentProducer.Job(globalCtx, i)
}
go func() {
concurrentProducer.jobsWg.Wait()
close(concurrentProducer.errors)
log.Info().Msg("concurrent producer errors chan closed")
}()
return concurrentProducer
}
type BasicConcurrentProducer struct {
blockingProducer Producer
config BasicConcurrentProducerConfig
messages chan proto.Message
errors chan Error
jobsWg sync.WaitGroup
}
func (b *BasicConcurrentProducer) Job(globalCtx context.Context, index uint) {
for m := range b.messages {
ctx, cancel := context.WithTimeout(globalCtx, b.config.ProduceTimeout)
if err := b.blockingProducer.Produce(ctx, m); err != nil {
b.errors <- Error{
Err: err,
Message: m,
}
}
cancel()
}
b.jobsWg.Done()
log.Info().Uint("producer_index", index).Msg("producer job finished")
}
func (b *BasicConcurrentProducer) Messages() chan<- proto.Message {
return b.messages
}
func (b *BasicConcurrentProducer) Errors() <-chan Error {
return b.errors
}