/
handle_periodic_operations.go
109 lines (87 loc) · 2.94 KB
/
handle_periodic_operations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package pricefeed
import (
"fmt"
"time"
"github.com/go-co-op/gocron"
"github.com/rs/zerolog/log"
"github.com/forbole/bdjuno/v3/types"
"github.com/forbole/bdjuno/v3/modules/pricefeed/coingecko"
"github.com/forbole/bdjuno/v3/modules/utils"
)
// RegisterPeriodicOperations implements modules.PeriodicOperationsModule
func (m *Module) RegisterPeriodicOperations(scheduler *gocron.Scheduler) error {
log.Debug().Str("module", "pricefeed").Msg("setting up periodic tasks")
// Fetch the token prices every 2 mins
if _, err := scheduler.Every(2).Minutes().Do(func() {
utils.WatchMethod(m.updatePrice)
}); err != nil {
return fmt.Errorf("error while setting up pricefeed period operations: %s", err)
}
// Update the historical token prices every 1 hour
if _, err := scheduler.Every(1).Hour().Do(func() {
utils.WatchMethod(m.updatePricesHistory)
}); err != nil {
return fmt.Errorf("error while setting up history period operations: %s", err)
}
return nil
}
// getTokenPrices allows to get the most up-to-date token prices
func (m *Module) getTokenPrices() ([]types.TokenPrice, error) {
// Get the list of tokens price id
ids, err := m.db.GetTokensPriceID()
if err != nil {
return nil, fmt.Errorf("error while getting tokens price id: %s", err)
}
if len(ids) == 0 {
log.Debug().Str("module", "pricefeed").Msg("no traded tokens price id found")
return nil, nil
}
// Get the tokens prices
prices, err := coingecko.GetTokensPrices(ids)
if err != nil {
return nil, fmt.Errorf("error while getting tokens prices: %s", err)
}
return prices, nil
}
// updatePrice fetch total amount of coins in the system from RPC and store it into database
func (m *Module) updatePrice() error {
log.Debug().
Str("module", "pricefeed").
Str("operation", "pricefeed").
Msg("updating token price and market cap")
prices, err := m.getTokenPrices()
if err != nil {
return err
}
// Save the token prices
err = m.db.SaveTokensPrices(prices)
if err != nil {
return fmt.Errorf("error while saving token prices: %s", err)
}
return nil
}
// updatePricesHistory fetches total amount of coins in the system from RPC
// and stores historical perice data inside the database
func (m *Module) updatePricesHistory() error {
log.Debug().
Str("module", "pricefeed").
Str("operation", "pricefeed").
Msg("updating token price and market cap history")
prices, err := m.getTokenPrices()
if err != nil {
return err
}
// Normally, the last updated value reflects the time when the price was last updated.
// If price hasn't changed, the returned timestamp will be the same as one hour ago, and it will not
// be stored in db as it will be a duplicated value.
// To fix this, we set each price timestamp to be the same as other ones.
timestamp := time.Now()
for _, price := range prices {
price.Timestamp = timestamp
}
err = m.db.SaveTokenPricesHistory(prices)
if err != nil {
return fmt.Errorf("error while saving token prices history: %s", err)
}
return nil
}