Skip to content

Commit

Permalink
Implement relayer runner (#17)
Browse files Browse the repository at this point in the history
* Implement relayer runner which aggregates all relayer services
* Implement runner init/read config functions
  • Loading branch information
dzmitryhil authored Sep 28, 2023
1 parent 4259c7c commit c1c302f
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 40 deletions.
2 changes: 1 addition & 1 deletion integration-tests/client/xrpl_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/CoreumFoundation/coreum-tools/pkg/http"
integrationtests "github.com/CoreumFoundation/coreumbridge-xrpl/integration-tests"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/client/xrpl"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl"
)

func TestFullHistoryScanAccountTx(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
if err != nil {
panic(errors.WithStack(err))
}
log := logger.NewZapLogger(zapDevLogger)
log := logger.NewZapLoggerFromLogger(zapDevLogger)
chains.Log = log

coreumChain, err := NewCoreumChain(coreumCfg)
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/xrpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
coreumapp "github.com/CoreumFoundation/coreum/v3/app"
creumconfig "github.com/CoreumFoundation/coreum/v3/pkg/config"
coreumkeyring "github.com/CoreumFoundation/coreum/v3/pkg/keyring"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/client/xrpl"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion relayer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/samber/lo v1.38.1
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.23.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down Expand Up @@ -198,7 +199,6 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
pgregory.net/rapid v0.5.5 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
66 changes: 64 additions & 2 deletions relayer/logger/zap.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,71 @@
package logger

import (
"strings"

"github.com/pkg/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var _ Logger = &ZapLogger{}

// ZapLoggerConfig is ZapLogger config.
type ZapLoggerConfig struct {
Level string
Format string
}

// DefaultZapLoggerConfig returns default ZapLoggerConfig.
func DefaultZapLoggerConfig() ZapLoggerConfig {
return ZapLoggerConfig{
Level: "info",
Format: "console",
}
}

// ZapLogger is logger wrapper with an ability to add error logs metric record.
type ZapLogger struct {
zapLogger *zap.Logger
}

// NewZapLogger returns a new instance of the ZapLogger.
func NewZapLogger(zapLogger *zap.Logger) *ZapLogger {
// NewZapLoggerFromLogger returns a new instance of the ZapLogger.
func NewZapLoggerFromLogger(zapLogger *zap.Logger) *ZapLogger {
return &ZapLogger{
zapLogger: zapLogger,
}
}

// NewZapLogger creates a new instance of the zap.Logger with .
func NewZapLogger(cfg ZapLoggerConfig) (*ZapLogger, error) {
logLevel, err := stringToLoggerLevel(cfg.Level)
if err != nil {
return nil, err
}

encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder

zapCfg := zap.Config{
Level: zap.NewAtomicLevelAt(logLevel),
Development: false,
Encoding: cfg.Format,
EncoderConfig: encoderConfig,
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}

zapLogger, err := zapCfg.Build(zap.AddCaller(), zap.AddCallerSkip(1), zap.AddStacktrace(zapcore.ErrorLevel))
if err != nil {
return nil, errors.Wrapf(err, "failed to build zap logger form the config, config:%+v", zapCfg)
}

return &ZapLogger{
zapLogger: zapLogger,
}, nil
}

// Debug logs a message at DebugLevel. The message includes any fields passed at the log site, as well as any fields
// accumulated on the logger.
func (z ZapLogger) Debug(msg string, fields ...Field) {
Expand Down Expand Up @@ -55,3 +101,19 @@ func filedToZapFiled(fields ...Field) []zap.Field {
}
})
}

// stringToLoggerLevel converts the string level to zapcore.Level.
func stringToLoggerLevel(level string) (zapcore.Level, error) {
switch strings.ToLower(level) {
case "debug":
return zapcore.DebugLevel, nil
case "info":
return zapcore.InfoLevel, nil
case "warn":
return zapcore.WarnLevel, nil
case "error":
return zapcore.ErrorLevel, nil
default:
return 0, errors.Errorf("unknown log level: %q", level)
}
}
185 changes: 185 additions & 0 deletions relayer/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//nolint:tagliatelle // yaml naming
package runner

import (
"io"
"os"
"path/filepath"
"time"

"github.com/pkg/errors"
rippledata "github.com/rubblelabs/ripple/data"
"gopkg.in/yaml.v3"

toolshttp "github.com/CoreumFoundation/coreum-tools/pkg/http"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl"
)

const (
configVersion = "v1"
configFileName = "relayer.yaml"
)

// LoggingConfig is logging config.
type LoggingConfig struct {
Level string `yaml:"level"`
Format string `yaml:"format"`
}

// HTTPClientConfig is http client config.
type HTTPClientConfig struct {
RequestTimeout time.Duration `yaml:"request_timeout"`
DoTimeout time.Duration `yaml:"do_timeout"`
RetryDelay time.Duration `yaml:"retry_delay"`
}

// XRPLRPCConfig is XRPL RPC config.
type XRPLRPCConfig struct {
URL string `yaml:"url"`
PageLimit uint32 `yaml:"page_limit"`
}

// XRPLScannerConfig is XRPL scanner config.
type XRPLScannerConfig struct {
RecentScanEnabled bool `yaml:"recent_scan_enabled"`
RecentScanWindow int64 `yaml:"recent_scan_window"`
RepeatRecentScan bool `yaml:"repeat_recent_scan"`

FullScanEnabled bool `yaml:"full_scan_enabled"`
RepeatFullScan bool `yaml:"repeat_full_scan"`

RetryDelay time.Duration `yaml:"retry_delay"`
}

// XRPLConfig is XRPL config.
type XRPLConfig struct {
BridgeAccount string `yaml:"bridge_account"`
RPC XRPLRPCConfig `yaml:"rpc"`
Scanner XRPLScannerConfig `yaml:"scanner"`
}

// Config is runner config.
type Config struct {
Version string `yaml:"version"`
LoggingConfig `yaml:"logging"`
HTTPClient HTTPClientConfig `yaml:"http_client"`
XRPL XRPLConfig `yaml:"xrpl"`
}

// DefaultConfig returns default runner config.
func DefaultConfig() Config {
defaultXRPLRPCConfig := xrpl.DefaultRPCClientConfig("")
defaultXRPLAccountScannerCfg := xrpl.DefaultAccountScannerConfig(rippledata.Account{})
return Config{
Version: configVersion,
LoggingConfig: LoggingConfig(logger.DefaultZapLoggerConfig()),
HTTPClient: HTTPClientConfig(toolshttp.DefaultClientConfig()),
XRPL: XRPLConfig{
// empty be default
BridgeAccount: "",
RPC: XRPLRPCConfig{
// empty be default
URL: "",
PageLimit: defaultXRPLRPCConfig.PageLimit,
},
Scanner: XRPLScannerConfig{
RecentScanEnabled: defaultXRPLAccountScannerCfg.RecentScanEnabled,
RecentScanWindow: defaultXRPLAccountScannerCfg.RecentScanWindow,
RepeatRecentScan: defaultXRPLAccountScannerCfg.RepeatRecentScan,
FullScanEnabled: defaultXRPLAccountScannerCfg.FullScanEnabled,
RepeatFullScan: defaultXRPLAccountScannerCfg.RepeatFullScan,
RetryDelay: defaultXRPLAccountScannerCfg.RetryDelay,
},
},
}
}

// Runner is relayer runner which aggregates all relayer components.
type Runner struct {
Log *logger.ZapLogger
RetryableHTTPClient *toolshttp.RetryableClient
XRPLRPCClient *xrpl.RPCClient
XRPLAccountScanner *xrpl.AccountScanner
}

// NewRunner return new runner from the config.
func NewRunner(cfg Config) (*Runner, error) {
zapLogger, err := logger.NewZapLogger(logger.ZapLoggerConfig(cfg.LoggingConfig))
if err != nil {
return nil, err
}
retryableHTTPClient := toolshttp.NewRetryableClient(toolshttp.RetryableClientConfig(cfg.HTTPClient))

// XRPL
xrplRPCClientCfg := xrpl.RPCClientConfig(cfg.XRPL.RPC)
xrplRPCClient := xrpl.NewRPCClient(xrplRPCClientCfg, zapLogger, retryableHTTPClient)
xrplAccount, err := rippledata.NewAccountFromAddress(cfg.XRPL.BridgeAccount)
if err != nil {
return nil, errors.Wrapf(err, "failed to get xrpl account from string, string:%s", cfg.XRPL.BridgeAccount)
}
xrplScanner := xrpl.NewAccountScanner(xrpl.AccountScannerConfig{
Account: *xrplAccount,
RecentScanEnabled: cfg.XRPL.Scanner.RecentScanEnabled,
RecentScanWindow: cfg.XRPL.Scanner.RecentScanWindow,
RepeatRecentScan: cfg.XRPL.Scanner.RepeatRecentScan,
FullScanEnabled: cfg.XRPL.Scanner.FullScanEnabled,
RepeatFullScan: cfg.XRPL.Scanner.RepeatFullScan,
RetryDelay: cfg.XRPL.Scanner.RetryDelay,
}, zapLogger, xrplRPCClient)

return &Runner{
Log: zapLogger,
RetryableHTTPClient: &retryableHTTPClient,
XRPLRPCClient: xrplRPCClient,
XRPLAccountScanner: xrplScanner,
}, nil
}

// InitConfig creates config yaml file.
func InitConfig(homePath string, cfg Config) error {
path := buildFilePath(homePath)
if _, err := os.Stat(path); !errors.Is(err, os.ErrNotExist) {
return errors.Errorf("failed to initi config, file already exists, path:%s", path)
}

file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
if err != nil {
return errors.Wrapf(err, "failed to create config file, path:%s", path)
}
defer file.Close()
yamlStringConfig, err := yaml.Marshal(cfg)
if err != nil {
return errors.Wrap(err, "failed convert default config to yaml")
}
if _, err := file.Write(yamlStringConfig); err != nil {
return errors.Wrapf(err, "failed to write yaml config file, path:%s", path)
}

return nil
}

// ReadConfig reads config yaml file.
func ReadConfig(homePath string) (Config, error) {
path := buildFilePath(homePath)
file, err := os.OpenFile(path, os.O_RDONLY, 0o600)
defer file.Close() //nolint:staticcheck //we accept the error ignoring
if errors.Is(err, os.ErrNotExist) {
return Config{}, errors.Errorf("config file does not exist, path:%s", path)
}
fileBytes, err := io.ReadAll(file)
if err != nil {
return Config{}, errors.Wrapf(err, "failed to read bytes from file does not exist, path:%s", path)
}

var config Config
if err := yaml.Unmarshal(fileBytes, &config); err != nil {
return Config{}, errors.Wrapf(err, "failed to unmarshal file to yaml, path:%s", path)
}

return config, nil
}

func buildFilePath(homePath string) string {
return filepath.Join(homePath, configFileName)
}
63 changes: 63 additions & 0 deletions relayer/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package runner_test

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/runner"
)

