Skip to content

Commit

Permalink
swapper resume from db
Browse files Browse the repository at this point in the history
This removes the swap state gob files, loading necessary data from the
DB on Swapper startup.

server/db

Add `SwapArchiver.ActiveSwaps` (impl. by `(*Archiver).ActiveSwaps`) to 
retrieve full details of all active swaps across all markets.  
`Swapper`'s constructor uses this to resume active swaps.

Remove all the swap state hash get/set functions.

The `meta` table and it's `state_hash` column still exist, but I think 
I'm going to entirely drop this table and follow up with a PR that 
re-adds the meta table with a versioning scheme.

server/swap

Rip out:

- All of the swap state/gob stuff.
- The `orderSwapTracker`, which was used to infer order "completion" for 
the cancellation rate computations.  The market now has these order 
fill/settlement/success responsibilities.  A `swapDone` callback is 
provided to the `Swapper` from the consumer (DEX manager) to do this.
- The `offBook` argument of `Negotiate`, which was used with the 
`orderSwapTracker` junk.
- The the live waiter tracking.  Clients retry init/redeem requests now, 
so jumping through hoops to recreate live coin waiters on swapper 
startup is not necessary.

The Swapper constructor now loads all active matches from the DB and 
reconstructs all of the `matchTrackers`.  This uses the new 
`Backend.CoinConfTime` method for the `SwapConfirmTime`, which is not 
stored in the DB.  The match request time field of `matchTracker` is 
also not recorded, so this is guessed on resume as the epoch close time 
plus a minute, which at worst gives the maker a little extra breathing 
room when resuming in `NewlyMatched` status.

server/market

Assume the responsibility of deciding when an order is successfully 
"completed" as per the cancellation rate definition.  This is 
facilitated by the `SwapDone` method that the DEX manager bridges with 
the `Swapper`.

The `SwapDone` method also performs the same unbooking that the Swapper 
previous requested via the `unbookHook`, now removed.

server/dex

Remove the unbook hook, replacing it with a `swapDone` dispatcher.

server/auth

`Sign` no longer returns an error since the secp256k1 changes recently 
eliminated the error return.
  • Loading branch information
