-
Notifications
You must be signed in to change notification settings - Fork 134
/
ingester.go
83 lines (73 loc) · 2.67 KB
/
ingester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package eventingester
import (
"regexp"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/ingest"
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/profiling"
"github.com/armadaproject/armada/internal/common/serve"
"github.com/armadaproject/armada/internal/eventingester/configuration"
"github.com/armadaproject/armada/internal/eventingester/convert"
"github.com/armadaproject/armada/internal/eventingester/metrics"
"github.com/armadaproject/armada/internal/eventingester/store"
)
// Run will create a pipeline that will take Armada event messages from Pulsar and update the
// Events database accordingly. This pipeline will run until a SIGTERM is received
func Run(config *configuration.EventIngesterConfiguration) {
log.Info("Event Ingester Starting")
// Expose profiling endpoints if enabled.
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
}
metrics := metrics.Get()
fatalRegexes := make([]*regexp.Regexp, len(config.FatalInsertionErrors))
for i, str := range config.FatalInsertionErrors {
rgx, err := regexp.Compile(str)
if err != nil {
log.Errorf("Error compiling regex %s", str)
panic(err)
}
fatalRegexes[i] = rgx
}
rc := redis.NewUniversalClient(&config.Redis)
defer func() {
if err := rc.Close(); err != nil {
log.WithError(err).Error("failed to close events Redis client")
}
}()
eventDb := store.NewRedisEventStore(rc, config.EventRetentionPolicy, fatalRegexes, 100*time.Millisecond, 60*time.Second)
// Turn the messages into event rows
compressor, err := compress.NewZlibCompressor(config.MinMessageCompressionSize)
if err != nil {
log.Errorf("Error creating compressor for consumer")
panic(err)
}
converter := convert.NewEventConverter(compressor, uint(config.BatchSize), metrics)
ingester := ingest.NewIngestionPipeline(
config.Pulsar,
config.SubscriptionName,
config.BatchSize,
config.BatchDuration,
pulsar.KeyShared,
converter,
eventDb,
config.MetricsPort,
metrics,
)
if err := ingester.Run(app.CreateContextWithShutdown()); err != nil {
panic(errors.WithMessage(err, "Error running ingestion pipeline"))
}
}