/
publish.go
114 lines (93 loc) · 5.07 KB
/
publish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main
import (
"fmt"
"strings"
"time"
"github.com/streamingfast/derr"
"github.com/dfuse-io/dkafka"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
)
var PublishCmd = &cobra.Command{
Use: "publish",
Short: "",
Long: "",
RunE: publishRunE,
}
func init() {
RootCmd.AddCommand(PublishCmd)
PublishCmd.Flags().Duration("delay-between-commits", time.Second*10, "no commits to kafka blow this delay, except un shutdown")
PublishCmd.Flags().String("event-source", "dkafka", "custom value for produced cloudevent source")
PublishCmd.Flags().String("event-keys-expr", "[account]", "CEL expression defining the event keys. More then one key will result in multiple events being sent. Must resolve to an array of strings")
PublishCmd.Flags().String("event-type-expr", "(notif?'!':'')+account+'/'+action", "CEL expression defining the event type. Must resolve to a string")
PublishCmd.Flags().StringSlice("event-extensions-expr", []string{}, "cloudevent extension definitions in this format: '{key}:{CEL expression}' (ex: 'blk:string(block_num)')")
PublishCmd.Flags().Bool("batch-mode", false, "Batch mode will ignore cursor and always start from {start-block-num}.")
PublishCmd.Flags().Int64("start-block-num", 0, "If we are in {batch-mode} or no prior cursor exists, start streaming from this block number (if negative, relative to HEAD)")
PublishCmd.Flags().Uint64("stop-block-num", 0, "If non-zero, stop processing before this block number")
PublishCmd.Flags().String("state-file", "./dkafka.state.json", "progress will be saved into this file")
PublishCmd.Flags().StringSlice("local-abi-files", []string{}, "repeatable, ABI file definition in this format: '{account}:{path/to/filename}' (ex: 'eosio.token:/tmp/eosio_token.abi'). ABIs are used to decode DB ops. Provided ABIs have highest priority and will never be fetched or updated")
PublishCmd.Flags().String("abicodec-grpc-addr", "", "if set, will connect to this endpoint to fetch contract ABIs")
PublishCmd.Flags().Bool("fail-on-undecodable-db-op", false, "If true, program will fail and exit when a db OP cannot be decoded (ex: missing or incompatible ABI file or invalid ABI fetched from abicodec")
}
func publishRunE(cmd *cobra.Command, args []string) error {
SetupLogger()
extensions := make(map[string]string)
for _, ext := range viper.GetStringSlice("publish-cmd-event-extensions-expr") {
kv := strings.SplitN(ext, ":", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid value for extension: %s", ext)
}
extensions[kv[0]] = kv[1]
}
localABIFiles := make(map[string]string)
for _, ext := range viper.GetStringSlice("publish-cmd-local-abi-files") {
kv := strings.SplitN(ext, ":", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid value for local ABI file: %s", ext)
}
localABIFiles[kv[0]] = kv[1]
}
conf := &dkafka.Config{
DfuseToken: viper.GetString("global-dfuse-auth-token"),
DfuseGRPCEndpoint: viper.GetString("global-dfuse-firehose-grpc-addr"),
IncludeFilterExpr: viper.GetString("global-dfuse-firehose-include-expr"),
DryRun: viper.GetBool("global-dry-run"),
KafkaEndpoints: viper.GetString("global-kafka-endpoints"),
KafkaSSLEnable: viper.GetBool("global-kafka-ssl-enable"),
KafkaSSLCAFile: viper.GetString("global-kafka-ssl-ca-file"),
KafkaSSLAuth: viper.GetBool("global-kafka-ssl-auth"),
KafkaSSLClientCertFile: viper.GetString("global-kafka-ssl-client-cert-file"),
KafkaSSLClientKeyFile: viper.GetString("global-kafka-ssl-client-key-file"),
KafkaTopic: viper.GetString("global-kafka-topic"),
KafkaCursorTopic: viper.GetString("global-kafka-cursor-topic"),
KafkaCursorPartition: int32(viper.GetUint32("global-kafka-cursor-partition")),
KafkaCursorConsumerGroupID: viper.GetString("global-kafka-cursor-consumer-group-id"),
KafkaTransactionID: viper.GetString("global-kafka-transaction-id"),
CommitMinDelay: viper.GetDuration("publish-cmd-delay-between-commits"),
EventSource: viper.GetString("publish-cmd-event-source"),
EventKeysExpr: viper.GetString("publish-cmd-event-keys-expr"),
EventTypeExpr: viper.GetString("publish-cmd-event-type-expr"),
EventExtensions: extensions,
BatchMode: viper.GetBool("publish-cmd-batch-mode"),
StartBlockNum: viper.GetInt64("publish-cmd-start-block-num"),
StopBlockNum: viper.GetUint64("publish-cmd-stop-block-num"),
StateFile: viper.GetString("publish-cmd-state-file"),
LocalABIFiles: localABIFiles,
ABICodecGRPCAddr: viper.GetString("publish-cmd-abicodec-grpc-addr"),
FailOnUndecodableDBOP: viper.GetBool("publish-cmd-fail-on-undecodable-db-op"),
}
cmd.SilenceUsage = true
signalHandler := derr.SetupSignalHandler(time.Second)
zlog.Info("starting dkafka publisher", zap.Reflect("config", conf))
app := dkafka.New(conf)
go func() { app.Shutdown(app.Run()) }()
select {
case <-signalHandler:
app.Shutdown(fmt.Errorf("shutdown signal received"))
case <-app.Terminating():
}
zlog.Info("terminating", zap.Error(app.Err()))
<-app.Terminated()
return app.Err()
}