Skip to content

Commit

Permalink
usermatches cli app
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed Sep 20, 2021
1 parent de0f679 commit 0844058
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -15,6 +15,7 @@ debug
dex*key
server/cmd/genkey/genkey
server/cmd/dcrdex/dcrdex
server/cmd/usermatches/usermatches
markets.json
dist/
node_modules/
Expand Down
256 changes: 256 additions & 0 deletions server/cmd/usermatches/main.go
@@ -0,0 +1,256 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package main

import (
"context"
"encoding/csv"
"encoding/hex"
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"strings"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/server/account"
"decred.org/dcrdex/server/asset"
"decred.org/dcrdex/server/asset/btc"
"decred.org/dcrdex/server/asset/dcr"
"decred.org/dcrdex/server/asset/ltc"
"decred.org/dcrdex/server/db"
"decred.org/dcrdex/server/db/driver/pg"
)

// We do not need a Backend Setup, just the Drivers to call DecodeCoinID. While
// we can do this with asset.DecodeCoinID(assetID, coinID), doing the following
// drastically reduces the locking/unlocking (asset.driversMtx) that would be
// required to decode coin IDs, and we will likely be doing many.
var assets = map[uint32]asset.Driver{
0: &btc.Driver{},
2: &ltc.Driver{},
42: &dcr.Driver{},
}

var dbhost = flag.String("host", "/run/postgresql", "pg host") // default to unix socket, but 127.0.0.1 would be common too
var dbuser = flag.String("user", "dcrdex", "db username")
var dbpass = flag.String("pass", "", "db password")
var dbname = flag.String("dbname", "dcrdex", "db name")
var dbport = flag.Int("port", 5432, "db port")
var base = flag.Uint("base", 42, "market base asset id")
var quote = flag.Uint("quote", 0, "market quote asset id")
var acct = flag.String("acct", "", "filter for dex account") // default is all accounts

// MatchData supplements a db.MatchData with decoded swap and redeem coins.
type MatchData struct {
db.MatchData
MakerSwap string
TakerSwap string
MakerRedeem string
TakerRedeem string
}

func convertMatchData(baseAsset, quoteAsset asset.Driver, md *db.MatchDataWithCoins) *MatchData {
matchData := MatchData{
MatchData: md.MatchData,
}
// asset0 is the maker swap / taker redeem asset.
// asset1 is the taker swap / maker redeem asset.
// Maker selling means asset 0 is base; asset 1 is quote.
asset0, asset1 := baseAsset, quoteAsset
if md.TakerSell {
asset0, asset1 = quoteAsset, baseAsset
}
if len(md.MakerSwapCoin) > 0 {
coinStr, err := asset0.DecodeCoinID(md.MakerSwapCoin)
if err != nil {
fmt.Printf("Unable to decode coin %x: %v\n", md.MakerSwapCoin, err)
// leave empty and keep chugging
}
matchData.MakerSwap = coinStr
}
if len(md.TakerSwapCoin) > 0 {
coinStr, err := asset1.DecodeCoinID(md.TakerSwapCoin)
if err != nil {
fmt.Printf("Unable to decode coin %x: %v\n", md.TakerSwapCoin, err)
}
matchData.TakerSwap = coinStr
}
if len(md.MakerRedeemCoin) > 0 {
coinStr, err := asset0.DecodeCoinID(md.MakerRedeemCoin)
if err != nil {
fmt.Printf("Unable to decode coin %x: %v\n", md.MakerRedeemCoin, err)
}
matchData.MakerRedeem = coinStr
}
if len(md.TakerRedeemCoin) > 0 {
coinStr, err := asset1.DecodeCoinID(md.TakerRedeemCoin)
if err != nil {
fmt.Printf("Unable to decode coin %x: %v\n", md.TakerRedeemCoin, err)
}
matchData.TakerRedeem = coinStr
}

return &matchData
}

