Skip to content

Commit

Permalink
Merge pull request #1466 from c9s/c9s/fix-xdepthmaker
Browse files Browse the repository at this point in the history
FIX: [xdepthmaker] remove shared trade collector and fix hedge
  • Loading branch information
c9s committed Dec 20, 2023
2 parents a4f996c + 3ba1621 commit 26f7d86
Showing 1 changed file with 23 additions and 41 deletions.
64 changes: 23 additions & 41 deletions pkg/strategy/xdepthmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"golang.org/x/time/rate"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
Expand Down Expand Up @@ -48,14 +47,9 @@ type CrossExchangeMarketMakingStrategy struct {
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
mu sync.Mutex

MakerOrderExecutor, HedgeOrderExecutor *bbgo.GeneralOrderExecutor

// orderStore is a shared order store between the maker session and the hedge session
orderStore *core.OrderStore

// tradeCollector is a shared trade collector between the maker session and the hedge session
tradeCollector *core.TradeCollector
}

func (s *CrossExchangeMarketMakingStrategy) Initialize(
Expand Down Expand Up @@ -129,14 +123,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// bbgo.Sync(ctx, s)
})

// global order store
s.orderStore = core.NewOrderStore(s.Position.Symbol)
s.orderStore.BindStream(hedgeSession.UserDataStream)
s.orderStore.BindStream(makerSession.UserDataStream)

// global trade collector
s.tradeCollector = core.NewTradeCollector(symbol, s.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
s.HedgeOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
c := trade.PositionChange()

// sync covered position
Expand All @@ -146,13 +133,12 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// buy trade -> positive delta ->
// 1) short position -> reduce short position
// 2) short position -> increase short position
if trade.Exchange == s.hedgeSession.ExchangeName {
// TODO: make this atomic
s.CoveredPosition = s.CoveredPosition.Add(c)
}

// TODO: make this atomic
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(c)
s.mu.Unlock()
})
s.tradeCollector.BindStream(s.hedgeSession.UserDataStream)
s.tradeCollector.BindStream(s.makerSession.UserDataStream)
return nil
}

Expand Down Expand Up @@ -229,7 +215,7 @@ func (s *Strategy) ID() string {
}

func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s", ID, s.Symbol)
return fmt.Sprintf("%s:%s:%s-%s", ID, s.Symbol, s.MakerExchange, s.HedgeExchange)
}

func (s *Strategy) Initialize() error {
Expand Down Expand Up @@ -339,11 +325,7 @@ func (s *Strategy) CrossRun(
s.stopC = make(chan struct{})

if s.RecoverTrade {
s.tradeCollector.OnRecover(func(trade types.Trade) {
bbgo.Notify("Recovered trade", trade)
})

go s.runTradeRecover(ctx)
// go s.runTradeRecover(ctx)
}

s.authedC = make(chan struct{}, 2)
Expand Down Expand Up @@ -373,7 +355,7 @@ func (s *Strategy) CrossRun(
defer fullReplenishTicker.Stop()

// clean up the previous open orders
if err := s.cleanUpOpenOrders(ctx); err != nil {
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Errorf("error cleaning up open orders")
}

Expand Down Expand Up @@ -426,10 +408,10 @@ func (s *Strategy) CrossRun(
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process()
s.HedgeOrderExecutor.TradeCollector().Process()
s.MakerOrderExecutor.TradeCollector().Process()

position := s.Position.GetBase()

uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs()
if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 {
Expand Down Expand Up @@ -550,7 +532,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)

createdOrders, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
_, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Market: s.hedgeMarket,
Symbol: s.Symbol,
Type: types.OrderTypeMarket,
Expand All @@ -564,14 +546,16 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
return
}

s.orderStore.Add(createdOrders...)

// if the hedge is on sell side, then we should add positive position
switch side {
case types.SideTypeSell:
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(quantity)
s.mu.Unlock()
case types.SideTypeBuy:
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
s.mu.Unlock()
}
}

Expand All @@ -597,11 +581,11 @@ func (s *Strategy) runTradeRecover(ctx context.Context) {
if s.RecoverTrade {
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)

if err := s.tradeCollector.Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}

if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
}
Expand Down Expand Up @@ -853,17 +837,15 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
return
}

createdOrders, err := s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
_, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil {
log.WithError(err).Errorf("order error: %s", err.Error())
return
}

s.orderStore.Add(createdOrders...)
}

func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.makerSession.Exchange, s.Symbol)
func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.ExchangeSession) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
return err
}
Expand All @@ -875,7 +857,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error {
log.Infof("found existing open orders:")
types.OrderSlice(openOrders).Print()

if err := s.makerSession.Exchange.CancelOrders(ctx, openOrders...); err != nil {
if err := session.Exchange.CancelOrders(ctx, openOrders...); err != nil {
return err
}

Expand Down

0 comments on commit 26f7d86

Please sign in to comment.