Skip to content

Commit

Permalink
services/horizon/internal/ingest: Fix deadlock in parallel ingestion (s…
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Apr 15, 2024
1 parent a30c441 commit 6bb4a44
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 157 deletions.
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/fsm_history_range_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ func (h historyRangeState) run(s *system) (transition, error) {
ledgers = append(ledgers, ledgerCloseMeta)

if len(ledgers) == cap(ledgers) {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, false); err != nil {
return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
ledgers = ledgers[0:0]
}
}

if len(ledgers) > 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, false); err != nil {
return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ func (reingestHistoryRangeState) GetState() State {
return ReingestHistoryRange
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32, execBatchInTx bool) error {
if s.maxLedgerPerFlush < 1 {
return errors.New("invalid maxLedgerPerFlush, must be greater than 0")
}
Expand Down Expand Up @@ -75,15 +71,15 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u
ledgers = append(ledgers, ledgerCloseMeta)

if len(ledgers)%int(s.maxLedgerPerFlush) == 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, execBatchInTx); err != nil {
return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
ledgers = ledgers[0:0]
}
}

if len(ledgers) > 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, execBatchInTx); err != nil {
return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
}
Expand Down Expand Up @@ -142,7 +138,7 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger); ingestErr != nil {
if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger, false); ingestErr != nil {
if err := s.historyQ.Commit(); err != nil {
return stop(), errors.Wrap(ingestErr, commitErrMsg)
}
Expand All @@ -169,18 +165,9 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) {
}

startTime = time.Now()
if err := s.historyQ.Begin(s.ctx); err != nil {
return stop(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

if e := h.ingestRange(s, h.fromLedger, h.toLedger); e != nil {
if e := h.ingestRange(s, h.fromLedger, h.toLedger, true); e != nil {
return stop(), e
}

if e := s.historyQ.Commit(); e != nil {
return stop(), errors.Wrap(e, commitErrMsg)
}
}

log.WithFields(logpkg.F{
Expand Down
63 changes: 40 additions & 23 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,49 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error {
return nil
}

type groupLoaders struct {
lazyLoaders []horizonLazyLoader
runDurations runDurations
stats map[string]history.LoaderStats
}

func newGroupLoaders(lazyLoaders []horizonLazyLoader) groupLoaders {
return groupLoaders{
lazyLoaders: lazyLoaders,
runDurations: make(map[string]time.Duration),
stats: make(map[string]history.LoaderStats),
}
}

func (g groupLoaders) Flush(ctx context.Context, session db.SessionInterface, execInTx bool) error {
if execInTx {
if err := session.Begin(ctx); err != nil {
return err
}
defer session.Rollback()
}

for _, loader := range g.lazyLoaders {
startTime := time.Now()
if err := loader.Exec(ctx, session); err != nil {
return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader)
}
name := loader.Name()
g.runDurations.AddRunDuration(name, startTime)
g.stats[name] = loader.Stats()
}

if execInTx {
if err := session.Commit(); err != nil {
return err
}
}
return nil
}

type groupTransactionProcessors struct {
processors []horizonTransactionProcessor
lazyLoaders []horizonLazyLoader
processorsRunDurations runDurations
loaderRunDurations runDurations
loaderStats map[string]history.LoaderStats
transactionStatsProcessor *processors.StatsLedgerTransactionProcessor
tradeProcessor *processors.TradeProcessor
}
Expand All @@ -76,17 +113,13 @@ type groupTransactionProcessors struct {
//
// so group processing will reset stats as needed
func newGroupTransactionProcessors(processors []horizonTransactionProcessor,
lazyLoaders []horizonLazyLoader,
transactionStatsProcessor *processors.StatsLedgerTransactionProcessor,
tradeProcessor *processors.TradeProcessor,
) *groupTransactionProcessors {

return &groupTransactionProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
loaderRunDurations: make(map[string]time.Duration),
loaderStats: make(map[string]history.LoaderStats),
lazyLoaders: lazyLoaders,
transactionStatsProcessor: transactionStatsProcessor,
tradeProcessor: tradeProcessor,
}
Expand All @@ -104,20 +137,6 @@ func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta,
}

func (g groupTransactionProcessors) Flush(ctx context.Context, session db.SessionInterface) error {
// need to trigger all lazy loaders to now resolve their future placeholders
// with real db values first
for _, loader := range g.lazyLoaders {
startTime := time.Now()
if err := loader.Exec(ctx, session); err != nil {
return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader)
}
name := loader.Name()
g.loaderRunDurations.AddRunDuration(name, startTime)
g.loaderStats[name] = loader.Stats()
}

// now flush each processor which may call loader.GetNow(), which
// required the prior loader.Exec() to have been called.
for _, p := range g.processors {
startTime := time.Now()
if err := p.Flush(ctx, session); err != nil {
Expand All @@ -130,8 +149,6 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio

func (g *groupTransactionProcessors) ResetStats() {
g.processorsRunDurations = make(map[string]time.Duration)
g.loaderRunDurations = make(map[string]time.Duration)
g.loaderStats = make(map[string]history.LoaderStats)
if g.tradeProcessor != nil {
g.tradeProcessor.ResetStats()
}
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() {
s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{
s.processorA,
s.processorB,
}, nil, statsProcessor, tradesProcessor)
}, statsProcessor, tradesProcessor)
s.session = &db.MockSession{}
}

Expand Down

0 comments on commit 6bb4a44

Please sign in to comment.