// MarketMatchesStreaming streams all matches for market with base and quote
// through a MatchData processing function, which is a wrapper around the
// provided function and convertMatchData. The provided function should do two
// main things: (1) apply some filtering, and (2) write the match data out
// somewhere, which in this app is a CSV file.
func MarketMatchesStreaming(storage db.DEXArchivist, base, quote uint32, includeInactive bool, N int64, f func(*MatchData) error) (int, error) {
baseAsset := assets[base]
if baseAsset == nil {
return 0, fmt.Errorf("asset %d not found", base)
}
quoteAsset := assets[quote]
if quoteAsset == nil {
return 0, fmt.Errorf("asset %d not found", quote)
}
fDB := func(md *db.MatchDataWithCoins) error {
matchData := convertMatchData(baseAsset, quoteAsset, md)
return f(matchData)
}
return storage.MarketMatchesStreaming(base, quote, includeInactive, N, fDB)
}

func main() {
if err := mainCore(); err != nil {
fmt.Println(err)
os.Exit(1)
}
os.Exit(0)
}

func mainCore() error {
ctx, quit := context.WithCancel(context.Background())
defer quit()
killChan := make(chan os.Signal, 1)
signal.Notify(killChan, os.Interrupt)
go func() {
<-killChan
quit()
fmt.Println("Shutting down...")
}()

flag.Parse()

base, quote := uint32(*base), uint32(*quote)
name, err := dex.MarketName(base, quote)
if err != nil {
return err
}
mkt := &dex.MarketInfo{
Name: name,
Base: base,
Quote: quote,
}

pgCfg := &pg.Config{
Host: *dbhost,
Port: strconv.Itoa(*dbport),
User: *dbuser,
Pass: *dbpass,
DBName: *dbname,
MarketCfg: []*dex.MarketInfo{mkt},
}
archiver, err := pg.NewArchiverForRead(ctx, pgCfg)
if err != nil {
return err
}
defer archiver.Close()

var acctID account.AccountID
var haveAccount bool
switch len(*acct) {
case 0: // no account filter
case account.HashSize * 2:
acctB, err := hex.DecodeString(*acct)
if err != nil {
return err
}
copy(acctID[:], acctB)
haveAccount = true
default:
return fmt.Errorf("bad acct ID %v", *acct)
}

csvfile, err := os.Create(fmt.Sprintf("acct_matches_%v.csv", acctID))
if err != nil {
return fmt.Errorf("error creating csv file: %w", err)
}
defer csvfile.Close()

csvwriter := csv.NewWriter(csvfile)
defer csvwriter.Flush()

err = csvwriter.Write([]string{"unixtime", "maker", "taker", "quantity", "rate",
"isTakerSell", "makerSwapTx", "makerSwapVout", "makerRedeemTx", "makerRedeemVin",
"takerSwapTx", "takerSwapVout", "takerRedeemTx", "takerRedeemVin"})
if err != nil {
return fmt.Errorf("ERROR: csvwriter.Write failed: %w", err)
}

splitTx := func(txinout string) (tx, vinout string, err error) {
if txinout == "" {
return // ok
}
txsplit := strings.Split(txinout, ":")
if len(txsplit) != 2 {
err = fmt.Errorf("txinout (%s) not formatted as a txin/out", txinout)
return
}
_, err = strconv.ParseUint(txsplit[1], 10, 32)
if err != nil {
err = fmt.Errorf("strconv.ParseUint(%s): %w", txsplit[1], err)
return
}
return txsplit[0], txsplit[1], nil
}

_, err = MarketMatchesStreaming(archiver, base, quote, true, -1, func(md *MatchData) error {
if err := ctx.Err(); err != nil {
return err
}
if haveAccount && (md.MakerAcct != acctID && md.TakerAcct != acctID) {
return nil
}

makerSwapTx, makerSwapVout, err := splitTx(md.MakerSwap)
if err != nil {
return fmt.Errorf("strings.Split(%s): %w", md.MakerSwap, err)
}
makerRedeemTx, makerRedeemVin, err := splitTx(md.MakerRedeem)
if err != nil {
return fmt.Errorf("strings.Split(%s): %w", md.MakerRedeem, err)
}
takerSwapTx, takerSwapVout, err := splitTx(md.TakerSwap)
if err != nil {
return fmt.Errorf("strings.Split(%s): %w", md.TakerSwap, err)
}
takerRedeemTx, takerRedeemVin, err := splitTx(md.TakerRedeem)
if err != nil {
return fmt.Errorf("strings.Split(%s): %w", md.TakerRedeem, err)
}

err = csvwriter.Write([]string{
strconv.FormatUint(md.Epoch.Idx*md.Epoch.Dur/1000, 10),
md.MakerAcct.String(),
md.TakerAcct.String(),
strconv.FormatInt(int64(md.Quantity)/1e8, 10),
strconv.FormatFloat(float64(md.Rate)/1e8, 'f', -1, 64),
strconv.FormatBool(md.TakerSell),
makerSwapTx, makerSwapVout, makerRedeemTx, makerRedeemVin,
takerSwapTx, takerSwapVout, takerRedeemTx, takerRedeemVin,
})
if err != nil {
return fmt.Errorf("csvwriter.Write: %w", err)
}
return nil
})

return err
}
22 changes: 17 additions & 5 deletions server/db/driver/pg/pg.go
Expand Up @@ -102,8 +102,10 @@ func (a *Archiver) fatalBackendErr(err error) {
a.fatalMtx.Unlock()
}

