-
Notifications
You must be signed in to change notification settings - Fork 86
/
epochs.go
130 lines (110 loc) · 3.5 KB
/
epochs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.
package pg
import (
"context"
"database/sql/driver"
"fmt"
"time"
"decred.org/dcrdex/dex/candles"
"decred.org/dcrdex/dex/order"
"decred.org/dcrdex/server/db"
"decred.org/dcrdex/server/db/driver/pg/internal"
"github.com/lib/pq"
)
// In a table, a []order.OrderID is stored as a BYTEA[]. The orderIDs type
// defines the Value and Scan methods for such an OrderID slice using
// pq.ByteaArray and copying of OrderId data to/from []byte.
type orderIDs []order.OrderID
// Value implements the sql/driver.Valuer interface.
func (oids orderIDs) Value() (driver.Value, error) {
if oids == nil {
return nil, nil
}
if len(oids) == 0 {
return "{}", nil
}
ba := make(pq.ByteaArray, 0, len(oids))
for i := range oids {
ba = append(ba, oids[i][:])
}
return ba.Value()
}
// Scan implements the sql.Scanner interface.
func (oids *orderIDs) Scan(src interface{}) error {
var ba pq.ByteaArray
err := ba.Scan(src)
if err != nil {
return err
}
n := len(ba)
*oids = make([]order.OrderID, n)
for i := range ba {
copy((*oids)[i][:], ba[i])
}
return nil
}
// InsertEpoch stores the results of a newly-processed epoch. TODO: test.
func (a *Archiver) InsertEpoch(ed *db.EpochResults) error {
marketSchema, err := a.marketSchema(ed.MktBase, ed.MktQuote)
if err != nil {
return err
}
epochsTableName := fullEpochsTableName(a.dbName, marketSchema)
stmt := fmt.Sprintf(internal.InsertEpoch, epochsTableName)
_, err = a.db.Exec(stmt, ed.Idx, ed.Dur, ed.MatchTime, ed.CSum, ed.Seed,
orderIDs(ed.OrdersRevealed), orderIDs(ed.OrdersMissed))
if err != nil {
a.fatalBackendErr(err)
return err
}
epochReportsTableName := fullEpochReportsTableName(a.dbName, marketSchema)
stmt = fmt.Sprintf(internal.InsertEpochReport, epochReportsTableName)
epochEnd := (ed.Idx + 1) * ed.Dur
_, err = a.db.Exec(stmt, epochEnd, ed.Dur, ed.MatchVolume, ed.QuoteVolume, ed.BookBuys, ed.BookBuys5, ed.BookBuys25,
ed.BookSells, ed.BookSells5, ed.BookSells25, ed.HighRate, ed.LowRate, ed.StartRate, ed.EndRate)
if err != nil {
a.fatalBackendErr(err)
}
return err
}
// LoadEpochStats reads all market epoch history from the database, updating the
// provided caches along the way.
func (a *Archiver) LoadEpochStats(base, quote uint32, caches []*candles.Cache) error {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return err
}
epochReportsTableName := fullEpochReportsTableName(a.dbName, marketSchema)
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
defer cancel()
tstart := time.Now()
defer func() { log.Debugf("select epoch candles in: %v", time.Since(tstart)) }()
stmt := fmt.Sprintf(internal.SelectEpochCandles, epochReportsTableName)
rows, err := a.db.QueryContext(ctx, stmt, 0)
if err != nil {
return err
}
defer rows.Close()
var endStamp, epochDur, matchVol, quoteVol, highRate, lowRate, startRate, endRate fastUint64
for rows.Next() {
err = rows.Scan(&endStamp, &epochDur, &matchVol, "eVol, &highRate, &lowRate, &startRate, &endRate)
if err != nil {
return err
}
candle := &candles.Candle{
StartStamp: uint64(endStamp - epochDur),
EndStamp: uint64(endStamp),
MatchVolume: uint64(matchVol),
QuoteVolume: uint64(quoteVol),
HighRate: uint64(highRate),
LowRate: uint64(lowRate),
StartRate: uint64(startRate),
EndRate: uint64(endRate),
}
for _, set := range caches {
set.Add(candle)
}
}
return rows.Err()
}