Skip to content

Commit

Permalink
conduit: Pass data dir to plugin config. (#1357)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored and AlgoStephenAkiki committed Jan 6, 2023
1 parent 8c266d9 commit bdea76c
Show file tree
Hide file tree
Showing 24 changed files with 194 additions and 155 deletions.
21 changes: 16 additions & 5 deletions conduit/pipeline/pipeline.go
Expand Up @@ -211,6 +211,19 @@ func (p *pipelineImpl) registerPluginMetricsCallbacks() {
}
}

func (p *pipelineImpl) makeConfig(pluginType, pluginName string, cfg []byte) (config plugins.PluginConfig) {
config.Config = string(cfg)
if p.cfg != nil && p.cfg.ConduitConfig != nil {
config.DataDir = path.Join(p.cfg.ConduitConfig.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, pluginName))
err := os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
p.logger.Errorf("Unable to create plugin data directory: %s", err)
config.DataDir = ""
}
}
return
}

// Init prepares the pipeline for processing block data
func (p *pipelineImpl) Init() error {
p.logger.Infof("Starting Pipeline Initialization")
Expand Down Expand Up @@ -257,7 +270,7 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not serialize Importer.Config: %w", err)
}
genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(configs), importerLogger)
genesis, err := (*p.importer).Init(p.ctx, p.makeConfig("importer", importerName, configs), importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err)
}
Expand Down Expand Up @@ -296,8 +309,8 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not serialize Processors[%d].Config : %w", idx, err)
}
err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(configs), processorLogger)
processorName := (*processor).Metadata().Name
err := (*processor).Init(p.ctx, *p.initProvider, p.makeConfig("processor", processorName, configs), processorLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", processorName, err)
}
Expand All @@ -314,8 +327,8 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not serialize Exporter.Config : %w", err)
}
err = (*p.exporter).Init(p.ctx, *p.initProvider, plugins.PluginConfig(configs), exporterLogger)
exporterName := (*p.exporter).Metadata().Name
err = (*p.exporter).Init(p.ctx, *p.initProvider, p.makeConfig("exporter", exporterName, configs), exporterLogger)
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err)
}
Expand Down Expand Up @@ -491,8 +504,6 @@ func (p *pipelineImpl) encodeMetadataToFile() error {
func (p *pipelineImpl) initializeOrLoadBlockMetadata() (state, error) {
pipelineMetadataFilePath := metadataPath(p.cfg.ConduitConfig.ConduitDataDir)
if stat, err := os.Stat(pipelineMetadataFilePath); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) {
fmt.Println(err)
fmt.Println(stat)
if stat != nil && stat.Size() == 0 {
err = os.Remove(pipelineMetadataFilePath)
if err != nil {
Expand Down
143 changes: 86 additions & 57 deletions conduit/pipeline/pipeline_test.go
Expand Up @@ -11,6 +11,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/algorand/indexer/conduit/plugins/importers"
"github.com/algorand/indexer/conduit/plugins/processors"
"github.com/algorand/indexer/data"
_ "github.com/algorand/indexer/util/metrics"
)

// TestPipelineConfigValidity tests the Valid() function for the Config
Expand Down Expand Up @@ -85,7 +87,7 @@ func TestPipelineConfigValidity(t *testing.T) {
// TestMakePipelineConfig tests making the pipeline configuration
func TestMakePipelineConfig(t *testing.T) {

l := log.New()
l, _ := test.NewNullLogger()

_, err := MakePipelineConfig(l, nil)
assert.Equal(t, fmt.Errorf("MakePipelineConfig(): empty conduit config"), err)
Expand Down Expand Up @@ -145,13 +147,15 @@ var uniqueBlockData = data.BlockData{
type mockImporter struct {
mock.Mock
importers.Importer
cfg plugins.PluginConfig
genesis bookkeeping.Genesis
finalRound basics.Round
returnError bool
onCompleteError bool
}

func (m *mockImporter) Init(_ context.Context, _ plugins.PluginConfig, _ *log.Logger) (*bookkeeping.Genesis, error) {
func (m *mockImporter) Init(_ context.Context, cfg plugins.PluginConfig, _ *log.Logger) (*bookkeeping.Genesis, error) {
m.cfg = cfg
return &m.genesis, nil
}

Expand Down Expand Up @@ -186,12 +190,14 @@ func (m *mockImporter) OnComplete(input data.BlockData) error {
type mockProcessor struct {
mock.Mock
processors.Processor
cfg plugins.PluginConfig
finalRound basics.Round
returnError bool
onCompleteError bool
}

func (m *mockProcessor) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *log.Logger) error {
func (m *mockProcessor) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error {
m.cfg = cfg
return nil
}

Expand Down Expand Up @@ -228,6 +234,7 @@ func (m *mockProcessor) OnComplete(input data.BlockData) error {
type mockExporter struct {
mock.Mock
exporters.Exporter
cfg plugins.PluginConfig
finalRound basics.Round
returnError bool
onCompleteError bool
Expand All @@ -239,7 +246,8 @@ func (m *mockExporter) Metadata() conduit.Metadata {
}
}

func (m *mockExporter) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *log.Logger) error {
func (m *mockExporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error {
m.cfg = cfg
return nil
}

Expand Down Expand Up @@ -293,10 +301,11 @@ func TestPipelineRun(t *testing.T) {

ctx, cf := context.WithCancel(context.Background())

l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down Expand Up @@ -339,6 +348,7 @@ func TestPipelineCpuPidFiles(t *testing.T) {
pidFilePath := filepath.Join(tempDir, "pidfile")
cpuFilepath := filepath.Join(tempDir, "cpufile")

l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Expand All @@ -360,7 +370,7 @@ func TestPipelineCpuPidFiles(t *testing.T) {
Config: map[string]interface{}{},
},
},
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down Expand Up @@ -418,6 +428,7 @@ func TestPipelineErrors(t *testing.T) {
var cbComplete conduit.Completed = &mProcessor

ctx, cf := context.WithCancel(context.Background())
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
Expand All @@ -426,7 +437,7 @@ func TestPipelineErrors(t *testing.T) {
ConduitDataDir: tempDir,
},
},
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down Expand Up @@ -490,11 +501,12 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) {
var pExporter exporters.Exporter = &mExporter

ctx, cf := context.WithCancel(context.Background())
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
cfg: &Config{},
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor, &pProcessor},
Expand All @@ -507,6 +519,61 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) {
assert.Len(t, pImpl.completeCallback, 4)
}

// TestBlockMetaDataFile tests that metadata.json file is created as expected
func TestPluginConfigDataDir(t *testing.T) {

mImporter := mockImporter{}
mProcessor := mockProcessor{}
mExporter := mockExporter{}

var pImporter importers.Importer = &mImporter
var pProcessor processors.Processor = &mProcessor
var pExporter exporters.Exporter = &mExporter

datadir := t.TempDir()
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Flags: nil,
ConduitDataDir: datadir,
},
Importer: NameConfigPair{
Name: "",
Config: map[string]interface{}{},
},
Processors: []NameConfigPair{
{
Name: "",
Config: map[string]interface{}{},
},
},
Exporter: NameConfigPair{
Name: "",
Config: map[string]interface{}{},
},
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
pipelineMetadata: state{
NextRound: 3,
},
}

err := pImpl.Init()
assert.NoError(t, err)

assert.Equal(t, mImporter.cfg.DataDir, path.Join(datadir, "importer_mockImporter"))
assert.DirExists(t, mImporter.cfg.DataDir)
assert.Equal(t, mProcessor.cfg.DataDir, path.Join(datadir, "processor_mockProcessor"))
assert.DirExists(t, mProcessor.cfg.DataDir)
assert.Equal(t, mExporter.cfg.DataDir, path.Join(datadir, "exporter_mockExporter"))
assert.DirExists(t, mExporter.cfg.DataDir)
}

// TestBlockMetaDataFile tests that metadata.json file is created as expected
func TestBlockMetaDataFile(t *testing.T) {

Expand All @@ -515,6 +582,7 @@ func TestBlockMetaDataFile(t *testing.T) {
var pExporter exporters.Exporter = &mockExporter{}

datadir := t.TempDir()
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Expand All @@ -536,7 +604,7 @@ func TestBlockMetaDataFile(t *testing.T) {
Config: map[string]interface{}{},
},
},
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down Expand Up @@ -585,6 +653,7 @@ func TestGenesisHash(t *testing.T) {
var pProcessor processors.Processor = &mockProcessor{}
var pExporter exporters.Exporter = &mockExporter{}
datadir := t.TempDir()
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Expand All @@ -606,7 +675,7 @@ func TestGenesisHash(t *testing.T) {
Config: map[string]interface{}{},
},
},
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down Expand Up @@ -635,54 +704,12 @@ func TestGenesisHash(t *testing.T) {
assert.Contains(t, err.Error(), "genesis hash in metadata does not match")
}

func TestInitError(t *testing.T) {
var pImporter importers.Importer = &mockImporter{genesis: bookkeeping.Genesis{Network: "test"}}
var pProcessor processors.Processor = &mockProcessor{}
var pExporter exporters.Exporter = &mockExporter{}
datadir := "data"
pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Flags: nil,
ConduitDataDir: datadir,
},
Importer: NameConfigPair{
Name: "",
Config: map[string]interface{}{},
},
Processors: []NameConfigPair{
{
Name: "",
Config: map[string]interface{}{},
},
},
Exporter: NameConfigPair{
Name: "unknown",
Config: map[string]interface{}{},
},
},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
pipelineMetadata: state{
GenesisHash: "",
Network: "",
NextRound: 3,
},
}

// could not read metadata
err := pImpl.Init()
assert.Contains(t, err.Error(), "could not read metadata")
}

func TestPipelineMetricsConfigs(t *testing.T) {
var pImporter importers.Importer = &mockImporter{}
var pProcessor processors.Processor = &mockProcessor{}
var pExporter exporters.Exporter = &mockExporter{}
ctx, cf := context.WithCancel(context.Background())
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Expand All @@ -705,7 +732,7 @@ func TestPipelineMetricsConfigs(t *testing.T) {
},
Metrics: Metrics{},
},
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down Expand Up @@ -779,7 +806,8 @@ func TestPipelineLogFile(t *testing.T) {
PipelineLogLevel: "INFO",
}

_, err := MakePipeline(context.Background(), configs, log.New())
l, _ := test.NewNullLogger()
_, err := MakePipeline(context.Background(), configs, l)
require.NoError(t, err)

// Test that file is not created
Expand All @@ -788,7 +816,7 @@ func TestPipelineLogFile(t *testing.T) {

// Test that it is created
configs.LogFile = logfilePath
_, err = MakePipeline(context.Background(), configs, log.New())
_, err = MakePipeline(context.Background(), configs, l)
require.NoError(t, err)

_, err = os.Stat(logfilePath)
Expand All @@ -800,6 +828,7 @@ func TestRoundOverwrite(t *testing.T) {
var pImporter importers.Importer = &mockImporter{genesis: bookkeeping.Genesis{Network: "test"}}
var pProcessor processors.Processor = &mockProcessor{}
var pExporter exporters.Exporter = &mockExporter{}
l, _ := test.NewNullLogger()
pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Expand All @@ -822,7 +851,7 @@ func TestRoundOverwrite(t *testing.T) {
Config: map[string]interface{}{},
},
},
logger: log.New(),
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down

0 comments on commit bdea76c

Please sign in to comment.