From 1d6525cad45069d71c2ac58056fabb1f7673ee3c Mon Sep 17 00:00:00 2001 From: Hai Le <34814121+Haiss2@users.noreply.github.com> Date: Fri, 21 Apr 2023 11:38:20 +0700 Subject: [PATCH] TT-493 Tradelogs-backfill-main-program-server-api (#10) * (TT-493) Tradelogs-backfill-main-program-server-api * Small change --- cmd/tradelogs/main.go | 30 +++++++- go.mod | 2 +- internal/app/server.go | 8 ++ internal/bigquery/bigquery.go | 19 +++-- internal/bigquery/types.go | 23 ++++-- internal/server/backfill/server.go | 90 +++++++++++++++++++++++ internal/server/{ => tradelogs}/server.go | 0 7 files changed, 154 insertions(+), 18 deletions(-) create mode 100644 internal/server/backfill/server.go rename internal/server/{ => tradelogs}/server.go (100%) diff --git a/cmd/tradelogs/main.go b/cmd/tradelogs/main.go index 68305e9..dd74520 100644 --- a/cmd/tradelogs/main.go +++ b/cmd/tradelogs/main.go @@ -7,6 +7,7 @@ import ( "os" libapp "github.com/KyberNetwork/tradelogs/internal/app" + "github.com/KyberNetwork/tradelogs/internal/bigquery" "github.com/KyberNetwork/tradelogs/internal/dbutil" "github.com/KyberNetwork/tradelogs/internal/evmlistenerclient" "github.com/KyberNetwork/tradelogs/internal/parser/kyberswap" @@ -14,7 +15,8 @@ import ( "github.com/KyberNetwork/tradelogs/internal/parser/tokenlon" "github.com/KyberNetwork/tradelogs/internal/parser/zxotc" "github.com/KyberNetwork/tradelogs/internal/parser/zxrfq" - "github.com/KyberNetwork/tradelogs/internal/server" + backfill "github.com/KyberNetwork/tradelogs/internal/server/backfill" + tradelogs "github.com/KyberNetwork/tradelogs/internal/server/tradelogs" "github.com/KyberNetwork/tradelogs/internal/storage" "github.com/KyberNetwork/tradelogs/internal/worker" "github.com/go-redis/redis/v8" @@ -31,6 +33,7 @@ func main() { app.Flags = append(app.Flags, libapp.RedisFlags()...) app.Flags = append(app.Flags, libapp.EvmListenerFlags()...) app.Flags = append(app.Flags, libapp.HTTPServerFlags()...) + app.Flags = append(app.Flags, libapp.BigqueryFlags()...) if err := app.Run(os.Args); err != nil { log.Panic(err) @@ -76,9 +79,30 @@ func run(c *cli.Context) error { return err } - http := server.New(l, s, c.String(libapp.HTTPServerFlag.Name)) + backfillWorker, err := bigquery.NewWorker( + libapp.BigqueryProjectIDFFromCli(c), + s, + kyberswap.MustNewParser(), + zxotc.MustNewParser(), + zxrfq.MustNewParser(), + tokenlon.MustNewParser(), + paraswap.MustNewParser(), + ) + if err != nil { + l.Errorw("Error while init backfillWorker") + return err + } + + httpBackfill := backfill.New(c.String(libapp.HTTPBackfillServerFlag.Name), backfillWorker) + go func() { + if err := httpBackfill.Run(); err != nil { + panic(err) + } + }() + + httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name)) go func() { - if err := http.Run(); err != nil { + if err := httpTradelogs.Run(); err != nil { panic(err) } }() diff --git a/go.mod b/go.mod index f53f65c..d02f93c 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/urfave/cli v1.22.5 go.uber.org/zap v1.24.0 + google.golang.org/api v0.114.0 ) require ( @@ -61,7 +62,6 @@ require ( golang.org/x/text v0.8.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/api v0.114.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea // indirect google.golang.org/grpc v1.54.0 // indirect diff --git a/internal/app/server.go b/internal/app/server.go index 67b3608..d41669d 100644 --- a/internal/app/server.go +++ b/internal/app/server.go @@ -11,8 +11,16 @@ var HTTPServerFlag = cli.StringFlag{ Value: "localhost:8080", } +var HTTPBackfillServerFlag = cli.StringFlag{ + Name: "backfill-server-address", + Usage: "Run the rest for backfill server", + EnvVar: "BACKFILL_SERVER_ADDRESS", + Value: "localhost:8081", +} + func HTTPServerFlags() []cli.Flag { return []cli.Flag{ HTTPServerFlag, + HTTPBackfillServerFlag, } } diff --git a/internal/bigquery/bigquery.go b/internal/bigquery/bigquery.go index 49ead3e..cc9e30b 100644 --- a/internal/bigquery/bigquery.go +++ b/internal/bigquery/bigquery.go @@ -83,7 +83,7 @@ func (w *Worker) queryDateData( AND block_timestamp >= TIMESTAMP_SECONDS(@minTime) AND block_timestamp <= TIMESTAMP_SECONDS(@maxTime) AND ARRAY_LENGTH(topics) > 0 - AND topics[OFFSET(0)] IN @topics + AND topics[OFFSET(0)] IN UNNEST(@topics) ORDER BY block_timestamp DESC LIMIT @limit @@ -157,6 +157,11 @@ func (w *Worker) parseLog(row BQLog) (storage.TradeLog, error) { return ps.Parse(ehtLog, uint64(row.BlockTimestamp.Unix())) } +func endOfDay(t time.Time) time.Time { + y, m, d := t.Date() + return time.Date(y, m, d, 23, 59, 59, 1e9-1, t.Location()) +} + func (w *Worker) run(minTime, maxTime time.Time) { l := w.l.With("minTime", minTime, "maxTime", maxTime) l.Info("Start running worker") @@ -166,7 +171,7 @@ func (w *Worker) run(minTime, maxTime time.Time) { w.state = stateStopped }() - temp := maxTime + temp := endOfDay(maxTime) offset := int64(0) minTs, maxTs := minTime.Unix(), maxTime.Unix() for temp.After(minTime) { @@ -187,7 +192,7 @@ func (w *Worker) run(minTime, maxTime time.Time) { "err", err, "temp", temp, "offset", offset) return } - l.Infow("Successfully get logsFromRowIterator", "temp", temp, "count", count, "tradelogs", tradelogs) + l.Infow("Successfully get logsFromRowIterator", "temp", temp, "count", count) err = w.storage.Insert(tradelogs) if err != nil { @@ -212,7 +217,7 @@ func (w *Worker) BackFillAllData() error { return ErrWorkerRunning } - go w.run(minBlockTime, time.Now()) + go w.run(minBlockTime, time.Now().UTC()) return nil } @@ -223,14 +228,14 @@ func (w *Worker) BackFillPartialData(fromTime, toTime int64) error { } // minTime = max(fromTime, minBlockTime) - minTime := time.Unix(fromTime, 0) + minTime := time.Unix(fromTime, 0).UTC() if minTime.Before(minBlockTime) { minTime = minBlockTime } // maxTime = min(toTime, now) - now := time.Now() - maxTime := time.Unix(toTime, 0) + now := time.Now().UTC() + maxTime := time.Unix(toTime, 0).UTC() if maxTime.After(now) { maxTime = now } diff --git a/internal/bigquery/types.go b/internal/bigquery/types.go index 662be1a..e5a86e7 100644 --- a/internal/bigquery/types.go +++ b/internal/bigquery/types.go @@ -1,6 +1,7 @@ package bigquery import ( + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -8,17 +9,25 @@ import ( ) type BQLog struct { - Index uint `bigquery:"log_index"` + Index int64 `bigquery:"log_index"` TxHash string `bigquery:"transaction_hash"` - TxIndex uint `bigquery:"transaction_index"` + TxIndex int64 `bigquery:"transaction_index"` Address string `bigquery:"address"` Data string `bigquery:"data"` Topics []string `bigquery:"topics"` BlockTimestamp time.Time `bigquery:"block_timestamp"` - BlockNumber uint64 `bigquery:"block_number"` + BlockNumber int64 `bigquery:"block_number"` BlockHash string `bigquery:"block_hash"` } +func hex2Bytes(raw string) []byte { + if strings.HasPrefix(raw, "0x") { + return common.Hex2Bytes(raw[2:]) + } + + return common.Hex2Bytes(raw) +} + func (log BQLog) ToEthLog() types.Log { topics := make([]common.Hash, len(log.Topics)) for id, topic := range log.Topics { @@ -28,11 +37,11 @@ func (log BQLog) ToEthLog() types.Log { return types.Log{ Address: common.HexToAddress(log.Address), Topics: topics, - Data: common.Hex2Bytes(log.Data), - BlockNumber: log.BlockNumber, + Data: hex2Bytes(log.Data), + BlockNumber: uint64(log.BlockNumber), TxHash: common.HexToHash(log.TxHash), - TxIndex: log.TxIndex, + TxIndex: uint(log.TxIndex), BlockHash: common.HexToHash(log.BlockHash), - Index: log.Index, + Index: uint(log.Index), } } diff --git a/internal/server/backfill/server.go b/internal/server/backfill/server.go new file mode 100644 index 0000000..5be3b90 --- /dev/null +++ b/internal/server/backfill/server.go @@ -0,0 +1,90 @@ +package server + +import ( + "fmt" + "net/http" + "time" + + "github.com/KyberNetwork/tradelogs/internal/bigquery" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +// Server to serve the service. +type Server struct { + l *zap.SugaredLogger + r *gin.Engine + bq *bigquery.Worker + bindAddr string +} + +// New returns a new server. +func New(bindAddr string, bq *bigquery.Worker) *Server { + engine := gin.New() + engine.Use(gin.Recovery()) + + server := &Server{ + l: zap.S(), + r: engine, + bq: bq, + bindAddr: bindAddr, + } + + gin.SetMode(gin.ReleaseMode) + server.register() + + return server +} + +// Run runs server. +func (s *Server) Run() error { + if err := s.r.Run(s.bindAddr); err != nil { + return fmt.Errorf("run server: %w", err) + } + + return nil +} + +func (s *Server) register() { + pprof.Register(s.r, "/debug") + s.r.POST("/backfill", s.backfill) + +} + +func responseErr(c *gin.Context, err error) { + c.JSON(http.StatusBadRequest, gin.H{ + "success": false, + "error": err.Error(), + }) +} + +func (s *Server) backfill(c *gin.Context) { + var ( + query struct { + FromTime int64 `form:"from_time" json:"from_time,omitempty"` // milliseconds + ToTime int64 `form:"to_time" json:"to_time,omitempty"` // milliseconds + } + err = c.ShouldBind(&query) + ) + if err != nil { + responseErr(c, err) + return + } + + s.l.Infow("Request backfill", "query", query) + if query.FromTime == 0 || query.ToTime == 0 { + err = s.bq.BackFillAllData() + } else { + err = s.bq.BackFillPartialData(query.FromTime/1000, query.ToTime/1000) + } + if err != nil { + responseErr(c, err) + return + } + + c.JSON(http.StatusOK, gin.H{ + "success": true, + "time": time.Now().UnixMilli(), + }) +} diff --git a/internal/server/server.go b/internal/server/tradelogs/server.go similarity index 100% rename from internal/server/server.go rename to internal/server/tradelogs/server.go