func TestInitAndReadConfig(t *testing.T) {
t.Parallel()

defaultCfg := runner.DefaultConfig()
yamlStringConfig, err := yaml.Marshal(defaultCfg)
require.NoError(t, err)
require.Equal(t, getDefaultConfigString(), string(yamlStringConfig))
// create temp dir to store the config
tempDir := t.TempDir()

// try to read none-existing config
_, err = runner.ReadConfig(tempDir)
require.Error(t, err)

// init the config first time
require.NoError(t, runner.InitConfig(tempDir, defaultCfg))

// try to init the config second time
require.Error(t, runner.InitConfig(tempDir, defaultCfg))

// read config
readConfig, err := runner.ReadConfig(tempDir)
require.NoError(t, err)
require.Error(t, runner.InitConfig(tempDir, defaultCfg))

require.Equal(t, defaultCfg, readConfig)
}

// the func returns the default config snapshot.
func getDefaultConfigString() string {
return `version: v1
logging:
level: info
format: console
http_client:
request_timeout: 5s
do_timeout: 30s
retry_delay: 300ms
xrpl:
bridge_account: ""
rpc:
url: ""
page_limit: 100
scanner:
recent_scan_enabled: true
recent_scan_window: 10000
repeat_recent_scan: true
full_scan_enabled: true
repeat_full_scan: true
retry_delay: 10s
`
}
Loading

0 comments on commit c1c302f

Please sign in to comment.