-
Notifications
You must be signed in to change notification settings - Fork 39
/
exporter.go
107 lines (92 loc) · 2.67 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// (c) 2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package consumers
import (
"context"
"encoding/hex"
"errors"
"fmt"
"log"
"os"
"path"
"time"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/ortelius/cfg"
"github.com/ava-labs/ortelius/stream"
"github.com/segmentio/kafka-go"
)
var (
ErrExportFileExists = errors.New("export file exists")
)
// ExportToDisk writes the stream data for a given chain out to disk
func ExportToDisk(conf *cfg.Config, exportPath string, chainID string) (int64, error) {
// Create path if it doesn't exist yet
if _, err := os.Stat(exportPath); os.IsNotExist(err) {
if err := os.MkdirAll(exportPath, 0755); err != nil {
log.Printf("Filed to create export directory %s: %s\n", exportPath, err.Error())
return 0, err
}
}
// Create exporter
fileName := path.Join(exportPath, chainID+"-consensus-export.txt")
log.Println("Exporting to file:", fileName)
exportNextFn, closeFn, err := newExportReadWriter(fileName, conf.Brokers, conf.NetworkID, chainID)
if err != nil {
return 0, err
}
// Export until an error is returned
log.Println("Starting exporter...")
var i int64
var exportErr error
for exportErr = exportNextFn(); err == nil; exportErr = exportNextFn() {
i++
if i%1000 == 0 {
fmt.Printf("Exported %d records\n", i)
}
}
// Close and log any errors
errs := wrappers.Errs{}
errs.Add(closeFn())
if exportErr != context.DeadlineExceeded {
errs.Add(exportErr)
}
return i, errs.Err
}
func newExportReadWriter(fileName string, brokers []string, networkID uint32, chainID string) (func() error, func() error, error) {
// Open file and kafka reader
_, err := os.Stat(fileName)
if !os.IsNotExist(err) {
if err == nil {
err = ErrExportFileExists
}
return nil, nil, err
}
file, err := os.OpenFile(fileName, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
if err != nil {
return nil, nil, err
}
reader := kafka.NewReader(kafka.ReaderConfig{
MaxBytes: cfg.ConsumerMaxBytesDefault,
Brokers: brokers,
StartOffset: kafka.FirstOffset,
GroupID: fmt.Sprintf("exporter-%d", time.Now().UTC().Unix()),
Topic: stream.GetTopicName(networkID, chainID, stream.EventTypeDecisions),
})
// Create functions for exporting and closing
closeFn := func() error {
errs := wrappers.Errs{}
errs.Add(reader.Close(), file.Close())
return errs.Err
}
writeFn := func() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
cancelFn()
msg, err := reader.ReadMessage(ctx)
if err != nil {
return err
}
_, err = fmt.Fprintf(file, "%s\n", hex.EncodeToString(msg.Value))
return err
}
return writeFn, closeFn, nil
}