Skip to content

Commit

Permalink
Merge pull request #1586 from c9s/kbearXD/dca2/round-collector
Browse files Browse the repository at this point in the history
dca2: new struct RoundCollector for testing and use flag to decide re…
  • Loading branch information
kbearXD committed Mar 18, 2024
2 parents 4eabb82 + a23c476 commit 3f44092
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 105 deletions.
23 changes: 13 additions & 10 deletions pkg/strategy/dca2/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,25 @@ func (s *Strategy) recover(ctx context.Context) error {
}
debugRoundOrders(s.logger, "current", currentRound)

// TODO: use flag
// recover profit stats
/*
if s.DisableProfitStatsRecover {
s.logger.Info("disableProfitStatsRecover is set, skip profit stats recovery")
} else {
if err := recoverProfitStats(ctx, s); err != nil {
return err
}
s.logger.Info("recover profit stats DONE")
*/
}

// recover position
if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil {
return err
if s.DisablePositionRecover {
s.logger.Info("disablePositionRecover is set, skip position recovery")
} else {
if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil {
return err
}
s.logger.Info("recover position DONE")
}
s.logger.Info("recover position DONE")

// recover startTimeOfNextRound
startTimeOfNextRound := recoverStartTimeOfNextRound(ctx, currentRound, s.CoolDownInterval)
Expand Down Expand Up @@ -206,16 +211,14 @@ func recoverPosition(ctx context.Context, position *types.Position, queryService
return nil
}

// TODO: use flag to decide which to recover
/*
func recoverProfitStats(ctx context.Context, strategy *Strategy) error {
if strategy.ProfitStats == nil {
return fmt.Errorf("profit stats is nil, please check it")
}

return strategy.CalculateAndEmitProfit(ctx)
_, err := strategy.UpdateProfitStats(ctx)
return err
}
*/

func recoverStartTimeOfNextRound(ctx context.Context, currentRound Round, coolDownInterval types.Duration) time.Time {
if currentRound.TakeProfitOrder.OrderID != 0 && currentRound.TakeProfitOrder.Status == types.OrderStatusFilled {
Expand Down
125 changes: 125 additions & 0 deletions pkg/strategy/dca2/round_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package dca2

import (
"context"
"strconv"
"time"

"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
)

type RoundCollector struct {
logger *logrus.Entry
symbol string
groupID uint32
isMax bool

// service
historyService types.ExchangeTradeHistoryService
queryService types.ExchangeOrderQueryService
}

func NewRoundCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.Exchange) *RoundCollector {
isMax := exchange.IsMaxExchange(ex)
historyService, ok := ex.(types.ExchangeTradeHistoryService)
if !ok {
logger.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", ex.Name())
return nil
}

queryService, ok := ex.(types.ExchangeOrderQueryService)
if !ok {
logger.Errorf("exchange %s doesn't support ExchangeOrderQueryService", ex.Name())
return nil
}

return &RoundCollector{
logger: logger,
symbol: symbol,
groupID: groupID,
isMax: isMax,
historyService: historyService,
queryService: queryService,
}
}

func (rc *RoundCollector) CollectFinishRounds(ctx context.Context, fromOrderID uint64) ([]Round, error) {
// TODO: pagination for it
// query the orders
rc.logger.Infof("query %s closed orders from order id #%d", rc.symbol, fromOrderID)
orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, rc.historyService, rc.symbol, time.Time{}, time.Time{}, fromOrderID)
if err != nil {
return nil, err
}
rc.logger.Infof("there are %d closed orders from order id #%d", len(orders), fromOrderID)

var rounds []Round
var round Round
for _, order := range orders {
// skip not this strategy order
if order.GroupID != rc.groupID {
continue
}

switch order.Side {
case types.SideTypeBuy:
round.OpenPositionOrders = append(round.OpenPositionOrders, order)
case types.SideTypeSell:
if !rc.isMax {
if order.Status != types.OrderStatusFilled {
rc.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status)
continue
}
} else {
if !maxapi.IsFilledOrderState(maxapi.OrderState(order.OriginalStatus)) {
rc.logger.Infof("isMax and take-profit order is %s not done or finalizing, so this round is not finished. Skip it", order.OriginalStatus)
continue
}
}

round.TakeProfitOrder = order
rounds = append(rounds, round)
round = Round{}
default:
rc.logger.Errorf("there is order with unsupported side")
}
}

return rounds, nil
}

