From 09b7b3afb2cfe7d88fd55ab9b7134a30104d6984 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Thu, 3 Mar 2022 22:45:39 +0100 Subject: [PATCH] more godoc comments --- client.go | 118 +----------------------------------------- prices.go | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++ products.go | 22 ++++++++ types.go | 80 +++++++++++++++++------------ types_test.go | 4 +- 5 files changed, 211 insertions(+), 151 deletions(-) create mode 100644 prices.go create mode 100644 products.go diff --git a/client.go b/client.go index c80bcd8..798a9e2 100644 --- a/client.go +++ b/client.go @@ -15,134 +15,20 @@ package pyth import ( - "context" - "errors" - "time" - - "github.com/cenkalti/backoff/v4" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" - "github.com/gagliardetto/solana-go/rpc/ws" "go.uber.org/zap" ) +// Client interacts with Pyth via Solana's JSON-RPC API. type Client struct { Opts Log *zap.Logger + RPC *rpc.Client WebSocketURL string } type Opts struct { ProgramKey solana.PublicKey } - -type PriceAccountUpdate struct { - Slot uint64 - *PriceAccount -} - -// StreamPriceAccounts sends an update to Prometheus any time a Pyth oracle account changes. -func (c *Client) StreamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error { - const retryInterval = 3 * time.Second - return backoff.Retry(func() error { - err := c.streamPriceAccounts(ctx, updates) - switch { - case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): - return backoff.Permanent(err) - default: - return err - } - }, backoff.NewConstantBackOff(retryInterval)) -} - -func (c *Client) streamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error { - client, err := ws.Connect(ctx, c.WebSocketURL) - if err != nil { - return err - } - defer client.Close() - - metricsWsActiveConns.Inc() - defer metricsWsActiveConns.Dec() - - sub, err := client.ProgramSubscribeWithOpts( - c.Opts.ProgramKey, - rpc.CommitmentConfirmed, - solana.EncodingBase64Zstd, - []rpc.RPCFilter{ - { - Memcmp: &rpc.RPCFilterMemcmp{ - Offset: 0, - Bytes: solana.Base58{ - 0xd4, 0xc3, 0xb2, 0xa1, // Magic - 0x02, 0x00, 0x00, 0x00, // V2 - }, - }, - }, - }, - ) - if err != nil { - return err - } - - // Stream updates. - for { - if err := c.readNextUpdate(ctx, sub, updates); err != nil { - return err - } - } -} - -func (c *Client) readNextUpdate( - ctx context.Context, - sub *ws.ProgramSubscription, - updates chan<- PriceAccountUpdate, -) error { - // If no update comes in within 20 seconds, bail. - const readTimeout = 20 * time.Second - ctx, cancel := context.WithTimeout(ctx, readTimeout) - defer cancel() - go func() { - <-ctx.Done() - // Terminate subscription if above timer has expired. - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - c.Log.Warn("Read deadline exceeded, terminating WebSocket connection", - zap.Duration("timeout", readTimeout)) - sub.Unsubscribe() - } - }() - - // Read next account update from WebSockets. - update, err := sub.Recv() - if err != nil { - return err - } - metricsWsEventsTotal.Inc() - - // Decode update. - if update.Value.Account.Owner != c.Opts.ProgramKey { - return nil - } - accountData := update.Value.Account.Data.GetBinary() - if PeekAccount(accountData) != AccountTypePrice { - return nil - } - priceAcc := new(PriceAccount) - if err := priceAcc.UnmarshalBinary(accountData); err != nil { - c.Log.Warn("Failed to unmarshal priceAcc account", zap.Error(err)) - return nil - } - - // Send update to channel. - msg := PriceAccountUpdate{ - Slot: update.Context.Slot, - PriceAccount: priceAcc, - } - select { - case <-ctx.Done(): - return ctx.Err() - case updates <- msg: - return nil - } -} diff --git a/prices.go b/prices.go new file mode 100644 index 0000000..03141ce --- /dev/null +++ b/prices.go @@ -0,0 +1,138 @@ +package pyth + +import ( + "context" + "errors" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + "github.com/gagliardetto/solana-go/rpc/ws" + "go.uber.org/zap" +) + +// GetPriceAccount retrieves a price account from the blockchain. +func (c *Client) GetPriceAccount(ctx context.Context, productKey solana.PublicKey) (*PriceAccount, error) { + accountInfo, err := c.RPC.GetAccountInfo(ctx, productKey) + if err != nil { + return nil, err + } + accountData := accountInfo.Value.Data.GetBinary() + + price := new(PriceAccount) + if err := price.UnmarshalBinary(accountData); err != nil { + return nil, err + } + return price, nil +} + +type PriceAccountUpdate struct { + Slot uint64 + *PriceAccount +} + +// StreamPriceAccounts sends an update to Prometheus any time a Pyth oracle account changes. +func (c *Client) StreamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error { + const retryInterval = 3 * time.Second + return backoff.Retry(func() error { + err := c.streamPriceAccounts(ctx, updates) + switch { + case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): + return backoff.Permanent(err) + default: + return err + } + }, backoff.NewConstantBackOff(retryInterval)) +} + +func (c *Client) streamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error { + client, err := ws.Connect(ctx, c.WebSocketURL) + if err != nil { + return err + } + defer client.Close() + + metricsWsActiveConns.Inc() + defer metricsWsActiveConns.Dec() + + sub, err := client.ProgramSubscribeWithOpts( + c.Opts.ProgramKey, + rpc.CommitmentConfirmed, + solana.EncodingBase64Zstd, + []rpc.RPCFilter{ + { + Memcmp: &rpc.RPCFilterMemcmp{ + Offset: 0, + Bytes: solana.Base58{ + 0xd4, 0xc3, 0xb2, 0xa1, // Magic + 0x02, 0x00, 0x00, 0x00, // V2 + }, + }, + }, + }, + ) + if err != nil { + return err + } + + // Stream updates. + for { + if err := c.readNextUpdate(ctx, sub, updates); err != nil { + return err + } + } +} + +func (c *Client) readNextUpdate( + ctx context.Context, + sub *ws.ProgramSubscription, + updates chan<- PriceAccountUpdate, +) error { + // If no update comes in within 20 seconds, bail. + const readTimeout = 20 * time.Second + ctx, cancel := context.WithTimeout(ctx, readTimeout) + defer cancel() + go func() { + <-ctx.Done() + // Terminate subscription if above timer has expired. + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + c.Log.Warn("Read deadline exceeded, terminating WebSocket connection", + zap.Duration("timeout", readTimeout)) + sub.Unsubscribe() + } + }() + + // Read next account update from WebSockets. + update, err := sub.Recv() + if err != nil { + return err + } + metricsWsEventsTotal.Inc() + + // Decode update. + if update.Value.Account.Owner != c.Opts.ProgramKey { + return nil + } + accountData := update.Value.Account.Data.GetBinary() + if PeekAccount(accountData) != AccountTypePrice { + return nil + } + priceAcc := new(PriceAccount) + if err := priceAcc.UnmarshalBinary(accountData); err != nil { + c.Log.Warn("Failed to unmarshal priceAcc account", zap.Error(err)) + return nil + } + + // Send update to channel. + msg := PriceAccountUpdate{ + Slot: update.Context.Slot, + PriceAccount: priceAcc, + } + select { + case <-ctx.Done(): + return ctx.Err() + case updates <- msg: + return nil + } +} diff --git a/products.go b/products.go new file mode 100644 index 0000000..9c558ab --- /dev/null +++ b/products.go @@ -0,0 +1,22 @@ +package pyth + +import ( + "context" + + "github.com/gagliardetto/solana-go" +) + +// GetProductAccount retrieves a product account from the blockchain. +func (c *Client) GetProductAccount(ctx context.Context, productKey solana.PublicKey) (*ProductAccount, error) { + accountInfo, err := c.RPC.GetAccountInfo(ctx, productKey) + if err != nil { + return nil, err + } + accountData := accountInfo.Value.Data.GetBinary() + + product := new(ProductAccount) + if err := product.UnmarshalBinary(accountData); err != nil { + return nil, err + } + return product, nil +} diff --git a/types.go b/types.go index 9fc9594..54af53f 100644 --- a/types.go +++ b/types.go @@ -36,14 +36,17 @@ const ( AccountTypePrice ) +// AccountHeader is a 16-byte header at the beginning of each account type. type AccountHeader struct { - Magic uint32 - Version uint32 - AccountType uint32 - Size uint32 + Magic uint32 // set exactly to 0xa1b2c3d4 + Version uint32 // currently V2 + AccountType uint32 // account type following the header + Size uint32 // size of the account including the header } +// Valid performs basic checks on an account. func (h AccountHeader) Valid() bool { + // Note: This size restriction is not enforced per protocol. return h.Magic == Magic && h.Version == V2 && h.Size < 65536 } @@ -57,6 +60,7 @@ func PeekAccount(data []byte) uint32 { return header.AccountType } +// readLPString returns a length-prefixed string as seen in ProductAccount.Attrs. func readLPString(rd *bytes.Reader) (string, error) { strLen, err := rd.ReadByte() if err != nil { @@ -69,14 +73,16 @@ func readLPString(rd *bytes.Reader) (string, error) { return string(val), nil } -type Product struct { +// ProductAccount contains metadata for a single product, +// such as its symbol and its base/quote currencies. +type ProductAccount struct { AccountHeader - FirstPrice solana.PublicKey - Attrs [464]byte + FirstPrice solana.PublicKey // first price account in list + Attrs [464]byte // key-value string pairs of additional data } // UnmarshalBinary decodes the product account from the on-chain format. -func (p *Product) UnmarshalBinary(buf []byte) error { +func (p *ProductAccount) UnmarshalBinary(buf []byte) error { decoder := bin.NewBinDecoder(buf) if err := decoder.Decode(p); err != nil { return err @@ -90,7 +96,8 @@ func (p *Product) UnmarshalBinary(buf []byte) error { return nil } -func (p *Product) GetAttrs() (map[string]string, error) { +// GetAttrs returns the parsed set of key-value pairs. +func (p *ProductAccount) GetAttrs() (map[string]string, error) { kvps := make(map[string]string) attrs := p.Attrs[:] @@ -115,45 +122,51 @@ func (p *Product) GetAttrs() (map[string]string, error) { return kvps, nil } +// Ema is an exponentially-weighted moving average. type Ema struct { Val int64 Numer int64 Denom int64 } +// PriceInfo contains a price adn confidence at a specific slot. +// +// This struct can represent either a publisher's contribution or the outcome of price aggregation. type PriceInfo struct { - Price int64 - Conf uint64 - Status uint32 + Price int64 // current price + Conf uint64 // confidence interval around the price + Status uint32 // status of price CorpAct uint32 - PubSlot uint64 + PubSlot uint64 // valid publishing slot } +// PriceComp contains the price and confidence contributed by a specific publisher. type PriceComp struct { - Publisher solana.PublicKey - Agg PriceInfo - Latest PriceInfo + Publisher solana.PublicKey // key of contributing publisher + Agg PriceInfo // price used to compute the current aggregate price + Latest PriceInfo // latest price of publisher } +// PriceAccount represents a continuously-updating price feed for a product. type PriceAccount struct { AccountHeader - PriceType uint32 - Exponent int32 - Num uint32 - NumQt uint32 - LastSlot uint64 - ValidSlot uint64 - Twap Ema - Twac Ema - Drv1, Drv2 int64 - Product solana.PublicKey - Next solana.PublicKey - PrevSlot uint64 - PrevPrice int64 - PrevConf uint64 - Drv3 int64 - Agg PriceInfo - Components [32]PriceComp + PriceType uint32 // price or calculation type + Exponent int32 // price exponent + Num uint32 // number of component prices + NumQt uint32 // number of quoters that make up aggregate + LastSlot uint64 // slot of last valid (not unknown) aggregate price + ValidSlot uint64 // valid slot of aggregate price + Twap Ema // exponential moving average price + Twac Ema // exponential moving confidence interval + Drv1, Drv2 int64 // reserved for future use + Product solana.PublicKey // ProductAccount key + Next solana.PublicKey // next PriceAccount key in linked list + PrevSlot uint64 // valid slot of previous update + PrevPrice int64 // aggregate price of previous update + PrevConf uint64 // confidence interval of previous update + Drv3 int64 // reserved for future use + Agg PriceInfo // aggregate price info + Components [32]PriceComp // price components for each quoter } // UnmarshalBinary decodes the price account from the on-chain format. @@ -181,6 +194,7 @@ func (p *PriceAccount) GetComponent(publisher *solana.PublicKey) *PriceComp { return nil } +// MappingAccount is a piece of a singly linked-list of all products on Pyth. type MappingAccount struct { AccountHeader Num uint32 diff --git a/types_test.go b/types_test.go index 41f0d92..71a2a3b 100644 --- a/types_test.go +++ b/types_test.go @@ -33,7 +33,7 @@ var ( ) func TestProductAccount(t *testing.T) { - expected := Product{ + expected := ProductAccount{ AccountHeader: AccountHeader{ Magic: Magic, Version: V2, @@ -63,7 +63,7 @@ func TestProductAccount(t *testing.T) { }, } - var actual Product + var actual ProductAccount require.NoError(t, actual.UnmarshalBinary(caseProductAccount)) assert.Equal(t, &expected, &actual)