Skip to content

Commit

Permalink
Improve Twitter registration (#107)
Browse files Browse the repository at this point in the history
* Pull in unofficialy latest protos

* Update Twitter store

* Implement updated GetTwitterUser RPC

* Fix Twitter worker with new protos and methods

* Fix SaveUser in memory Twitter store for detecting duplicate tip addresses

* Update Twitter store to keep track of used registration nonces

* Make adjustments to Twitter worker with new registration flow

* Add push when Twitter account is connected

* Update used twitter nonce postgres table name

* Pull in official latest protos

* Remove addressed todo
  • Loading branch information
jeffyanta committed Apr 9, 2024
1 parent 0a22e66 commit 617ac80
Show file tree
Hide file tree
Showing 16 changed files with 502 additions and 154 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
firebase.google.com/go/v4 v4.8.0
github.com/aws/aws-sdk-go-v2 v0.17.0
github.com/bits-and-blooms/bloom/v3 v3.1.0
github.com/code-payments/code-protobuf-api v1.14.0
github.com/code-payments/code-protobuf-api v1.16.1
github.com/emirpasic/gods v1.12.0
github.com/envoyproxy/protoc-gen-validate v1.0.4
github.com/golang-jwt/jwt/v5 v5.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/code-payments/code-protobuf-api v1.14.0 h1:HQOTZtIDGbEjWp7HDFD20Lpav4CbRhrM6GZrhxtfiZc=
github.com/code-payments/code-protobuf-api v1.14.0/go.mod h1:pHQm75vydD6Cm2qHAzlimW6drysm489Z4tVxC2zHSsU=
github.com/code-payments/code-protobuf-api v1.16.1 h1:aQ5cwttkMR8nJmN2tTD+5mb+7aWuWgKhY2VpAAogplE=
github.com/code-payments/code-protobuf-api v1.16.1/go.mod h1:pHQm75vydD6Cm2qHAzlimW6drysm489Z4tVxC2zHSsU=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
Expand Down
5 changes: 4 additions & 1 deletion pkg/code/async/user/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,24 @@ import (

"github.com/code-payments/code-server/pkg/code/async"
code_data "github.com/code-payments/code-server/pkg/code/data"
push_lib "github.com/code-payments/code-server/pkg/push"
"github.com/code-payments/code-server/pkg/sync"
"github.com/code-payments/code-server/pkg/twitter"
)

type service struct {
log *logrus.Entry
data code_data.Provider
pusher push_lib.Provider
twitterClient *twitter.Client
userLocks *sync.StripedLock
}

func New(twitterClient *twitter.Client, data code_data.Provider) async.Service {
func New(data code_data.Provider, pusher push_lib.Provider, twitterClient *twitter.Client) async.Service {
return &service{
log: logrus.StandardLogger().WithField("service", "user"),
data: data,
pusher: pusher,
twitterClient: twitterClient,
userLocks: sync.NewStripedLock(1024),
}
Expand Down
256 changes: 150 additions & 106 deletions pkg/code/async/user/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package async_user

import (
"context"
"crypto/ed25519"
"database/sql"
"strings"
"time"

"github.com/google/uuid"
"github.com/mr-tron/base58"
"github.com/newrelic/go-agent/v3/newrelic"
"github.com/pkg/errors"
Expand All @@ -15,13 +18,14 @@ import (
"github.com/code-payments/code-server/pkg/code/common"
"github.com/code-payments/code-server/pkg/code/data/account"
"github.com/code-payments/code-server/pkg/code/data/twitter"
push_util "github.com/code-payments/code-server/pkg/code/push"
"github.com/code-payments/code-server/pkg/metrics"
"github.com/code-payments/code-server/pkg/retry"
twitter_lib "github.com/code-payments/code-server/pkg/twitter"
)

const (
tipCardRegistrationPrefix = "CodeAccount:"
tipCardRegistrationPrefix = "CodeAccount"
maxTweetSearchResults = 100 // maximum allowed
)

Expand All @@ -44,7 +48,7 @@ func (p *service) twitterRegistrationWorker(serviceCtx context.Context, interval
defer m.End()
tracedCtx := newrelic.NewContext(serviceCtx, m)

err = p.findNewTwitterRegistrations(tracedCtx)
err = p.processNewTwitterRegistrations(tracedCtx)
if err != nil {
m.NoticeError(err)
log.WithError(err).Warn("failure processing new twitter registrations")
Expand Down Expand Up @@ -98,96 +102,56 @@ func (p *service) twitterUserInfoUpdateWorker(serviceCtx context.Context, interv
return err
}

func (p *service) findNewTwitterRegistrations(ctx context.Context) error {
var newlyProcessedTweets []string

err := func() error {
var pageToken *string
processedUsernames := make(map[string]any)
for {
tweets, nextPageToken, err := p.twitterClient.SearchRecentTweets(
ctx,
tipCardRegistrationPrefix,
maxTweetSearchResults,
pageToken,
)
if err != nil {
return errors.Wrap(err, "error searching tweets")
}

for _, tweet := range tweets {
if tweet.AdditionalMetadata.Author == nil {
return errors.Errorf("author missing in tweet %s", tweet.ID)
}

isTweetProcessed, err := p.data.IsTweetProcessed(ctx, tweet.ID)
if err != nil {
return errors.Wrap(err, "error checking if tweet is processed")
} else if isTweetProcessed {
// Found a checkpoint, so stop processing
return nil
}

// Oldest tweets go first, so we are guaranteed to checkpoint everything
newlyProcessedTweets = append([]string{tweet.ID}, newlyProcessedTweets...)

// Avoid reprocessing a Twitter user and potentially overriding the
// tip address with something older.
if _, ok := processedUsernames[tweet.AdditionalMetadata.Author.Username]; ok {
continue
}

// Attempt to find a tip account from the registration tweet
tipAccount, err := findTipAccountRegisteredInTweet(tweet)
switch err {
case nil:
case errTwitterInvalidRegistrationValue, errTwitterRegistrationNotFound:
continue
default:
return errors.Wrapf(err, "unexpected error processing tweet %s", tweet.ID)
}
func (p *service) processNewTwitterRegistrations(ctx context.Context) error {
tweets, err := p.findNewRegistrationTweets(ctx)
if err != nil {
return errors.Wrap(err, "error finding new registration tweets")
}

// Validate the new tip account
accountInfoRecord, err := p.data.GetAccountInfoByTokenAddress(ctx, tipAccount.PublicKey().ToBase58())
switch err {
case nil:
// todo: potentially use a relationship account instead
if accountInfoRecord.AccountType != commonpb.AccountType_PRIMARY {
continue
}
case account.ErrAccountInfoNotFound:
continue
default:
return errors.Wrap(err, "error getting account info")
}
for _, tweet := range tweets {
if tweet.AdditionalMetadata.Author == nil {
return errors.Errorf("author missing in tweet %s", tweet.ID)
}

processedUsernames[tweet.AdditionalMetadata.Author.Username] = struct{}{}
// Attempt to find a verified tip account from the registration tweet
tipAccount, registrationNonce, err := p.findVerifiedTipAccountRegisteredInTweet(ctx, tweet)
switch err {
case nil:
case errTwitterInvalidRegistrationValue, errTwitterRegistrationNotFound:
continue
default:
return errors.Wrapf(err, "unexpected error processing tweet %s", tweet.ID)
}

err = p.updateCachedTwitterUser(ctx, tweet.AdditionalMetadata.Author, tipAccount)
if err != nil {
return errors.Wrap(err, "error updating cached user state")
}
// Save the updated tipping information
err = p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error {
err = p.data.MarkTwitterNonceAsUsed(ctx, tweet.ID, *registrationNonce)
if err != nil {
return err
}

if nextPageToken == nil {
return nil
err = p.updateCachedTwitterUser(ctx, tweet.AdditionalMetadata.Author, tipAccount)
if err != nil {
return err
}
pageToken = nextPageToken
}
}()

if err != nil {
return err
}
err = p.data.MarkTweetAsProcessed(ctx, tweet.ID)
if err != nil {
return err
}

// Only update the processed tweet cache once we've found another checkpoint,
// or reached the end of the Tweet feed.
//
// todo: add batching
for _, tweetId := range newlyProcessedTweets {
err := p.data.MarkTweetAsProcessed(ctx, tweetId)
if err != nil {
return errors.Wrap(err, "error marking tweet as processed")
return nil
})

switch err {
case nil:
go push_util.SendTwitterAccountConnectedPushNotification(ctx, p.data, p.pusher, tipAccount)
case twitter.ErrDuplicateTipAddress, twitter.ErrDuplicateNonce:
// Any race conditions with duplicate nonces or tip addresses will are ignored
//
// todo: In the future, support multiple tip address mappings
default:
return errors.Wrap(err, "error saving new registration")
}
}

Expand All @@ -212,7 +176,7 @@ func (p *service) updateCachedTwitterUser(ctx context.Context, user *twitter_lib
mu.Lock()
defer mu.Unlock()

record, err := p.data.GetTwitterUser(ctx, user.Username)
record, err := p.data.GetTwitterUserByUsername(ctx, user.Username)
switch err {
case twitter.ErrUserNotFound:
if newTipAccount == nil {
Expand All @@ -238,49 +202,129 @@ func (p *service) updateCachedTwitterUser(ctx context.Context, user *twitter_lib
}

err = p.data.SaveTwitterUser(ctx, record)
if err != nil {
switch err {
case nil, twitter.ErrDuplicateTipAddress:
return err
default:
return errors.Wrap(err, "error updating cached twitter user")
}
return nil
}

func findTipAccountRegisteredInTweet(tweet *twitter_lib.Tweet) (*common.Account, error) {
var depositAccount *common.Account
func (p *service) findNewRegistrationTweets(ctx context.Context) ([]*twitter_lib.Tweet, error) {
var pageToken *string
var res []*twitter_lib.Tweet
for {
tweets, nextPageToken, err := p.twitterClient.SearchRecentTweets(
ctx,
tipCardRegistrationPrefix,
maxTweetSearchResults,
pageToken,
)
if err != nil {
return nil, errors.Wrap(err, "error searching tweets")
}

for _, tweet := range tweets {
isTweetProcessed, err := p.data.IsTweetProcessed(ctx, tweet.ID)
if err != nil {
return nil, errors.Wrap(err, "error checking if tweet is processed")
} else if isTweetProcessed {
// Found a checkpoint
return res, nil
}

parts := strings.Fields(tweet.Text)
for _, part := range parts {
if !strings.HasPrefix(part, tipCardRegistrationPrefix) {
res = append([]*twitter_lib.Tweet{tweet}, res...)
}

if nextPageToken == nil {
return res, nil
}
pageToken = nextPageToken
}
}

func (p *service) findVerifiedTipAccountRegisteredInTweet(ctx context.Context, tweet *twitter_lib.Tweet) (*common.Account, *uuid.UUID, error) {
tweetParts := strings.Fields(tweet.Text)
for _, tweetPart := range tweetParts {
// Look for the well-known prefix to indicate a potential registration value

if !strings.HasPrefix(tweetPart, tipCardRegistrationPrefix) {
continue
}

part = part[len(tipCardRegistrationPrefix):]
part = strings.TrimSuffix(part, ".")
// Parse out the individual components of the registration value

tweetPart = strings.TrimSuffix(tweetPart, ".")
registrationParts := strings.Split(tweetPart, ":")
if len(registrationParts) != 4 {
return nil, nil, errTwitterInvalidRegistrationValue
}

addressString := registrationParts[1]
nonceString := registrationParts[2]
signatureString := registrationParts[3]

decodedAddress, err := base58.Decode(addressString)
if err != nil {
return nil, nil, errTwitterInvalidRegistrationValue
}
if len(decodedAddress) != 32 {
return nil, nil, errTwitterInvalidRegistrationValue
}
tipAccount, _ := common.NewAccountFromPublicKeyBytes(decodedAddress)

nonce, err := uuid.Parse(nonceString)
if err != nil {
return nil, nil, errTwitterInvalidRegistrationValue
}

decoded, err := base58.Decode(part)
decodedSignature, err := base58.Decode(signatureString)
if err != nil {
return nil, errTwitterInvalidRegistrationValue
return nil, nil, errTwitterInvalidRegistrationValue
}
if len(decodedSignature) != 64 {
return nil, nil, errTwitterInvalidRegistrationValue
}

// Validate the components of the registration value

var tipAuthority *common.Account
accountInfoRecord, err := p.data.GetAccountInfoByTokenAddress(ctx, tipAccount.PublicKey().ToBase58())
switch err {
case nil:
if accountInfoRecord.AccountType != commonpb.AccountType_PRIMARY {
return nil, nil, errTwitterInvalidRegistrationValue
}

tipAuthority, err = common.NewAccountFromPublicKeyString(accountInfoRecord.AuthorityAccount)
if err != nil {
return nil, nil, errors.Wrap(err, "invalid tip authority account")
}
case account.ErrAccountInfoNotFound:
return nil, nil, errTwitterInvalidRegistrationValue
default:
return nil, nil, errors.Wrap(err, "error getting account info")
}

if len(decoded) != 32 {
return nil, errTwitterInvalidRegistrationValue
if !ed25519.Verify(tipAuthority.PublicKey().ToBytes(), nonce[:], decodedSignature) {
return nil, nil, errTwitterInvalidRegistrationValue
}

depositAccount, _ = common.NewAccountFromPublicKeyBytes(decoded)
return depositAccount, nil
return tipAccount, &nonce, nil
}

return nil, errTwitterRegistrationNotFound
return nil, nil, errTwitterRegistrationNotFound
}

func toProtoVerifiedType(value string) userpb.GetTwitterUserResponse_VerifiedType {
func toProtoVerifiedType(value string) userpb.TwitterUser_VerifiedType {
switch value {
case "blue":
return userpb.GetTwitterUserResponse_BLUE
return userpb.TwitterUser_BLUE
case "business":
return userpb.GetTwitterUserResponse_BUSINESS
return userpb.TwitterUser_BUSINESS
case "government":
return userpb.GetTwitterUserResponse_GOVERNMENT
return userpb.TwitterUser_GOVERNMENT
default:
return userpb.GetTwitterUserResponse_NONE
return userpb.TwitterUser_NONE
}
}
Loading

0 comments on commit 617ac80

Please sign in to comment.