From eceda559ff285f129031438df602d42474540976 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 11 Feb 2022 16:09:31 -0500 Subject: [PATCH 1/2] Move insert of events to a special pre-commit phase of the DB transaction Signed-off-by: Peter Broadhurst --- internal/batch/batch_processor.go | 2 +- internal/database/sqlcommon/event_sql.go | 29 +++++++++++++++++------- internal/database/sqlcommon/sqlcommon.go | 24 ++++++++++++++++---- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 7fefbc070e..2a0dc07390 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -482,7 +482,7 @@ func (bp *batchProcessor) dispatchBatch(batch *fftypes.Batch, pins []*fftypes.By } func (bp *batchProcessor) markMessagesDispatched(batch *fftypes.Batch) error { - return bp.retry.Do(bp.ctx, "batch persist", func(attempt int) (retry bool, err error) { + return bp.retry.Do(bp.ctx, "mark dispatched messages", func(attempt int) (retry bool, err error) { return true, bp.database.RunAsGroup(bp.ctx, func(ctx context.Context) (err error) { // Update all the messages in the batch with the batch ID msgIDs := make([]driver.Value, len(batch.Payload.Messages)) diff --git a/internal/database/sqlcommon/event_sql.go b/internal/database/sqlcommon/event_sql.go index 49940195fa..361d1b7116 100644 --- a/internal/database/sqlcommon/event_sql.go +++ b/internal/database/sqlcommon/event_sql.go @@ -42,15 +42,32 @@ var ( } ) +// Events are special. +// +// They are an ordered sequence of recorded state, that must be detected and processed in order. +// +// We choose (today) to coordinate the emission of these, into a DB transaction where the other +// state changes happen - so the event is assured atomically to happen "after" the other state +// changes, but also not to be lost. Downstream fan-out of those events occurs via +// Wehook/WebSocket (.../NATS/Kafka) pluggable pub/sub interfaces. +// +// Implementing this single stream of incrementing (note not guaranteed to be gapless) ordered +// items on top of a SQL database, means taking a lock (see below). +// This is not safe to do unless you are really sure what other locks will be taken after +// that in the transaction. So we defer the emission of the events to a pre-commit capture. func (s *SQLCommon) InsertEvent(ctx context.Context, event *fftypes.Event) (err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { return err } - defer s.rollbackTx(ctx, tx, autoCommit) + event.Sequence = -1 // the sequence is not allocated until the post-commit callback + s.addPreCommitEvent(tx, event) + return s.commitTx(ctx, tx, autoCommit) +} - // This is special to events - we take the cost of a full table lock on the events table, so - // that nobody can add rows that increment the sequence, until our transaction has committed. +func (s *SQLCommon) insertEventPreCommit(ctx context.Context, tx *txWrapper, event *fftypes.Event) (err error) { + + // We take the cost of a full table lock on the events table. // This allows us to rely on the sequence to always be increasing, even when writing events // concurrently (it does not guarantee we won't get a gap in the sequences). if err = s.lockTableExclusiveTx(ctx, tx, "events"); err != nil { @@ -71,11 +88,7 @@ func (s *SQLCommon) InsertEvent(ctx context.Context, event *fftypes.Event) (err s.callbacks.OrderedUUIDCollectionNSEvent(database.CollectionEvents, fftypes.ChangeEventTypeCreated, event.Namespace, event.ID, event.Sequence) }, ) - if err != nil { - return err - } - - return s.commitTx(ctx, tx, autoCommit) + return err } func (s *SQLCommon) eventResult(ctx context.Context, row *sql.Rows) (*fftypes.Event, error) { diff --git a/internal/database/sqlcommon/sqlcommon.go b/internal/database/sqlcommon/sqlcommon.go index fb61395c17..161008147d 100644 --- a/internal/database/sqlcommon/sqlcommon.go +++ b/internal/database/sqlcommon/sqlcommon.go @@ -44,9 +44,10 @@ type SQLCommon struct { type txContextKey struct{} type txWrapper struct { - sqlTX *sql.Tx - postCommit []func() - tableLocks []string + sqlTX *sql.Tx + preCommitEvents []*fftypes.Event + postCommit []func() + tableLocks []string } func (s *SQLCommon) Init(ctx context.Context, provider Provider, prefix config.Prefix, callbacks database.Callbacks, capabilities *database.Capabilities) (err error) { @@ -318,6 +319,10 @@ func (s *SQLCommon) postCommitEvent(tx *txWrapper, fn func()) { tx.postCommit = append(tx.postCommit, fn) } +func (s *SQLCommon) addPreCommitEvent(tx *txWrapper, event *fftypes.Event) { + tx.preCommitEvents = append(tx.preCommitEvents, event) +} + func (tx *txWrapper) tableIsLocked(table string) bool { for _, t := range tx.tableLocks { if t == table { @@ -365,8 +370,19 @@ func (s *SQLCommon) commitTx(ctx context.Context, tx *txWrapper, autoCommit bool // We're inside of a wide transaction boundary with an auto-commit return nil } - l := log.L(ctx) + + // Only at this stage do we write to the special events Database table, so we know + // regardless of the higher level logic, the events are always written at this point + // at the end of the transaction + for _, event := range tx.preCommitEvents { + if err := s.insertEventPreCommit(ctx, tx, event); err != nil { + s.rollbackTx(ctx, tx, false) + return err + } + l.Infof("Emitted %s event %s ref=%s (sequence=%d)", event.Type, event.ID, event.Reference, event.Sequence) + } + l.Debugf(`SQL-> commit`) err := tx.sqlTX.Commit() if err != nil { From 4e52c13aaa428dc0b970ac04f18434694a4364f9 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 11 Feb 2022 16:19:21 -0500 Subject: [PATCH 2/2] Wehook -> Webhook Signed-off-by: Peter Broadhurst --- internal/database/sqlcommon/event_sql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/database/sqlcommon/event_sql.go b/internal/database/sqlcommon/event_sql.go index 361d1b7116..24e8758364 100644 --- a/internal/database/sqlcommon/event_sql.go +++ b/internal/database/sqlcommon/event_sql.go @@ -49,7 +49,7 @@ var ( // We choose (today) to coordinate the emission of these, into a DB transaction where the other // state changes happen - so the event is assured atomically to happen "after" the other state // changes, but also not to be lost. Downstream fan-out of those events occurs via -// Wehook/WebSocket (.../NATS/Kafka) pluggable pub/sub interfaces. +// Webhook/WebSocket (.../NATS/Kafka) pluggable pub/sub interfaces. // // Implementing this single stream of incrementing (note not guaranteed to be gapless) ordered // items on top of a SQL database, means taking a lock (see below).