Skip to content

Commit

Permalink
Refactor StartHorizon() function (stellar#5020)
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Aug 18, 2023
1 parent 283b38d commit 7ad41e0
Showing 1 changed file with 129 additions and 91 deletions.
220 changes: 129 additions & 91 deletions services/horizon/internal/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/jhttp"
"github.com/spf13/cobra"
"github.com/stellar/go/support/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -39,6 +40,7 @@ import (
const (
StandaloneNetworkPassphrase = "Standalone Network ; February 2017"
stellarCorePostgresPassword = "mysecretpassword"
horizonDefaultPort = "8000"
adminPort = 6060
stellarCorePort = 11626
stellarCorePostgresPort = 5641
Expand Down Expand Up @@ -336,15 +338,119 @@ func (i *Test) Shutdown() {
})
}

// StartHorizon initializes and starts the Horizon client-facing API server and the ingest server.
func (i *Test) StartHorizon() error {
postgres := dbtest.Postgres(i.t)
i.shutdownCalls = append(i.shutdownCalls, func() {
i.StopHorizon()
postgres.Close()
})

// To facilitate custom runs of Horizon, we merge a default set of
// parameters with the tester-supplied ones (if any).
mergedWebArgs := MergeMaps(i.getDefaultWebArgs(postgres), i.config.HorizonWebParameters)
webArgs := mapToFlags(mergedWebArgs)
i.t.Log("Horizon command line webArgs:", webArgs)

mergedIngestArgs := MergeMaps(i.getDefaultIngestArgs(postgres), i.config.HorizonIngestParameters)
ingestArgs := mapToFlags(mergedIngestArgs)
i.t.Log("Horizon command line ingestArgs:", ingestArgs)

// setup Horizon web command
var err error
webConfig, webConfigOpts := horizon.Flags()
webCmd := i.createWebCommand(webConfig, webConfigOpts)
webCmd.SetArgs(webArgs)
if err = webConfigOpts.Init(webCmd); err != nil {
return errors.Wrap(err, "cannot initialize params")
}

// setup Horizon ingest command
ingestConfig, ingestConfigOpts := horizon.Flags()
ingestCmd := i.createIngestCommand(ingestConfig, ingestConfigOpts)
ingestCmd.SetArgs(ingestArgs)
if err = ingestConfigOpts.Init(ingestCmd); err != nil {
return errors.Wrap(err, "cannot initialize params")
}

if err = i.initializeEnvironmentVariables(); err != nil {
return err
}

if err = ingestCmd.Execute(); err != nil {
return errors.Wrap(err, HorizonInitErrStr)
}

if err = webCmd.Execute(); err != nil {
return errors.Wrap(err, HorizonInitErrStr)
}

// Set up Horizon clients
i.setupHorizonClient(mergedWebArgs)
if err = i.setupHorizonAdminClient(mergedIngestArgs); err != nil {
return err
}

i.horizonIngestConfig = *ingestConfig

i.appStopped = &sync.WaitGroup{}
i.appStopped.Add(2)
go func() {
_ = i.ingestNode.Serve()
i.appStopped.Done()
}()
go func() {
_ = i.webNode.Serve()
i.appStopped.Done()
}()

return nil
}

func (i *Test) getDefaultArgs(postgres *dbtest.DB) map[string]string {
// TODO: Ideally, we'd be pulling host/port information from the Docker
// Compose YAML file itself rather than hardcoding it.
return map[string]string{
"ingest": "false",
"history-archive-urls": fmt.Sprintf("http://%s:%d", "localhost", historyArchivePort),
"db-url": postgres.RO_DSN,
"stellar-core-url": i.coreClient.URL,
"network-passphrase": i.passPhrase,
"apply-migrations": "true",
"enable-captive-core-ingestion": "false",
"port": horizonDefaultPort,
// due to ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING
"checkpoint-frequency": "8",
"per-hour-rate-limit": "0", // disable rate limiting
"max-db-connections": "50", // the postgres container supports 100 connections, be conservative
}
}

func (i *Test) getDefaultWebArgs(postgres *dbtest.DB) map[string]string {
return MergeMaps(i.getDefaultArgs(postgres), map[string]string{"admin-port": "0"})
}

func (i *Test) getDefaultIngestArgs(postgres *dbtest.DB) map[string]string {
return MergeMaps(i.getDefaultArgs(postgres), map[string]string{
"admin-port": strconv.Itoa(i.AdminPort()),
"port": "8001",
"enable-captive-core-ingestion": strconv.FormatBool(len(i.coreConfig.binaryPath) > 0),
"db-url": postgres.DSN,
"stellar-core-db-url": fmt.Sprintf(
"postgres://postgres:%s@%s:%d/stellar?sslmode=disable",
stellarCorePostgresPassword,
"localhost",
stellarCorePostgresPort,
),
"stellar-core-binary-path": i.coreConfig.binaryPath,
"captive-core-config-path": i.coreConfig.configPath,
"captive-core-http-port": "21626",
"captive-core-use-db": strconv.FormatBool(i.coreConfig.useDB),
"captive-core-storage-path": i.coreConfig.storagePath,
"ingest": "true"})
}

func (i *Test) createWebCommand(webConfig *horizon.Config, webConfigOpts config.ConfigOptions) *cobra.Command {
webCmd := &cobra.Command{
Use: "horizon",
Short: "Client-facing API server for the Stellar network",
Expand All @@ -359,7 +465,10 @@ func (i *Test) StartHorizon() error {
}
},
}
return webCmd
}

