Skip to content

Commit

Permalink
Initial worker to retry triggering the Swap RPC on client (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffyanta committed May 8, 2024
1 parent 8f01d17 commit 1fb22a3
Show file tree
Hide file tree
Showing 13 changed files with 476 additions and 22 deletions.
18 changes: 12 additions & 6 deletions pkg/code/async/account/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
)

const (
giftCardWorkerEventName = "GiftCardWorkerPollingCheck"
giftCardWorkerEventName = "GiftCardWorkerPollingCheck"
swapRetryWorkerEventName = "SwapRetryWorkerPollingCheck"
)

func (p *service) metricsGaugeWorker(ctx context.Context) error {
Expand All @@ -30,11 +31,16 @@ func (p *service) metricsGaugeWorker(ctx context.Context) error {

func (p *service) recordBackupQueueStatusPollingEvent(ctx context.Context) {
count, err := p.data.GetAccountInfoCountRequiringAutoReturnCheck(ctx)
if err != nil {
return
if err == nil {
metrics.RecordEvent(ctx, giftCardWorkerEventName, map[string]interface{}{
"queue_size": count,
})
}

metrics.RecordEvent(ctx, giftCardWorkerEventName, map[string]interface{}{
"queue_size": count,
})
count, err = p.data.GetAccountInfoCountRequiringSwapRetry(ctx)
if err == nil {
metrics.RecordEvent(ctx, swapRetryWorkerEventName, map[string]interface{}{
"queue_size": count,
})
}
}
7 changes: 7 additions & 0 deletions pkg/code/async/account/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func (p *service) Start(ctx context.Context, interval time.Duration) error {
}
}()

go func() {
err := p.swapRetryWorker(ctx, interval)
if err != nil && err != context.Canceled {
p.log.WithError(err).Warn("swap retry processing loop terminated unexpectedly")
}
}()

go func() {
err := p.metricsGaugeWorker(ctx)
if err != nil && err != context.Canceled {
Expand Down
145 changes: 145 additions & 0 deletions pkg/code/async/account/swap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package async_account

import (
"context"
"errors"
"sync"
"time"

"github.com/newrelic/go-agent/v3/newrelic"
"github.com/sirupsen/logrus"

commonpb "github.com/code-payments/code-protobuf-api/generated/go/common/v1"

"github.com/code-payments/code-server/pkg/code/balance"
"github.com/code-payments/code-server/pkg/code/common"
code_data "github.com/code-payments/code-server/pkg/code/data"
"github.com/code-payments/code-server/pkg/code/data/account"
"github.com/code-payments/code-server/pkg/code/push"
"github.com/code-payments/code-server/pkg/metrics"
"github.com/code-payments/code-server/pkg/retry"
"github.com/code-payments/code-server/pkg/usdc"
)

const (
swapPushRetryThreshold = 5 * time.Minute
minUsdcSwapBalance = usdc.QuarksPerUsdc / 100 // $0.01 USD
)

func (p *service) swapRetryWorker(serviceCtx context.Context, interval time.Duration) error {
delay := interval

err := retry.Loop(
func() (err error) {
time.Sleep(delay)

nr := serviceCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application)
m := nr.StartTransaction("async__account_service__handle_swap_retry")
defer m.End()
tracedCtx := newrelic.NewContext(serviceCtx, m)

records, err := p.data.GetPrioritizedAccountInfosRequiringSwapRetry(tracedCtx, swapPushRetryThreshold, 100)
if err == account.ErrAccountInfoNotFound {
return nil
} else if err != nil {
m.NoticeError(err)
return err
}

var wg sync.WaitGroup
for _, record := range records {
wg.Add(1)

go func(record *account.Record) {
defer wg.Done()

err := p.maybeTriggerAnotherSwap(tracedCtx, record)
if err != nil {
m.NoticeError(err)
}
}(record)
}
wg.Wait()

return nil
},
retry.NonRetriableErrors(context.Canceled),
)

return err
}

// todo: Handle the case where there are no push tokens
func (p *service) maybeTriggerAnotherSwap(ctx context.Context, accountInfoRecord *account.Record) error {
log := p.log.WithFields(logrus.Fields{
"method": "maybeTriggerAnotherSwap",
"owner_account": accountInfoRecord.OwnerAccount,
"token_account": accountInfoRecord.TokenAccount,
})

if accountInfoRecord.AccountType != commonpb.AccountType_SWAP {
log.Trace("skipping account that isn't a swap account")
return errors.New("expected a swap account")
}

ownerAccount, err := common.NewAccountFromPublicKeyString(accountInfoRecord.OwnerAccount)
if err != nil {
log.WithError(err).Warn("invalid owner account")
return err
}

tokenAccount, err := common.NewAccountFromPublicKeyString(accountInfoRecord.TokenAccount)
if err != nil {
log.WithError(err).Warn("invalid token account")
return err
}

balance, _, err := balance.CalculateFromBlockchain(ctx, p.data, tokenAccount)
if err != nil {
log.WithError(err).Warn("failure getting token account balance")
return err
}

// Mark the swap as successful if the account is left with only dust
if balance < minUsdcSwapBalance {
err = markSwapRetrySuccessful(ctx, p.data, accountInfoRecord)
if err != nil {
log.WithError(err).Warn("failure updating account info record")
}
return err
}

err = push.SendTriggerSwapRpcPushNotification(
ctx,
p.data,
p.pusher,
ownerAccount,
)
if err != nil {
log.WithError(err).Warn("failure sending push notification")
}

err = markSwapRetriedNow(ctx, p.data, accountInfoRecord)
if err != nil {
log.WithError(err).Warn("failure updating account info record")
}
return err
}

func markSwapRetrySuccessful(ctx context.Context, data code_data.Provider, record *account.Record) error {
if !record.RequiresSwapRetry {
return nil
}

record.RequiresSwapRetry = false
return data.UpdateAccountInfo(ctx, record)
}

func markSwapRetriedNow(ctx context.Context, data code_data.Provider, record *account.Record) error {
if !record.RequiresSwapRetry {
return nil
}

record.LastSwapRetryAt = time.Now()
return data.UpdateAccountInfo(ctx, record)
}
3 changes: 3 additions & 0 deletions pkg/code/async/account/swap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package async_account

// todo: add tests
23 changes: 21 additions & 2 deletions pkg/code/async/geyser/external_deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
chat_util "github.com/code-payments/code-server/pkg/code/chat"
"github.com/code-payments/code-server/pkg/code/common"
code_data "github.com/code-payments/code-server/pkg/code/data"
"github.com/code-payments/code-server/pkg/code/data/account"
"github.com/code-payments/code-server/pkg/code/data/balance"
"github.com/code-payments/code-server/pkg/code/data/chat"
"github.com/code-payments/code-server/pkg/code/data/deposit"
Expand Down Expand Up @@ -343,14 +344,18 @@ func processPotentialExternalDeposit(ctx context.Context, conf *conf, data code_
return errors.Wrap(err, "invalid owner account")
}

err = push.SendTriggerSwapRpcPushNotification(
// Best-effort attempt to get the client to trigger a Swap RPC call now
go push.SendTriggerSwapRpcPushNotification(
ctx,
data,
pusher,
owner,
)

// Have the account pulled by the swap retry worker
err = markRequiringSwapRetries(ctx, data, accountInfoRecord)
if err != nil {
return errors.Wrap(err, "error sending push to trigger swap rpc on client")
return err
}

syncedDepositCache.Insert(cacheKey, true, 1)
Expand Down Expand Up @@ -383,6 +388,20 @@ func markDepositsAsSynced(ctx context.Context, data code_data.Provider, vault *c
return nil
}

func markRequiringSwapRetries(ctx context.Context, data code_data.Provider, accountInfoRecord *account.Record) error {
if accountInfoRecord.RequiresSwapRetry {
return nil
}

accountInfoRecord.RequiresSwapRetry = true
accountInfoRecord.LastSwapRetryAt = time.Now()
err := data.UpdateAccountInfo(ctx, accountInfoRecord)
if err != nil {
return errors.Wrap(err, "error updating swap account info")
}
return nil
}

// todo: can be promoted more broadly
func getDeltaQuarksFromTokenBalances(tokenAccount *common.Account, tokenBalances *solana.TransactionTokenBalances) (int64, error) {
var preQuarkBalance, postQuarkBalance int64
Expand Down
11 changes: 11 additions & 0 deletions pkg/code/data/account/acccount_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Record struct {

RequiresAutoReturnCheck bool

RequiresSwapRetry bool
LastSwapRetryAt time.Time

CreatedAt time.Time
}

Expand Down Expand Up @@ -74,6 +77,8 @@ func (r *Record) Clone() Record {
RequiresDepositSync: r.RequiresDepositSync,
DepositsLastSyncedAt: r.DepositsLastSyncedAt,
RequiresAutoReturnCheck: r.RequiresAutoReturnCheck,
RequiresSwapRetry: r.RequiresSwapRetry,
LastSwapRetryAt: r.LastSwapRetryAt,
CreatedAt: r.CreatedAt,
}
}
Expand All @@ -90,6 +95,8 @@ func (r *Record) CopyTo(dst *Record) {
dst.RequiresDepositSync = r.RequiresDepositSync
dst.DepositsLastSyncedAt = r.DepositsLastSyncedAt
dst.RequiresAutoReturnCheck = r.RequiresAutoReturnCheck
dst.RequiresSwapRetry = r.RequiresSwapRetry
dst.LastSwapRetryAt = r.LastSwapRetryAt
dst.CreatedAt = r.CreatedAt
}

Expand Down Expand Up @@ -185,6 +192,10 @@ func (r *Record) Validate() error {
return errors.New("only remote send gift cards can have auto-return checks")
}

if r.RequiresSwapRetry && r.AccountType != commonpb.AccountType_SWAP {
return errors.New("only swap accounts can require swap retries")
}

if r.RelationshipTo != nil && r.AccountType != commonpb.AccountType_RELATIONSHIP {
return errors.New("only relationship accounts can have a relationship metadata")
}
Expand Down
64 changes: 62 additions & 2 deletions pkg/code/data/account/memory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func (a ByCreatedAt) Less(i, j int) bool {
return a[i].CreatedAt.Unix() < a[j].CreatedAt.Unix()
}

type ByLastSwapRetryAt []*account.Record

func (a ByLastSwapRetryAt) Len() int { return len(a) }
func (a ByLastSwapRetryAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByLastSwapRetryAt) Less(i, j int) bool {
return a[i].LastSwapRetryAt.Unix() < a[j].LastSwapRetryAt.Unix()
}

func New() account.Store {
return &store{
records: make([]*account.Record, 0),
Expand Down Expand Up @@ -131,6 +139,16 @@ func (s *store) findByRequiringAutoReturnCheck(want bool) []*account.Record {
return res
}

func (s *store) findByRequiringSwapRetry(want bool) []*account.Record {
var res []*account.Record
for _, item := range s.records {
if item.RequiresSwapRetry == want {
res = append(res, item)
}
}
return res
}

func (s *store) filterByType(items []*account.Record, accountType commonpb.AccountType) []*account.Record {
var res []*account.Record
for _, item := range items {
Expand Down Expand Up @@ -220,6 +238,9 @@ func (s *store) Update(_ context.Context, data *account.Record) error {

item.RequiresAutoReturnCheck = data.RequiresAutoReturnCheck

item.RequiresSwapRetry = data.RequiresSwapRetry
item.LastSwapRetryAt = data.LastSwapRetryAt

item.CopyTo(data)

return nil
Expand Down Expand Up @@ -353,7 +374,7 @@ func (s *store) GetPrioritizedRequiringDepositSync(_ context.Context, limit uint
if len(res) > int(limit) {
return res[:limit], nil
}
return res, nil
return cloneRecords(res), nil
}

// CountRequiringDepositSync implements account.Store.CountRequiringDepositSync
Expand Down Expand Up @@ -392,7 +413,7 @@ func (s *store) GetPrioritizedRequiringAutoReturnCheck(ctx context.Context, minA
if len(res) > int(limit) {
return res[:limit], nil
}
return res, nil
return cloneRecords(res), nil
}

// CountRequiringAutoReturnCheck implements account.Store.CountRequiringAutoReturnCheck
Expand All @@ -404,6 +425,45 @@ func (s *store) CountRequiringAutoReturnCheck(ctx context.Context) (uint64, erro
return uint64(len(items)), nil
}

// GetPrioritizedRequiringSwapRetry implements account.Store.GetPrioritizedRequiringSwapRetry
func (s *store) GetPrioritizedRequiringSwapRetry(ctx context.Context, minAge time.Duration, limit uint64) ([]*account.Record, error) {
s.mu.Lock()
defer s.mu.Unlock()

items := s.findByRequiringSwapRetry(true)

var res []*account.Record
for _, item := range items {
if time.Since(item.LastSwapRetryAt) <= minAge {
continue
}

cloned := item.Clone()
res = append(res, &cloned)
}

if len(res) == 0 {
return nil, account.ErrAccountInfoNotFound
}

sorted := ByLastSwapRetryAt(res)
sort.Sort(sorted)

if len(res) > int(limit) {
return res[:limit], nil
}
return cloneRecords(res), nil
}

// CountRequiringSwapRetry implements account.Store.CountRequiringSwapRetry
func (s *store) CountRequiringSwapRetry(ctx context.Context) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

items := s.findByRequiringSwapRetry(true)
return uint64(len(items)), nil
}

func cloneRecords(items []*account.Record) []*account.Record {
res := make([]*account.Record, len(items))

Expand Down
Loading

0 comments on commit 1fb22a3

Please sign in to comment.