diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 912a44d802..26c24ebea1 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -12,7 +12,6 @@ import ( "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" @@ -48,14 +47,9 @@ type CrossExchangeMarketMakingStrategy struct { Position *types.Position `json:"position,omitempty" persistence:"position"` ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"` CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"` + mu sync.Mutex MakerOrderExecutor, HedgeOrderExecutor *bbgo.GeneralOrderExecutor - - // orderStore is a shared order store between the maker session and the hedge session - orderStore *core.OrderStore - - // tradeCollector is a shared trade collector between the maker session and the hedge session - tradeCollector *core.TradeCollector } func (s *CrossExchangeMarketMakingStrategy) Initialize( @@ -129,14 +123,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( // bbgo.Sync(ctx, s) }) - // global order store - s.orderStore = core.NewOrderStore(s.Position.Symbol) - s.orderStore.BindStream(hedgeSession.UserDataStream) - s.orderStore.BindStream(makerSession.UserDataStream) - - // global trade collector - s.tradeCollector = core.NewTradeCollector(symbol, s.Position, s.orderStore) - s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { + s.HedgeOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { c := trade.PositionChange() // sync covered position @@ -146,13 +133,12 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( // buy trade -> positive delta -> // 1) short position -> reduce short position // 2) short position -> increase short position - if trade.Exchange == s.hedgeSession.ExchangeName { - // TODO: make this atomic - s.CoveredPosition = s.CoveredPosition.Add(c) - } + + // TODO: make this atomic + s.mu.Lock() + s.CoveredPosition = s.CoveredPosition.Add(c) + s.mu.Unlock() }) - s.tradeCollector.BindStream(s.hedgeSession.UserDataStream) - s.tradeCollector.BindStream(s.makerSession.UserDataStream) return nil } @@ -229,7 +215,7 @@ func (s *Strategy) ID() string { } func (s *Strategy) InstanceID() string { - return fmt.Sprintf("%s:%s", ID, s.Symbol) + return fmt.Sprintf("%s:%s:%s-%s", ID, s.Symbol, s.MakerExchange, s.HedgeExchange) } func (s *Strategy) Initialize() error { @@ -339,11 +325,7 @@ func (s *Strategy) CrossRun( s.stopC = make(chan struct{}) if s.RecoverTrade { - s.tradeCollector.OnRecover(func(trade types.Trade) { - bbgo.Notify("Recovered trade", trade) - }) - - go s.runTradeRecover(ctx) + // go s.runTradeRecover(ctx) } s.authedC = make(chan struct{}, 2) @@ -373,7 +355,7 @@ func (s *Strategy) CrossRun( defer fullReplenishTicker.Stop() // clean up the previous open orders - if err := s.cleanUpOpenOrders(ctx); err != nil { + if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil { log.WithError(err).Errorf("error cleaning up open orders") } @@ -426,10 +408,10 @@ func (s *Strategy) CrossRun( // // For negative position: // uncover position = -5 - -3 (covered position) = -2 - s.tradeCollector.Process() + s.HedgeOrderExecutor.TradeCollector().Process() + s.MakerOrderExecutor.TradeCollector().Process() position := s.Position.GetBase() - uncoverPosition := position.Sub(s.CoveredPosition) absPos := uncoverPosition.Abs() if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 { @@ -550,7 +532,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) - createdOrders, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{ + _, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{ Market: s.hedgeMarket, Symbol: s.Symbol, Type: types.OrderTypeMarket, @@ -564,14 +546,16 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { return } - s.orderStore.Add(createdOrders...) - // if the hedge is on sell side, then we should add positive position switch side { case types.SideTypeSell: + s.mu.Lock() s.CoveredPosition = s.CoveredPosition.Add(quantity) + s.mu.Unlock() case types.SideTypeBuy: + s.mu.Lock() s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg()) + s.mu.Unlock() } } @@ -597,11 +581,11 @@ func (s *Strategy) runTradeRecover(ctx context.Context) { if s.RecoverTrade { startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod) - if err := s.tradeCollector.Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { log.WithError(err).Errorf("query trades error") } - if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { log.WithError(err).Errorf("query trades error") } } @@ -853,17 +837,15 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) { return } - createdOrders, err := s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...) + _, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...) if err != nil { log.WithError(err).Errorf("order error: %s", err.Error()) return } - - s.orderStore.Add(createdOrders...) } -func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error { - openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.makerSession.Exchange, s.Symbol) +func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.ExchangeSession) error { + openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol) if err != nil { return err } @@ -875,7 +857,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error { log.Infof("found existing open orders:") types.OrderSlice(openOrders).Print() - if err := s.makerSession.Exchange.CancelOrders(ctx, openOrders...); err != nil { + if err := session.Exchange.CancelOrders(ctx, openOrders...); err != nil { return err }