Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fidelity bonds foundation: comms, account tiering, server tracking #1819

Merged
merged 6 commits into from Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -10,6 +10,8 @@ debug
/vendor/
*.orig
*.pprof
go.work
go.work.sum
.DS_Store
\.project
dex*key
Expand All @@ -27,6 +29,7 @@ client/cmd/mmbot/mmbot
docs/examples/rpcclient/rpcclient
dex/testing/loadbot/loadbot
bin/
bin-v*/
client/webserver/site/template-builder/template-builder
dex/testing/btc/harnesschain.tar.gz
client/asset/btc/electrum/example/server/server
Expand Down
58 changes: 32 additions & 26 deletions client/core/bookie.go
Expand Up @@ -814,38 +814,43 @@ func handleTradeResumptionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
return nil
}

func (dc *dexConnection) apiVersion() int32 {
return atomic.LoadInt32(&dc.apiVer)
}

// refreshServerConfig fetches and replaces server configuration data. It also
// initially checks that a server's API version is one of serverAPIVers.
func (dc *dexConnection) refreshServerConfig() error {
func (dc *dexConnection) refreshServerConfig() (*msgjson.ConfigResult, error) {
// Fetch the updated DEX configuration.
cfg := new(msgjson.ConfigResult)
err := sendRequest(dc.WsConn, msgjson.ConfigRoute, nil, cfg, DefaultResponseTimeout)
if err != nil {
return fmt.Errorf("unable to fetch server config: %w", err)
return nil, fmt.Errorf("unable to fetch server config: %w", err)
}

// (V0PURGE) Infer if the server advertises API v0 but supports the upcoming
// v1 routes such as postbond. Server can only advertise a single version
// presently so we have to detect this indirectly.
apiVer := int32(cfg.APIVersion)
if apiVer == 0 && cfg.BondExpiry > 0 {
apiVer = 1
}
Comment on lines +834 to 837
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move to a semver scheme or maybe a bitmask.

type APISupportFlag uint64

const (
	APISupportV0 APISupportFlag = 1 << iota
	APISupportBondsV0
)

type ConfigResult struct {
    APIVersion       uint64    `json:"apiver"`
    ...
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still breaks v0 clients, but that's something to consider.

dc.log.Infof("Server %v supports API version %v.", dc.acct.host, apiVer)
atomic.StoreInt32(&dc.apiVer, apiVer)

// Check that we are able to communicate with this DEX.
apiVer := atomic.LoadInt32(&dc.apiVer)
cfgAPIVer := int32(cfg.APIVersion)

if apiVer != cfgAPIVer {
if found := func() bool {
for _, version := range serverAPIVers {
ver := int32(version)
if cfgAPIVer == ver {
dc.log.Debugf("Setting server api version to %v.", ver)
atomic.StoreInt32(&dc.apiVer, ver)
return true
}
}
return false
}(); !found {
err := fmt.Errorf("unknown server API version: %v", cfgAPIVer)
if cfgAPIVer > apiVer {
err = fmt.Errorf("%v: %w", err, outdatedClientErr)
}
return err
var supported bool
for _, ver := range supportedAPIVers {
if apiVer == ver {
supported = true
}
}
if !supported {
err := fmt.Errorf("unsupported server API version %v", apiVer)
if apiVer > supportedAPIVers[len(supportedAPIVers)-1] {
err = fmt.Errorf("%v: %w", err, outdatedClientErr)
}
return nil, err
}

bTimeout := time.Millisecond * time.Duration(cfg.BroadcastTimeout)
Expand All @@ -854,6 +859,7 @@ func (dc *dexConnection) refreshServerConfig() error {
if dc.ticker.Dur() != tickInterval {
dc.ticker.Reset(tickInterval)
}

getAsset := func(id uint32) *msgjson.Asset {
for _, asset := range cfg.Assets {
if id == asset.ID {
Expand Down Expand Up @@ -916,10 +922,10 @@ func (dc *dexConnection) refreshServerConfig() error {

assets, epochs, err := generateDEXMaps(dc.acct.host, cfg)
if err != nil {
return fmt.Errorf("inconsistent 'config' response: %w", err)
return nil, fmt.Errorf("inconsistent 'config' response: %w", err)
}

// Update dc.{marketMap,epoch,assets}
// Update dc.{epoch,assets}
dc.assetsMtx.Lock()
dc.assets = assets
dc.assetsMtx.Unlock()
Expand All @@ -932,15 +938,15 @@ func (dc *dexConnection) refreshServerConfig() error {
if dc.acct.dexPubKey == nil && len(cfg.DEXPubKey) > 0 {
dc.acct.dexPubKey, err = secp256k1.ParsePubKey(cfg.DEXPubKey)
if err != nil {
return fmt.Errorf("error decoding secp256k1 PublicKey from bytes: %w", err)
return nil, fmt.Errorf("error decoding secp256k1 PublicKey from bytes: %w", err)
}
}

dc.epochMtx.Lock()
dc.epoch = epochs
dc.epochMtx.Unlock()

return nil
return cfg, nil
}

// subPriceFeed subscribes to the price_feed notification feed and primes the
Expand Down
106 changes: 77 additions & 29 deletions client/core/core.go
Expand Up @@ -74,13 +74,13 @@ var (
// When waiting for a wallet to sync, a SyncStatus check will be performed
// every syncTickerPeriod. var instead of const for testing purposes.
syncTickerPeriod = 3 * time.Second
// serverAPIVers are the DEX server API versions this client is capable
// supportedAPIVers are the DEX server API versions this client is capable
// of communicating with.
//
// NOTE: API version may change at any time. Keep this in mind when
// updating the API. Long-running operations may start and end with
// differing versions.
serverAPIVers = []int{serverdex.PreAPIVersion}
supportedAPIVers = []int32{serverdex.PreAPIVersion, serverdex.BondAPIVersion}
// ActiveOrdersLogoutErr is returned from logout when there are active
// orders.
ActiveOrdersLogoutErr = errors.New("cannot log out with active orders")
Expand Down Expand Up @@ -151,6 +151,8 @@ type dexConnection struct {
// connectionStatus is a best guess on the ws connection status.
connectionStatus uint32

// pendingFee is deprecated, and will be removed when v0 API support is
// dropped in favor of v1 with bonds. (V0PURGE)
pendingFeeMtx sync.RWMutex
pendingFee *pendingFeeState

Expand Down Expand Up @@ -395,10 +397,16 @@ func (dc *dexConnection) exchangeInfo() *Exchange {
Host: dc.acct.host,
AcctID: acctID,
ConnectionStatus: dc.status(),
PendingFee: dc.getPendingFee(),
PendingFee: dc.getPendingFee(), // V0PURGE - deprecated with bonds in v1
}
}

bondAssets := make(map[string]*BondAsset, len(cfg.BondAssets))
for symb, bondAsset := range cfg.BondAssets {
coreBondAsset := BondAsset(*bondAsset) // convert msgjson.BondAsset to core.BondAsset
bondAssets[symb] = &coreBondAsset
}

dc.assetsMtx.RLock()
assets := make(map[uint32]*dex.Asset, len(dc.assets))
for assetID, dexAsset := range dc.assets {
Expand All @@ -416,6 +424,7 @@ func (dc *dexConnection) exchangeInfo() *Exchange {
}
dcrAsset := feeAssets["dcr"]
if dcrAsset == nil { // should have happened in refreshServerConfig
// V0PURGE
dcrAsset = &FeeAsset{
ID: 42,
Amt: cfg.Fee,
Expand All @@ -424,16 +433,29 @@ func (dc *dexConnection) exchangeInfo() *Exchange {
feeAssets["dcr"] = dcrAsset
}

dc.acct.authMtx.RLock()
// TODO: List bonds in core.Exchange. For now, just tier.
// bondsPending := len(dc.acct.pendingBonds) > 0
tier := dc.acct.tier
dc.acct.authMtx.RUnlock()

return &Exchange{
Host: dc.acct.host,
AcctID: acctID,
Markets: dc.marketMap(),
Assets: assets,
BondExpiry: cfg.BondExpiry,
BondAssets: bondAssets,
ConnectionStatus: dc.status(),
Fee: dcrAsset,
RegFees: feeAssets,
PendingFee: dc.getPendingFee(),
CandleDurs: cfg.BinSizes,
Tier: tier,
BondsPending: false,
// TODO: Bonds

// Legacy reg fee (V0PURGE)
Fee: dcrAsset,
RegFees: feeAssets,
PendingFee: dc.getPendingFee(),
}
}

Expand Down Expand Up @@ -3439,20 +3461,24 @@ func (c *Core) discoverAccount(dc *dexConnection, crypter encrypt.Crypter) (bool
}
return false, newError(authErr, "unexpected authDEX error: %w", err)
}
if dc.acct.isSuspended {
c.log.Infof("HD account key for %s was reported as suspended. Deriving another account key.", dc.acct.host)
// do not skip key if tier is 0 and bonds will be used
if dc.acct.tier < 0 || (dc.acct.tier < 1 && dc.apiVersion() < serverdex.BondAPIVersion) {
c.log.Infof("HD account key for %s has tier %d (not able to trade). Deriving another account key.",
dc.acct.host, dc.acct.tier)
keyIndex++
time.Sleep(200 * time.Millisecond) // don't hammer
continue
}

break // great, the account at this key index is paid and ready
break // great, the account at this key index exists
}

// Actual fee asset ID and coin are unknown, but paid.
dc.acct.isPaid = true
dc.acct.feeCoin = []byte("DUMMY COIN")
dc.acct.feeAssetID = 42
if dc.acct.legacyFeePaid {
// Actual fee asset ID and coin are unknown, but paid.
dc.acct.isPaid = true
dc.acct.feeCoin = []byte("DUMMY COIN")
dc.acct.feeAssetID = 42
}

err := c.db.CreateAccount(&db.AccountInfo{
Host: dc.acct.host,
Expand Down Expand Up @@ -3522,6 +3548,10 @@ func (c *Core) upgradeConnection(dc *dexConnection) {
// to register on a DEX, and Register may be called directly, although it requires
// the expected fee amount as an additional input and it will pay the fee if the
// account is not discovered and paid.
//
// The Tier and BondsPending fields may be consulted to determine if it is still
// necessary to PostBond (i.e. Tier == 0 && !BondsPending) before trading. The
// Connected field should be consulted first.
func (c *Core) DiscoverAccount(dexAddr string, appPW []byte, certI interface{}) (*Exchange, bool, error) {
if !c.IsInitialized() {
return nil, false, fmt.Errorf("cannot register DEX because app has not been initialized")
Expand All @@ -3538,8 +3568,16 @@ func (c *Core) DiscoverAccount(dexAddr string, appPW []byte, certI interface{})
}
defer crypter.Close()

c.connMtx.RLock()
dc, found := c.conns[host]
c.connMtx.RUnlock()
if found {
// Already registered, but connection may be down and/or PostBond needed.
return dc.exchangeInfo(), true, nil // *Exchange has Tier and BondsPending
}

var ready bool
dc, err := c.tempDexConnection(host, certI)
dc, err = c.tempDexConnection(host, certI)
if dc != nil { // (re)connect loop may be running even if err != nil
defer func() {
// Either disconnect or promote this connection.
Expand Down Expand Up @@ -3695,10 +3733,8 @@ func (c *Core) Register(form *RegisterForm) (*RegisterResult, error) {

// Ensure this DEX supports this asset for registration fees, and get the
// required confirmations and fee amount.
dc.cfgMtx.RLock()
feeAsset, supported := dc.cfg.RegFees[regFeeAssetSymbol]
dc.cfgMtx.RUnlock()
if !supported || feeAsset == nil {
feeAsset := dc.feeAsset(regFeeAssetID) // dc.cfg.RegFees[regFeeAssetSymbol]
if feeAsset == nil {
return nil, newError(assetSupportErr, "dex server does not accept registration fees in asset %q", regFeeAssetSymbol)
}
if feeAsset.ID != regFeeAssetID {
Expand Down Expand Up @@ -5691,15 +5727,29 @@ func (c *Core) authDEX(dc *dexConnection) error {
return newError(signatureErr, "DEX signature validation error: %w", err)
}

var suspended bool
if result.Suspended != nil {
suspended = *result.Suspended
var tier int64
var legacyFeePaid bool
if result.Tier == nil { // legacy server (V0PURGE)
// A legacy server does not set ConnectResult.LegacyFeePaid, but unpaid
// legacy ('register') users get an UnpaidAccountError from Connect, so
// we know the account is paid and not suspended.
legacyFeePaid = true
if result.Suspended == nil || !*result.Suspended {
tier = 1
}
} else {
tier = *result.Tier
if result.LegacyFeePaid != nil {
legacyFeePaid = *result.LegacyFeePaid
}
}

// Set the account as authenticated.
c.log.Debugf("Authenticated connection to %s, acct %v, %d active orders, %d active matches, score %d (suspended = %v)",
dc.acct.host, acctID, len(result.ActiveOrderStatuses), len(result.ActiveMatches), result.Score, suspended)
dc.acct.auth(suspended)
c.log.Infof("Authenticated connection to %s, acct %v, %d active bonds, %d active orders, %d active matches, score %d, tier %d",
dc.acct.host, acctID, len(result.ActiveBonds), len(result.ActiveOrderStatuses), len(result.ActiveMatches), result.Score, tier)
// Flag as authenticated before bondConfirmed / monitorBondConfs, which may
// call authDEX if not flagged as such.
dc.acct.auth(tier, legacyFeePaid)

// Associate the matches with known trades.
matches, _, err := dc.parseMatches(result.ActiveMatches, false)
Expand Down Expand Up @@ -6968,7 +7018,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo, temporary ...bool) (*dexConn
}

// Request the market configuration.
err = dc.refreshServerConfig() // handleReconnect must too
_, err = dc.refreshServerConfig() // handleReconnect must too
if err != nil {
if errors.Is(err, outdatedClientErr) {
sendOutdatedClientNotification(c, dc)
Expand Down Expand Up @@ -7000,7 +7050,7 @@ func (c *Core) handleReconnect(host string) {

// The server's configuration may have changed, so retrieve the current
// server configuration.
err := dc.refreshServerConfig()
cfg, err := dc.refreshServerConfig()
if err != nil {
if errors.Is(err, outdatedClientErr) {
sendOutdatedClientNotification(c, dc)
Expand All @@ -7014,16 +7064,14 @@ func (c *Core) handleReconnect(host string) {
base uint32
quote uint32
}
dc.cfgMtx.RLock()
mkts := make(map[string]*market, len(dc.cfg.Markets))
for _, m := range dc.cfg.Markets {
for _, m := range cfg.Markets {
mkts[m.Name] = &market{
name: m.Name,
base: m.Base,
quote: m.Quote,
}
}
dc.cfgMtx.RUnlock()

// Update the orders' selfGoverned flag according to the configured markets.
for _, trade := range dc.trackedTrades() {
Expand Down