Skip to content

Commit

Permalink
Add Twitter worker (#97)
Browse files Browse the repository at this point in the history
* Add Twitter worker for tweet registrations with Code

* Add Twitter worker to refresh stale user info

* Fix nil access in updateCachedTwitterUser
  • Loading branch information
jeffyanta committed Apr 2, 2024
1 parent 6c3ce57 commit 16a62c4
Show file tree
Hide file tree
Showing 10 changed files with 642 additions and 44 deletions.
51 changes: 51 additions & 0 deletions pkg/code/async/user/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package async_user

import (
"context"
"time"

"github.com/sirupsen/logrus"

"github.com/code-payments/code-server/pkg/code/async"
code_data "github.com/code-payments/code-server/pkg/code/data"
"github.com/code-payments/code-server/pkg/sync"
"github.com/code-payments/code-server/pkg/twitter"
)

type service struct {
log *logrus.Entry
data code_data.Provider
twitterClient *twitter.Client
userLocks *sync.StripedLock
}

func New(twitterClient *twitter.Client, data code_data.Provider) async.Service {
return &service{
log: logrus.StandardLogger().WithField("service", "user"),
data: data,
twitterClient: twitterClient,
userLocks: sync.NewStripedLock(1024),
}
}

// todo: split out interval for each worker
func (p *service) Start(ctx context.Context, interval time.Duration) error {
go func() {
err := p.twitterRegistrationWorker(ctx, interval)
if err != nil && err != context.Canceled {
p.log.WithError(err).Warn("twitter registration processing loop terminated unexpectedly")
}
}()

go func() {
err := p.twitterUserInfoUpdateWorker(ctx, interval)
if err != nil && err != context.Canceled {
p.log.WithError(err).Warn("twitter user info processing loop terminated unexpectedly")
}
}()

select {
case <-ctx.Done():
return ctx.Err()
}
}
285 changes: 285 additions & 0 deletions pkg/code/async/user/twitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package async_user

import (
"context"
"strings"
"time"

"github.com/mr-tron/base58"
"github.com/newrelic/go-agent/v3/newrelic"
"github.com/pkg/errors"

commonpb "github.com/code-payments/code-protobuf-api/generated/go/common/v1"
userpb "github.com/code-payments/code-protobuf-api/generated/go/user/v1"

"github.com/code-payments/code-server/pkg/code/common"
"github.com/code-payments/code-server/pkg/code/data/account"
"github.com/code-payments/code-server/pkg/code/data/twitter"
"github.com/code-payments/code-server/pkg/metrics"
"github.com/code-payments/code-server/pkg/retry"
twitter_lib "github.com/code-payments/code-server/pkg/twitter"
)

const (
tipCardRegistrationPrefix = "accountForX="
maxTweetSearchResults = 100 // maximum allowed
)

var (
errTwitterInvalidRegistrationValue = errors.New("twitter registration value is invalid")
errTwitterRegistrationNotFound = errors.New("twitter registration not found")
)

func (p *service) twitterRegistrationWorker(serviceCtx context.Context, interval time.Duration) error {
log := p.log.WithField("method", "twitterRegistrationWorker")

delay := interval

err := retry.Loop(
func() (err error) {
time.Sleep(delay)

nr := serviceCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application)
m := nr.StartTransaction("async__user_service__handle_twitter_registration")
defer m.End()
tracedCtx := newrelic.NewContext(serviceCtx, m)

err = p.findNewTwitterRegistrations(tracedCtx)
if err != nil {
m.NoticeError(err)
log.WithError(err).Warn("failure processing new twitter registrations")
}
return err
},
retry.NonRetriableErrors(context.Canceled),
)

return err
}

