Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: [xdepthmaker] add profit fixer #1559

Merged
merged 5 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 84 additions & 0 deletions pkg/strategy/xdepthmaker/profitfixer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package xdepthmaker

import (
"context"
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)

type ProfitFixerConfig struct {
TradesSince types.Time `json:"tradesSince,omitempty"`
}

// ProfitFixer implements a trade history based profit fixer
type ProfitFixer struct {
market types.Market

sessions map[string]types.ExchangeTradeHistoryService
}

func NewProfitFixer(market types.Market) *ProfitFixer {
return &ProfitFixer{
market: market,
sessions: make(map[string]types.ExchangeTradeHistoryService),
}
}

func (f *ProfitFixer) AddExchange(sessionName string, service types.ExchangeTradeHistoryService) {
f.sessions[sessionName] = service
}

func (f *ProfitFixer) batchQueryTrades(
ctx context.Context,
service types.ExchangeTradeHistoryService,
symbol string,
since, until time.Time,
) ([]types.Trade, error) {
q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service}
return q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &since,
EndTime: &until,
})
}

func (f *ProfitFixer) Fix(ctx context.Context, since, until time.Time, stats *types.ProfitStats, position *types.Position) error {
var mu sync.Mutex
var allTrades = make([]types.Trade, 0, 1000)

g, subCtx := errgroup.WithContext(ctx)
for n, s := range f.sessions {
// allocate a copy of the iteration variables
sessionName := n
service := s
g.Go(func() error {
log.Infof("batch querying %s trade history from %s since %s until %s", f.market.Symbol, sessionName, since.String(), until.String())
trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since, until)
if err != nil {
log.WithError(err).Errorf("unable to batch query trades for fixer")
c9s marked this conversation as resolved.
Show resolved Hide resolved
return err
}

mu.Lock()
allTrades = append(allTrades, trades...)
mu.Unlock()
return nil
})
}

if err := g.Wait(); err != nil {
return err
}

allTrades = types.SortTradesAscending(allTrades)
for _, trade := range allTrades {
stats.AddTrade(trade)
position.AddTrade(trade)
}

return nil
}
43 changes: 30 additions & 13 deletions pkg/strategy/xdepthmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ type Strategy struct {
// Pips is the pips of the layer prices
Pips fixedpoint.Value `json:"pips"`

ProfitFixerConfig *ProfitFixerConfig `json:"profitFixer"`

// --------------------------------
// private fields
// --------------------------------
Expand Down Expand Up @@ -324,14 +326,31 @@ func (s *Strategy) CrossRun(

s.stopC = make(chan struct{})

if s.RecoverTrade {
// go s.runTradeRecover(ctx)
}

s.authedC = make(chan struct{}, 2)
s.authedC = make(chan struct{}, 5)
bindAuthSignal(ctx, s.makerSession.UserDataStream, s.authedC)
bindAuthSignal(ctx, s.hedgeSession.UserDataStream, s.authedC)

if s.ProfitFixerConfig != nil {
if s.ProfitFixerConfig.TradesSince.Time().IsZero() {
return errors.New("tradesSince time can not be zero")
}

fixer := NewProfitFixer(s.makerMarket)
fixer.AddExchange(s.makerSession.Name, s.makerSession.Exchange.(types.ExchangeTradeHistoryService))
fixer.AddExchange(s.hedgeSession.Name, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService))

s.CrossExchangeMarketMakingStrategy.Position = types.NewPositionFromMarket(s.makerMarket)
s.CrossExchangeMarketMakingStrategy.ProfitStats = types.NewProfitStats(s.makerMarket)

if err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time(), time.Now(), s.CrossExchangeMarketMakingStrategy.ProfitStats, s.CrossExchangeMarketMakingStrategy.Position); err2 != nil {
return err2
}
}

if s.RecoverTrade {
go s.runTradeRecover(ctx)
}

go func() {
log.Infof("waiting for user data stream to get authenticated")
select {
Expand Down Expand Up @@ -578,16 +597,14 @@ func (s *Strategy) runTradeRecover(ctx context.Context) {
case <-tradeScanTicker.C:
log.Infof("scanning trades from %s ago...", tradeScanInterval)

if s.RecoverTrade {
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)

if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}

if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/types/profit.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func (s *ProfitStats) AddTrade(trade Trade) {

// IsOver24Hours checks if the since time is over 24 hours
func (s *ProfitStats) IsOver24Hours() bool {
if s.TodaySince == 0 {
return false
}

return time.Since(time.Unix(s.TodaySince, 0)) >= 24*time.Hour
}

Expand Down