func (rc *RoundCollector) CollectRoundTrades(ctx context.Context, round Round) ([]types.Trade, error) {
debugRoundOrders(rc.logger, "collect round trades", round)

var roundTrades []types.Trade

var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)

for _, order := range roundOrders {
rc.logger.Infof("collect trades from order: %s", order.String())
if order.ExecutedQuantity.Sign() == 0 {
rc.logger.Info("collect trads from order but no executed quantity ", order.String())
continue
} else {
rc.logger.Info("collect trades from order ", order.String())
}

trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, rc.queryService, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})

if err != nil {
return nil, err
}

roundTrades = append(roundTrades, trades...)
}

return roundTrades, nil
}
6 changes: 2 additions & 4 deletions pkg/strategy/dca2/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,8 @@ func (s *Strategy) runTakeProfitReady(ctx context.Context, next State) {

s.logger.Info("[State] TakeProfitReady - start reseting position and calculate quote investment for next round")

// reset position

// calculate profit stats
if err := s.CalculateAndEmitProfitUntilSuccessful(ctx); err != nil {
// update profit stats
if err := s.UpdateProfitStatsUntilSuccessful(ctx); err != nil {
s.logger.WithError(err).Warn("failed to calculate and emit profit")
}

Expand Down
136 changes: 45 additions & 91 deletions pkg/strategy/dca2/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import (
"context"
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/strategy/common"
Expand Down Expand Up @@ -67,7 +65,9 @@ type Strategy struct {
OrderGroupID uint32 `json:"orderGroupID"`

// RecoverWhenStart option is used for recovering dca states
RecoverWhenStart bool `json:"recoverWhenStart"`
RecoverWhenStart bool `json:"recoverWhenStart"`
DisableProfitStatsRecover bool `json:"disableProfitStatsRecover"`
DisablePositionRecover bool `json:"disablePositionRecover"`

// KeepOrdersWhenShutdown option is used for keeping the grid orders when shutting down bbgo
KeepOrdersWhenShutdown bool `json:"keepOrdersWhenShutdown"`
Expand All @@ -91,6 +91,7 @@ type Strategy struct {
startTimeOfNextRound time.Time
nextStateC chan State
state State
roundCollector *RoundCollector

// callbacks
common.StatusCallbacks
Expand Down Expand Up @@ -171,6 +172,12 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.Position = types.NewPositionFromMarket(s.Market)
}

// round collector
s.roundCollector = NewRoundCollector(s.logger, s.Symbol, s.OrderGroupID, s.ExchangeSession.Exchange)
if s.roundCollector == nil {
return fmt.Errorf("failed to initialize round collector")
}

// prometheus
if s.PrometheusLabels != nil {
initMetrics(labelKeys(s.PrometheusLabels))
Expand Down Expand Up @@ -373,122 +380,69 @@ func (s *Strategy) CleanUp(ctx context.Context) error {
return werr
}

func (s *Strategy) CalculateAndEmitProfitUntilSuccessful(ctx context.Context) error {
fromOrderID := s.ProfitStats.FromOrderID

historyService, ok := s.ExchangeSession.Exchange.(types.ExchangeTradeHistoryService)
if !ok {
return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.ExchangeSession.Exchange.Name())
}

queryService, ok := s.ExchangeSession.Exchange.(types.ExchangeOrderQueryService)
if !ok {
return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.ExchangeSession.Exchange.Name())
}

func (s *Strategy) UpdateProfitStatsUntilSuccessful(ctx context.Context) error {
var op = func() error {
if err := s.CalculateAndEmitProfit(ctx, historyService, queryService); err != nil {
return errors.Wrapf(err, "failed to calculate and emit profit, please check it")
}

if s.ProfitStats.FromOrderID == fromOrderID {
return fmt.Errorf("FromOrderID (%d) is not updated, retry it", s.ProfitStats.FromOrderID)
if updated, err := s.UpdateProfitStats(ctx); err != nil {
return errors.Wrapf(err, "failed to update profit stats, please check it")
} else if !updated {
return fmt.Errorf("there is no round to update profit stats, please check it")
}

return nil
}

return retry.GeneralBackoff(ctx, op)
// exponential increased interval retry until success
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 5 * time.Second
bo.MaxInterval = 20 * time.Minute
bo.MaxElapsedTime = 0

return backoff.Retry(op, backoff.WithContext(bo, ctx))
}

func (s *Strategy) CalculateAndEmitProfit(ctx context.Context, historyService types.ExchangeTradeHistoryService, queryService types.ExchangeOrderQueryService) error {
// TODO: pagination for it
// query the orders
s.logger.Infof("query %s closed orders from order id #%d", s.Symbol, s.ProfitStats.FromOrderID)
orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, historyService, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
// UpdateProfitStats will collect round from closed orders and emit update profit stats
// return true, nil -> there is at least one finished round and all the finished rounds we collect update profit stats successfully
// return false, nil -> there is no finished round!
// return true, error -> At least one round update profit stats successfully but there is error when collecting other rounds
func (s *Strategy) UpdateProfitStats(ctx context.Context) (bool, error) {
rounds, err := s.roundCollector.CollectFinishRounds(ctx, s.ProfitStats.FromOrderID)
if err != nil {
return err
return false, errors.Wrapf(err, "failed to collect finish rounds from #%d", s.ProfitStats.FromOrderID)
}
s.logger.Infof("there are %d closed orders from order id #%d", len(orders), s.ProfitStats.FromOrderID)

isMax := exchange.IsMaxExchange(s.ExchangeSession.Exchange)

var rounds []Round
var round Round
for _, order := range orders {
// skip not this strategy order
if order.GroupID != s.OrderGroupID {
continue
}

switch order.Side {
case types.SideTypeBuy:
round.OpenPositionOrders = append(round.OpenPositionOrders, order)
case types.SideTypeSell:
if !isMax {
if order.Status != types.OrderStatusFilled {
s.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status)
continue
}
} else {
if !maxapi.IsFilledOrderState(maxapi.OrderState(order.OriginalStatus)) {
s.logger.Infof("isMax and take-profit order is %s not done or finalizing, so this round is not finished. Skip it", order.OriginalStatus)
continue
}
}

round.TakeProfitOrder = order
rounds = append(rounds, round)
round = Round{}
default:
s.logger.Errorf("there is order with unsupported side")
}
}

s.logger.Infof("there are %d rounds from order id #%d", len(rounds), s.ProfitStats.FromOrderID)
var updated bool = false
for _, round := range rounds {
debugRoundOrders(s.logger, strconv.FormatInt(s.ProfitStats.Round, 10), round)
var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)
for _, order := range roundOrders {
s.logger.Infof("calculate profit stats from order: %s", order.String())

// skip no trade orders
if order.ExecutedQuantity.Sign() == 0 {
continue
}

trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, queryService, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})

if err != nil {
return err
}
trades, err := s.roundCollector.CollectRoundTrades(ctx, round)
if err != nil {
return updated, errors.Wrapf(err, "failed to collect the trades of round")
}

for _, trade := range trades {
s.logger.Infof("calculate profit stats from trade: %s", trade.String())
s.ProfitStats.AddTrade(trade)
}
for _, trade := range trades {
s.logger.Infof("update profit stats from trade: %s", trade.String())
s.ProfitStats.AddTrade(trade)
}

// update profit stats FromOrderID to make sure we will not collect duplicated rounds
s.ProfitStats.FromOrderID = round.TakeProfitOrder.OrderID + 1

// update quote investment
s.ProfitStats.QuoteInvestment = s.ProfitStats.QuoteInvestment.Add(s.ProfitStats.CurrentRoundProfit)

// store into persistence
// sync to persistence
bbgo.Sync(ctx, s)
updated = true

s.logger.Infof("profit stats:\n%s", s.ProfitStats.String())

// emit profit
s.EmitProfit(s.ProfitStats)
updateProfitMetrics(s.ProfitStats.Round, s.ProfitStats.CurrentRoundProfit.Float64())

// make profit stats forward to new round
s.ProfitStats.NewRound()
}

return nil
return updated, nil
}

func (s *Strategy) updateNumOfOrdersMetrics(ctx context.Context) {
Expand Down

0 comments on commit 3f44092

Please sign in to comment.