Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: [xdepthmaker] remove shared trade collector and fix hedge #1466

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 23 additions & 41 deletions pkg/strategy/xdepthmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"golang.org/x/time/rate"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
Expand Down Expand Up @@ -48,14 +47,9 @@ type CrossExchangeMarketMakingStrategy struct {
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
mu sync.Mutex

MakerOrderExecutor, HedgeOrderExecutor *bbgo.GeneralOrderExecutor

// orderStore is a shared order store between the maker session and the hedge session
orderStore *core.OrderStore

// tradeCollector is a shared trade collector between the maker session and the hedge session
tradeCollector *core.TradeCollector
}

func (s *CrossExchangeMarketMakingStrategy) Initialize(
Expand Down Expand Up @@ -129,14 +123,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// bbgo.Sync(ctx, s)
})

// global order store
s.orderStore = core.NewOrderStore(s.Position.Symbol)
s.orderStore.BindStream(hedgeSession.UserDataStream)
s.orderStore.BindStream(makerSession.UserDataStream)

// global trade collector
s.tradeCollector = core.NewTradeCollector(symbol, s.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
s.HedgeOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
c := trade.PositionChange()

// sync covered position
Expand All @@ -146,13 +133,12 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// buy trade -> positive delta ->
// 1) short position -> reduce short position
// 2) short position -> increase short position
if trade.Exchange == s.hedgeSession.ExchangeName {
// TODO: make this atomic
s.CoveredPosition = s.CoveredPosition.Add(c)
}

// TODO: make this atomic
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(c)
s.mu.Unlock()
})
s.tradeCollector.BindStream(s.hedgeSession.UserDataStream)
s.tradeCollector.BindStream(s.makerSession.UserDataStream)
return nil
}

Expand Down Expand Up @@ -229,7 +215,7 @@ func (s *Strategy) ID() string {
}

func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s", ID, s.Symbol)
return fmt.Sprintf("%s:%s:%s-%s", ID, s.Symbol, s.MakerExchange, s.HedgeExchange)
}

func (s *Strategy) Initialize() error {
Expand Down Expand Up @@ -339,11 +325,7 @@ func (s *Strategy) CrossRun(
s.stopC = make(chan struct{})

if s.RecoverTrade {
s.tradeCollector.OnRecover(func(trade types.Trade) {
bbgo.Notify("Recovered trade", trade)
})

go s.runTradeRecover(ctx)
// go s.runTradeRecover(ctx)
}

s.authedC = make(chan struct{}, 2)
Expand Down Expand Up @@ -373,7 +355,7 @@ func (s *Strategy) CrossRun(
defer fullReplenishTicker.Stop()

// clean up the previous open orders
if err := s.cleanUpOpenOrders(ctx); err != nil {
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Errorf("error cleaning up open orders")
}

Expand Down Expand Up @@ -426,10 +408,10 @@ func (s *Strategy) CrossRun(
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process()
s.HedgeOrderExecutor.TradeCollector().Process()
s.MakerOrderExecutor.TradeCollector().Process()

position := s.Position.GetBase()

uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs()
if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 {
Expand Down Expand Up @@ -550,7 +532,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)

createdOrders, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
_, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Market: s.hedgeMarket,
Symbol: s.Symbol,
Type: types.OrderTypeMarket,
Expand All @@ -564,14 +546,16 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
return
}

s.orderStore.Add(createdOrders...)

// if the hedge is on sell side, then we should add positive position
switch side {
case types.SideTypeSell:
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(quantity)
s.mu.Unlock()
case types.SideTypeBuy:
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
s.mu.Unlock()
}
}

Expand All @@ -597,11 +581,11 @@ func (s *Strategy) runTradeRecover(ctx context.Context) {
if s.RecoverTrade {
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)

if err := s.tradeCollector.Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}

if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
}
Expand Down Expand Up @@ -853,17 +837,15 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
return
}

createdOrders, err := s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
_, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil {
log.WithError(err).Errorf("order error: %s", err.Error())
return
}

s.orderStore.Add(createdOrders...)
}

func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.makerSession.Exchange, s.Symbol)
func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.ExchangeSession) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
return err
}
Expand All @@ -875,7 +857,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error {
log.Infof("found existing open orders:")
types.OrderSlice(openOrders).Print()

if err := s.makerSession.Exchange.CancelOrders(ctx, openOrders...); err != nil {
if err := session.Exchange.CancelOrders(ctx, openOrders...); err != nil {
return err
}

Expand Down
Loading