Skip to content

Commit

Permalink
Merge branch 'master' into improvement/status
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Jun 17, 2020
2 parents a6dddaa + 2dcfb79 commit 0132457
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 113 deletions.
2 changes: 1 addition & 1 deletion api/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ func TestMain(m *testing.M) {
}
dbConn, connCleanup := storage.CreateTestConn(params)
dbConn.SetDefaultConnection()
defer connCleanup()

code := m.Run()

connCleanup()
os.Exit(code)
}

Expand Down
9 changes: 5 additions & 4 deletions app/proxy/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ func TestMain(m *testing.M) {
DBName: dbConfig.DBName,
Options: dbConfig.Options,
}
c, connCleanup := storage.CreateTestConn(params)
c.SetDefaultConnection()
dbConn, connCleanup := storage.CreateTestConn(params)
dbConn.SetDefaultConnection()

defer connCleanup()
code := m.Run()

os.Exit(m.Run())
connCleanup()
os.Exit(code)
}

func testFuncSetup() {
Expand Down
7 changes: 3 additions & 4 deletions app/publish/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ func copyToDocker(t *testing.T, fileName string) {

func TestLbrynetPublisher(t *testing.T) {
dbConfig := config.GetDatabase()
params := storage.ConnParams{
c, connCleanup := storage.CreateTestConn(storage.ConnParams{
Connection: dbConfig.Connection,
DBName: dbConfig.DBName,
Options: dbConfig.Options,
}
c, connCleanup := storage.CreateTestConn(params)
c.SetDefaultConnection()
})
defer connCleanup()
c.SetDefaultConnection()

data := []byte("test file")
f, err := ioutil.TempFile(os.TempDir(), "*")
Expand Down
5 changes: 3 additions & 2 deletions app/query/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewCaller(endpoint string, userID int) *Caller {
KeepAlive: 120 * time.Second,
}).Dial,
TLSHandshakeTimeout: 30 * time.Second,
ResponseHeaderTimeout: 300 * time.Second,
ResponseHeaderTimeout: 600 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
Expand Down Expand Up @@ -149,6 +149,7 @@ func (c *Caller) callQueryWithRetry(q *Query) (*jsonrpc.RPCResponse, error) {
// Generally a HTTP transport failure (connect error etc)
if err != nil {
logger.Log().Errorf("error sending query to %v: %v", c.endpoint, err)
metrics.ProxyCallFailedDurations.WithLabelValues(q.Method(), c.endpoint, metrics.FailureKindNet).Observe(duration)
return nil, errors.Err(err)
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (c *Caller) callQueryWithRetry(q *Query) (*jsonrpc.RPCResponse, error) {
if err != nil || (r != nil && r.Error != nil) {
logFields["response"] = r.Error
logger.WithFields(logFields).Error("rpc call error")
metrics.ProxyCallFailedDurations.WithLabelValues(q.Method(), c.endpoint).Observe(duration)
metrics.ProxyCallFailedDurations.WithLabelValues(q.Method(), c.endpoint, metrics.FailureKindRPC).Observe(duration)
} else {
if config.ShouldLogResponses() {
logFields["response"] = r
Expand Down
40 changes: 11 additions & 29 deletions app/query/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func preflightHookGet(caller *Caller, query *Query) (*jsonrpc.RPCResponse, error
}
stream := claim.Value.GetStream()

if stream.Fee != nil && stream.Fee.Amount > 0 {
feeAmount := stream.GetFee().GetAmount()
if feeAmount > 0 {
isPaidStream = true

purchaseQuery, err := NewQuery(jsonrpc.NewRequest(
Expand Down Expand Up @@ -73,8 +74,8 @@ func preflightHookGet(caller *Caller, query *Query) (*jsonrpc.RPCResponse, error
} else {
// Assuming the stream is of a paid variety and we have just paid for the stream
metrics.LbrytvPurchases.Inc()
metrics.LbrytvPurchaseAmounts.Observe(float64(stream.Fee.Amount))
log.Infof("made a purchase for %.2f LBC", float64(stream.Fee.Amount))
metrics.LbrytvPurchaseAmounts.Observe(float64(feeAmount))
log.Infof("made a purchase for %d LBC", feeAmount)
// This is needed so changes can propagate for the subsequent resolve
time.Sleep(1 * time.Second)
claim, err = resolve(caller, query, url)
Expand All @@ -91,20 +92,15 @@ func preflightHookGet(caller *Caller, query *Query) (*jsonrpc.RPCResponse, error
}
metrics.LbrytvStreamRequests.WithLabelValues(metricLabel).Inc()

size := stream.GetSource().Size

if err != nil {
return nil, fmt.Errorf("error getting size by magic: %v", err)
}

if isPaidStream {
size := stream.GetSource().GetSize()
if claim.PurchaseReceipt == nil {
log.Errorf("stream was paid for but receipt not found in the resolve response")
return nil, fmt.Errorf("couldn't find purchase receipt for paid stream")
}

log.Debugf("creating stream token with stream id=%s, txid=%s, size=%v", claim.Name+"/"+claim.ClaimID, claim.PurchaseReceipt.Txid, uint64(size))
token, err := paid.CreateToken(claim.Name+"/"+claim.ClaimID, claim.PurchaseReceipt.Txid, uint64(size), paid.ExpTenSecPer100MB)
log.Debugf("creating stream token with stream id=%s, txid=%s, size=%v", claim.Name+"/"+claim.ClaimID, claim.PurchaseReceipt.Txid, size)
token, err := paid.CreateToken(claim.Name+"/"+claim.ClaimID, claim.PurchaseReceipt.Txid, size, paid.ExpTenSecPer100MB)
if err != nil {
return nil, err
}
Expand All @@ -121,10 +117,7 @@ func preflightHookGet(caller *Caller, query *Query) (*jsonrpc.RPCResponse, error
}

func resolve(c *Caller, q *Query, url string) (*ljsonrpc.Claim, error) {
log := logger.Log().WithField("url", url)

resolveResponse := ljsonrpc.ResolveResponse{}
resQuery, err := NewQuery(jsonrpc.NewRequest(
resolveQuery, err := NewQuery(jsonrpc.NewRequest(
MethodResolve,
map[string]interface{}{
"urls": url,
Expand All @@ -136,18 +129,13 @@ func resolve(c *Caller, q *Query, url string) (*ljsonrpc.Claim, error) {
return nil, err
}

resRespRaw, err := c.callQueryWithRetry(resQuery)
rawResolveResponse, err := c.callQueryWithRetry(resolveQuery)
if err != nil {
return nil, err
}

resResult := map[string]interface{}{}
err = resRespRaw.GetObject(&resResult)
if err != nil {
log.Debug("error parsing resolve response:", err)
return nil, err
}
err = ljsonrpc.Decode(resResult, &resolveResponse)
var resolveResponse ljsonrpc.ResolveResponse
err = ljsonrpc.Decode(rawResolveResponse.Result, &resolveResponse)
if err != nil {
return nil, err
}
Expand All @@ -159,12 +147,6 @@ func resolve(c *Caller, q *Query, url string) (*ljsonrpc.Claim, error) {
return &claim, err
}

func checkIsPaidStream(s interface{}) bool {
f := s.(ljsonrpc.File)
stream := f.Metadata.GetStream()
return stream.Fee != nil && stream.Fee.Amount > 0
}

func getStatusResponse(c *Caller, q *Query) (*jsonrpc.RPCResponse, error) {
var response map[string]interface{}

Expand Down
7 changes: 5 additions & 2 deletions app/sdkrouter/sdkrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ func TestMain(m *testing.M) {
}
dbConn, connCleanup := storage.CreateTestConn(params)
dbConn.SetDefaultConnection()
defer connCleanup()
os.Exit(m.Run())

code := m.Run()

connCleanup()
os.Exit(code)
}

func TestInitializeWithYML(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions app/wallet/tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func TestMain(m *testing.M) {
DBName: dbConfig.DBName,
Options: dbConfig.Options + "&TimeZone=UTC",
}
c, connCleanup := storage.CreateTestConn(params)
c.SetDefaultConnection()
dbConn, connCleanup := storage.CreateTestConn(params)
dbConn.SetDefaultConnection()

code := m.Run()
connCleanup()

connCleanup()
os.Exit(code)
}

Expand Down
92 changes: 59 additions & 33 deletions app/wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ var logger = monitor.NewModuleLogger("wallet")
func DisableLogger() { logger.Disable() } // for testing

// TokenHeader is the name of HTTP header which is supplied by client and should contain internal-api auth_token.
const TokenHeader = "X-Lbry-Auth-Token"
const pgUniqueConstraintViolation = "23505"
const (
TokenHeader = "X-Lbry-Auth-Token"

pgUniqueConstraintViolation = "23505"
pgAbortedTransactionViolation = "25P02"
txMaxRetries = 2
)

// GetUserWithWallet gets user by internal-apis auth token. If the user does not have a
// wallet yet, they are assigned an SDK and a wallet is created for them on that SDK.
Expand All @@ -53,7 +58,7 @@ func GetUserWithSDKServer(rt *sdkrouter.Router, internalAPIHost, token, metaRemo
defer cancelFn()

var localUser *models.User
err = tx(ctx, storage.Conn.DB.DB, func(tx *sql.Tx) error {
err = inTx(ctx, storage.Conn.DB.DB, func(tx *sql.Tx) error {
localUser, err = getOrCreateLocalUser(tx, remoteUser.ID, log)
if err != nil {
return err
Expand All @@ -71,65 +76,86 @@ func GetUserWithSDKServer(rt *sdkrouter.Router, internalAPIHost, token, metaRemo
return localUser, err
}

func tx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
func inTx(ctx context.Context, db *sql.DB, f func(tx *sql.Tx) error) error {
var (
tx *sql.Tx
err error
)

err = fn(tx)
if err != nil {
tx.Rollback()
return err
for i := 0; i < txMaxRetries; i++ {
tx, err = db.BeginTx(ctx, nil)
if err != nil {
return err
}

err = f(tx)

if err == nil {
return tx.Commit()
}

rollbackErr := tx.Rollback()
if rollbackErr != nil {
logger.Log().Errorf("rolling back tx: %v", rollbackErr)
}

// in postgres, if an error occurs inside a transaction, you can't do anything else
// you havee to roll the transaction back and start a new one
// more info: https://community.pivotal.io/s/article/How-to-Overcome-the-Error-current-transaction-is-aborted-commands-ignored-until-end-of-transaction-block
var pgErr *pq.Error
if errors.As(err, &pgErr) && pgErr.Code == pgAbortedTransactionViolation {
logger.Log().Debug("attempted query in aborted transaction, re-trying")
continue
}

break
}

return tx.Commit()
return err
}

func getOrCreateLocalUser(exec boil.Executor, remoteUserID int, log *logrus.Entry) (*models.User, error) {
localUser, err := getDBUser(exec, remoteUserID)
if err != nil && err != sql.ErrNoRows {
return nil, err
} else if err == sql.ErrNoRows {
log.Infof("user not found in the database, creating")
localUser, err = createDBUser(exec, remoteUserID)
if err != nil {
return nil, err

if err == nil {
if localUser.LbrynetServerID.IsZero() {
// Should not happen, but not enforced in DB structure yet
log.Errorf("user %d found in db but doesn't have sdk assigned", localUser.ID)
}
} else if localUser.LbrynetServerID.IsZero() {
// Should not happen, but not enforced in DB structure yet
log.Errorf("user %d found in db but doesn't have sdk assigned", localUser.ID)
return localUser, nil
}

return localUser, nil
}
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}

func createDBUser(exec boil.Executor, id int) (*models.User, error) {
log := logger.WithFields(logrus.Fields{"id": id})
log.Infof("user not found in the database, creating")

u := &models.User{ID: id}
err := u.Insert(exec, boil.Infer())
u := &models.User{ID: remoteUserID}
err = u.Insert(exec, boil.Infer())
if err == nil {
metrics.LbrytvNewUsers.Inc()
return u, nil
}

// Check if we encountered a primary key violation, it would mean another routine
// fired from another request has managed to create a user before us so we should try retrieving it again.
// Check if we encountered a primary key violation, it would mean another routine fired another
// request managed to create a user before us and we should retrieve that user record.
var pgErr *pq.Error
if errors.As(err, &pgErr) && pgErr.Code == pgUniqueConstraintViolation {
log.Info("user creation conflict, trying to retrieve the local user again")
return getDBUser(exec, id)
return getDBUser(exec, remoteUserID)
}

log.Error("unknown error encountered while creating user:", err)
return nil, err
}

func getDBUser(exec boil.Executor, id int) (*models.User, error) {
return models.Users(
user, err := models.Users(
models.UserWhere.ID.EQ(id),
qm.Load(models.UserRels.LbrynetServer),
).One(exec)
return user, errors.Err(err)
}

// GetDBUserG returns a database user with LbrynetServer selected, using the global executor.
Expand Down

0 comments on commit 0132457

Please sign in to comment.