Skip to content

Commit

Permalink
client/{core,db}: add topic IDs to notifications
Browse files Browse the repository at this point in the history
Subjects are not language-dependent, so using the Subject field of
Notification to dictate logic is not longer sound. This work adds
a "topic" to notifications, which is a language-independent unique
ID. The topic takes the place of the subject in the core translation
maps. Additionally unexports the Translation and its fields.
Client db is upgraded to store the topic, but upgraded clients
will have no topics on pre-upgrade notifications retrieved using
(*DB).NotificationsN. This is not a problem for us anywhere, and it
would be bad practice to do use historical notifications to trigger
sensitive operations anyway, so I suspect this won't be a problem for
any existing core consumers.
  • Loading branch information
buck54321 committed Sep 5, 2021
1 parent b294a00 commit c994f03
Show file tree
Hide file tree
Showing 13 changed files with 557 additions and 352 deletions.
24 changes: 12 additions & 12 deletions client/core/bookie.go
Expand Up @@ -482,17 +482,17 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
if sp.SuspendTime != 0 {
// This is just a warning about a scheduled suspension.
suspendTime := encode.UnixTimeMilli(int64(sp.SuspendTime))
subject, detail := c.formatDetails(SubjectMarketSuspendScheduled, sp.MarketID, dc.acct.host, suspendTime)
c.notify(newServerNotifyNote(subject, detail, db.WarningLevel))
subject, detail := c.formatDetails(TopicMarketSuspendScheduled, sp.MarketID, dc.acct.host, suspendTime)
c.notify(newServerNotifyNote(TopicMarketSuspendScheduled, subject, detail, db.WarningLevel))
return nil
}

subject := SubjectMarketSuspended
topic := TopicMarketSuspended
if !sp.Persist {
subject = SubjectMarketSuspendedWithPurge
topic = TopicMarketSuspendedWithPurge
}
subject, detail := c.formatDetails(subject, sp.MarketID, dc.acct.host)
c.notify(newServerNotifyNote(subject, detail, db.WarningLevel))
subject, detail := c.formatDetails(topic, sp.MarketID, dc.acct.host)
c.notify(newServerNotifyNote(topic, subject, detail, db.WarningLevel))

if sp.Persist {
// No book changes. Just wait for more order notes.
Expand Down Expand Up @@ -522,8 +522,8 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
tracker.metaData.Host == dc.acct.host && tracker.metaData.Status == order.OrderStatusBooked {
// Locally revoke the purged book order.
tracker.revoke()
subject, details := c.formatDetails(SubjectOrderAutoRevoked, tracker.token(), sp.MarketID, dc.acct.host)
c.notify(newOrderNote(subject, details, db.WarningLevel, tracker.coreOrderInternal()))
subject, details := c.formatDetails(TopicOrderAutoRevoked, tracker.token(), sp.MarketID, dc.acct.host)
c.notify(newOrderNote(TopicOrderAutoRevoked, subject, details, db.WarningLevel, tracker.coreOrderInternal()))
updatedAssets.count(tracker.fromAssetID)
}
}
Expand Down Expand Up @@ -569,8 +569,8 @@ func handleTradeResumptionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
// This is just a notice about a scheduled resumption.
dc.setMarketStartEpoch(rs.MarketID, rs.StartEpoch, false) // set the start epoch, leaving any final/persist data
resTime := encode.UnixTimeMilli(int64(rs.ResumeTime))
subject, detail := c.formatDetails(SubjectMarketResumeScheduled, rs.MarketID, dc.acct.host, resTime)
c.notify(newServerNotifyNote(subject, detail, db.WarningLevel))
subject, detail := c.formatDetails(TopicMarketResumeScheduled, rs.MarketID, dc.acct.host, resTime)
c.notify(newServerNotifyNote(TopicMarketResumeScheduled, subject, detail, db.WarningLevel))
return nil
}

Expand All @@ -587,8 +587,8 @@ func handleTradeResumptionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
// Fetch the updated DEX configuration.
// dc.refreshServerConfig()

subject, detail := c.formatDetails(SubjectMarketResumed, rs.MarketID, dc.acct.host, rs.StartEpoch)
c.notify(newServerNotifyNote(subject, detail, db.Success))
subject, detail := c.formatDetails(TopicMarketResumed, rs.MarketID, dc.acct.host, rs.StartEpoch)
c.notify(newServerNotifyNote(TopicMarketResumed, subject, detail, db.Success))

// Book notes may resume at any time. Seq not set since no book changes.

Expand Down
164 changes: 86 additions & 78 deletions client/core/core.go

Large diffs are not rendered by default.

44 changes: 23 additions & 21 deletions client/core/core_test.go
Expand Up @@ -903,8 +903,10 @@ func newTestRig() *testRig {
// which may have been previously "disconnected".
return conn, nil
},
newCrypter: func([]byte) encrypt.Crypter { return crypter },
reCrypter: func([]byte, []byte) (encrypt.Crypter, error) { return crypter, crypter.recryptErr },
newCrypter: func([]byte) encrypt.Crypter { return crypter },
reCrypter: func([]byte, []byte) (encrypt.Crypter, error) { return crypter, crypter.recryptErr },