// NewArchiver constructs a new Archiver. Use Close when done with the Archiver.
func NewArchiver(ctx context.Context, cfg *Config) (*Archiver, error) {
// NewArchiverForRead constructs a new Archiver without creating or modifying
// any data structures. This should be used for read-only applications. Use
// Close when done with the Archiver.
func NewArchiverForRead(ctx context.Context, cfg *Config) (*Archiver, error) {
// Connect to the PostgreSQL daemon and return the *sql.DB.
db, err := connect(cfg.Host, cfg.Port, cfg.User, cfg.Pass, cfg.DBName)
if err != nil {
Expand Down Expand Up @@ -140,7 +142,7 @@ func NewArchiver(ctx context.Context, cfg *Config) (*Archiver, error) {
mktMap[mkt.Name] = mkt
}

archiver := &Archiver{
return &Archiver{
ctx: ctx,
db: db,
dbName: cfg.DBName,
Expand All @@ -151,6 +153,16 @@ func NewArchiver(ctx context.Context, cfg *Config) (*Archiver, error) {
accounts: fullTableName(cfg.DBName, publicSchema, accountsTableName),
},
fatal: make(chan struct{}),
}, nil
}

// NewArchiver constructs a new Archiver. All tables are created, including
// tables for markets that may have been added since last startup. Use Close
// when done with the Archiver.
func NewArchiver(ctx context.Context, cfg *Config) (*Archiver, error) {
archiver, err := NewArchiverForRead(ctx, cfg)
if err != nil {
return nil, err
}

// Check critical performance-related settings.
Expand All @@ -159,12 +171,12 @@ func NewArchiver(ctx context.Context, cfg *Config) (*Archiver, error) {
}

// Ensure all tables required by the current market configuration are ready.
purgeMarkets, err := prepareTables(ctx, db, cfg.MarketCfg)
purgeMarkets, err := prepareTables(ctx, archiver.db, cfg.MarketCfg)
if err != nil {
return nil, err
}
for _, staleMarket := range purgeMarkets {
mkt := mktMap[staleMarket]
mkt := archiver.markets[staleMarket]
if mkt == nil { // shouldn't happen
return nil, fmt.Errorf("unrecognized market %v", staleMarket)
}
Expand Down

0 comments on commit 0844058

Please sign in to comment.