-
Notifications
You must be signed in to change notification settings - Fork 0
/
mysql.go
601 lines (545 loc) · 15.2 KB
/
mysql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
package main
import (
"container/list"
"database/sql"
"encoding/json"
"fmt"
"math/big"
"net/url"
"strings"
"sync"
"time"
// mysql
"github.com/erick785/services/common/log"
_ "github.com/go-sql-driver/mysql"
)
var (
confirmed = 300
)
//Mysql implement mysql
type Mysql struct {
DBName string
DBUser string
DBPWD string
DBHost string
db *sql.DB
RPC *RPCClient
writeBlockChan chan *list.Element // 已可安全写入db
memBlocks *list.List // 缓存10个块, 未安全,易回滚
memBlocksRW sync.RWMutex
pendingBlock *Block // 内存池
pendingBlockRW sync.RWMutex
elemChan *list.Element
tokenChan chan string
}
// Open open a db and create tables if necessary.
func (mysql *Mysql) Open() error {
connStr := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&loc=%s&parseTime=true",
mysql.DBUser, mysql.DBPWD, mysql.DBHost, mysql.DBName, url.QueryEscape("Asia/Shanghai"))
db, err := sql.Open("mysql", connStr)
if err != nil {
return err
}
db.SetMaxOpenConns(2000)
db.SetMaxIdleConns(2000)
db.SetConnMaxLifetime(60 * time.Second)
mysql.db = db
if err := mysql.execSQL(initSQL); err != nil {
db.Close()
return err
}
mysql.memBlocks = list.New()
mysql.writeBlockChan = make(chan *list.Element, 100)
mysql.tokenChan = make(chan string, 100)
go func() {
for {
select {
case elem := <-mysql.writeBlockChan:
blk := elem.Value.(*Block)
t := time.Now()
if err := mysql.execSQL(mysql.getSQL(blk)); err != nil {
panic(err)
}
log.Infof("[MYSQL] write block %d, elpase %s", blk.Height, time.Now().Sub(t))
mysql.memBlocksRW.Lock()
mysql.memBlocks.Remove(elem)
mysql.memBlocksRW.Unlock()
case token := <-mysql.tokenChan:
if _, err := mysql.InsertOrUpdateTokenInfo(token); err != nil {
log.Errorf("[MYSQL] insert or update token %s - %s", token, err)
}
}
}
}()
return nil
}
// Close close db
func (mysql *Mysql) Close() error {
return mysql.db.Close()
}
func (mysql *Mysql) execSQL(sqlStr string) error {
sqlStrs := strings.Split(sqlStr, ";")
tx, err := mysql.db.Begin()
if err != nil {
return err
}
defer func() {
if tx != nil {
tx.Rollback()
}
}()
for _, sqlStr := range sqlStrs {
sqlStr = strings.TrimSpace(sqlStr)
if len(sqlStr) != 0 {
if _, err := tx.Exec(fmt.Sprintf("%s;", sqlStr)); err != nil {
return fmt.Errorf("%s - %s", sqlStr, err)
}
}
}
err = tx.Commit()
if err == nil {
tx = nil
}
return err
}
func (mysql *Mysql) getSQL(blk *Block) string {
isMonitorAddresses := func(addresses []string) bool {
for _, address := range addresses {
list := strings.Split(address, "-")
if mysql.IsMonitorAddress(list[0]) {
return true
}
}
return false
}
//blockchain
sqlStr := fmt.Sprintf("REPLACE INTO t_blockchain(id, i_height, i_created, s_hash, s_prevhash) values(1, %d, %d, '%s', '%s');",
blk.Height, blk.Time, blk.ID, blk.PrevID)
//tx
for _, tx := range blk.Transactions {
addresses := []string{}
for _, in := range tx.Ins {
addresses = append(addresses, in.Addresses...)
}
for _, out := range tx.Outs {
addresses = append(addresses, out.Addresses...)
}
if !isMonitorAddresses(addresses) {
continue
}
ins, _ := json.Marshal(tx.Ins)
outs, _ := json.Marshal(tx.Outs)
sqlStr += fmt.Sprintf("INSERT INTO t_transaction(s_hash, s_ins, s_outs, i_created, i_height, s_fee, i_size) values("+
"'%s','%s','%s', %d, %d, '%s', %d);",
tx.ID, ins, outs, tx.Time, tx.Height, tx.Fee, tx.Size)
}
//address
for address, addressInfo := range blk.addressInfos {
if !isMonitorAddresses([]string{address}) {
continue
}
sqlStr += fmt.Sprintf("REPLACE INTO t_address(s_address, s_value) values('%s', '%s');", address, addressInfo.Amount)
for _, hash := range addressInfo.HTxs {
sqlStr += fmt.Sprintf("REPLACE INTO t_history(s_address, s_hash) values('%s', '%s');", address, hash)
}
}
return sqlStr
}
//InsertBlock 新增区块
func (mysql *Mysql) InsertBlock(blk *Block) error {
t := time.Now()
defer func() {
log.Infof("[MYSQL] insert block %d elpase %s", blk.Height, time.Now().Sub(t))
}()
blk.addressInfos = make(map[string]*AddressInfo)
for _, tx := range blk.Transactions {
if err := mysql.insertTx(tx, blk); err != nil {
return err
}
}
mysql.memBlocksRW.Lock()
mysql.memBlocks.PushBack(blk)
cnt := mysql.memBlocks.Len()
if mysql.elemChan == nil {
mysql.elemChan = mysql.memBlocks.Front()
}
mysql.memBlocksRW.Unlock()
if cnt > confirmed {
mysql.writeBlockChan <- mysql.elemChan
mysql.elemChan = mysql.elemChan.Next()
}
return nil
}
//InsertPendingTxs 新增内存池
func (mysql *Mysql) InsertPendingTxs(txs []*Transaction) error {
pendingBlock := &Block{}
t := time.Now()
defer func() {
log.Infof("[MYSQL] insert pending block %d elpase %s", pendingBlock.Height, time.Now().Sub(t))
}()
pendingBlock.addressInfos = make(map[string]*AddressInfo)
for _, tx := range txs {
if err := mysql.insertTx(tx, pendingBlock); err != nil {
return err
}
}
mysql.pendingBlockRW.Lock()
mysql.pendingBlock = pendingBlock
mysql.pendingBlockRW.Unlock()
return nil
}
// DeleteBlock 删除区块
func (mysql *Mysql) DeleteBlock(blk *Block) error {
mysql.memBlocksRW.Lock()
if elem := mysql.memBlocks.Back(); elem != nil {
lblk := elem.Value.(*Block)
if lblk.Height != blk.Height {
panic(fmt.Sprintf("mismatch height %d %d", lblk.Height, blk.Height))
}
mysql.memBlocks.Remove(elem)
} else {
panic("uncompleted")
}
mysql.memBlocksRW.Unlock()
return nil
}
// GetBlockChainFromDB 获取最新区块
func (mysql *Mysql) GetBlockChainFromDB() (*Block, error) {
sqlstr := "SELECT i_height, i_created, s_hash, s_prevhash FROM t_blockchain"
blk := &Block{}
row := mysql.db.QueryRow(sqlstr)
err := row.Scan(&blk.Height, &blk.Time, &blk.ID, &blk.PrevID)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return blk, nil
}
// GetBlockChain 获取最新区块
func (mysql *Mysql) GetBlockChain() (*Block, error) {
//从缓存区块中查找
mysql.memBlocksRW.RLock()
if elem := mysql.memBlocks.Back(); elem != nil {
blk := elem.Value.(*Block)
mysql.memBlocksRW.RUnlock()
return blk, nil
}
mysql.memBlocksRW.RUnlock()
return mysql.GetBlockChainFromDB()
}
// GetTransactionsByAddressFromDB 获取指定地址的交易
func (mysql *Mysql) GetTransactionsByAddressFromDB(addr string, skip int64, num int64) ([]*Transaction, error) {
if num == 0 {
return nil, nil
}
sqlStrH := fmt.Sprintf("SELECT s_hash FROM t_history where s_address='%s' order by id desc limit %d, %d", addr, skip, num)
rowsH, err := mysql.db.Query(sqlStrH)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
defer rowsH.Close()
hashes := []string{}
for rowsH.Next() {
var hash string
err := rowsH.Scan(&hash)
if err != nil {
return nil, err
}
hashes = append(hashes, fmt.Sprintf("'%s'", hash))
}
if len(hashes) == 0 {
return nil, nil
}
sqlStr := fmt.Sprintf("SELECT s_hash, s_ins, s_outs, i_created, i_height, s_fee, i_size FROM t_transaction where s_hash in(%s) order by id desc;",
strings.Join(hashes, ","))
rows, err := mysql.db.Query(sqlStr)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
defer rows.Close()
txs := []*Transaction{}
for rows.Next() {
tx := &Transaction{
Fee: big.NewInt(0),
}
var ins, outs, fee string
err := rows.Scan(&tx.ID, &ins, &outs, &tx.Time, &tx.Height, &fee, &tx.Size)
if err != nil {
return nil, err
}
json.Unmarshal([]byte(ins), &tx.Ins)
json.Unmarshal([]byte(outs), &tx.Outs)
tx.Fee.SetString(fee, 10)
txs = append(txs, tx)
}
return txs, nil
}
// GetTransactionsByAddress 获取指定地址的交易
func (mysql *Mysql) GetTransactionsByAddress(addr string, pagenum int64, pagesize int64) ([]*Transaction, error) {
skip := pagenum * pagesize
txs := []*Transaction{}
//从内存池区块中查找
mysql.pendingBlockRW.RLock()
if mysql.pendingBlock != nil {
if addrInfo, ok := mysql.pendingBlock.addressInfos[addr]; ok {
for cnt := len(addrInfo.HTxs); cnt > 0; cnt-- {
if skip == 0 {
txs = append(txs, addrInfo.Txs[addrInfo.HTxs[cnt-1]])
if pagenum--; pagenum == 0 {
mysql.pendingBlockRW.RUnlock()
return txs, nil
}
} else {
skip--
}
}
}
}
mysql.pendingBlockRW.RUnlock()
//从缓存区块中查找
mysql.memBlocksRW.RLock()
for elem := mysql.memBlocks.Back(); elem != nil; elem = elem.Prev() {
blk := elem.Value.(*Block)
if addrInfo, ok := blk.addressInfos[addr]; ok {
for cnt := len(addrInfo.HTxs); cnt > 0; cnt-- {
if skip == 0 {
txs = append(txs, addrInfo.Txs[addrInfo.HTxs[cnt-1]])
if pagenum--; pagenum == 0 {
mysql.memBlocksRW.RUnlock()
return txs, nil
}
} else {
skip--
}
}
}
}
mysql.memBlocksRW.RUnlock()
ttxs, err := mysql.GetTransactionsByAddressFromDB(addr, skip, pagenum-int64(len(txs)))
return append(txs, ttxs...), err
}
// GetAccountByAddress 获取指定地址的余额
func (mysql *Mysql) GetAccountByAddress(addr string, pending bool) (*AddressInfo, error) {
if pending {
//从内存池区块中查找
if mysql.pendingBlock != nil {
mysql.pendingBlockRW.RLock()
if addrInfo, ok := mysql.pendingBlock.addressInfos[addr]; ok {
mysql.pendingBlockRW.RUnlock()
return addrInfo, nil
}
mysql.pendingBlockRW.RUnlock()
}
}
//从缓存区块中查找
mysql.memBlocksRW.RLock()
for elem := mysql.memBlocks.Back(); elem != nil; elem = elem.Prev() {
blk := elem.Value.(*Block)
if addrInfo, ok := blk.addressInfos[addr]; ok {
mysql.memBlocksRW.RUnlock()
return addrInfo, nil
}
}
mysql.memBlocksRW.RUnlock()
return mysql.GetAccountByAddressFromDB(addr)
}
// GetAccountByAddressFromDB 获取指定地址的信息
func (mysql *Mysql) GetAccountByAddressFromDB(addr string) (*AddressInfo, error) {
sqlStr := fmt.Sprintf("SELECT s_value FROM t_address where s_address='%s'", addr)
addrInfo := &AddressInfo{
Amount: big.NewInt(0),
}
row := mysql.db.QueryRow(sqlStr)
var amount string
err := row.Scan(&amount)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
addrInfo.Amount.SetString(amount, 10)
return addrInfo, nil
}
// AddMonitorAddress 新增监控地址
func (mysql *Mysql) AddMonitorAddress(address string) error {
if addressInfo, err := mysql.GetAccountByAddressFromDB(strings.ToLower(address)); addressInfo != nil || err != nil {
return err
}
sqlStr := fmt.Sprintf("INSERT INTO t_address(s_address, s_value) values('%s', '%s');", address, big.NewInt(0))
return mysql.execSQL(sqlStr)
}
// IsMonitorAddress 新增监控地址
func (mysql *Mysql) IsMonitorAddress(address string) bool {
return true
// addressInfo, _ := mysql.GetAccountByAddressFromDB(strings.ToLower(address))
// return addressInfo != nil
}
func (mysql *Mysql) insertTx(tx *Transaction, blk *Block) error {
for _, in := range tx.Ins {
for _, address := range in.Addresses {
addressInfo, ok := blk.addressInfos[address]
if !ok {
addressInfo = &AddressInfo{
Amount: big.NewInt(0),
Txs: make(map[string]*Transaction),
}
if addrInfo, _ := mysql.GetAccountByAddress(address, false); addrInfo != nil {
addressInfo.Amount.SetBytes(addrInfo.Amount.Bytes())
}
blk.addressInfos[address] = addressInfo
}
//花掉
addressInfo.Amount = new(big.Int).Sub(addressInfo.Amount, in.Value)
//新增历史记录
if _, ok := addressInfo.Txs[tx.ID]; !ok {
addressInfo.Txs[tx.ID] = tx
addressInfo.HTxs = append(addressInfo.HTxs, tx.ID)
}
if addrs := strings.Split(address, "-"); len(addrs) == 2 {
mysql.tokenChan <- addrs[1]
}
}
}
for _, out := range tx.Outs {
for _, address := range out.Addresses {
addressInfo, ok := blk.addressInfos[address]
if !ok {
addressInfo = &AddressInfo{
Amount: big.NewInt(0),
Txs: make(map[string]*Transaction),
}
if addrInfo, _ := mysql.GetAccountByAddress(address, false); addrInfo != nil {
addressInfo.Amount.SetBytes(addrInfo.Amount.Bytes())
}
blk.addressInfos[address] = addressInfo
}
//新增
addressInfo.Amount = new(big.Int).Add(addressInfo.Amount, out.Value)
//新增历史记录
if _, ok := addressInfo.Txs[tx.ID]; !ok {
addressInfo.Txs[tx.ID] = tx
addressInfo.HTxs = append(addressInfo.HTxs, tx.ID)
}
}
}
return nil
}
// GetTokenInfo 获取token信息
func (mysql *Mysql) GetTokenInfo(token string) (*TokenInfo, error) {
sqlStr := fmt.Sprintf("SELECT s_address, s_name, s_symbol, i_decimal FROM t_tokeninfo where s_address='%s'", token)
tokenInfo := &TokenInfo{}
row := mysql.db.QueryRow(sqlStr)
err := row.Scan(&tokenInfo.Address, &tokenInfo.Name, &tokenInfo.Symbol, &tokenInfo.Decimal)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return tokenInfo, nil
}
// InsertOrUpdateTokenInfo 更新token信息
func (mysql *Mysql) InsertOrUpdateTokenInfo(token string) (*TokenInfo, error) {
if tokenInfo, err := mysql.GetTokenInfo(token); tokenInfo != nil || err != nil {
return tokenInfo, err
}
tokenInfo := &TokenInfo{}
tokenInfo.Address = token
name, err := mysql.RPC.GetTokenName(token)
if err != nil {
return nil, err
}
tokenInfo.Name = name
symbol, err := mysql.RPC.GetTokenSymbol(token)
if err != nil {
return nil, err
}
tokenInfo.Symbol = symbol
decimal, err := mysql.RPC.GetTokenDecimal(token)
if err != nil {
return nil, err
}
tokenInfo.Decimal = decimal.Int64()
if len(tokenInfo.Name) > 0 && tokenInfo.Name[0] < 32 {
tokenInfo.Name = tokenInfo.Name[1:]
}
if len(tokenInfo.Symbol) > 0 && tokenInfo.Symbol[0] < 32 {
tokenInfo.Symbol = tokenInfo.Symbol[1:]
}
sqlStr := fmt.Sprintf("INSERT INTO t_tokeninfo(s_address, s_name, s_symbol, i_decimal) values('%s','%s','%s',%d)",
tokenInfo.Address, Escape(tokenInfo.Name), Escape(tokenInfo.Symbol), tokenInfo.Decimal)
return tokenInfo, mysql.execSQL(sqlStr)
}
// GetTokenInfosByAddress 获取token信息列表
func (mysql *Mysql) GetTokenInfosByAddress(addr string) ([]*TokenInfo, error) {
sqlStr := fmt.Sprintf("SELECT s_address FROM t_address where s_address like '%s_%%'", addr)
rows, err := mysql.db.Query(sqlStr)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
defer rows.Close()
tokenInfos := []*TokenInfo{}
for rows.Next() {
var address string
err := rows.Scan(&address)
if err != nil {
return nil, err
}
token := strings.Replace(address, fmt.Sprintf("%s-", addr), "", -1)
tokenInfo, err := mysql.InsertOrUpdateTokenInfo(token)
if err != nil {
log.Errorf("gettoken info ---- %s", err)
continue
}
tokenInfos = append(tokenInfos, tokenInfo)
}
return tokenInfos, nil
}
func Escape(sql string) string {
dest := make([]byte, 0, 2*len(sql))
var escape byte
for i := 0; i < len(sql); i++ {
c := sql[i]
escape = 0
switch c {
case 0: /* Must be escaped for 'mysql' */
escape = '0'
break
case '\n': /* Must be escaped for logs */
escape = 'n'
break
case '\r':
escape = 'r'
break
case '\\':
escape = '\\'
break
case '\'':
escape = '\''
break
case '"': /* Better safe than sorry */
escape = '"'
break
case '\032': /* This gives problems on Win32 */
escape = 'Z'
}
if escape != 0 {
dest = append(dest, '\\', escape)
} else {
dest = append(dest, c)
}
}
return string(dest)
}