locale: enUS,
localePrinter: message.NewPrinter(language.AmericanEnglish),
},
db: tdb,
Expand Down Expand Up @@ -2670,8 +2672,8 @@ func TestHandlePreimageRequest(t *testing.T) {

select {
case note := <-notes:
if note.Subject() != SubjectPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Subject(), SubjectPreimageSent)
if note.Topic() != TopicPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Topic(), TopicPreimageSent)
}
case <-time.After(time.Second):
t.Fatal("no order note from preimage request handling")
Expand Down Expand Up @@ -2762,8 +2764,8 @@ func TestHandlePreimageRequest(t *testing.T) {

select {
case note := <-notes:
if note.Subject() != SubjectPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Subject(), SubjectPreimageSent)
if note.Topic() != TopicPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Topic(), TopicPreimageSent)
}
case <-time.After(time.Second):
t.Fatal("no order note from preimage request handling")
Expand Down Expand Up @@ -2902,8 +2904,8 @@ func TestHandlePreimageRequest(t *testing.T) {

select {
case note := <-notes:
if note.Subject() != SubjectPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Subject(), SubjectPreimageSent)
if note.Topic() != TopicPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Topic(), TopicPreimageSent)
}
case <-time.After(time.Second):
t.Fatal("no order note from preimage request handling")
Expand Down Expand Up @@ -2981,8 +2983,8 @@ func TestHandlePreimageRequest(t *testing.T) {

select {
case note := <-notes:
if note.Subject() != SubjectCancelPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Subject(), SubjectCancelPreimageSent)
if note.Topic() != TopicCancelPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Topic(), TopicCancelPreimageSent)
}
case <-time.After(time.Second):
t.Fatal("no order note from preimage request handling")
Expand Down Expand Up @@ -3146,8 +3148,8 @@ func TestHandlePreimageRequest(t *testing.T) {

select {
case note := <-notes:
if note.Subject() != SubjectCancelPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Subject(), SubjectPreimageSent)
if note.Topic() != TopicCancelPreimageSent {
t.Fatalf("note subject is %v, not %v", note.Topic(), TopicCancelPreimageSent)
}
case <-time.After(time.Second):
t.Fatal("no order note from preimage request handling")
Expand Down Expand Up @@ -3230,7 +3232,7 @@ func TestHandleRevokeOrderMsg(t *testing.T) {
t.Fatalf("handleRevokeOrderMsg error: %v", err)
}

verifyRevokeNotification(orderNotes, SubjectOrderRevoked, t)
verifyRevokeNotification(orderNotes, TopicOrderRevoked, t)

if tracker.metaData.Status != order.OrderStatusRevoked {
t.Errorf("expected order status %v, got %v", order.OrderStatusRevoked, tracker.metaData.Status)
Expand Down Expand Up @@ -3372,8 +3374,8 @@ func TestTradeTracking(t *testing.T) {
for {
select {
case note := <-notes:
if note.Severity() == db.ErrorLevel && (note.Subject() == SubjectSwapSendError ||
note.Subject() == SubjectInitError || note.Subject() == SubjectReportRedeemError) {
if note.Severity() == db.ErrorLevel && (note.Topic() == TopicSwapSendError ||
note.Topic() == TopicInitError || note.Topic() == TopicReportRedeemError) {

return note
}
Expand Down Expand Up @@ -4336,7 +4338,7 @@ func TestNotifications(t *testing.T) {
defer rig.shutdown()

// Insert a notification into the database.
typedNote := newOrderNote("abc", "def", 100, nil)
typedNote := newOrderNote("123", "abc", "def", 100, nil)

tCore := rig.core
ch := tCore.NotificationFeed()
Expand Down Expand Up @@ -5334,7 +5336,7 @@ func TestHandleTradeSuspensionMsg(t *testing.T) {
t.Fatalf("[handleTradeSuspensionMsg] unexpected error: %v", err)
}

verifyRevokeNotification(orderNotes, SubjectOrderAutoRevoked, t)
verifyRevokeNotification(orderNotes, TopicOrderAutoRevoked, t)

// Check that the funding coin was returned. Use the tradeMtx for
// synchronization.
Expand Down Expand Up @@ -5443,12 +5445,12 @@ func orderNoteFeed(tCore *Core) (orderNotes chan *OrderNote, done func()) {
return orderNotes, done
}

func verifyRevokeNotification(ch chan *OrderNote, expectedSubject string, t *testing.T) {
func verifyRevokeNotification(ch chan *OrderNote, expectedTopic Topic, t *testing.T) {
select {
case actualOrderNote := <-ch:
if expectedSubject != actualOrderNote.SubjectText {
t.Fatalf("SubjectText mismatch. %s != %s", actualOrderNote.SubjectText,
expectedSubject)
if expectedTopic != actualOrderNote.TopicID {
t.Fatalf("SubjectText mismatch. %s != %s", actualOrderNote.TopicID,
expectedTopic)
}
return
case <-tCtx.Done():
Expand Down

0 comments on commit c994f03

Please sign in to comment.