From 4fcbd3a7c4c06091cbc0c1e1127509896d2778e8 Mon Sep 17 00:00:00 2001 From: "shiqi.zheng@algorand.com" Date: Fri, 14 Apr 2023 15:40:30 -0400 Subject: [PATCH 1/7] use conduit binary --- tools/block-generator/data/conduit.yml | 61 +++++++++ tools/block-generator/generator/generate.go | 28 +++- .../block-generator/generator/server_test.go | 24 ++-- tools/block-generator/metrics/metrics.go | 42 ------ tools/block-generator/run_tests.sh | 2 +- .../runner/metrics_collector.go | 25 ++++ tools/block-generator/runner/run.go | 128 ++++++++++++++---- tools/block-generator/runner/runner.go | 8 +- 8 files changed, 225 insertions(+), 93 deletions(-) create mode 100644 tools/block-generator/data/conduit.yml delete mode 100644 tools/block-generator/metrics/metrics.go diff --git a/tools/block-generator/data/conduit.yml b/tools/block-generator/data/conduit.yml new file mode 100644 index 0000000000..c361426ee6 --- /dev/null +++ b/tools/block-generator/data/conduit.yml @@ -0,0 +1,61 @@ +# Log verbosity: PANIC, FATAL, ERROR, WARN, INFO, DEBUG, TRACE +log-level: {{.LogLevel}} + +# If no log file is provided logs are written to stdout. +log-file: {{.LogFile}} + +# Number of retries to perform after a pipeline plugin error. +retry-count: 10 + +# Time duration to wait between retry attempts. +retry-delay: "1s" + +# Optional filepath to use for pidfile. +#pid-filepath: /path/to/pidfile + +# Whether or not to print the conduit banner on startup. +hide-banner: false + +# When enabled prometheus metrics are available on '/metrics' +metrics: + mode: ON + addr: "{{.MetricsPort}}" + prefix: "conduit" + +# The importer is typically an algod follower node. +importer: + name: algod + config: + # The mode of operation, either "archival" or "follower". + # * archival mode allows you to start processing on any round but does not + # contain the ledger state delta objects required for the postgres writer. + # * follower mode allows you to use a lightweight non-archival node as the + # data source. In addition, it will provide ledger state delta objects to + # the processors and exporter. + mode: "follower" + + # Algod API address. + netaddr: "{{.AlgodNet}}" + + # Algod API token. + token: "" + + +# Zero or more processors may be defined to manipulate what data +# reaches the exporter. +processors: + +# An exporter is defined to do something with the data. +exporter: + name: postgresql + config: + # Pgsql connection string + # See https://github.com/jackc/pgconn for more details + connection-string: "{{ .PostgresConnectionString }}" + + # Maximum connection number for connection pool + # This means the total number of active queries that can be running + # concurrently can never be more than this + max-conn: 20 + + diff --git a/tools/block-generator/generator/generate.go b/tools/block-generator/generator/generate.go index f1961587bb..6080c35e93 100644 --- a/tools/block-generator/generator/generate.go +++ b/tools/block-generator/generator/generate.go @@ -72,7 +72,7 @@ type GenerationConfig struct { GenesisAccountInitialBalance uint64 `yaml:"genesis_account_balance"` // Block generation - TxnPerBlock uint64 `mapstructure:"tx_per_block"` + TxnPerBlock uint64 `yaml:"tx_per_block"` // TX Distribution PaymentTransactionFraction float32 `yaml:"tx_pay_fraction"` @@ -120,7 +120,7 @@ func MakeGenerator(config GenerationConfig) (Generator, error) { genesisHash: [32]byte{}, genesisID: "blockgen-test", prevBlockHash: "", - round: 1, + round: 0, txnCounter: 0, timestamp: 0, rewardsLevel: 0, @@ -312,7 +312,9 @@ func (g *generator) WriteGenesis(output io.Writer) error { FeeSink: g.feeSink.String(), Timestamp: g.timestamp, } - return json.NewEncoder(output).Encode(gen) + + _, err := output.Write(protocol.EncodeJSON(gen)) + return err } func getTransactionOptions() []interface{} { @@ -357,12 +359,25 @@ func (g *generator) finishRound(txnCount uint64) { // WriteBlock generates a block full of new transactions and writes it to the writer. func (g *generator) WriteBlock(output io.Writer, round uint64) error { + if round != g.round { fmt.Printf("Generator only supports sequential block access. Expected %d but received request for %d.\n", g.round, round) } numTxnForBlock := g.txnForRound(round) + // return genesis block + if round == 0 { + // write the msgpack bytes for a block + block, err := rpcs.RawBlockBytes(g.ledger, basics.Round(round)) + if err != nil { + return err + } + output.Write(block) + g.finishRound(numTxnForBlock) + return nil + } + header := bookkeeping.BlockHeader{ Round: basics.Round(g.round), Branch: bookkeeping.BlockHash{}, @@ -414,15 +429,16 @@ func (g *generator) WriteBlock(output io.Writer, round uint64) error { Certificate: agreement.Certificate{}, } - err := json.NewEncoder(output).Encode(cert) + err := g.ledger.AddBlock(cert.Block, cert.Certificate) if err != nil { return err } - err = g.ledger.AddBlock(cert.Block, agreement.Certificate{}) + // write the msgpack bytes for a block + block, err := rpcs.RawBlockBytes(g.ledger, basics.Round(round)) if err != nil { return err } - g.ledger.WaitForCommit(basics.Round(g.round)) + output.Write(block) g.finishRound(numTxnForBlock) return nil } diff --git a/tools/block-generator/generator/server_test.go b/tools/block-generator/generator/server_test.go index 6e21d77586..7007db00f0 100644 --- a/tools/block-generator/generator/server_test.go +++ b/tools/block-generator/generator/server_test.go @@ -53,61 +53,61 @@ func TestParseURL(t *testing.T) { var testcases = []struct { name string url string - expectedRound string + expectedParam string err string }{ { name: "no block", url: "/v2/blocks/", - expectedRound: "", + expectedParam: "", err: "invalid request path, /v2/blocks/", }, { name: "normal one digit", url: fmt.Sprintf("%s1", blockQueryPrefix), - expectedRound: "1", + expectedParam: "1", err: "", }, { name: "normal long number", url: fmt.Sprintf("%s12345678", blockQueryPrefix), - expectedRound: "12345678", + expectedParam: "12345678", err: "", }, { name: "with query parameters", url: fmt.Sprintf("%s1234?pretty", blockQueryPrefix), - expectedRound: "1234", + expectedParam: "1234", err: "", }, { name: "with query parameters", url: fmt.Sprintf("%s1234?pretty", blockQueryPrefix), - expectedRound: "1234", + expectedParam: "1234", err: "", }, { name: "no deltas", url: "/v2/deltas/", - expectedRound: "", + expectedParam: "", err: "invalid request path, /v2/deltas/", }, { name: "deltas", url: fmt.Sprintf("%s123?Format=msgp", deltaQueryPrefix), - expectedRound: "123", + expectedParam: "123", err: "", }, { name: "no account", url: "/v2/accounts/", - expectedRound: "", + expectedParam: "", err: "invalid request path, /v2/accounts/", }, { name: "accounts", url: fmt.Sprintf("%sAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4", accountQueryPrefix), - expectedRound: "AIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4", + expectedParam: "AIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4", err: "", }, } @@ -117,9 +117,9 @@ func TestParseURL(t *testing.T) { round, err := parseURL(testcase.url) if len(testcase.err) == 0 { msg := fmt.Sprintf("Unexpected error parsing '%s', expected round '%s' received error: %v", - testcase.url, testcase.expectedRound, err) + testcase.url, testcase.expectedParam, err) require.NoError(t, err, msg) - assert.Equal(t, testcase.expectedRound, round) + assert.Equal(t, testcase.expectedParam, round) } else { require.Error(t, err, fmt.Sprintf("Expected an error containing: %s", testcase.err)) require.True(t, strings.Contains(err.Error(), testcase.err)) diff --git a/tools/block-generator/metrics/metrics.go b/tools/block-generator/metrics/metrics.go deleted file mode 100644 index 6c60fbabc2..0000000000 --- a/tools/block-generator/metrics/metrics.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (C) 2019-2023 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package metrics - -// Prometheus metrics collected in Conduit. -const ( - BlockImportTimeName = "import_time_sec" - ImportedTxnsPerBlockName = "imported_tx_per_block" - ImportedRoundGaugeName = "imported_round" - GetAlgodRawBlockTimeName = "get_algod_raw_block_time_sec" - ImportedTxnsName = "imported_txns" - ImporterTimeName = "importer_time_sec" - ProcessorTimeName = "processor_time_sec" - ExporterTimeName = "exporter_time_sec" - PipelineRetryCountName = "pipeline_retry_count" -) - -// AllMetricNames is a reference for all the custom metric names. -var AllMetricNames = []string{ - BlockImportTimeName, - ImportedTxnsPerBlockName, - ImportedRoundGaugeName, - GetAlgodRawBlockTimeName, - ImporterTimeName, - ProcessorTimeName, - ExporterTimeName, - PipelineRetryCountName, -} diff --git a/tools/block-generator/run_tests.sh b/tools/block-generator/run_tests.sh index 0a1c6df49e..8befa41dc4 100755 --- a/tools/block-generator/run_tests.sh +++ b/tools/block-generator/run_tests.sh @@ -17,7 +17,7 @@ help() { echo " -r|--report-dir directory where the report should be written." echo " -d|--duration test duration." echo " -l|--level log level to pass to Indexer." - echo " -g|--generator use a different indexer binary to run the generator." + echo " -g|--generator use a different block-generator binary to run the generator." exit } diff --git a/tools/block-generator/runner/metrics_collector.go b/tools/block-generator/runner/metrics_collector.go index 327ca9e33b..394bf32cba 100644 --- a/tools/block-generator/runner/metrics_collector.go +++ b/tools/block-generator/runner/metrics_collector.go @@ -24,6 +24,31 @@ import ( "time" ) +// Prometheus metrics collected in Conduit. +const ( + BlockImportTimeName = "import_time_sec" + ImportedTxnsPerBlockName = "imported_tx_per_block" + ImportedRoundGaugeName = "imported_round" + GetAlgodRawBlockTimeName = "get_algod_raw_block_time_sec" + ImportedTxnsName = "imported_txns" + ImporterTimeName = "importer_time_sec" + ProcessorTimeName = "processor_time_sec" + ExporterTimeName = "exporter_time_sec" + PipelineRetryCountName = "pipeline_retry_count" +) + +// AllMetricNames is a reference for all the custom metric names. +var AllMetricNames = []string{ + BlockImportTimeName, + ImportedTxnsPerBlockName, + ImportedRoundGaugeName, + GetAlgodRawBlockTimeName, + ImporterTimeName, + ProcessorTimeName, + ExporterTimeName, + PipelineRetryCountName, +} + // MetricsCollector queries a /metrics endpoint for prometheus style metrics and saves metrics matching a pattern. type MetricsCollector struct { // MetricsURL where metrics can be queried. diff --git a/tools/block-generator/runner/run.go b/tools/block-generator/runner/run.go index 9604212da1..0b1d49c518 100644 --- a/tools/block-generator/runner/run.go +++ b/tools/block-generator/runner/run.go @@ -17,19 +17,21 @@ package runner import ( + "bytes" "context" "encoding/json" "fmt" "net/http" "os" + "os/exec" "path" "path/filepath" "strconv" "strings" + "text/template" "time" "github.com/algorand/go-algorand/tools/block-generator/generator" - "github.com/algorand/go-algorand/tools/block-generator/metrics" "github.com/algorand/go-algorand/tools/block-generator/util" "github.com/algorand/go-deadlock" ) @@ -38,8 +40,8 @@ import ( type Args struct { // Path is a directory when passed to RunBatch, otherwise a file path. Path string - IndexerBinary string - IndexerPort uint64 + ConduitBinary string + MetricsPort uint64 PostgresConnectionString string CPUProfilePath string RunDuration time.Duration @@ -50,7 +52,15 @@ type Args struct { KeepDataDir bool } -// Run is a publi8c helper to run the tests. +type config struct { + LogLevel string + LogFile string + MetricsPort string + AlgodNet string + PostgresConnectionString string +} + +// Run is a public helper to run the tests. // The test will run against the generator configuration file specified by 'args.Path'. // If 'args.Path' is a directory it should contain generator configuration files, a test will run using each file. func Run(args Args) error { @@ -86,8 +96,12 @@ func (r *Args) run() error { baseName := filepath.Base(r.Path) baseNameNoExt := strings.TrimSuffix(baseName, filepath.Ext(baseName)) reportfile := path.Join(r.ReportDirectory, fmt.Sprintf("%s.report", baseNameNoExt)) - //logfile := path.Join(r.ReportDirectory, fmt.Sprintf("%s.indexer-log", baseNameNoExt)) + logfile := path.Join(r.ReportDirectory, fmt.Sprintf("%s.indexer-log", baseNameNoExt)) dataDir := path.Join(r.ReportDirectory, fmt.Sprintf("%s_data", baseNameNoExt)) + // create the data directory. + if err := os.Mkdir(dataDir, os.ModeDir|os.ModePerm); err != nil { + return fmt.Errorf("failed to create data directory: %w", err) + } if !r.KeepDataDir { defer os.RemoveAll(dataDir) } @@ -103,7 +117,7 @@ func (r *Args) run() error { } // Start services algodNet := fmt.Sprintf("localhost:%d", 11112) - indexerNet := fmt.Sprintf("localhost:%d", r.IndexerPort) + metricsNet := fmt.Sprintf("localhost:%d", r.MetricsPort) generatorShutdownFunc, _ := startGenerator(r.Path, algodNet, blockMiddleware) defer func() { // Shutdown generator. @@ -111,17 +125,43 @@ func (r *Args) run() error { fmt.Printf("Failed to shutdown generator: %s\n", err) } }() + time.Sleep(1 * time.Second) + // write conduit config file + cwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("current working directory: %w", err) + } + t, err := template.ParseFiles(path.Join(cwd, "data/conduit.yml")) + if err != nil { + return fmt.Errorf("unable to open config template file: %w", err) + } - //indexerShutdownFunc, err := startIndexer(dataDir, logfile, r.LogLevel, r.IndexerBinary, algodNet, indexerNet, r.PostgresConnectionString, r.CPUProfilePath) - //if err != nil { - // return fmt.Errorf("failed to start indexer: %w", err) - //} - //defer func() { - // // Shutdown indexer - // if err := indexerShutdownFunc(); err != nil { - // fmt.Printf("Failed to shutdown indexer: %s\n", err) - // } - //}() + f, err := os.Create(path.Join(dataDir, "conduit.yml")) + if err != nil { + return fmt.Errorf("creating conduit.yml: ", err) + } + defer f.Close() + + conduitConfig := config{r.LogLevel, logfile, + fmt.Sprintf(":%d", r.MetricsPort), + algodNet, r.PostgresConnectionString, + } + + err = t.Execute(f, conduitConfig) + if err != nil { + return fmt.Errorf("execute template file: ", err) + } + + indexerShutdownFunc, err := startIndexer(dataDir, r.ConduitBinary) + if err != nil { + return fmt.Errorf("failed to start indexer: %w", err) + } + defer func() { + // Shutdown indexer + if err := indexerShutdownFunc(); err != nil { + fmt.Printf("Failed to shutdown indexer: %s\n", err) + } + }() // Create the report file report, err := os.Create(reportfile) @@ -131,7 +171,7 @@ func (r *Args) run() error { defer report.Close() // Run the test, collecting results. - if err := r.runTest(report, indexerNet, algodNet); err != nil { + if err := r.runTest(report, metricsNet, algodNet); err != nil { return err } @@ -158,23 +198,23 @@ func recordDataToFile(start time.Time, entry Entry, prefix string, out *os.File) } } - record("_average", metrics.BlockImportTimeName, rate) - record("_cumulative", metrics.BlockImportTimeName, floatTotal) - record("_average", metrics.ImportedTxnsPerBlockName, rate) - record("_cumulative", metrics.ImportedTxnsPerBlockName, intTotal) - record("", metrics.ImportedRoundGaugeName, intTotal) + record("_average", BlockImportTimeName, rate) + record("_cumulative", BlockImportTimeName, floatTotal) + record("_average", ImportedTxnsPerBlockName, rate) + record("_cumulative", ImportedTxnsPerBlockName, intTotal) + record("", ImportedRoundGaugeName, intTotal) if len(writeErrors) > 0 { return fmt.Errorf("error writing metrics (%s): %w", strings.Join(writeErrors, ", "), writeErr) } // Calculate import transactions per second. - totalTxn, err := getMetric(entry, metrics.ImportedTxnsPerBlockName, false) + totalTxn, err := getMetric(entry, ImportedTxnsPerBlockName, false) if err != nil { return err } - importTimeS, err := getMetric(entry, metrics.BlockImportTimeName, false) + importTimeS, err := getMetric(entry, BlockImportTimeName, false) if err != nil { return err } @@ -266,8 +306,8 @@ func getMetric(entry Entry, suffix string, rateMetric bool) (float64, error) { } // Run the test for 'RunDuration', collect metrics and write them to the 'ReportDirectory' -func (r *Args) runTest(report *os.File, indexerURL string, generatorURL string) error { - collector := &MetricsCollector{MetricsURL: fmt.Sprintf("http://%s/metrics", indexerURL)} +func (r *Args) runTest(report *os.File, metricsURL string, generatorURL string) error { + collector := &MetricsCollector{MetricsURL: fmt.Sprintf("http://%s/metrics", metricsURL)} // Run for r.RunDuration start := time.Now() @@ -275,12 +315,12 @@ func (r *Args) runTest(report *os.File, indexerURL string, generatorURL string) for time.Since(start) < r.RunDuration { time.Sleep(r.RunDuration / 10) - if err := collector.Collect(metrics.AllMetricNames...); err != nil { + if err := collector.Collect(AllMetricNames...); err != nil { return fmt.Errorf("problem collecting metrics (%d / %s): %w", count, time.Since(start), err) } count++ } - if err := collector.Collect(metrics.AllMetricNames...); err != nil { + if err := collector.Collect(AllMetricNames...); err != nil { return fmt.Errorf("problem collecting final metrics (%d / %s): %w", count, time.Since(start), err) } @@ -335,6 +375,7 @@ func startGenerator(configFile string, addr string, blockMiddleware func(http.Ha // Start the server go func() { // always returns error. ErrServerClosed on graceful close + fmt.Printf("generator serving on %s\n", server.Addr) if err := server.ListenAndServe(); err != http.ErrServerClosed { util.MaybeFail(err, "ListenAndServe() failure to start with config file '%s'", configFile) } @@ -350,3 +391,34 @@ func startGenerator(configFile string, addr string, blockMiddleware func(http.Ha return nil }, generator } + +// startIndexer starts the indexer. +func startIndexer(dataDir string, conduitBinary string) (func() error, error) { + cmd := exec.Command( + conduitBinary, + "-d", dataDir, + ) + + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failure calling Start(): %w", err) + } + // conduit doesn't have health check endpoint. so, no health check for now + + return func() error { + if err := cmd.Process.Signal(os.Interrupt); err != nil { + fmt.Printf("failed to kill indexer process: %s\n", err) + if err := cmd.Process.Kill(); err != nil { + return fmt.Errorf("failed to kill indexer process: %w", err) + } + } + if err := cmd.Wait(); err != nil { + fmt.Printf("ignoring error while waiting for process to stop: %s\n", err) + } + return nil + }, nil +} diff --git a/tools/block-generator/runner/runner.go b/tools/block-generator/runner/runner.go index 94df877660..2eab45fea5 100644 --- a/tools/block-generator/runner/runner.go +++ b/tools/block-generator/runner/runner.go @@ -34,7 +34,7 @@ func init() { RunnerCmd = &cobra.Command{ Use: "runner", Short: "Run test suite and collect results.", - Long: "Run an automated test suite using the block-generator daemon and a provided algorand-indexer binary. Results are captured to a specified output directory.", + Long: "Run an automated test suite using the block-generator daemon and a provided conduit binary. Results are captured to a specified output directory.", Run: func(cmd *cobra.Command, args []string) { if err := Run(runnerArgs); err != nil { fmt.Println(err) @@ -43,12 +43,12 @@ func init() { } RunnerCmd.Flags().StringVarP(&runnerArgs.Path, "scenario", "s", "", "Directory containing scenarios, or specific scenario file.") - RunnerCmd.Flags().StringVarP(&runnerArgs.IndexerBinary, "indexer-binary", "i", "", "Path to indexer binary.") - RunnerCmd.Flags().Uint64VarP(&runnerArgs.IndexerPort, "indexer-port", "p", 4010, "Port to start the server at. This is useful if you have a prometheus server for collecting additional data.") + RunnerCmd.Flags().StringVarP(&runnerArgs.ConduitBinary, "conduit-binary", "i", "", "Path to conduit binary.") + RunnerCmd.Flags().Uint64VarP(&runnerArgs.MetricsPort, "metrics-port", "p", 9999, "Port to start the metrics server at.") RunnerCmd.Flags().StringVarP(&runnerArgs.PostgresConnectionString, "postgres-connection-string", "c", "", "Postgres connection string.") RunnerCmd.Flags().DurationVarP(&runnerArgs.RunDuration, "test-duration", "d", 5*time.Minute, "Duration to use for each scenario.") RunnerCmd.Flags().StringVarP(&runnerArgs.ReportDirectory, "report-directory", "r", "", "Location to place test reports.") - RunnerCmd.Flags().StringVarP(&runnerArgs.LogLevel, "log-level", "l", "error", "LogLevel to use when starting Indexer. [error, warn, info, debug, trace]") + RunnerCmd.Flags().StringVarP(&runnerArgs.LogLevel, "log-level", "l", "ERROR", "LogLevel to use when starting Indexer. [PANIC, FATAL, ERROR, WARN, INFO, DEBUG, TRACE]") RunnerCmd.Flags().StringVarP(&runnerArgs.CPUProfilePath, "cpuprofile", "", "", "Path where Indexer writes its CPU profile.") RunnerCmd.Flags().BoolVarP(&runnerArgs.ResetReportDir, "reset", "", false, "If set any existing report directory will be deleted before running tests.") RunnerCmd.Flags().BoolVarP(&runnerArgs.RunValidation, "validate", "", false, "If set the validator will run after test-duration has elapsed to verify data is correct. An extra line in each report indicates validator success or failure.") From 4cd3fe1342c10f0c8632ebd8a6b8c127d5f84fb8 Mon Sep 17 00:00:00 2001 From: "shiqi.zheng@algorand.com" Date: Fri, 14 Apr 2023 16:00:04 -0400 Subject: [PATCH 2/7] updates --- tools/block-generator/runner/run.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/block-generator/runner/run.go b/tools/block-generator/runner/run.go index 0b1d49c518..05199bee73 100644 --- a/tools/block-generator/runner/run.go +++ b/tools/block-generator/runner/run.go @@ -125,8 +125,8 @@ func (r *Args) run() error { fmt.Printf("Failed to shutdown generator: %s\n", err) } }() - time.Sleep(1 * time.Second) - // write conduit config file + + // get conduit config template cwd, err := os.Getwd() if err != nil { return fmt.Errorf("current working directory: %w", err) @@ -136,6 +136,7 @@ func (r *Args) run() error { return fmt.Errorf("unable to open config template file: %w", err) } + // create config file in the right data directory f, err := os.Create(path.Join(dataDir, "conduit.yml")) if err != nil { return fmt.Errorf("creating conduit.yml: ", err) @@ -152,6 +153,7 @@ func (r *Args) run() error { return fmt.Errorf("execute template file: ", err) } + // Start indexer indexerShutdownFunc, err := startIndexer(dataDir, r.ConduitBinary) if err != nil { return fmt.Errorf("failed to start indexer: %w", err) From 0f7285b7d134ac700be728d1cbf5135858a1f81a Mon Sep 17 00:00:00 2001 From: "shiqi.zheng@algorand.com" Date: Fri, 14 Apr 2023 16:02:55 -0400 Subject: [PATCH 3/7] lint fix --- tools/block-generator/generator/generate.go | 10 ++++++++-- tools/block-generator/runner/run.go | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/tools/block-generator/generator/generate.go b/tools/block-generator/generator/generate.go index 6080c35e93..a8c245a488 100644 --- a/tools/block-generator/generator/generate.go +++ b/tools/block-generator/generator/generate.go @@ -373,7 +373,10 @@ func (g *generator) WriteBlock(output io.Writer, round uint64) error { if err != nil { return err } - output.Write(block) + _, err = output.Write(block) + if err != nil { + return err + } g.finishRound(numTxnForBlock) return nil } @@ -438,7 +441,10 @@ func (g *generator) WriteBlock(output io.Writer, round uint64) error { if err != nil { return err } - output.Write(block) + _, err = output.Write(block) + if err != nil { + return err + } g.finishRound(numTxnForBlock) return nil } diff --git a/tools/block-generator/runner/run.go b/tools/block-generator/runner/run.go index 05199bee73..6117add0aa 100644 --- a/tools/block-generator/runner/run.go +++ b/tools/block-generator/runner/run.go @@ -139,7 +139,7 @@ func (r *Args) run() error { // create config file in the right data directory f, err := os.Create(path.Join(dataDir, "conduit.yml")) if err != nil { - return fmt.Errorf("creating conduit.yml: ", err) + return fmt.Errorf("creating conduit.yml: %v", err) } defer f.Close() @@ -150,7 +150,7 @@ func (r *Args) run() error { err = t.Execute(f, conduitConfig) if err != nil { - return fmt.Errorf("execute template file: ", err) + return fmt.Errorf("execute template file: %v", err) } // Start indexer From e6a45f916f0edc66f34b3acc30a55b8de604e0b8 Mon Sep 17 00:00:00 2001 From: "shiqi.zheng@algorand.com" Date: Fri, 14 Apr 2023 16:42:42 -0400 Subject: [PATCH 4/7] fix test --- tools/block-generator/generator/generate_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/block-generator/generator/generate_test.go b/tools/block-generator/generator/generate_test.go index 78ca9e0323..e57d3f8006 100644 --- a/tools/block-generator/generator/generate_test.go +++ b/tools/block-generator/generator/generate_test.go @@ -203,6 +203,7 @@ func TestWriteRound(t *testing.T) { g := makePrivateGenerator(t) var data []byte writer := bytes.NewBuffer(data) + g.WriteBlock(writer, 0) g.WriteBlock(writer, 1) var block rpcs.EncodedBlockCert protocol.Decode(data, &block) From d9d8b67f794a37e6c28dfe009e76e15a0b327f33 Mon Sep 17 00:00:00 2001 From: "shiqi.zheng@algorand.com" Date: Wed, 19 Apr 2023 12:08:16 -0400 Subject: [PATCH 5/7] address PR comments --- tools/block-generator/runner/run.go | 29 ++++++++++--------- tools/block-generator/runner/runner.go | 2 +- .../template/conduit.yml.tmpl} | 0 3 files changed, 16 insertions(+), 15 deletions(-) rename tools/block-generator/{data/conduit.yml => runner/template/conduit.yml.tmpl} (100%) diff --git a/tools/block-generator/runner/run.go b/tools/block-generator/runner/run.go index 6117add0aa..407a1b48bd 100644 --- a/tools/block-generator/runner/run.go +++ b/tools/block-generator/runner/run.go @@ -19,6 +19,8 @@ package runner import ( "bytes" "context" + // embed conduit template config file + _ "embed" "encoding/json" "fmt" "net/http" @@ -36,6 +38,9 @@ import ( "github.com/algorand/go-deadlock" ) +//go:embed template/conduit.yml.tmpl +var conduitConfigTmpl string + // Args are all the things needed to run a performance test. type Args struct { // Path is a directory when passed to RunBatch, otherwise a file path. @@ -127,13 +132,9 @@ func (r *Args) run() error { }() // get conduit config template - cwd, err := os.Getwd() - if err != nil { - return fmt.Errorf("current working directory: %w", err) - } - t, err := template.ParseFiles(path.Join(cwd, "data/conduit.yml")) + t, err := template.New("conduit").Parse(conduitConfigTmpl) if err != nil { - return fmt.Errorf("unable to open config template file: %w", err) + return fmt.Errorf("unable to parse conduit config template: %w", err) } // create config file in the right data directory @@ -153,15 +154,15 @@ func (r *Args) run() error { return fmt.Errorf("execute template file: %v", err) } - // Start indexer - indexerShutdownFunc, err := startIndexer(dataDir, r.ConduitBinary) + // Start conduit + conduitShutdownFunc, err := startConduit(dataDir, r.ConduitBinary) if err != nil { - return fmt.Errorf("failed to start indexer: %w", err) + return fmt.Errorf("failed to start Conduit: %w", err) } defer func() { - // Shutdown indexer - if err := indexerShutdownFunc(); err != nil { - fmt.Printf("Failed to shutdown indexer: %s\n", err) + // Shutdown conduit + if err := conduitShutdownFunc(); err != nil { + fmt.Printf("Failed to shutdown Conduit: %s\n", err) } }() @@ -394,8 +395,8 @@ func startGenerator(configFile string, addr string, blockMiddleware func(http.Ha }, generator } -// startIndexer starts the indexer. -func startIndexer(dataDir string, conduitBinary string) (func() error, error) { +// startConduit starts the indexer. +func startConduit(dataDir string, conduitBinary string) (func() error, error) { cmd := exec.Command( conduitBinary, "-d", dataDir, diff --git a/tools/block-generator/runner/runner.go b/tools/block-generator/runner/runner.go index 2eab45fea5..6c653efaec 100644 --- a/tools/block-generator/runner/runner.go +++ b/tools/block-generator/runner/runner.go @@ -48,7 +48,7 @@ func init() { RunnerCmd.Flags().StringVarP(&runnerArgs.PostgresConnectionString, "postgres-connection-string", "c", "", "Postgres connection string.") RunnerCmd.Flags().DurationVarP(&runnerArgs.RunDuration, "test-duration", "d", 5*time.Minute, "Duration to use for each scenario.") RunnerCmd.Flags().StringVarP(&runnerArgs.ReportDirectory, "report-directory", "r", "", "Location to place test reports.") - RunnerCmd.Flags().StringVarP(&runnerArgs.LogLevel, "log-level", "l", "ERROR", "LogLevel to use when starting Indexer. [PANIC, FATAL, ERROR, WARN, INFO, DEBUG, TRACE]") + RunnerCmd.Flags().StringVarP(&runnerArgs.LogLevel, "log-level", "l", "error", "LogLevel to use when starting Indexer. [panic, fatal, error, warn, info, debug, trace]") RunnerCmd.Flags().StringVarP(&runnerArgs.CPUProfilePath, "cpuprofile", "", "", "Path where Indexer writes its CPU profile.") RunnerCmd.Flags().BoolVarP(&runnerArgs.ResetReportDir, "reset", "", false, "If set any existing report directory will be deleted before running tests.") RunnerCmd.Flags().BoolVarP(&runnerArgs.RunValidation, "validate", "", false, "If set the validator will run after test-duration has elapsed to verify data is correct. An extra line in each report indicates validator success or failure.") diff --git a/tools/block-generator/data/conduit.yml b/tools/block-generator/runner/template/conduit.yml.tmpl similarity index 100% rename from tools/block-generator/data/conduit.yml rename to tools/block-generator/runner/template/conduit.yml.tmpl From 1980571ec5fa4d2400ae0952cb8f3ec8010f2efa Mon Sep 17 00:00:00 2001 From: "shiqi.zheng@algorand.com" Date: Wed, 19 Apr 2023 14:07:37 -0400 Subject: [PATCH 6/7] update scripts --- tools/block-generator/run_generator.sh | 56 -------------------------- tools/block-generator/run_postgres.sh | 2 +- tools/block-generator/run_runner.sh | 13 +++--- tools/block-generator/run_tests.sh | 19 +++++---- 4 files changed, 18 insertions(+), 72 deletions(-) delete mode 100755 tools/block-generator/run_generator.sh diff --git a/tools/block-generator/run_generator.sh b/tools/block-generator/run_generator.sh deleted file mode 100755 index 47e87b2e0d..0000000000 --- a/tools/block-generator/run_generator.sh +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env bash - -# Demonstrate how to run the generator and connect it to indexer. - -set -e - -POSTGRES_CONTAINER=generator-test-container -POSTGRES_PORT=15432 -POSTGRES_DATABASE=generator_db -CONFIG=${1:-"$(dirname $0)/test_config.yml"} -echo "Using config file: $CONFIG" - -function start_postgres() { - docker rm -f $POSTGRES_CONTAINER > /dev/null 2>&1 || true - - # Start postgres container... - docker run \ - -d \ - --name $POSTGRES_CONTAINER \ - -e POSTGRES_USER=algorand \ - -e POSTGRES_PASSWORD=algorand \ - -e PGPASSWORD=algorand \ - -p $POSTGRES_PORT:5432 \ - postgres - - sleep 5 - - docker exec -it $POSTGRES_CONTAINER psql -Ualgorand -c "create database $POSTGRES_DATABASE" -} - -function shutdown() { - docker rm -f $POSTGRES_CONTAINER > /dev/null 2>&1 || true - kill -9 $GENERATOR_PID -} - -trap shutdown EXIT - -echo "Building generator." -pushd $(dirname "$0") > /dev/null -go build -cd ../.. > /dev/null -echo "Building indexer." -make -popd -echo "Starting postgres container." -start_postgres -echo "Starting block generator (see generator.log)" -$(dirname "$0")/block-generator daemon --port 11111 --config "${CONFIG}" & -GENERATOR_PID=$! -echo "Starting indexer" -$(dirname "$0")/../../cmd/algorand-indexer/algorand-indexer daemon \ - -S localhost:8980 \ - --algod-net localhost:11111 \ - --algod-token security-is-our-number-one-priority \ - --metrics-mode VERBOSE \ - -P "host=localhost user=algorand password=algorand dbname=generator_db port=15432 sslmode=disable" diff --git a/tools/block-generator/run_postgres.sh b/tools/block-generator/run_postgres.sh index c6a967132b..4817961220 100755 --- a/tools/block-generator/run_postgres.sh +++ b/tools/block-generator/run_postgres.sh @@ -4,7 +4,7 @@ # in a debugger. Simply start this script and run with: # ./block-generator runner \ # -d 5s \ -# -i ./../algorand-indexer/algorand-indexer \ +# -i \ # -c "host=localhost user=algorand password=algorand dbname=algorand port=15432 sslmode=disable" \ # -r results \ # -s scenarios/config.payment.small.yml diff --git a/tools/block-generator/run_runner.sh b/tools/block-generator/run_runner.sh index d90749b96b..7f1205cae5 100755 --- a/tools/block-generator/run_runner.sh +++ b/tools/block-generator/run_runner.sh @@ -4,10 +4,16 @@ set -e +CONDUIT_BINARY=$1 +if [ -z "$CONDUIT_BINARY" ]; then + echo "path to conduit binary is required" + exit 1 +fi + POSTGRES_CONTAINER=generator-test-container POSTGRES_PORT=15432 POSTGRES_DATABASE=generator_db -CONFIG=${1:-"$(dirname $0)/test_config.yml"} +CONFIG=${2:-"$(dirname $0)/test_config.yml"} echo "Using config file: $CONFIG" function start_postgres() { @@ -38,15 +44,12 @@ rm -rf OUTPUT_RUN_RUNNER_TEST > /dev/null 2>&1 echo "Building generator." pushd $(dirname "$0") > /dev/null go build -cd ../.. > /dev/null -echo "Building indexer." -make popd echo "Starting postgres container." start_postgres echo "Starting test runner" $(dirname "$0")/block-generator runner \ - --indexer-binary ../algorand-indexer/algorand-indexer \ + --conduit-binary "$CONDUIT_BINARY" \ --report-directory OUTPUT_RUN_RUNNER_TEST \ --test-duration 30s \ --log-level trace \ diff --git a/tools/block-generator/run_tests.sh b/tools/block-generator/run_tests.sh index 8befa41dc4..5d25d50af4 100755 --- a/tools/block-generator/run_tests.sh +++ b/tools/block-generator/run_tests.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash CONNECTION_STRING="" -INDEXER_BINARY="" +CONDUIT_BINARY="" REPORT_DIR="" DURATION="1h" LOG_LEVEL="error" @@ -17,7 +17,7 @@ help() { echo " -r|--report-dir directory where the report should be written." echo " -d|--duration test duration." echo " -l|--level log level to pass to Indexer." - echo " -g|--generator use a different block-generator binary to run the generator." + echo " -g|--generator block-generator binary to run the generator." exit } @@ -34,7 +34,7 @@ while :; do shift ;; -i | --indexer) - INDEXER_BINARY="${2-}" + CONDUIT_BINARY="${2-}" shift ;; -r | --report-dir) @@ -66,7 +66,7 @@ if [ -z "$CONNECTION_STRING" ]; then exit 1 fi -if [ -z "$INDEXER_BINARY" ]; then +if [ -z "$CONDUIT_BINARY" ]; then echo "Missing required indexer binary parameter (-i / --indexer)." exit 1 fi @@ -77,18 +77,17 @@ if [ -z "$SCENARIOS" ]; then fi if [ -z "$GENERATOR_BINARY" ]; then - echo "Using indexer binary for generator, override with (-g / --generator)." - GENERATOR_BINARY="$INDEXER_BINARY" + echo "path to block-generator binary is required" + exit 1 fi -echo "Running with binary: $INDEXER_BINARY" +echo "Running with binary: $CONDUIT_BINARY" echo "Report directory: $REPORT_DIR" echo "Duration: $DURATION" echo "Log Level: $LOG_LEVEL" -"$GENERATOR_BINARY" \ - util block-generator runner \ - -i "$INDEXER_BINARY" \ +"$GENERATOR_BINARY" runner \ + -i "$CONDUIT_BINARY" \ -s "$SCENARIOS" \ -d "$DURATION" \ -c "$CONNECTION_STRING" \ From 32ad557438a9f33e2678d3ae1e324544104bcc4a Mon Sep 17 00:00:00 2001 From: "shiqi.zheng@algorand.com" Date: Wed, 19 Apr 2023 14:47:12 -0400 Subject: [PATCH 7/7] wait for /metrics to be available --- tools/block-generator/runner/run.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tools/block-generator/runner/run.go b/tools/block-generator/runner/run.go index 407a1b48bd..5e68cd49ae 100644 --- a/tools/block-generator/runner/run.go +++ b/tools/block-generator/runner/run.go @@ -174,7 +174,20 @@ func (r *Args) run() error { defer report.Close() // Run the test, collecting results. - if err := r.runTest(report, metricsNet, algodNet); err != nil { + // check /metrics endpoint is available before running the test + var resp *http.Response + for retry := 0; retry < 10; retry++ { + resp, err = http.Get(fmt.Sprintf("http://%s/metrics", metricsNet)) + if err == nil { + resp.Body.Close() + break + } + time.Sleep(3 * time.Second) + } + if err != nil { + return fmt.Errorf("failed to query metrics endpoint: %w", err) + } + if err = r.runTest(report, metricsNet, algodNet); err != nil { return err }