func (i *Test) createIngestCommand(ingestConfig *horizon.Config, ingestConfigOpts config.ConfigOptions) *cobra.Command {
ingestCmd := &cobra.Command{
Use: "horizon",
Short: "Ingest of Stellar network",
Expand All @@ -373,68 +482,16 @@ func (i *Test) StartHorizon() error {
return err
},
}
return ingestCmd
}

// To facilitate custom runs of Horizon, we merge a default set of
// parameters with the tester-supplied ones (if any).
//
// TODO: Ideally, we'd be pulling host/port information from the Docker
// Compose YAML file itself rather than hardcoding it.
hostname := "localhost"
coreBinaryPath := i.coreConfig.binaryPath
captiveCoreConfigPath := i.coreConfig.configPath
captiveCoreUseDB := strconv.FormatBool(i.coreConfig.useDB)
captiveCoreStoragePath := i.coreConfig.storagePath

defaultArgs := map[string]string{
"ingest": "false",
"history-archive-urls": fmt.Sprintf("http://%s:%d", hostname, historyArchivePort),
"db-url": postgres.RO_DSN,
"stellar-core-url": i.coreClient.URL,
"network-passphrase": i.passPhrase,
"apply-migrations": "true",
"enable-captive-core-ingestion": "false",
"port": "8000",
// due to ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING
"checkpoint-frequency": "8",
"per-hour-rate-limit": "0", // disable rate limiting
"max-db-connections": "50", // the postgres container supports 100 connections, be conservative
}

merged := MergeMaps(defaultArgs, i.config.HorizonWebParameters, map[string]string{"admin-port": "0"})
webArgs := mapToFlags(merged)
mergedIngest := MergeMaps(defaultArgs,
map[string]string{
"admin-port": strconv.Itoa(i.AdminPort()),
"port": "8001",
"enable-captive-core-ingestion": strconv.FormatBool(len(coreBinaryPath) > 0),
"db-url": postgres.DSN,
"stellar-core-db-url": fmt.Sprintf(
"postgres://postgres:%s@%s:%d/stellar?sslmode=disable",
stellarCorePostgresPassword,
hostname,
stellarCorePostgresPort,
),
"stellar-core-binary-path": coreBinaryPath,
"captive-core-config-path": captiveCoreConfigPath,
"captive-core-http-port": "21626",
"captive-core-use-db": captiveCoreUseDB,
"captive-core-storage-path": captiveCoreStoragePath,
"ingest": "true"},
i.config.HorizonIngestParameters)
ingestArgs := mapToFlags(mergedIngest)

// initialize core arguments
i.t.Log("Horizon command line webArgs:", webArgs)
i.t.Log("Horizon command line ingestArgs:", ingestArgs)
func (i *Test) initializeEnvironmentVariables() error {
var env strings.Builder
for key, value := range i.config.HorizonEnvironment {
env.WriteString(fmt.Sprintf("%s=%s ", key, value))
}
i.t.Logf("Horizon environmental variables: %s\n", env.String())

webCmd.SetArgs(webArgs)
ingestCmd.SetArgs(ingestArgs)

// prepare env
for key, value := range i.config.HorizonEnvironment {
innerErr := i.environment.Add(key, value)
Expand All @@ -443,54 +500,35 @@ func (i *Test) StartHorizon() error {
"failed to set envvar (%s=%s)", key, value))
}
}
return nil
}

var err error
if err = webConfigOpts.Init(webCmd); err != nil {
return errors.Wrap(err, "cannot initialize params")
}
if err = ingestConfigOpts.Init(ingestCmd); err != nil {
return errors.Wrap(err, "cannot initialize params")
}

if err = ingestCmd.Execute(); err != nil {
return errors.Wrap(err, HorizonInitErrStr)
}

if err = webCmd.Execute(); err != nil {
return errors.Wrap(err, HorizonInitErrStr)
}

horizonPort := "8000"
if port, ok := merged["port"]; ok {
horizonPort = port
}
func (i *Test) setupHorizonAdminClient(ingestArgs map[string]string) error {
adminPort := uint16(i.AdminPort())
if port, ok := mergedIngest["admin-port"]; ok {
if port, ok := ingestArgs["admin-port"]; ok {
if cmdAdminPort, parseErr := strconv.ParseInt(port, 0, 16); parseErr == nil {
adminPort = uint16(cmdAdminPort)
}
}
i.horizonIngestConfig = *ingestConfig
i.horizonClient = &sdk.Client{
HorizonURL: fmt.Sprintf("http://%s:%s", hostname, horizonPort),
}

var err error
i.horizonAdminClient, err = sdk.NewAdminClient(adminPort, "", 0)
if err != nil {
return errors.Wrap(err, "cannot initialize Horizon admin client")
}
return nil
}

i.appStopped = &sync.WaitGroup{}
i.appStopped.Add(2)
go func() {
i.ingestNode.Serve()
i.appStopped.Done()
}()
go func() {
i.webNode.Serve()
i.appStopped.Done()
}()
func (i *Test) setupHorizonClient(webArgs map[string]string) {
hostname := "localhost"
horizonPort := horizonDefaultPort
if port, ok := webArgs["port"]; ok {
horizonPort = port
}

return nil
i.horizonClient = &sdk.Client{
HorizonURL: fmt.Sprintf("http://%s:%s", hostname, horizonPort),
}
}

const maxWaitForCoreStartup = 30 * time.Second
Expand Down

0 comments on commit 7ad41e0

Please sign in to comment.