func (p *service) twitterUserInfoUpdateWorker(serviceCtx context.Context, interval time.Duration) error {
log := p.log.WithField("method", "twitterUserInfoUpdateWorker")

delay := interval

err := retry.Loop(
func() (err error) {
time.Sleep(delay)

nr := serviceCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application)
m := nr.StartTransaction("async__user_service__handle_twitter_user_info_update")
defer m.End()
tracedCtx := newrelic.NewContext(serviceCtx, m)

// todo: configurable parameters
records, err := p.data.GetStaleTwitterUsers(tracedCtx, 7*24*time.Hour, 32)
if err == twitter.ErrUserNotFound {
return nil
} else if err != nil {
m.NoticeError(err)
log.WithError(err).Warn("failure getting stale twitter users")
return err
}

for _, record := range records {
err := p.refreshTwitterUserInfo(tracedCtx, record.Username)
if err != nil {
m.NoticeError(err)
log.WithError(err).Warn("failure refreshing twitter user info")
return err
}
}

return nil
},
retry.NonRetriableErrors(context.Canceled),
)

return err
}

func (p *service) findNewTwitterRegistrations(ctx context.Context) error {
var newlyProcessedTweets []string

err := func() error {
var pageToken *string
for {
tweets, nextPageToken, err := p.twitterClient.SearchRecentTweets(
ctx,
tipCardRegistrationPrefix,
maxTweetSearchResults,
pageToken,
)
if err != nil {
return errors.Wrap(err, "error searching tweets")
}

processedUsernames := make(map[string]any)
for _, tweet := range tweets {
if tweet.AdditionalMetadata.Author == nil {
return errors.Errorf("author missing in tweet %s", tweet.ID)
}

isTweetProcessed, err := p.data.IsTweetProcessed(ctx, tweet.ID)
if err != nil {
return errors.Wrap(err, "error checking if tweet is processed")
} else if isTweetProcessed {
// Found a checkpoint, so stop processing
return nil
}

// Oldest tweets go first, so we are guaranteed to checkpoint everything
newlyProcessedTweets = append([]string{tweet.ID}, newlyProcessedTweets...)

// Avoid reprocessing a Twitter user and potentially overriding the
// tip address with something older.
if _, ok := processedUsernames[tweet.AdditionalMetadata.Author.Username]; ok {
continue
}

tipAccount, err := findTipAccountRegisteredInTweet(tweet)
switch err {
case nil:
case errTwitterInvalidRegistrationValue, errTwitterRegistrationNotFound:
continue
default:
return errors.Wrapf(err, "unexpected error processing tweet %s", tweet.ID)
}

processedUsernames[tweet.AdditionalMetadata.Author.Username] = struct{}{}

err = p.updateCachedTwitterUser(ctx, tweet.AdditionalMetadata.Author, tipAccount)
if err != nil {
return errors.Wrap(err, "error updating cached user state")
}
}

if nextPageToken == nil {
return nil
}
pageToken = nextPageToken
}
}()

if err != nil {
return err
}

// Only update the processed tweet cache once we've found another checkpoint,
// or reached the end of the Tweet feed.
//
// todo: add batching
for _, tweetId := range newlyProcessedTweets {
err := p.data.MarkTweetAsProcessed(ctx, tweetId)
if err != nil {
return errors.Wrap(err, "error marking tweet as processed")
}
}

return nil
}

func (p *service) refreshTwitterUserInfo(ctx context.Context, username string) error {
user, err := p.twitterClient.GetUserByUsername(ctx, username)
if err != nil {
return errors.Wrap(err, "error getting user info from twitter")
}

err = p.updateCachedTwitterUser(ctx, user, nil)
if err != nil {
return errors.Wrap(err, "error updating cached user state")
}
return nil
}

