-
Notifications
You must be signed in to change notification settings - Fork 25
/
file_exporter.go
127 lines (103 loc) · 3.15 KB
/
file_exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package filewriter
import (
"context"
_ "embed" // used to embed config
"errors"
"fmt"
"os"
"path"
"github.com/sirupsen/logrus"
"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
"github.com/algorand/conduit/conduit/plugins/exporters"
)
const (
// PluginName to use when configuring.
PluginName = "file_writer"
// FilePattern is used to name the output files.
FilePattern = "%[1]d_block.msgp.gz"
// GenesisFilename is the name of the genesis file.
GenesisFilename = "genesis.json"
)
type fileExporter struct {
round uint64
cfg Config
gzip bool
format EncodingFormat
logger *logrus.Logger
}
//go:embed sample.yaml
var sampleFile string
var metadata = plugins.Metadata{
Name: PluginName,
Description: "Exporter for writing data to a file.",
Deprecated: false,
SampleConfig: sampleFile,
}
func (exp *fileExporter) Metadata() plugins.Metadata {
return metadata
}
func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error {
exp.logger = logger
err := cfg.UnmarshalConfig(&exp.cfg)
if err != nil {
return fmt.Errorf("connect failure in unmarshalConfig: %w", err)
}
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
exp.format, exp.gzip, err = ParseFilenamePattern(exp.cfg.FilenamePattern)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}
// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
}
// create block directory
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
if err != nil && !errors.Is(err, os.ErrExist) {
// Ignore mkdir err if the dir exists (case errors.Is(err, os.ErrExist))
return fmt.Errorf("Init() error: %w", err)
}
exp.round = uint64(initProvider.NextDBRound())
genesis := initProvider.GetGenesis()
genesisPath := path.Join(exp.cfg.BlocksDir, GenesisFilename)
// the genesis is always exported as plain JSON:
err = EncodeToFile(genesisPath, genesis, JSONFormat, false)
if err != nil {
return fmt.Errorf("Init() error sending to genesisPath=%s: %w", genesisPath, err)
}
return nil
}
func (exp *fileExporter) Close() error {
exp.logger.Infof("latest round on file: %d", exp.round)
return nil
}
func (exp *fileExporter) Receive(exportData data.BlockData) error {
if exp.logger == nil {
return fmt.Errorf("exporter not initialized")
}
if exportData.Round() != exp.round {
return fmt.Errorf("Receive(): wrong block: received round %d, expected round %d", exportData.Round(), exp.round)
}
// write block to file
{
if exp.cfg.DropCertificate {
exportData.Certificate = nil
}
blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
err := EncodeToFile(blockFile, &exportData, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}
exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}
exp.round++
return nil
}
func init() {
exporters.Register(PluginName, exporters.ExporterConstructorFunc(func() exporters.Exporter {
return &fileExporter{}
}))
}