diff --git a/services/horizon/internal/ingest/fsm_history_range_state.go b/services/horizon/internal/ingest/fsm_history_range_state.go index 0aa65795a9..f1c2a238b7 100644 --- a/services/horizon/internal/ingest/fsm_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_history_range_state.go @@ -91,7 +91,7 @@ func (h historyRangeState) run(s *system) (transition, error) { ledgers = append(ledgers, ledgerCloseMeta) if len(ledgers) == cap(ledgers) { - if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, false); err != nil { return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } ledgers = ledgers[0:0] @@ -99,7 +99,7 @@ func (h historyRangeState) run(s *system) (transition, error) { } if len(ledgers) > 0 { - if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, false); err != nil { return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } } diff --git a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go index 832898d021..499d15871c 100644 --- a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go @@ -30,11 +30,7 @@ func (reingestHistoryRangeState) GetState() State { return ReingestHistoryRange } -func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error { - if s.historyQ.GetTx() == nil { - return errors.New("expected transaction to be present") - } - +func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32, execBatchInTx bool) error { if s.maxLedgerPerFlush < 1 { return errors.New("invalid maxLedgerPerFlush, must be greater than 0") } @@ -75,7 +71,7 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u ledgers = append(ledgers, ledgerCloseMeta) if len(ledgers)%int(s.maxLedgerPerFlush) == 0 { - if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, execBatchInTx); err != nil { return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } ledgers = ledgers[0:0] @@ -83,7 +79,7 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u } if len(ledgers) > 0 { - if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, execBatchInTx); err != nil { return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } } @@ -142,7 +138,7 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) { return stop(), errors.Wrap(err, getLastIngestedErrMsg) } - if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger); ingestErr != nil { + if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger, false); ingestErr != nil { if err := s.historyQ.Commit(); err != nil { return stop(), errors.Wrap(ingestErr, commitErrMsg) } @@ -169,18 +165,9 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) { } startTime = time.Now() - if err := s.historyQ.Begin(s.ctx); err != nil { - return stop(), errors.Wrap(err, "Error starting a transaction") - } - defer s.historyQ.Rollback() - - if e := h.ingestRange(s, h.fromLedger, h.toLedger); e != nil { + if e := h.ingestRange(s, h.fromLedger, h.toLedger, true); e != nil { return stop(), e } - - if e := s.historyQ.Commit(); e != nil { - return stop(), errors.Wrap(e, commitErrMsg) - } } log.WithFields(logpkg.F{ diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index fd0843cc28..8b5d2d337e 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -56,12 +56,49 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error { return nil } +type groupLoaders struct { + lazyLoaders []horizonLazyLoader + runDurations runDurations + stats map[string]history.LoaderStats +} + +func newGroupLoaders(lazyLoaders []horizonLazyLoader) groupLoaders { + return groupLoaders{ + lazyLoaders: lazyLoaders, + runDurations: make(map[string]time.Duration), + stats: make(map[string]history.LoaderStats), + } +} + +func (g groupLoaders) Flush(ctx context.Context, session db.SessionInterface, execInTx bool) error { + if execInTx { + if err := session.Begin(ctx); err != nil { + return err + } + defer session.Rollback() + } + + for _, loader := range g.lazyLoaders { + startTime := time.Now() + if err := loader.Exec(ctx, session); err != nil { + return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader) + } + name := loader.Name() + g.runDurations.AddRunDuration(name, startTime) + g.stats[name] = loader.Stats() + } + + if execInTx { + if err := session.Commit(); err != nil { + return err + } + } + return nil +} + type groupTransactionProcessors struct { processors []horizonTransactionProcessor - lazyLoaders []horizonLazyLoader processorsRunDurations runDurations - loaderRunDurations runDurations - loaderStats map[string]history.LoaderStats transactionStatsProcessor *processors.StatsLedgerTransactionProcessor tradeProcessor *processors.TradeProcessor } @@ -76,7 +113,6 @@ type groupTransactionProcessors struct { // // so group processing will reset stats as needed func newGroupTransactionProcessors(processors []horizonTransactionProcessor, - lazyLoaders []horizonLazyLoader, transactionStatsProcessor *processors.StatsLedgerTransactionProcessor, tradeProcessor *processors.TradeProcessor, ) *groupTransactionProcessors { @@ -84,9 +120,6 @@ func newGroupTransactionProcessors(processors []horizonTransactionProcessor, return &groupTransactionProcessors{ processors: processors, processorsRunDurations: make(map[string]time.Duration), - loaderRunDurations: make(map[string]time.Duration), - loaderStats: make(map[string]history.LoaderStats), - lazyLoaders: lazyLoaders, transactionStatsProcessor: transactionStatsProcessor, tradeProcessor: tradeProcessor, } @@ -104,20 +137,6 @@ func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta, } func (g groupTransactionProcessors) Flush(ctx context.Context, session db.SessionInterface) error { - // need to trigger all lazy loaders to now resolve their future placeholders - // with real db values first - for _, loader := range g.lazyLoaders { - startTime := time.Now() - if err := loader.Exec(ctx, session); err != nil { - return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader) - } - name := loader.Name() - g.loaderRunDurations.AddRunDuration(name, startTime) - g.loaderStats[name] = loader.Stats() - } - - // now flush each processor which may call loader.GetNow(), which - // required the prior loader.Exec() to have been called. for _, p := range g.processors { startTime := time.Now() if err := p.Flush(ctx, session); err != nil { @@ -130,8 +149,6 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio func (g *groupTransactionProcessors) ResetStats() { g.processorsRunDurations = make(map[string]time.Duration) - g.loaderRunDurations = make(map[string]time.Duration) - g.loaderStats = make(map[string]history.LoaderStats) if g.tradeProcessor != nil { g.tradeProcessor.ResetStats() } diff --git a/services/horizon/internal/ingest/group_processors_test.go b/services/horizon/internal/ingest/group_processors_test.go index 9fc5a3acf1..058999420a 100644 --- a/services/horizon/internal/ingest/group_processors_test.go +++ b/services/horizon/internal/ingest/group_processors_test.go @@ -157,7 +157,7 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() { s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{ s.processorA, s.processorB, - }, nil, statsProcessor, tradesProcessor) + }, statsProcessor, tradesProcessor) s.session = &db.MockSession{} } diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index 4f7d2c4944..cf248a6a7d 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -6,7 +6,6 @@ import ( "context" "testing" - "github.com/jmoiron/sqlx" "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest/ledgerbackend" @@ -164,7 +163,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeRunTransactionProcess } s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, false).Return(errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) @@ -189,7 +188,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccess() { }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, false).Return(nil).Once() } s.historyQ.On("Commit").Return(nil).Once() @@ -226,8 +225,8 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessWithFlushMax() } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch, false).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch, false).Return(nil).Once() s.system.maxLedgerPerFlush = 60 next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -252,7 +251,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeCommitsWorkOnLedgerBa }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, false).Return(nil).Once() s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)). Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() @@ -319,23 +318,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvali func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidMaxFlush() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() s.system.maxLedgerPerFlush = 0 err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateBeginReturnsError() { - // Recreate mock in this single test to remove Rollback assertion. - s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once() - - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) - s.Assert().EqualError(err, "Error starting a transaction: my error") -} - func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateGetLastLedgerIngestNonBlockingError() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), errors.New("my error")).Once() @@ -359,9 +346,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStatRangeOv func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateClearHistoryFails() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) // the state deletes range once, calc'd by toid.LedgerRangeInclusive(), which adjusts to = to + 1 toidTo := toid.New(201, 0, 0) @@ -375,9 +359,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateClearH func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTransactionProcessorsReturnsError() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) toidTo := toid.New(201, 0, 0) s.historyQ.On( @@ -395,54 +376,19 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTra } s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, true).Return(errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommitFails() { - s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() - s.historyQ.On("Commit").Return(errors.New("my error")).Once() - - toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(201, 0, 0) - s.historyQ.On( - "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), - ).Return(nil).Once() - - for i := uint32(100); i <= uint32(200); i++ { - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(i), - }, - }, - }, - } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() - } - - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) - s.Assert().EqualError(err, "Error committing db transaction: my error") -} - func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccess() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() for i := uint32(100); i <= uint32(200); i++ { @@ -456,7 +402,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, true).Return(nil).Once() } // system.maxLedgerPerFlush has been set by default to 1 in test suite setup @@ -466,15 +412,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessWithFlushMax() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() firstLedgersBatch := []xdr.LedgerCloseMeta{} @@ -497,8 +439,8 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch, true).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch, true).Return(nil).Once() s.system.maxLedgerPerFlush = 60 err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().NoError(err) @@ -506,10 +448,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessOneLedger() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() - s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(100), 0).Return(nil).Once() // Recreate mock in this single ledger test to remove setup assertion on ledger range. *s.ledgerBackend = mockLedgerBackend{} @@ -532,7 +470,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces } s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, true).Return(nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false, true) s.Assert().NoError(err) @@ -550,7 +488,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceG func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() @@ -572,7 +509,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce( }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, false).Return(nil).Once() } // system.maxLedgerPerFlush has been set by default to 1 in test suite setup @@ -583,7 +520,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce( func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() s.historyQ.On("Commit").Return(nil).Once() @@ -604,7 +540,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, false).Return(nil).Once() } s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() @@ -617,7 +553,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalAndCommitError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() s.historyQ.On("Commit").Return(errors.New("commit error")).Once() @@ -638,7 +573,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}, false).Return(nil).Once() } s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() @@ -651,7 +586,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceWithFlushMax() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() @@ -682,8 +616,8 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() - s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch, false).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch, false).Return(nil).Once() s.system.maxLedgerPerFlush = 60 err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true) diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 3ba66ef4ba..0db777306c 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -528,8 +528,8 @@ func (m *mockProcessorsRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMe args.Error(1) } -func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error { - args := m.Called(ledgers) +func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) error { + args := m.Called(ledgers, execInTx) return args.Error(0) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index b98896fe1f..6832a5078f 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -77,7 +77,7 @@ type ProcessorRunnerInterface interface { ledgerProtocolVersion uint32, bucketListHash xdr.Hash, ) (ingest.StatsChangeProcessorResults, error) - RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error + RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) error RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats ledgerStats, err error, @@ -134,13 +134,13 @@ func buildChangeProcessor( }) } -func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) *groupTransactionProcessors { +func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) (groupLoaders, *groupTransactionProcessors) { accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() lpLoader := history.NewLiquidityPoolLoader() cbLoader := history.NewClaimableBalanceLoader() - lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader} + loaders := newGroupLoaders([]horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader}) statsLedgerTransactionProcessor := processors.NewStatsLedgerTransactionProcessor() tradeProcessor := processors.NewTradeProcessor(accountLoader, @@ -160,7 +160,7 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors processors.NewLiquidityPoolsTransactionProcessor(lpLoader, s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())} - return newGroupTransactionProcessors(processors, lazyLoaders, statsLedgerTransactionProcessor, tradeProcessor) + return loaders, newGroupTransactionProcessors(processors, statsLedgerTransactionProcessor, tradeProcessor) } func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers { @@ -180,7 +180,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessor p = append(p, txSubProc) } - return newGroupTransactionProcessors(p, nil, nil, nil) + return newGroupTransactionProcessors(p, nil, nil) } // checkIfProtocolVersionSupported checks if this Horizon version supports the @@ -408,10 +408,11 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry groupTransactionFilterers := s.buildTransactionFilterer() groupFilteredOutProcessors := s.buildFilteredOutProcessor() - groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) if err = registerTransactionProcessors( registry, + loaders, groupTransactionFilterers, groupFilteredOutProcessors, groupTransactionProcessors, @@ -429,7 +430,11 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry return } - err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors) + if err = loaders.Flush(s.ctx, s.session, false); err != nil { + return + } + + err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors, false) if err != nil { return } @@ -441,8 +446,8 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry for key, duration := range groupFilteredOutProcessors.processorsRunDurations { transactionDurations[key] = duration } - loaderStats = groupTransactionProcessors.loaderStats - loaderDurations = groupTransactionProcessors.loaderRunDurations + loaderStats = loaders.stats + loaderDurations = loaders.runDurations for key, duration := range groupTransactionFilterers.runDurations { transactionDurations[key] = duration } @@ -478,6 +483,7 @@ func registerChangeProcessors( func registerTransactionProcessors( registry nameRegistry, + loaders groupLoaders, groupTransactionFilterers *groupTransactionFilterers, groupFilteredOutProcessors *groupTransactionProcessors, groupTransactionProcessors *groupTransactionProcessors, @@ -492,7 +498,7 @@ func registerTransactionProcessors( return err } } - for _, l := range groupTransactionProcessors.lazyLoaders { + for _, l := range loaders.lazyLoaders { if err := registry.add(l.Name()); err != nil { return err } @@ -502,20 +508,15 @@ func registerTransactionProcessors( return err } } - for _, l := range groupFilteredOutProcessors.lazyLoaders { - if err := registry.add(l.Name()); err != nil { - return err - } - } return nil } -func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) (err error) { +func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) (err error) { ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) groupTransactionFilterers := s.buildTransactionFilterer() groupFilteredOutProcessors := s.buildFilteredOutProcessor() - groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) startTime := time.Now() curHeap, sysHeap := getMemStats() @@ -546,7 +547,11 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.Ledger groupTransactionFilterers.ResetStats() } - err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors) + if err = loaders.Flush(s.ctx, s.session, execInTx); err != nil { + return + } + + err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors, execInTx) if err != nil { return } @@ -564,23 +569,36 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.Ledger return nil } -func (s *ProcessorRunner) flushProcessors(groupFilteredOutProcessors *groupTransactionProcessors, groupTransactionProcessors *groupTransactionProcessors) (err error) { +func (s *ProcessorRunner) flushProcessors(groupFilteredOutProcessors *groupTransactionProcessors, groupTransactionProcessors *groupTransactionProcessors, execInTx bool) error { + if execInTx { + if err := s.session.Begin(s.ctx); err != nil { + return err + } + defer s.session.Rollback() + } + if s.config.EnableIngestionFiltering { - err = groupFilteredOutProcessors.Flush(s.ctx, s.session) - if err != nil { - err = errors.Wrap(err, "Error flushing temp filtered tx from processor") - return + + if err := groupFilteredOutProcessors.Flush(s.ctx, s.session); err != nil { + return errors.Wrap(err, "Error flushing temp filtered tx from processor") } if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod { - s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())) + if _, err := s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())); err != nil { + return errors.Wrap(err, "Error trimming filtered transactions") + } } } - err = groupTransactionProcessors.Flush(s.ctx, s.session) - if err != nil { - err = errors.Wrap(err, "Error flushing changes from processor") + if err := groupTransactionProcessors.Flush(s.ctx, s.session); err != nil { + return errors.Wrap(err, "Error flushing changes from processor") } - return + + if execInTx { + if err := s.session.Commit(); err != nil { + return err + } + } + return nil } func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index cad841b602..8f6eb58d74 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -249,7 +249,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { ledgersProcessor := &processors.LedgersProcessor{} - processor := runner.buildTransactionProcessor(ledgersProcessor) + _, processor := runner.buildTransactionProcessor(ledgersProcessor) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &processors.StatsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) @@ -477,7 +477,7 @@ func TestProcessorRunnerRunTransactionsProcessorsOnLedgers(t *testing.T) { filters: &MockFilters{}, } - err := runner.RunTransactionProcessorsOnLedgers(ledgers) + err := runner.RunTransactionProcessorsOnLedgers(ledgers, false) assert.NoError(t, err) }