Skip to content

Commit 1902b69

Browse files
committed
Use session advisory lock for leader election
1 parent e2229c9 commit 1902b69

File tree

5 files changed

+85
-11
lines changed

5 files changed

+85
-11
lines changed

config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ func load() Config {
120120
}
121121
}
122122

123+
if cfg.Elector.Enable {
124+
if cfg.Elector.ID == "" || cfg.Elector.Salt == "" {
125+
panic("elector configuration is required when enabled")
126+
}
127+
}
128+
123129
if cfg.Elector.Enable != cfg.Ingestors.CSFloat.Enable {
124130
panic("both elector and ingestor require the same enable state")
125131
}

domain/repository/factory.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package repository
22

33
import (
4+
"context"
45
"io"
56

67
"gorm.io/gorm"
@@ -18,6 +19,12 @@ type AdvisoryLock interface {
1819
TryAdvisoryXactLock(id, salt string) (bool, error)
1920
}
2021

22+
type AdvisoryLockSession interface {
23+
io.Closer
24+
TryAdvisoryLock(id, salt string) (bool, error)
25+
AdvisoryUnlock(id, salt string) error
26+
}
27+
2128
type PublicTransaction interface {
2229
gorm.TxCommitter
2330

@@ -38,4 +45,5 @@ type Factory interface {
3845

3946
NewPublicTransaction() PublicTransaction
4047
RunInTransactionPublic(fn func(PublicTransaction) error) error
48+
NewPublicAdvisoryLockSession(ctx context.Context) (AdvisoryLockSession, error)
4149
}

leader/leader.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,34 @@ func (e *elector) Run(ctx context.Context, onLeader func()) {
4141
case <-time.After(time.Until(nextMinute)):
4242
}
4343

44-
tx := e.factory.NewPublicTransaction()
45-
acquired, err := tx.TryAdvisoryXactLock(e.cfg.Elector.ID, e.cfg.Elector.Salt)
44+
lockSession, err := e.factory.NewPublicAdvisoryLockSession(ctx)
45+
if err != nil {
46+
e.log.Warnf("failed to create advisory lock session: %v", err)
47+
continue
48+
}
49+
50+
acquired, err := lockSession.TryAdvisoryLock(e.cfg.Elector.ID, e.cfg.Elector.Salt)
4651
if err != nil {
4752
e.log.Warnf("failed to acquire leader lock: %v", err)
48-
tx.Rollback()
53+
lockSession.Close()
4954
continue
5055
}
5156

5257
if !acquired {
5358
e.log.Warn("failed to acquire leader lock")
54-
tx.Rollback()
59+
lockSession.Close()
5560
continue
5661
}
5762

5863
e.log.Info("leader lock acquired")
5964
onLeader()
6065

61-
tx.Rollback()
66+
if err := lockSession.AdvisoryUnlock(e.cfg.Elector.ID, e.cfg.Elector.Salt); err != nil {
67+
e.log.Warnf("failed to release leader lock: %v", err)
68+
}
69+
if err := lockSession.Close(); err != nil {
70+
e.log.Warnf("failed to close advisory lock session: %v", err)
71+
}
6272
return
6373
}
6474
}

repository/factory/factory.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package factory
22

33
import (
4+
"context"
45
"database/sql"
56
"errors"
67
"fmt"
@@ -210,6 +211,20 @@ func (f *factory) NewPublicTransaction() repository.PublicTransaction {
210211
return newPublicTransaction(f.public.Begin())
211212
}
212213

214+
func (f *factory) NewPublicAdvisoryLockSession(ctx context.Context) (repository.AdvisoryLockSession, error) {
215+
sqlDB, err := f.public.DB()
216+
if err != nil {
217+
return nil, err
218+
}
219+
220+
conn, err := sqlDB.Conn(ctx)
221+
if err != nil {
222+
return nil, err
223+
}
224+
225+
return newPublicAdvisoryLockSession(conn), nil
226+
}
227+
213228
func (f *factory) RunInTransactionPublic(fn func(repository.PublicTransaction) error) error {
214229
return f.public.Transaction(func(gormTx *gorm.DB) error {
215230
tx := newPublicTransaction(gormTx)

repository/factory/transaction.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package factory
22

33
import (
4+
"context"
5+
"database/sql"
46
"hash/fnv"
57

68
"reverse-watch/domain/repository"
@@ -72,13 +74,9 @@ func (t *publicTransaction) Reversal() repository.ReversalRepository {
7274
}
7375

7476
func (t *publicTransaction) TryAdvisoryXactLock(id, salt string) (bool, error) {
75-
h := fnv.New32a()
76-
h.Write([]byte(salt))
77-
h.Write([]byte(id))
78-
lockKey := h.Sum32()
79-
77+
lockKey := advisoryLockKey(id, salt)
8078
var hasLock bool
81-
if err := t.tx.Raw("SELECT pg_try_advisory_xact_lock(?)", lockKey).Scan(&hasLock).Error; err != nil {
79+
if err := t.tx.Raw("SELECT pg_try_advisory_lock(?)", lockKey).Scan(&hasLock).Error; err != nil {
8280
return false, err
8381
}
8482

@@ -87,3 +85,40 @@ func (t *publicTransaction) TryAdvisoryXactLock(id, salt string) (bool, error) {
8785
}
8886
return true, nil
8987
}
88+
89+
type publicAdvisoryLockSession struct {
90+
conn *sql.Conn
91+
}
92+
93+
func newPublicAdvisoryLockSession(conn *sql.Conn) *publicAdvisoryLockSession {
94+
return &publicAdvisoryLockSession{conn: conn}
95+
}
96+
97+
func (s *publicAdvisoryLockSession) Close() error {
98+
return s.conn.Close()
99+
}
100+
101+
func (s *publicAdvisoryLockSession) TryAdvisoryLock(id, salt string) (bool, error) {
102+
lockKey := advisoryLockKey(id, salt)
103+
var hasLock bool
104+
if err := s.conn.QueryRowContext(context.Background(), "SELECT pg_try_advisory_lock($1)", lockKey).Scan(&hasLock); err != nil {
105+
return false, err
106+
}
107+
return hasLock, nil
108+
}
109+
110+
func (s *publicAdvisoryLockSession) AdvisoryUnlock(id, salt string) error {
111+
lockKey := advisoryLockKey(id, salt)
112+
var unlocked bool
113+
if err := s.conn.QueryRowContext(context.Background(), "SELECT pg_advisory_unlock($1)", lockKey).Scan(&unlocked); err != nil {
114+
return err
115+
}
116+
return nil
117+
}
118+
119+
func advisoryLockKey(id, salt string) uint32 {
120+
h := fnv.New32a()
121+
h.Write([]byte(salt))
122+
h.Write([]byte(id))
123+
return h.Sum32()
124+
}

0 commit comments

Comments
 (0)