diff --git a/tools/block-generator/generator/generate.go b/tools/block-generator/generator/generate.go
index f1961587bb..a8c245a488 100644
--- a/tools/block-generator/generator/generate.go
+++ b/tools/block-generator/generator/generate.go
@@ -72,7 +72,7 @@ type GenerationConfig struct {
GenesisAccountInitialBalance uint64 `yaml:"genesis_account_balance"`
// Block generation
- TxnPerBlock uint64 `mapstructure:"tx_per_block"`
+ TxnPerBlock uint64 `yaml:"tx_per_block"`
// TX Distribution
PaymentTransactionFraction float32 `yaml:"tx_pay_fraction"`
@@ -120,7 +120,7 @@ func MakeGenerator(config GenerationConfig) (Generator, error) {
genesisHash: [32]byte{},
genesisID: "blockgen-test",
prevBlockHash: "",
- round: 1,
+ round: 0,
txnCounter: 0,
timestamp: 0,
rewardsLevel: 0,
@@ -312,7 +312,9 @@ func (g *generator) WriteGenesis(output io.Writer) error {
FeeSink: g.feeSink.String(),
Timestamp: g.timestamp,
}
- return json.NewEncoder(output).Encode(gen)
+
+ _, err := output.Write(protocol.EncodeJSON(gen))
+ return err
}
func getTransactionOptions() []interface{} {
@@ -357,12 +359,28 @@ func (g *generator) finishRound(txnCount uint64) {
// WriteBlock generates a block full of new transactions and writes it to the writer.
func (g *generator) WriteBlock(output io.Writer, round uint64) error {
+
if round != g.round {
fmt.Printf("Generator only supports sequential block access. Expected %d but received request for %d.\n", g.round, round)
}
numTxnForBlock := g.txnForRound(round)
+ // return genesis block
+ if round == 0 {
+ // write the msgpack bytes for a block
+ block, err := rpcs.RawBlockBytes(g.ledger, basics.Round(round))
+ if err != nil {
+ return err
+ }
+ _, err = output.Write(block)
+ if err != nil {
+ return err
+ }
+ g.finishRound(numTxnForBlock)
+ return nil
+ }
+
header := bookkeeping.BlockHeader{
Round: basics.Round(g.round),
Branch: bookkeeping.BlockHash{},
@@ -414,15 +432,19 @@ func (g *generator) WriteBlock(output io.Writer, round uint64) error {
Certificate: agreement.Certificate{},
}
- err := json.NewEncoder(output).Encode(cert)
+ err := g.ledger.AddBlock(cert.Block, cert.Certificate)
+ if err != nil {
+ return err
+ }
+ // write the msgpack bytes for a block
+ block, err := rpcs.RawBlockBytes(g.ledger, basics.Round(round))
if err != nil {
return err
}
- err = g.ledger.AddBlock(cert.Block, agreement.Certificate{})
+ _, err = output.Write(block)
if err != nil {
return err
}
- g.ledger.WaitForCommit(basics.Round(g.round))
g.finishRound(numTxnForBlock)
return nil
}
diff --git a/tools/block-generator/generator/generate_test.go b/tools/block-generator/generator/generate_test.go
index 78ca9e0323..e57d3f8006 100644
--- a/tools/block-generator/generator/generate_test.go
+++ b/tools/block-generator/generator/generate_test.go
@@ -203,6 +203,7 @@ func TestWriteRound(t *testing.T) {
g := makePrivateGenerator(t)
var data []byte
writer := bytes.NewBuffer(data)
+ g.WriteBlock(writer, 0)
g.WriteBlock(writer, 1)
var block rpcs.EncodedBlockCert
protocol.Decode(data, &block)
diff --git a/tools/block-generator/generator/server_test.go b/tools/block-generator/generator/server_test.go
index 6e21d77586..7007db00f0 100644
--- a/tools/block-generator/generator/server_test.go
+++ b/tools/block-generator/generator/server_test.go
@@ -53,61 +53,61 @@ func TestParseURL(t *testing.T) {
var testcases = []struct {
name string
url string
- expectedRound string
+ expectedParam string
err string
}{
{
name: "no block",
url: "/v2/blocks/",
- expectedRound: "",
+ expectedParam: "",
err: "invalid request path, /v2/blocks/",
},
{
name: "normal one digit",
url: fmt.Sprintf("%s1", blockQueryPrefix),
- expectedRound: "1",
+ expectedParam: "1",
err: "",
},
{
name: "normal long number",
url: fmt.Sprintf("%s12345678", blockQueryPrefix),
- expectedRound: "12345678",
+ expectedParam: "12345678",
err: "",
},
{
name: "with query parameters",
url: fmt.Sprintf("%s1234?pretty", blockQueryPrefix),
- expectedRound: "1234",
+ expectedParam: "1234",
err: "",
},
{
name: "with query parameters",
url: fmt.Sprintf("%s1234?pretty", blockQueryPrefix),
- expectedRound: "1234",
+ expectedParam: "1234",
err: "",
},
{
name: "no deltas",
url: "/v2/deltas/",
- expectedRound: "",
+ expectedParam: "",
err: "invalid request path, /v2/deltas/",
},
{
name: "deltas",
url: fmt.Sprintf("%s123?Format=msgp", deltaQueryPrefix),
- expectedRound: "123",
+ expectedParam: "123",
err: "",
},
{
name: "no account",
url: "/v2/accounts/",
- expectedRound: "",
+ expectedParam: "",
err: "invalid request path, /v2/accounts/",
},
{
name: "accounts",
url: fmt.Sprintf("%sAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4", accountQueryPrefix),
- expectedRound: "AIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4",
+ expectedParam: "AIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4",
err: "",
},
}
@@ -117,9 +117,9 @@ func TestParseURL(t *testing.T) {
round, err := parseURL(testcase.url)
if len(testcase.err) == 0 {
msg := fmt.Sprintf("Unexpected error parsing '%s', expected round '%s' received error: %v",
- testcase.url, testcase.expectedRound, err)
+ testcase.url, testcase.expectedParam, err)
require.NoError(t, err, msg)
- assert.Equal(t, testcase.expectedRound, round)
+ assert.Equal(t, testcase.expectedParam, round)
} else {
require.Error(t, err, fmt.Sprintf("Expected an error containing: %s", testcase.err))
require.True(t, strings.Contains(err.Error(), testcase.err))
diff --git a/tools/block-generator/metrics/metrics.go b/tools/block-generator/metrics/metrics.go
deleted file mode 100644
index 6c60fbabc2..0000000000
--- a/tools/block-generator/metrics/metrics.go
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright (C) 2019-2023 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see .
-
-package metrics
-
-// Prometheus metrics collected in Conduit.
-const (
- BlockImportTimeName = "import_time_sec"
- ImportedTxnsPerBlockName = "imported_tx_per_block"
- ImportedRoundGaugeName = "imported_round"
- GetAlgodRawBlockTimeName = "get_algod_raw_block_time_sec"
- ImportedTxnsName = "imported_txns"
- ImporterTimeName = "importer_time_sec"
- ProcessorTimeName = "processor_time_sec"
- ExporterTimeName = "exporter_time_sec"
- PipelineRetryCountName = "pipeline_retry_count"
-)
-
-// AllMetricNames is a reference for all the custom metric names.
-var AllMetricNames = []string{
- BlockImportTimeName,
- ImportedTxnsPerBlockName,
- ImportedRoundGaugeName,
- GetAlgodRawBlockTimeName,
- ImporterTimeName,
- ProcessorTimeName,
- ExporterTimeName,
- PipelineRetryCountName,
-}
diff --git a/tools/block-generator/run_generator.sh b/tools/block-generator/run_generator.sh
deleted file mode 100755
index 47e87b2e0d..0000000000
--- a/tools/block-generator/run_generator.sh
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/env bash
-
-# Demonstrate how to run the generator and connect it to indexer.
-
-set -e
-
-POSTGRES_CONTAINER=generator-test-container
-POSTGRES_PORT=15432
-POSTGRES_DATABASE=generator_db
-CONFIG=${1:-"$(dirname $0)/test_config.yml"}
-echo "Using config file: $CONFIG"
-
-function start_postgres() {
- docker rm -f $POSTGRES_CONTAINER > /dev/null 2>&1 || true
-
- # Start postgres container...
- docker run \
- -d \
- --name $POSTGRES_CONTAINER \
- -e POSTGRES_USER=algorand \
- -e POSTGRES_PASSWORD=algorand \
- -e PGPASSWORD=algorand \
- -p $POSTGRES_PORT:5432 \
- postgres
-
- sleep 5
-
- docker exec -it $POSTGRES_CONTAINER psql -Ualgorand -c "create database $POSTGRES_DATABASE"
-}
-
-function shutdown() {
- docker rm -f $POSTGRES_CONTAINER > /dev/null 2>&1 || true
- kill -9 $GENERATOR_PID
-}
-
-trap shutdown EXIT
-
-echo "Building generator."
-pushd $(dirname "$0") > /dev/null
-go build
-cd ../.. > /dev/null
-echo "Building indexer."
-make
-popd
-echo "Starting postgres container."
-start_postgres
-echo "Starting block generator (see generator.log)"
-$(dirname "$0")/block-generator daemon --port 11111 --config "${CONFIG}" &
-GENERATOR_PID=$!
-echo "Starting indexer"
-$(dirname "$0")/../../cmd/algorand-indexer/algorand-indexer daemon \
- -S localhost:8980 \
- --algod-net localhost:11111 \
- --algod-token security-is-our-number-one-priority \
- --metrics-mode VERBOSE \
- -P "host=localhost user=algorand password=algorand dbname=generator_db port=15432 sslmode=disable"
diff --git a/tools/block-generator/run_postgres.sh b/tools/block-generator/run_postgres.sh
index c6a967132b..4817961220 100755
--- a/tools/block-generator/run_postgres.sh
+++ b/tools/block-generator/run_postgres.sh
@@ -4,7 +4,7 @@
# in a debugger. Simply start this script and run with:
# ./block-generator runner \
# -d 5s \
-# -i ./../algorand-indexer/algorand-indexer \
+# -i \
# -c "host=localhost user=algorand password=algorand dbname=algorand port=15432 sslmode=disable" \
# -r results \
# -s scenarios/config.payment.small.yml
diff --git a/tools/block-generator/run_runner.sh b/tools/block-generator/run_runner.sh
index d90749b96b..7f1205cae5 100755
--- a/tools/block-generator/run_runner.sh
+++ b/tools/block-generator/run_runner.sh
@@ -4,10 +4,16 @@
set -e
+CONDUIT_BINARY=$1
+if [ -z "$CONDUIT_BINARY" ]; then
+ echo "path to conduit binary is required"
+ exit 1
+fi
+
POSTGRES_CONTAINER=generator-test-container
POSTGRES_PORT=15432
POSTGRES_DATABASE=generator_db
-CONFIG=${1:-"$(dirname $0)/test_config.yml"}
+CONFIG=${2:-"$(dirname $0)/test_config.yml"}
echo "Using config file: $CONFIG"
function start_postgres() {
@@ -38,15 +44,12 @@ rm -rf OUTPUT_RUN_RUNNER_TEST > /dev/null 2>&1
echo "Building generator."
pushd $(dirname "$0") > /dev/null
go build
-cd ../.. > /dev/null
-echo "Building indexer."
-make
popd
echo "Starting postgres container."
start_postgres
echo "Starting test runner"
$(dirname "$0")/block-generator runner \
- --indexer-binary ../algorand-indexer/algorand-indexer \
+ --conduit-binary "$CONDUIT_BINARY" \
--report-directory OUTPUT_RUN_RUNNER_TEST \
--test-duration 30s \
--log-level trace \
diff --git a/tools/block-generator/run_tests.sh b/tools/block-generator/run_tests.sh
index 0a1c6df49e..5d25d50af4 100755
--- a/tools/block-generator/run_tests.sh
+++ b/tools/block-generator/run_tests.sh
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
CONNECTION_STRING=""
-INDEXER_BINARY=""
+CONDUIT_BINARY=""
REPORT_DIR=""
DURATION="1h"
LOG_LEVEL="error"
@@ -17,7 +17,7 @@ help() {
echo " -r|--report-dir directory where the report should be written."
echo " -d|--duration test duration."
echo " -l|--level log level to pass to Indexer."
- echo " -g|--generator use a different indexer binary to run the generator."
+ echo " -g|--generator block-generator binary to run the generator."
exit
}
@@ -34,7 +34,7 @@ while :; do
shift
;;
-i | --indexer)
- INDEXER_BINARY="${2-}"
+ CONDUIT_BINARY="${2-}"
shift
;;
-r | --report-dir)
@@ -66,7 +66,7 @@ if [ -z "$CONNECTION_STRING" ]; then
exit 1
fi
-if [ -z "$INDEXER_BINARY" ]; then
+if [ -z "$CONDUIT_BINARY" ]; then
echo "Missing required indexer binary parameter (-i / --indexer)."
exit 1
fi
@@ -77,18 +77,17 @@ if [ -z "$SCENARIOS" ]; then
fi
if [ -z "$GENERATOR_BINARY" ]; then
- echo "Using indexer binary for generator, override with (-g / --generator)."
- GENERATOR_BINARY="$INDEXER_BINARY"
+ echo "path to block-generator binary is required"
+ exit 1
fi
-echo "Running with binary: $INDEXER_BINARY"
+echo "Running with binary: $CONDUIT_BINARY"
echo "Report directory: $REPORT_DIR"
echo "Duration: $DURATION"
echo "Log Level: $LOG_LEVEL"
-"$GENERATOR_BINARY" \
- util block-generator runner \
- -i "$INDEXER_BINARY" \
+"$GENERATOR_BINARY" runner \
+ -i "$CONDUIT_BINARY" \
-s "$SCENARIOS" \
-d "$DURATION" \
-c "$CONNECTION_STRING" \
diff --git a/tools/block-generator/runner/metrics_collector.go b/tools/block-generator/runner/metrics_collector.go
index 327ca9e33b..394bf32cba 100644
--- a/tools/block-generator/runner/metrics_collector.go
+++ b/tools/block-generator/runner/metrics_collector.go
@@ -24,6 +24,31 @@ import (
"time"
)
+// Prometheus metrics collected in Conduit.
+const (
+ BlockImportTimeName = "import_time_sec"
+ ImportedTxnsPerBlockName = "imported_tx_per_block"
+ ImportedRoundGaugeName = "imported_round"
+ GetAlgodRawBlockTimeName = "get_algod_raw_block_time_sec"
+ ImportedTxnsName = "imported_txns"
+ ImporterTimeName = "importer_time_sec"
+ ProcessorTimeName = "processor_time_sec"
+ ExporterTimeName = "exporter_time_sec"
+ PipelineRetryCountName = "pipeline_retry_count"
+)
+
+// AllMetricNames is a reference for all the custom metric names.
+var AllMetricNames = []string{
+ BlockImportTimeName,
+ ImportedTxnsPerBlockName,
+ ImportedRoundGaugeName,
+ GetAlgodRawBlockTimeName,
+ ImporterTimeName,
+ ProcessorTimeName,
+ ExporterTimeName,
+ PipelineRetryCountName,
+}
+
// MetricsCollector queries a /metrics endpoint for prometheus style metrics and saves metrics matching a pattern.
type MetricsCollector struct {
// MetricsURL where metrics can be queried.
diff --git a/tools/block-generator/runner/run.go b/tools/block-generator/runner/run.go
index 9604212da1..5e68cd49ae 100644
--- a/tools/block-generator/runner/run.go
+++ b/tools/block-generator/runner/run.go
@@ -17,29 +17,36 @@
package runner
import (
+ "bytes"
"context"
+ // embed conduit template config file
+ _ "embed"
"encoding/json"
"fmt"
"net/http"
"os"
+ "os/exec"
"path"
"path/filepath"
"strconv"
"strings"
+ "text/template"
"time"
"github.com/algorand/go-algorand/tools/block-generator/generator"
- "github.com/algorand/go-algorand/tools/block-generator/metrics"
"github.com/algorand/go-algorand/tools/block-generator/util"
"github.com/algorand/go-deadlock"
)
+//go:embed template/conduit.yml.tmpl
+var conduitConfigTmpl string
+
// Args are all the things needed to run a performance test.
type Args struct {
// Path is a directory when passed to RunBatch, otherwise a file path.
Path string
- IndexerBinary string
- IndexerPort uint64
+ ConduitBinary string
+ MetricsPort uint64
PostgresConnectionString string
CPUProfilePath string
RunDuration time.Duration
@@ -50,7 +57,15 @@ type Args struct {
KeepDataDir bool
}
-// Run is a publi8c helper to run the tests.
+type config struct {
+ LogLevel string
+ LogFile string
+ MetricsPort string
+ AlgodNet string
+ PostgresConnectionString string
+}
+
+// Run is a public helper to run the tests.
// The test will run against the generator configuration file specified by 'args.Path'.
// If 'args.Path' is a directory it should contain generator configuration files, a test will run using each file.
func Run(args Args) error {
@@ -86,8 +101,12 @@ func (r *Args) run() error {
baseName := filepath.Base(r.Path)
baseNameNoExt := strings.TrimSuffix(baseName, filepath.Ext(baseName))
reportfile := path.Join(r.ReportDirectory, fmt.Sprintf("%s.report", baseNameNoExt))
- //logfile := path.Join(r.ReportDirectory, fmt.Sprintf("%s.indexer-log", baseNameNoExt))
+ logfile := path.Join(r.ReportDirectory, fmt.Sprintf("%s.indexer-log", baseNameNoExt))
dataDir := path.Join(r.ReportDirectory, fmt.Sprintf("%s_data", baseNameNoExt))
+ // create the data directory.
+ if err := os.Mkdir(dataDir, os.ModeDir|os.ModePerm); err != nil {
+ return fmt.Errorf("failed to create data directory: %w", err)
+ }
if !r.KeepDataDir {
defer os.RemoveAll(dataDir)
}
@@ -103,7 +122,7 @@ func (r *Args) run() error {
}
// Start services
algodNet := fmt.Sprintf("localhost:%d", 11112)
- indexerNet := fmt.Sprintf("localhost:%d", r.IndexerPort)
+ metricsNet := fmt.Sprintf("localhost:%d", r.MetricsPort)
generatorShutdownFunc, _ := startGenerator(r.Path, algodNet, blockMiddleware)
defer func() {
// Shutdown generator.
@@ -112,16 +131,40 @@ func (r *Args) run() error {
}
}()
- //indexerShutdownFunc, err := startIndexer(dataDir, logfile, r.LogLevel, r.IndexerBinary, algodNet, indexerNet, r.PostgresConnectionString, r.CPUProfilePath)
- //if err != nil {
- // return fmt.Errorf("failed to start indexer: %w", err)
- //}
- //defer func() {
- // // Shutdown indexer
- // if err := indexerShutdownFunc(); err != nil {
- // fmt.Printf("Failed to shutdown indexer: %s\n", err)
- // }
- //}()
+ // get conduit config template
+ t, err := template.New("conduit").Parse(conduitConfigTmpl)
+ if err != nil {
+ return fmt.Errorf("unable to parse conduit config template: %w", err)
+ }
+
+ // create config file in the right data directory
+ f, err := os.Create(path.Join(dataDir, "conduit.yml"))
+ if err != nil {
+ return fmt.Errorf("creating conduit.yml: %v", err)
+ }
+ defer f.Close()
+
+ conduitConfig := config{r.LogLevel, logfile,
+ fmt.Sprintf(":%d", r.MetricsPort),
+ algodNet, r.PostgresConnectionString,
+ }
+
+ err = t.Execute(f, conduitConfig)
+ if err != nil {
+ return fmt.Errorf("execute template file: %v", err)
+ }
+
+ // Start conduit
+ conduitShutdownFunc, err := startConduit(dataDir, r.ConduitBinary)
+ if err != nil {
+ return fmt.Errorf("failed to start Conduit: %w", err)
+ }
+ defer func() {
+ // Shutdown conduit
+ if err := conduitShutdownFunc(); err != nil {
+ fmt.Printf("Failed to shutdown Conduit: %s\n", err)
+ }
+ }()
// Create the report file
report, err := os.Create(reportfile)
@@ -131,7 +174,20 @@ func (r *Args) run() error {
defer report.Close()
// Run the test, collecting results.
- if err := r.runTest(report, indexerNet, algodNet); err != nil {
+ // check /metrics endpoint is available before running the test
+ var resp *http.Response
+ for retry := 0; retry < 10; retry++ {
+ resp, err = http.Get(fmt.Sprintf("http://%s/metrics", metricsNet))
+ if err == nil {
+ resp.Body.Close()
+ break
+ }
+ time.Sleep(3 * time.Second)
+ }
+ if err != nil {
+ return fmt.Errorf("failed to query metrics endpoint: %w", err)
+ }
+ if err = r.runTest(report, metricsNet, algodNet); err != nil {
return err
}
@@ -158,23 +214,23 @@ func recordDataToFile(start time.Time, entry Entry, prefix string, out *os.File)
}
}
- record("_average", metrics.BlockImportTimeName, rate)
- record("_cumulative", metrics.BlockImportTimeName, floatTotal)
- record("_average", metrics.ImportedTxnsPerBlockName, rate)
- record("_cumulative", metrics.ImportedTxnsPerBlockName, intTotal)
- record("", metrics.ImportedRoundGaugeName, intTotal)
+ record("_average", BlockImportTimeName, rate)
+ record("_cumulative", BlockImportTimeName, floatTotal)
+ record("_average", ImportedTxnsPerBlockName, rate)
+ record("_cumulative", ImportedTxnsPerBlockName, intTotal)
+ record("", ImportedRoundGaugeName, intTotal)
if len(writeErrors) > 0 {
return fmt.Errorf("error writing metrics (%s): %w", strings.Join(writeErrors, ", "), writeErr)
}
// Calculate import transactions per second.
- totalTxn, err := getMetric(entry, metrics.ImportedTxnsPerBlockName, false)
+ totalTxn, err := getMetric(entry, ImportedTxnsPerBlockName, false)
if err != nil {
return err
}
- importTimeS, err := getMetric(entry, metrics.BlockImportTimeName, false)
+ importTimeS, err := getMetric(entry, BlockImportTimeName, false)
if err != nil {
return err
}
@@ -266,8 +322,8 @@ func getMetric(entry Entry, suffix string, rateMetric bool) (float64, error) {
}
// Run the test for 'RunDuration', collect metrics and write them to the 'ReportDirectory'
-func (r *Args) runTest(report *os.File, indexerURL string, generatorURL string) error {
- collector := &MetricsCollector{MetricsURL: fmt.Sprintf("http://%s/metrics", indexerURL)}
+func (r *Args) runTest(report *os.File, metricsURL string, generatorURL string) error {
+ collector := &MetricsCollector{MetricsURL: fmt.Sprintf("http://%s/metrics", metricsURL)}
// Run for r.RunDuration
start := time.Now()
@@ -275,12 +331,12 @@ func (r *Args) runTest(report *os.File, indexerURL string, generatorURL string)
for time.Since(start) < r.RunDuration {
time.Sleep(r.RunDuration / 10)
- if err := collector.Collect(metrics.AllMetricNames...); err != nil {
+ if err := collector.Collect(AllMetricNames...); err != nil {
return fmt.Errorf("problem collecting metrics (%d / %s): %w", count, time.Since(start), err)
}
count++
}
- if err := collector.Collect(metrics.AllMetricNames...); err != nil {
+ if err := collector.Collect(AllMetricNames...); err != nil {
return fmt.Errorf("problem collecting final metrics (%d / %s): %w", count, time.Since(start), err)
}
@@ -335,6 +391,7 @@ func startGenerator(configFile string, addr string, blockMiddleware func(http.Ha
// Start the server
go func() {
// always returns error. ErrServerClosed on graceful close
+ fmt.Printf("generator serving on %s\n", server.Addr)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
util.MaybeFail(err, "ListenAndServe() failure to start with config file '%s'", configFile)
}
@@ -350,3 +407,34 @@ func startGenerator(configFile string, addr string, blockMiddleware func(http.Ha
return nil
}, generator
}
+
+// startConduit starts the indexer.
+func startConduit(dataDir string, conduitBinary string) (func() error, error) {
+ cmd := exec.Command(
+ conduitBinary,
+ "-d", dataDir,
+ )
+
+ var stdout bytes.Buffer
+ var stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+
+ if err := cmd.Start(); err != nil {
+ return nil, fmt.Errorf("failure calling Start(): %w", err)
+ }
+ // conduit doesn't have health check endpoint. so, no health check for now
+
+ return func() error {
+ if err := cmd.Process.Signal(os.Interrupt); err != nil {
+ fmt.Printf("failed to kill indexer process: %s\n", err)
+ if err := cmd.Process.Kill(); err != nil {
+ return fmt.Errorf("failed to kill indexer process: %w", err)
+ }
+ }
+ if err := cmd.Wait(); err != nil {
+ fmt.Printf("ignoring error while waiting for process to stop: %s\n", err)
+ }
+ return nil
+ }, nil
+}
diff --git a/tools/block-generator/runner/runner.go b/tools/block-generator/runner/runner.go
index 94df877660..6c653efaec 100644
--- a/tools/block-generator/runner/runner.go
+++ b/tools/block-generator/runner/runner.go
@@ -34,7 +34,7 @@ func init() {
RunnerCmd = &cobra.Command{
Use: "runner",
Short: "Run test suite and collect results.",
- Long: "Run an automated test suite using the block-generator daemon and a provided algorand-indexer binary. Results are captured to a specified output directory.",
+ Long: "Run an automated test suite using the block-generator daemon and a provided conduit binary. Results are captured to a specified output directory.",
Run: func(cmd *cobra.Command, args []string) {
if err := Run(runnerArgs); err != nil {
fmt.Println(err)
@@ -43,12 +43,12 @@ func init() {
}
RunnerCmd.Flags().StringVarP(&runnerArgs.Path, "scenario", "s", "", "Directory containing scenarios, or specific scenario file.")
- RunnerCmd.Flags().StringVarP(&runnerArgs.IndexerBinary, "indexer-binary", "i", "", "Path to indexer binary.")
- RunnerCmd.Flags().Uint64VarP(&runnerArgs.IndexerPort, "indexer-port", "p", 4010, "Port to start the server at. This is useful if you have a prometheus server for collecting additional data.")
+ RunnerCmd.Flags().StringVarP(&runnerArgs.ConduitBinary, "conduit-binary", "i", "", "Path to conduit binary.")
+ RunnerCmd.Flags().Uint64VarP(&runnerArgs.MetricsPort, "metrics-port", "p", 9999, "Port to start the metrics server at.")
RunnerCmd.Flags().StringVarP(&runnerArgs.PostgresConnectionString, "postgres-connection-string", "c", "", "Postgres connection string.")
RunnerCmd.Flags().DurationVarP(&runnerArgs.RunDuration, "test-duration", "d", 5*time.Minute, "Duration to use for each scenario.")
RunnerCmd.Flags().StringVarP(&runnerArgs.ReportDirectory, "report-directory", "r", "", "Location to place test reports.")
- RunnerCmd.Flags().StringVarP(&runnerArgs.LogLevel, "log-level", "l", "error", "LogLevel to use when starting Indexer. [error, warn, info, debug, trace]")
+ RunnerCmd.Flags().StringVarP(&runnerArgs.LogLevel, "log-level", "l", "error", "LogLevel to use when starting Indexer. [panic, fatal, error, warn, info, debug, trace]")
RunnerCmd.Flags().StringVarP(&runnerArgs.CPUProfilePath, "cpuprofile", "", "", "Path where Indexer writes its CPU profile.")
RunnerCmd.Flags().BoolVarP(&runnerArgs.ResetReportDir, "reset", "", false, "If set any existing report directory will be deleted before running tests.")
RunnerCmd.Flags().BoolVarP(&runnerArgs.RunValidation, "validate", "", false, "If set the validator will run after test-duration has elapsed to verify data is correct. An extra line in each report indicates validator success or failure.")
diff --git a/tools/block-generator/runner/template/conduit.yml.tmpl b/tools/block-generator/runner/template/conduit.yml.tmpl
new file mode 100644
index 0000000000..c361426ee6
--- /dev/null
+++ b/tools/block-generator/runner/template/conduit.yml.tmpl
@@ -0,0 +1,61 @@
+# Log verbosity: PANIC, FATAL, ERROR, WARN, INFO, DEBUG, TRACE
+log-level: {{.LogLevel}}
+
+# If no log file is provided logs are written to stdout.
+log-file: {{.LogFile}}
+
+# Number of retries to perform after a pipeline plugin error.
+retry-count: 10
+
+# Time duration to wait between retry attempts.
+retry-delay: "1s"
+
+# Optional filepath to use for pidfile.
+#pid-filepath: /path/to/pidfile
+
+# Whether or not to print the conduit banner on startup.
+hide-banner: false
+
+# When enabled prometheus metrics are available on '/metrics'
+metrics:
+ mode: ON
+ addr: "{{.MetricsPort}}"
+ prefix: "conduit"
+
+# The importer is typically an algod follower node.
+importer:
+ name: algod
+ config:
+ # The mode of operation, either "archival" or "follower".
+ # * archival mode allows you to start processing on any round but does not
+ # contain the ledger state delta objects required for the postgres writer.
+ # * follower mode allows you to use a lightweight non-archival node as the
+ # data source. In addition, it will provide ledger state delta objects to
+ # the processors and exporter.
+ mode: "follower"
+
+ # Algod API address.
+ netaddr: "{{.AlgodNet}}"
+
+ # Algod API token.
+ token: ""
+
+
+# Zero or more processors may be defined to manipulate what data
+# reaches the exporter.
+processors:
+
+# An exporter is defined to do something with the data.
+exporter:
+ name: postgresql
+ config:
+ # Pgsql connection string
+ # See https://github.com/jackc/pgconn for more details
+ connection-string: "{{ .PostgresConnectionString }}"
+
+ # Maximum connection number for connection pool
+ # This means the total number of active queries that can be running
+ # concurrently can never be more than this
+ max-conn: 20
+
+