Skip to content

Commit

Permalink
if a command errors inside a transaction, retry the whole transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
lyoshenka committed Jun 2, 2020
1 parent b893c6d commit 0cb7709
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 36 deletions.
92 changes: 59 additions & 33 deletions app/wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ var logger = monitor.NewModuleLogger("wallet")
func DisableLogger() { logger.Disable() } // for testing

// TokenHeader is the name of HTTP header which is supplied by client and should contain internal-api auth_token.
const TokenHeader = "X-Lbry-Auth-Token"
const pgUniqueConstraintViolation = "23505"
const (
TokenHeader = "X-Lbry-Auth-Token"

pgUniqueConstraintViolation = "23505"
pgAbortedTransactionViolation = "25P02"
txMaxRetries = 2
)

// GetUserWithWallet gets user by internal-apis auth token. If the user does not have a
// wallet yet, they are assigned an SDK and a wallet is created for them on that SDK.
Expand All @@ -53,7 +58,7 @@ func GetUserWithSDKServer(rt *sdkrouter.Router, internalAPIHost, token, metaRemo
defer cancelFn()

var localUser *models.User
err = tx(ctx, storage.Conn.DB.DB, func(tx *sql.Tx) error {
err = inTx(ctx, storage.Conn.DB.DB, func(tx *sql.Tx) error {
localUser, err = getOrCreateLocalUser(tx, remoteUser.ID, log)
if err != nil {
return err
Expand All @@ -71,65 +76,86 @@ func GetUserWithSDKServer(rt *sdkrouter.Router, internalAPIHost, token, metaRemo
return localUser, err
}

func tx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
func inTx(ctx context.Context, db *sql.DB, f func(tx *sql.Tx) error) error {
var (
tx *sql.Tx
err error
)

err = fn(tx)
if err != nil {
tx.Rollback()
return err
for i := 0; i < txMaxRetries; i++ {
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}

err = f(tx)

if err == nil {
return tx.Commit()
}

rollbackErr := tx.Rollback()
if rollbackErr != nil {
logger.Log().Errorf("rolling back tx: %v", rollbackErr)
}

// in postgres, if an error occurs inside a transaction, you can't do anything else
// you havee to roll the transaction back and start a new one
// more info: https://community.pivotal.io/s/article/How-to-Overcome-the-Error-current-transaction-is-aborted-commands-ignored-until-end-of-transaction-block
var pgErr *pq.Error
if errors.As(err, &pgErr) && pgErr.Code == pgAbortedTransactionViolation {
logger.Log().Debug("attempted query in aborted transaction, re-trying")
continue
}

break
}

return tx.Commit()
return err
}

func getOrCreateLocalUser(exec boil.Executor, remoteUserID int, log *logrus.Entry) (*models.User, error) {
localUser, err := getDBUser(exec, remoteUserID)
if err != nil && err != sql.ErrNoRows {
return nil, err
} else if err == sql.ErrNoRows {
log.Infof("user not found in the database, creating")
localUser, err = createDBUser(exec, remoteUserID)
if err != nil {
return nil, err

if err == nil {
if localUser.LbrynetServerID.IsZero() {
// Should not happen, but not enforced in DB structure yet
log.Errorf("user %d found in db but doesn't have sdk assigned", localUser.ID)
}
} else if localUser.LbrynetServerID.IsZero() {
// Should not happen, but not enforced in DB structure yet
log.Errorf("user %d found in db but doesn't have sdk assigned", localUser.ID)
return localUser, nil
}

return localUser, nil
}
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}

func createDBUser(exec boil.Executor, id int) (*models.User, error) {
log := logger.WithFields(logrus.Fields{"id": id})
log.Infof("user not found in the database, creating")

u := &models.User{ID: id}
err := u.Insert(exec, boil.Infer())
u := &models.User{ID: remoteUserID}
err = u.Insert(exec, boil.Infer())
if err == nil {
metrics.LbrytvNewUsers.Inc()
return u, nil
}

// Check if we encountered a primary key violation, it would mean another routine
// fired from another request has managed to create a user before us so we should try retrieving it again.
// Check if we encountered a primary key violation, it would mean another routine fired another
// request managed to create a user before us and we should retrieve that user record.
var pgErr *pq.Error
if errors.As(err, &pgErr) && pgErr.Code == pgUniqueConstraintViolation {
log.Info("user creation conflict, trying to retrieve the local user again")
return getDBUser(exec, id)
return getDBUser(exec, remoteUserID)
}

log.Error("unknown error encountered while creating user:", err)
return nil, err
}

func getDBUser(exec boil.Executor, id int) (*models.User, error) {
return models.Users(
user, err := models.Users(
models.UserWhere.ID.EQ(id),
qm.Load(models.UserRels.LbrynetServer),
).One(exec)
return user, errors.Err(err)
}

// assignSDKServerToUser permanently assigns an sdk to a user, and creates a wallet on that sdk for that user.
Expand Down
55 changes: 52 additions & 3 deletions app/wallet/wallet_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package wallet

import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -191,11 +193,12 @@ func TestGetUserWithWallet_ExistingUserWithoutSDKGetsAssignedOneOnRetrieve(t *te
srv.InsertG(boil.Infer())
defer func() { srv.DeleteG() }()

rt := sdkrouter.NewWithServers(srv)
u, err := createDBUser(storage.Conn.DB, userID)
u := &models.User{ID: userID}
err := u.Insert(storage.Conn.DB, boil.Infer())
require.NoError(t, err)
require.NotNil(t, u)
assert.False(t, u.CreatedAt.IsZero())

rt := sdkrouter.NewWithServers(srv)
u, err = GetUserWithSDKServer(rt, ts.URL, "abc", "")
require.NoError(t, err)
assert.True(t, u.LbrynetServerID.Valid)
Expand Down Expand Up @@ -362,3 +365,49 @@ func TestCreateWalletLoadWallet(t *testing.T) {
err = LoadWallet(addr, userID)
require.NoError(t, err)
}

func TestCreateDBUser_ConcurrentDuplicateUser(t *testing.T) {
storage.Conn.Truncate([]string{models.TableNames.Users})

id := 123
user := &models.User{ID: id}
err := user.Insert(storage.Conn.DB.DB, boil.Infer())
require.NoError(t, err)

// we want the very first getDBUser() call in getOrCreateLocalUser() to return no results to
// simulate the case where that call returns nothing and then the user is created in another
// request

mockExecutor := &firstQueryNoResults{}

err = inTx(context.Background(), storage.Conn.DB.DB, func(tx *sql.Tx) error {
mockExecutor.ex = tx
_, err := getOrCreateLocalUser(mockExecutor, id, logger.Log())
return err
})

assert.NoError(t, err)
}

type firstQueryNoResults struct {
ex boil.Executor
calls int
}

func (m *firstQueryNoResults) Exec(query string, args ...interface{}) (sql.Result, error) {
return m.ex.Exec(query, args...)
}
func (m *firstQueryNoResults) Query(query string, args ...interface{}) (*sql.Rows, error) {
m.calls++
if m.calls == 1 {
return nil, errors.Err(sql.ErrNoRows)
}
return m.ex.Query(query, args...)
}
func (m *firstQueryNoResults) QueryRow(query string, args ...interface{}) *sql.Row {
m.calls++
if m.calls == 1 {
return m.ex.QueryRow("SELECT 0 <> 0") // just want something with no rows
}
return m.ex.QueryRow(query, args...)
}

0 comments on commit 0cb7709

Please sign in to comment.