chappjc committed Dec 11, 2020
1 parent 23d0c47 commit a676e07
Show file tree
Hide file tree
Showing 25 changed files with 645 additions and 1,548 deletions.
21 changes: 10 additions & 11 deletions client/core/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,15 +1460,14 @@ func (c *Core) finalizeSwapAction(t *trackedTrade, match *matchTracker, coinID,
CoinID: coinID,
Contract: contract,
}
// The DEX may wait up to its configured broadcast timeout to locate the
// contract txn, so wait at least that long for a response. Note that the
// server presently waits an unspecified amount of time that is shorter than
// this, which gives the client an msgjson.TransactionUndiscovered error to
// signal to the client to try broadcasting again or check their asset
// backend connectivity before hitting the broadcast timeout (and being
// penalized).
// The DEX may wait up to its configured broadcast timeout, but we will
// retry on timeout or other error. Note that the server presently waits an
// unspecified amount of time, which gives the client an
// msgjson.TransactionUndiscovered error to signal to the client to try
// broadcasting again or check their asset backend connectivity before
// hitting the broadcast timeout (and being penalized).
var needsResolution bool
timeout := t.broadcastTimeout()
timeout := t.broadcastTimeout() / 4
if err := t.dc.signAndRequest(init, msgjson.InitRoute, ack, timeout); err != nil {
var msgErr *msgjson.Error
needsResolution = errors.As(err, &msgErr) && msgErr.Code == msgjson.SettlementSequenceError
Expand Down Expand Up @@ -1639,9 +1638,9 @@ func (c *Core) finalizeRedeemAction(t *trackedTrade, match *matchTracker, coinID
Secret: proof.Secret,
}
ack := new(msgjson.Acknowledgement)
// The DEX may wait up to its configured broadcast timeout, so wait at least
// that long for a response.
timeout := t.broadcastTimeout()
// The DEX may wait up to its configured broadcast timeout, but we will
// retry on timeout or other error.
timeout := t.broadcastTimeout() / 4
if err := t.dc.signAndRequest(msgRedeem, msgjson.RedeemRoute, ack, timeout); err != nil {
var msgErr *msgjson.Error
needsResolution = errors.As(err, &msgErr) && msgErr.Code == msgjson.SettlementSequenceError
Expand Down
2 changes: 1 addition & 1 deletion server/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (btc *Backend) call(method string, args anylist, thing interface{}) error {
}
b, err := btc.node.RawRequest(method, params)
if err != nil {
return fmt.Errorf("rawrequest error: %v", err)
return fmt.Errorf("rawrequest error: %w", err)
}
if thing != nil {
return json.Unmarshal(b, thing)
Expand Down
4 changes: 2 additions & 2 deletions server/asset/dcr/live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestLiveUTXO(t *testing.T) {
// Just check for no error on Confirmations.
confs, err := utxo.Confirmations(ctx)
if err != nil {
return fmt.Errorf("error getting confirmations for mempool tx output: %v", err)
return fmt.Errorf("error getting confirmations for mempool tx output: %w", err)
}
if confs != int64(expectedConfs) {
return fmt.Errorf("expected %d confirmations, found %d for %s:%d", expectedConfs, confs, txHash, vout)
Expand All @@ -231,7 +231,7 @@ func TestLiveUTXO(t *testing.T) {
// Since we are iterating backwards starting with mempool, we would
// already know the spending transaction and have it stored.
if !txOutIsSpent(*txHash, uint32(vout)) {
return fmt.Errorf("unexpected UTXO error: %v", err)
return fmt.Errorf("unexpected UTXO error: %w", err)
}
break
case !scriptTypeOK && err == nil:
Expand Down
3 changes: 1 addition & 2 deletions server/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,12 @@ func (auth *AuthManager) Suspended(user account.AccountID) (found, suspended boo
}

// Sign signs the msgjson.Signables with the DEX private key.
func (auth *AuthManager) Sign(signables ...msgjson.Signable) error {
func (auth *AuthManager) Sign(signables ...msgjson.Signable) {
for _, signable := range signables {
sigMsg := signable.Serialize()
sig := auth.signer.Sign(sigMsg)
signable.SetSig(sig.Serialize())
}
return nil
}

// Response and notification (non-request) messages
Expand Down
10 changes: 2 additions & 8 deletions server/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,20 +995,14 @@ func TestSign(t *testing.T) {
sig1Bytes := sig1.Serialize()
rig.signer.sig = sig1
s := &tSignable{b: randBytes(25)}
err := rig.mgr.Sign(s)
if err != nil {
t.Fatalf("unexpected error for valid signable: %v", err)
}
rig.mgr.Sign(s)
if !bytes.Equal(sig1Bytes, s.SigBytes()) {
t.Fatalf("incorrect signature. expected %x, got %x", sig1.Serialize(), s.SigBytes())
}

// Try two at a time
s2 := &tSignable{b: randBytes(25)}
err = rig.mgr.Sign(s, s2)
if err != nil {
t.Fatalf("error for multiple signables: %v", err)
}
rig.mgr.Sign(s, s2)
}

func TestSend(t *testing.T) {
Expand Down
19 changes: 2 additions & 17 deletions server/auth/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,7 @@ func (auth *AuthManager) handleRegister(conn comms.Link, msg *msgjson.Message) *
Fee: auth.regFee,
Time: encode.UnixMilliU((unixMsNow())),
}

err = auth.Sign(regRes)
if err != nil {
log.Errorf("error serializing register result: %v, data = %#v", err, regRes)
return &msgjson.Error{
Code: msgjson.RPCInternalError,
Message: "internal error",
}
}
auth.Sign(regRes)

resp, err := msgjson.NewResponse(msg.ID, regRes, nil)
if err != nil {
Expand Down Expand Up @@ -250,14 +242,7 @@ func (auth *AuthManager) validateFee(conn comms.Link, acctID account.AccountID,
log.Infof("New user registered: acct %v, paid %d to %v", acctID, val, addr)

// Create, sign, and send the the response.
err = auth.Sign(notifyFee)
if err != nil {
msgErr = &msgjson.Error{
Code: msgjson.RPCInternalError,
Message: "internal signature error",
}
return wait.DontTryAgain
}
auth.Sign(notifyFee)
notifyRes := new(msgjson.NotifyFeeResult)
notifyRes.SetSig(notifyFee.SigBytes())
resp, err := msgjson.NewResponse(msgID, notifyRes, nil)
Expand Down
9 changes: 3 additions & 6 deletions server/cmd/dcrdex/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ type dexConf struct {
AdminSrvOn bool
AdminSrvAddr string
AdminSrvPW []byte
IgnoreState bool
StatePath string
NoResumeSwaps bool
DisableDataAPI bool
}

Expand Down Expand Up @@ -149,8 +148,7 @@ type flagsData struct {
AdminSrvAddr string `long:"adminsrvaddr" description:"Administration HTTPS server address (default: 127.0.0.1:6542)"`
AdminSrvPassword string `long:"adminsrvpass" description:"Admin server password. INSECURE. Do not set unless absolutely necessary."`

IgnorePrevState bool `long:"ignoreprevstate" description:"Do not attempt to load the stored swap state."`
PrevStatePath string `long:"prevstatepath" description:"Load the swap state from provided file path. --prevstatepath supercedes --ignoreprevstate"`
NoResumeSwaps bool `long:"noresumeswaps" description:"Do not attempt to resume swaps that are active in the DB."`

DisableDataAPI bool `long:"nodata" description:"Disable the HTTP data API."`
}
Expand Down Expand Up @@ -602,8 +600,7 @@ func loadConfig() (*dexConf, *procOpts, error) {
AdminSrvAddr: adminSrvAddr,
AdminSrvOn: cfg.AdminSrvOn,
AdminSrvPW: []byte(cfg.AdminSrvPassword),
IgnoreState: cfg.IgnorePrevState,
StatePath: cfg.PrevStatePath,
NoResumeSwaps: cfg.NoResumeSwaps,
DisableDataAPI: cfg.DisableDataAPI,
}

Expand Down
3 changes: 1 addition & 2 deletions server/cmd/dcrdex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ func mainCore(ctx context.Context) error {
AltDNSNames: cfg.AltDNSNames,
DisableDataAPI: cfg.DisableDataAPI,
},
IgnoreState: cfg.IgnoreState,
StatePath: cfg.StatePath,
NoResumeSwaps: cfg.NoResumeSwaps,
}
dexMan, err := dexsrv.NewDEX(dexConf)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions server/db/driver/pg/internal/matches.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ const (
AND active
ORDER BY epochIdx * epochDur DESC;`

// RetrieveActiveMarketMatchesExtended combines RetrieveSwapData with
// RetrieveActiveMarketMatches.
RetrieveActiveMarketMatchesExtended = `SELECT matchid, takerSell,
takerOrder, takerAccount, takerAddress,
makerOrder, makerAccount, makerAddress,
epochIdx, epochDur, quantity, rate, baseRate, quoteRate, status,
sigMatchAckMaker, sigMatchAckTaker,
aContractCoinID, aContract, aContractTime, bSigAckOfAContract,
bContractCoinID, bContract, bContractTime, aSigAckOfBContract,
aRedeemCoinID, aRedeemSecret, aRedeemTime, bSigAckOfARedeem,
bRedeemCoinID, bRedeemTime
FROM %s
WHERE takerSell IS NOT NULL -- not a cancel order
AND active
ORDER BY epochIdx * epochDur DESC;`

// CompletedOrAtFaultMatchesLastN retrieves inactive matches for a user that
// are either successfully completed by the user (MatchComplete or
// MakerRedeemed with user as maker), or failed because of this user's
Expand Down
8 changes: 2 additions & 6 deletions server/db/driver/pg/internal/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ const (
state_hash BYTEA -- hash of the swapper state file
);`

// TODO: alter table to drop the state_hash column and add version.

// CreateMetaRow creates the single row of the meta table.
CreateMetaRow = "INSERT INTO meta DEFAULT VALUES;"

// RetrieveStateHash retrieves the last stored swap state file hash.
RetrieveStateHash = "SELECT state_hash FROM meta;"

// SetStateHash sets the hash of the swap state file.
SetStateHash = "UPDATE meta SET state_hash = $1;"
)
93 changes: 89 additions & 4 deletions server/db/driver/pg/matches.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func (a *Archiver) ForgiveMatchFail(mid order.MatchID) (bool, error) {
stmt := fmt.Sprintf(internal.ForgiveMatchFail, fullMatchesTableName(a.dbName, m))
N, err := sqlExec(a.db, stmt, mid)
if err != nil { // not just no rows updated
a.fatalBackendErr(err)
return false, err
}
if N == 1 {
Expand All @@ -45,6 +44,93 @@ func (a *Archiver) ForgiveMatchFail(mid order.MatchID) (bool, error) {
return false, nil
}

// ActiveSwaps loads the full details for all active swaps across all markets.
func (a *Archiver) ActiveSwaps() ([]*db.SwapDataFull, error) {
var sd []*db.SwapDataFull

for m, mkt := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, m)
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
matches, swapData, err := activeSwaps(ctx, a.db, matchesTableName)
cancel()
if err != nil {
return nil, err
}

for i := range matches {
sd = append(sd, &db.SwapDataFull{
Base: mkt.Base,
Quote: mkt.Quote,
MatchData: matches[i],
SwapData: swapData[i],
})
}
}

return sd, nil
}

func activeSwaps(ctx context.Context, dbe *sql.DB, tableName string) (matches []*db.MatchData, swapData []*db.SwapData, err error) {
stmt := fmt.Sprintf(internal.RetrieveActiveMarketMatchesExtended, tableName)
rows, err := dbe.QueryContext(ctx, stmt)
if err != nil {
return
}
defer rows.Close()

for rows.Next() {
var m db.MatchData
var sd db.SwapData

var status uint8
var baseRate, quoteRate sql.NullInt64
var takerSell sql.NullBool
var takerAddr, makerAddr sql.NullString
var contractATime, contractBTime, redeemATime, redeemBTime sql.NullInt64

err = rows.Scan(&m.ID, &takerSell,
&m.Taker, &m.TakerAcct, &takerAddr,
&m.Maker, &m.MakerAcct, &makerAddr,
&m.Epoch.Idx, &m.Epoch.Dur, &m.Quantity, &m.Rate,
&baseRate, &quoteRate, &status,
&sd.SigMatchAckMaker, &sd.SigMatchAckTaker,
&sd.ContractACoinID, &sd.ContractA, &contractATime,
&sd.ContractAAckSig,
&sd.ContractBCoinID, &sd.ContractB, &contractBTime,
&sd.ContractBAckSig,
&sd.RedeemACoinID, &sd.RedeemASecret, &redeemATime,
&sd.RedeemAAckSig,
&sd.RedeemBCoinID, &redeemBTime)
if err != nil {
return nil, nil, err
}

// All are active.
m.Active = true

m.Status = order.MatchStatus(status)
m.TakerSell = takerSell.Bool
m.TakerAddr = takerAddr.String
m.MakerAddr = makerAddr.String
m.BaseRate = uint64(baseRate.Int64)
m.QuoteRate = uint64(quoteRate.Int64)

sd.ContractATime = contractATime.Int64
sd.ContractBTime = contractBTime.Int64
sd.RedeemATime = redeemATime.Int64
sd.RedeemBTime = redeemBTime.Int64

matches = append(matches, &m)
swapData = append(swapData, &sd)
}

if err = rows.Err(); err != nil {
return nil, nil, err
}

return
}

// CompletedAndAtFaultMatchStats retrieves the outcomes of matches that were (1)
// successfully completed by the specified user, or (2) failed with the user
// being the at-fault party. Note that the MakerRedeemed match status may be
Expand All @@ -56,8 +142,8 @@ func (a *Archiver) CompletedAndAtFaultMatchStats(aid account.AccountID, lastN in
for m, mkt := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, m)
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
defer cancel()
matchOutcomes, err := completedAndAtFaultMatches(ctx, a.db, matchesTableName, aid, lastN, mkt.Base, mkt.Quote)
cancel()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,8 +636,7 @@ func (a *Archiver) SaveRedeemB(mid db.MarketMatchID, coinID []byte, timestamp in
}

// SetMatchInactive flags the match as done/inactive. This is not necessary if
// both SaveRedeemAckSigA and SaveRedeemAckSigB are run for the match since they
// will also flags the match as done when both signatures are stored.
// SaveRedeemAckSigB is run for the match since it will flag the match as done.
func (a *Archiver) SetMatchInactive(mid db.MarketMatchID) error {
return a.updateMatchStmt(mid, internal.SetSwapDone, mid.MatchID)
}
Loading

0 comments on commit a676e07

Please sign in to comment.