func (p *service) updateCachedTwitterUser(ctx context.Context, user *twitter_lib.User, newTipAccount *common.Account) error {
mu := p.userLocks.Get([]byte(user.Username))
mu.Lock()
defer mu.Unlock()

// Validate the new tip account if it's provided
if newTipAccount != nil {
accountInfoRecord, err := p.data.GetAccountInfoByTokenAddress(ctx, newTipAccount.PublicKey().ToBase58())
switch err {
case nil:
if accountInfoRecord.AccountType != commonpb.AccountType_PRIMARY {
return nil
}
case account.ErrAccountInfoNotFound:
default:
return errors.Wrap(err, "error getting account info")
}
}

record, err := p.data.GetTwitterUser(ctx, user.Username)
switch err {
case twitter.ErrUserNotFound:
if newTipAccount == nil {
return errors.New("tip account must be present for newly registered twitter users")
}

record = &twitter.Record{
Username: user.Username,
}

fallthrough
case nil:
record.Name = user.Name
record.ProfilePicUrl = user.ProfileImageUrl
record.VerifiedType = toProtoVerifiedType(user.VerifiedType)
record.FollowerCount = uint32(user.PublicMetrics.FollowersCount)

if newTipAccount != nil {
record.TipAddress = newTipAccount.PublicKey().ToBase58()
}
default:
return errors.Wrap(err, "error getting cached twitter user")
}

err = p.data.SaveTwitterUser(ctx, record)
if err != nil {
return errors.Wrap(err, "error updating cached twitter user")
}
return nil
}

func findTipAccountRegisteredInTweet(tweet *twitter_lib.Tweet) (*common.Account, error) {
var depositAccount *common.Account

parts := strings.Fields(tweet.Text)
for _, part := range parts {
if !strings.HasPrefix(part, tipCardRegistrationPrefix) {
continue
}

part = part[len(tipCardRegistrationPrefix):]
part = strings.TrimSuffix(part, ".")

decoded, err := base58.Decode(part)
if err != nil {
return nil, errTwitterInvalidRegistrationValue
}

if len(decoded) != 32 {
return nil, errTwitterInvalidRegistrationValue
}

depositAccount, _ = common.NewAccountFromPublicKeyBytes(decoded)
return depositAccount, nil
}

return nil, errTwitterRegistrationNotFound
}

func toProtoVerifiedType(value string) userpb.GetTwitterUserResponse_VerifiedType {
switch value {
case "blue":
return userpb.GetTwitterUserResponse_BLUE
case "business":
return userpb.GetTwitterUserResponse_BUSINESS
case "government":
return userpb.GetTwitterUserResponse_GOVERNMENT
default:
return userpb.GetTwitterUserResponse_NONE
}
}
16 changes: 14 additions & 2 deletions pkg/code/data/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@ type DatabaseData interface {
// --------------------------------------------------------------------------------
SaveTwitterUser(ctx context.Context, record *twitter.Record) error
GetTwitterUser(ctx context.Context, username string) (*twitter.Record, error)
GetStaleTwitterUsers(ctx context.Context, minAge time.Duration, limit int) ([]*twitter.Record, error)
MarkTweetAsProcessed(ctx context.Context, tweetId string) error
IsTweetProcessed(ctx context.Context, tweetId string) (bool, error)

// ExecuteInTx executes fn with a single DB transaction that is scoped to the call.
// This enables more complex transactions that can span many calls across the provider.
Expand Down Expand Up @@ -1514,8 +1517,17 @@ func (dp *DatabaseProvider) IsEligibleForAirdrop(ctx context.Context, owner stri
// Twitter
// --------------------------------------------------------------------------------
func (dp *DatabaseProvider) SaveTwitterUser(ctx context.Context, record *twitter.Record) error {
return dp.twitter.Save(ctx, record)
return dp.twitter.SaveUser(ctx, record)
}
func (dp *DatabaseProvider) GetTwitterUser(ctx context.Context, username string) (*twitter.Record, error) {
return dp.twitter.Get(ctx, username)
return dp.twitter.GetUser(ctx, username)
}
func (dp *DatabaseProvider) GetStaleTwitterUsers(ctx context.Context, minAge time.Duration, limit int) ([]*twitter.Record, error) {
return dp.twitter.GetStaleUsers(ctx, minAge, limit)
}
func (dp *DatabaseProvider) MarkTweetAsProcessed(ctx context.Context, tweetId string) error {
return dp.twitter.MarkTweetAsProcessed(ctx, tweetId)
}
func (dp *DatabaseProvider) IsTweetProcessed(ctx context.Context, tweetId string) (bool, error) {
return dp.twitter.IsTweetProcessed(ctx, tweetId)
}
Loading

0 comments on commit 16a62c4

Please sign in to comment.