Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (bp *batchProcessor) dispatchBatch(batch *fftypes.Batch, pins []*fftypes.By
}

func (bp *batchProcessor) markMessagesDispatched(batch *fftypes.Batch) error {
return bp.retry.Do(bp.ctx, "batch persist", func(attempt int) (retry bool, err error) {
return bp.retry.Do(bp.ctx, "mark dispatched messages", func(attempt int) (retry bool, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

return true, bp.database.RunAsGroup(bp.ctx, func(ctx context.Context) (err error) {
// Update all the messages in the batch with the batch ID
msgIDs := make([]driver.Value, len(batch.Payload.Messages))
Expand Down
29 changes: 21 additions & 8 deletions internal/database/sqlcommon/event_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,32 @@ var (
}
)

// Events are special.
//
// They are an ordered sequence of recorded state, that must be detected and processed in order.
//
// We choose (today) to coordinate the emission of these, into a DB transaction where the other
// state changes happen - so the event is assured atomically to happen "after" the other state
// changes, but also not to be lost. Downstream fan-out of those events occurs via
// Webhook/WebSocket (.../NATS/Kafka) pluggable pub/sub interfaces.
//
// Implementing this single stream of incrementing (note not guaranteed to be gapless) ordered
// items on top of a SQL database, means taking a lock (see below).
// This is not safe to do unless you are really sure what other locks will be taken after
// that in the transaction. So we defer the emission of the events to a pre-commit capture.
func (s *SQLCommon) InsertEvent(ctx context.Context, event *fftypes.Event) (err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
}
defer s.rollbackTx(ctx, tx, autoCommit)
event.Sequence = -1 // the sequence is not allocated until the post-commit callback
s.addPreCommitEvent(tx, event)
return s.commitTx(ctx, tx, autoCommit)
}

// This is special to events - we take the cost of a full table lock on the events table, so
// that nobody can add rows that increment the sequence, until our transaction has committed.
func (s *SQLCommon) insertEventPreCommit(ctx context.Context, tx *txWrapper, event *fftypes.Event) (err error) {

// We take the cost of a full table lock on the events table.
// This allows us to rely on the sequence to always be increasing, even when writing events
// concurrently (it does not guarantee we won't get a gap in the sequences).
if err = s.lockTableExclusiveTx(ctx, tx, "events"); err != nil {
Expand All @@ -71,11 +88,7 @@ func (s *SQLCommon) InsertEvent(ctx context.Context, event *fftypes.Event) (err
s.callbacks.OrderedUUIDCollectionNSEvent(database.CollectionEvents, fftypes.ChangeEventTypeCreated, event.Namespace, event.ID, event.Sequence)
},
)
if err != nil {
return err
}

return s.commitTx(ctx, tx, autoCommit)
return err
}

func (s *SQLCommon) eventResult(ctx context.Context, row *sql.Rows) (*fftypes.Event, error) {
Expand Down
24 changes: 20 additions & 4 deletions internal/database/sqlcommon/sqlcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ type SQLCommon struct {
type txContextKey struct{}

type txWrapper struct {
sqlTX *sql.Tx
postCommit []func()
tableLocks []string
sqlTX *sql.Tx
preCommitEvents []*fftypes.Event
postCommit []func()
tableLocks []string
}

func (s *SQLCommon) Init(ctx context.Context, provider Provider, prefix config.Prefix, callbacks database.Callbacks, capabilities *database.Capabilities) (err error) {
Expand Down Expand Up @@ -318,6 +319,10 @@ func (s *SQLCommon) postCommitEvent(tx *txWrapper, fn func()) {
tx.postCommit = append(tx.postCommit, fn)
}

func (s *SQLCommon) addPreCommitEvent(tx *txWrapper, event *fftypes.Event) {
tx.preCommitEvents = append(tx.preCommitEvents, event)
}

func (tx *txWrapper) tableIsLocked(table string) bool {
for _, t := range tx.tableLocks {
if t == table {
Expand Down Expand Up @@ -365,8 +370,19 @@ func (s *SQLCommon) commitTx(ctx context.Context, tx *txWrapper, autoCommit bool
// We're inside of a wide transaction boundary with an auto-commit
return nil
}

l := log.L(ctx)

// Only at this stage do we write to the special events Database table, so we know
// regardless of the higher level logic, the events are always written at this point
// at the end of the transaction
for _, event := range tx.preCommitEvents {
if err := s.insertEventPreCommit(ctx, tx, event); err != nil {
s.rollbackTx(ctx, tx, false)
return err
}
l.Infof("Emitted %s event %s ref=%s (sequence=%d)", event.Type, event.ID, event.Reference, event.Sequence)
}

l.Debugf(`SQL-> commit`)
err := tx.sqlTX.Commit()
if err != nil {
Expand Down