Skip to content

Commit

Permalink
Fix price per session (livepeer#2892)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored and eliteprox committed Feb 21, 2024
1 parent a783354 commit 72c77ca
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 30 deletions.
22 changes: 22 additions & 0 deletions core/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type Balances struct {
type balance struct {
lastUpdate time.Time // Unix time since last update
amount *big.Rat // Balance represented as a big.Rat
fixedPrice *big.Rat // Fixed price for the session
}

// NewBalances creates a Balances instance with the given ttl
Expand Down Expand Up @@ -181,6 +182,27 @@ func (b *Balances) Balance(id ManifestID) *big.Rat {
return b.balances[id].amount
}

// FixedPrice retrieves the price fixed the given session
func (b *Balances) FixedPrice(id ManifestID) *big.Rat {
b.mtx.RLock()
defer b.mtx.RUnlock()
if b.balances[id] == nil {
return nil
}
return b.balances[id].fixedPrice
}

// SetFixedPrice sets fixed price for the given session
func (b *Balances) SetFixedPrice(id ManifestID, fixedPrice *big.Rat) {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.balances[id] == nil {
b.balances[id] = &balance{amount: big.NewRat(0, 1)}
}
b.balances[id].fixedPrice = fixedPrice
b.balances[id].lastUpdate = time.Now()
}

func (b *Balances) cleanup() {
for id, balance := range b.balances {
b.mtx.Lock()
Expand Down
17 changes: 17 additions & 0 deletions core/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ func TestReserve(t *testing.T) {
assert.Zero(big.NewRat(0, 1).Cmp(b.Balance(mid)))
}

func TestFixedPrice(t *testing.T) {
b := NewBalances(5 * time.Second)
id1 := ManifestID("12345")
id2 := ManifestID("abcdef")

// No fixed price set yet
assert.Nil(t, b.FixedPrice(id1))

// Set fixed price
p := big.NewRat(1, 5)
b.SetFixedPrice(id1, p)
assert.Equal(t, p, b.FixedPrice(id1))

// No fixed price for a different manifest ID
assert.Nil(t, b.FixedPrice(id2))
}

func TestBalancesCleanup(t *testing.T) {
b := NewBalances(5 * time.Second)
assert := assert.New(t)
Expand Down
26 changes: 13 additions & 13 deletions core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ func TestPriceInfo(t *testing.T) {
recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil)
orch := NewOrchestrator(n, nil)

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err := common.PriceToFixed(expPricePerPixel)
Expand All @@ -1410,7 +1410,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(1010, 100)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1425,7 +1425,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(101, 1000)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1439,7 +1439,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(2525, 1000)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1458,7 +1458,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(11, 1)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1477,7 +1477,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(1100, 10)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1496,7 +1496,7 @@ func TestPriceInfo(t *testing.T) {
orch = NewOrchestrator(n, nil)
expPricePerPixel = big.NewRat(20, 1)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)))
fixedPrice, err = common.PriceToFixed(expPricePerPixel)
Expand All @@ -1509,7 +1509,7 @@ func TestPriceInfo(t *testing.T) {
n.SetBasePrice("default", big.NewRat(0, 1))
orch = NewOrchestrator(n, nil)

priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Zero(priceInfo.PricePerUnit)
assert.Equal(int64(1), priceInfo.PixelsPerUnit)
Expand All @@ -1527,7 +1527,7 @@ func TestPriceInfo(t *testing.T) {
overhead := new(big.Rat).Add(big.NewRat(1, 1), new(big.Rat).Inv(txMultiplier))
expPricePerPixel = new(big.Rat).Mul(basePrice, overhead) // 23953749205332825000/926899968213313
require.Equal(expPricePerPixel.Num().Cmp(big.NewInt(int64(math.MaxInt64))), 1)
priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
// for this case price will be rounded when converting to fixed
assert.NotEqual(expPricePerPixel.Cmp(big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit)), 0)
Expand All @@ -1543,7 +1543,7 @@ func TestPriceInfo(t *testing.T) {

// Now make sure when AutoAdjustPrice = false we are returning the base price
n.AutoAdjustPrice = false
priceInfo, err = orch.PriceInfo(ethcommon.Address{})
priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(err)
assert.Equal(basePrice, big.NewRat(priceInfo.PricePerUnit, priceInfo.PixelsPerUnit))
}
Expand All @@ -1553,7 +1553,7 @@ func TestPriceInfo_GivenNilNode_ReturnsNilError(t *testing.T) {
orch := NewOrchestrator(n, nil)
orch.node = nil

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(t, err)
assert.Nil(t, priceInfo)
}
Expand All @@ -1563,7 +1563,7 @@ func TestPriceInfo_GivenNilRecipient_ReturnsNilError(t *testing.T) {
orch := NewOrchestrator(n, nil)
n.Recipient = nil

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(t, err)
assert.Nil(t, priceInfo)
}
Expand All @@ -1578,7 +1578,7 @@ func TestPriceInfo_TxMultiplierError_ReturnsError(t *testing.T) {
recipient.On("TxCostMultiplier", mock.Anything).Return(nil, expError)
orch := NewOrchestrator(n, nil)

priceInfo, err := orch.PriceInfo(ethcommon.Address{})
priceInfo, err := orch.PriceInfo(ethcommon.Address{}, "")
assert.Nil(t, priceInfo)
assert.EqualError(t, err, expError.Error())
}
Expand Down
24 changes: 21 additions & 3 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
return fmt.Errorf("invalid expected price sent with payment err=%q", "expected price is nil")
}

// During the first payment, set the fixed price per session
if balances, ok := orch.node.Balances.balances[sender]; ok {
if balances.FixedPrice(manifestID) == nil {
balances.SetFixedPrice(manifestID, priceInfoRat)
glog.V(6).Infof("Setting fixed price=%v for session=%v", priceInfoRat, manifestID)
}
}

ticketParams := &pm.TicketParams{
Recipient: ethcommon.BytesToAddress(payment.TicketParams.Recipient),
FaceValue: new(big.Int).SetBytes(payment.TicketParams.FaceValue),
Expand Down Expand Up @@ -252,12 +260,12 @@ func (orch *orchestrator) TicketParams(sender ethcommon.Address, priceInfo *net.
}, nil
}

func (orch *orchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) {
func (orch *orchestrator) PriceInfo(sender ethcommon.Address, manifestID ManifestID) (*net.PriceInfo, error) {
if orch.node == nil || orch.node.Recipient == nil {
return nil, nil
}

price, err := orch.priceInfo(sender)
price, err := orch.priceInfo(sender, manifestID)
if err != nil {
return nil, err
}
Expand All @@ -273,9 +281,19 @@ func (orch *orchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, e
}

// priceInfo returns price per pixel as a fixed point number wrapped in a big.Rat
func (orch *orchestrator) priceInfo(sender ethcommon.Address) (*big.Rat, error) {
func (orch *orchestrator) priceInfo(sender ethcommon.Address, manifestID ManifestID) (*big.Rat, error) {
basePrice := orch.node.GetBasePrice(sender.String())

// If there is already a fixed price for the given session, use this price
if manifestID != "" {
if balances, ok := orch.node.Balances.balances[sender]; ok {
fixedPrice := balances.FixedPrice(manifestID)
if fixedPrice != nil {
return fixedPrice, nil
}
}
}

if basePrice == nil {
basePrice = orch.node.GetBasePrice("default")
}
Expand Down
1 change: 1 addition & 0 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ func selectOrchestrator(ctx context.Context, n *core.LivepeerNode, params *core.
Balance: balance,
lock: &sync.RWMutex{},
OrchestratorScore: oScore,
InitialPrice: od.RemoteInfo.PriceInfo,
}

sessions = append(sessions, session)
Expand Down
13 changes: 7 additions & 6 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Orchestrator interface {
TranscoderResults(job int64, res *core.RemoteTranscoderResult)
ProcessPayment(ctx context.Context, payment net.Payment, manifestID core.ManifestID) error
TicketParams(sender ethcommon.Address, priceInfo *net.PriceInfo) (*net.TicketParams, error)
PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error)
PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error)
SufficientBalance(addr ethcommon.Address, manifestID core.ManifestID) bool
DebitFees(addr ethcommon.Address, manifestID core.ManifestID, price *net.PriceInfo, pixels int64)
Capabilities() *net.Capabilities
Expand Down Expand Up @@ -118,6 +118,7 @@ type BroadcastSession struct {
OrchestratorOS drivers.OSSession
PMSessionID string
Balance Balance
InitialPrice *net.PriceInfo
}

func (bs *BroadcastSession) Transcoder() string {
Expand Down Expand Up @@ -323,7 +324,7 @@ func getOrchestrator(orch Orchestrator, req *net.OrchestratorRequest) (*net.Orch
}

// currently, orchestrator == transcoder
return orchestratorInfo(orch, addr, orch.ServiceURI().String())
return orchestratorInfo(orch, addr, orch.ServiceURI().String(), "")
}

func endTranscodingSession(node *core.LivepeerNode, orch Orchestrator, req *net.EndTranscodingSessionRequest) (*net.EndTranscodingSessionResponse, error) {
Expand All @@ -335,18 +336,18 @@ func endTranscodingSession(node *core.LivepeerNode, orch Orchestrator, req *net.
return &net.EndTranscodingSessionResponse{}, nil
}

func getPriceInfo(orch Orchestrator, addr ethcommon.Address) (*net.PriceInfo, error) {
func getPriceInfo(orch Orchestrator, addr ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) {
if AuthWebhookURL != nil {
webhookRes := getFromDiscoveryAuthWebhookCache(addr.Hex())
if webhookRes != nil && webhookRes.PriceInfo != nil {
return webhookRes.PriceInfo, nil
}
}
return orch.PriceInfo(addr)
return orch.PriceInfo(addr, manifestID)
}

func orchestratorInfo(orch Orchestrator, addr ethcommon.Address, serviceURI string) (*net.OrchestratorInfo, error) {
priceInfo, err := getPriceInfo(orch, addr)
func orchestratorInfo(orch Orchestrator, addr ethcommon.Address, serviceURI string, manifestID core.ManifestID) (*net.OrchestratorInfo, error) {
priceInfo, err := getPriceInfo(orch, addr, manifestID)
if err != nil {
return nil, err
}
Expand Down
30 changes: 23 additions & 7 deletions server/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (r *stubOrchestrator) TicketParams(sender ethcommon.Address, priceInfo *net
return r.ticketParams, nil
}

func (r *stubOrchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) {
func (r *stubOrchestrator) PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) {
return r.priceInfo, nil
}

Expand Down Expand Up @@ -696,7 +696,23 @@ func TestValidatePrice(t *testing.T) {
err = validatePrice(s)
assert.Nil(err)

// O Initial Price == O Price
s.InitialPrice = oinfo.PriceInfo
err = validatePrice(s)
assert.Nil(err)

// O Initial Price higher than O Price
s.InitialPrice = &net.PriceInfo{PricePerUnit: 10, PixelsPerUnit: 3}
err = validatePrice(s)
assert.Nil(err)

// O Initial Price lower than O Price
s.InitialPrice = &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 10}
err = validatePrice(s)
assert.ErrorContains(err, "price has changed")

// B MaxPrice < O Price
s.InitialPrice = nil
BroadcastCfg.SetMaxPrice(big.NewRat(1, 5))
err = validatePrice(s)
assert.EqualError(err, fmt.Sprintf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", int64(1), int64(5)))
Expand Down Expand Up @@ -1034,7 +1050,7 @@ func TestGetPriceInfo_NoWebhook_DefaultPriceError_ReturnsError(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(nil, expErr)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Nil(p)
assert.EqualError(err, expErr.Error())
}
Expand All @@ -1052,7 +1068,7 @@ func TestGetPriceInfo_NoWebhook_ReturnsDefaultPrice(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(100))
assert.Equal(p.PixelsPerUnit, int64(30))
assert.Nil(err)
Expand All @@ -1076,7 +1092,7 @@ func TestGetPriceInfo_Webhook_NoCache_ReturnsDefaultPrice(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(100))
assert.Equal(p.PixelsPerUnit, int64(30))
assert.Nil(err)
Expand All @@ -1102,7 +1118,7 @@ func TestGetPriceInfo_Webhook_Cache_WrongType_ReturnsDefaultPrice(t *testing.T)

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(100))
assert.Equal(p.PixelsPerUnit, int64(30))
assert.Nil(err)
Expand Down Expand Up @@ -1133,7 +1149,7 @@ func TestGetPriceInfo_Webhook_Cache_ReturnsCachePrice(t *testing.T) {

orch.On("PriceInfo", mock.Anything).Return(priceInfo, nil)

p, err := getPriceInfo(orch, addr)
p, err := getPriceInfo(orch, addr, "")
assert.Equal(p.PricePerUnit, int64(20))
assert.Equal(p.PixelsPerUnit, int64(19))
assert.Nil(err)
Expand Down Expand Up @@ -1307,7 +1323,7 @@ func (o *mockOrchestrator) TicketParams(sender ethcommon.Address, priceInfo *net
return nil, args.Error(1)
}

func (o *mockOrchestrator) PriceInfo(sender ethcommon.Address) (*net.PriceInfo, error) {
func (o *mockOrchestrator) PriceInfo(sender ethcommon.Address, manifestID core.ManifestID) (*net.PriceInfo, error) {
args := o.Called(sender)
if args.Get(0) != nil {
return args.Get(0).(*net.PriceInfo), args.Error(1)
Expand Down
7 changes: 6 additions & 1 deletion server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (h *lphttp) ServeSegment(w http.ResponseWriter, r *http.Request) {
return
}

oInfo, err := orchestratorInfo(orch, sender, orch.ServiceURI().String())
oInfo, err := orchestratorInfo(orch, sender, orch.ServiceURI().String(), core.ManifestID(segData.AuthToken.SessionId))
if err != nil {
clog.Errorf(ctx, "Error updating orchestrator info - err=%q", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
Expand Down Expand Up @@ -864,6 +864,11 @@ func validatePrice(sess *BroadcastSession) error {
if maxPrice != nil && oPrice.Cmp(maxPrice) == 1 {
return fmt.Errorf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", maxPrice.Num().Int64(), maxPrice.Denom().Int64())
}
iPrice, err := common.RatPriceInfo(sess.InitialPrice)
if err == nil && iPrice != nil && oPrice.Cmp(iPrice) == 1 {
return fmt.Errorf("Orchestrator price has changed, Orchestrator price: %v, Orchestrator initial price: %v", oPrice, iPrice)
}

return nil
}

Expand Down

0 comments on commit 72c77ca

Please sign in to comment.