Skip to content
This repository was archived by the owner on May 31, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 2 additions & 116 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,134 +15,20 @@
package pyth

import (
"context"
"errors"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/ws"
"go.uber.org/zap"
)

// Client interacts with Pyth via Solana's JSON-RPC API.
type Client struct {
Opts

Log *zap.Logger
RPC *rpc.Client
WebSocketURL string
}

type Opts struct {
ProgramKey solana.PublicKey
}

type PriceAccountUpdate struct {
Slot uint64
*PriceAccount
}

// StreamPriceAccounts sends an update to Prometheus any time a Pyth oracle account changes.
func (c *Client) StreamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
const retryInterval = 3 * time.Second
return backoff.Retry(func() error {
err := c.streamPriceAccounts(ctx, updates)
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return backoff.Permanent(err)
default:
return err
}
}, backoff.NewConstantBackOff(retryInterval))
}

func (c *Client) streamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
client, err := ws.Connect(ctx, c.WebSocketURL)
if err != nil {
return err
}
defer client.Close()

metricsWsActiveConns.Inc()
defer metricsWsActiveConns.Dec()

sub, err := client.ProgramSubscribeWithOpts(
c.Opts.ProgramKey,
rpc.CommitmentConfirmed,
solana.EncodingBase64Zstd,
[]rpc.RPCFilter{
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 0,
Bytes: solana.Base58{
0xd4, 0xc3, 0xb2, 0xa1, // Magic
0x02, 0x00, 0x00, 0x00, // V2
},
},
},
},
)
if err != nil {
return err
}

// Stream updates.
for {
if err := c.readNextUpdate(ctx, sub, updates); err != nil {
return err
}
}
}

func (c *Client) readNextUpdate(
ctx context.Context,
sub *ws.ProgramSubscription,
updates chan<- PriceAccountUpdate,
) error {
// If no update comes in within 20 seconds, bail.
const readTimeout = 20 * time.Second
ctx, cancel := context.WithTimeout(ctx, readTimeout)
defer cancel()
go func() {
<-ctx.Done()
// Terminate subscription if above timer has expired.
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
c.Log.Warn("Read deadline exceeded, terminating WebSocket connection",
zap.Duration("timeout", readTimeout))
sub.Unsubscribe()
}
}()

// Read next account update from WebSockets.
update, err := sub.Recv()
if err != nil {
return err
}
metricsWsEventsTotal.Inc()

// Decode update.
if update.Value.Account.Owner != c.Opts.ProgramKey {
return nil
}
accountData := update.Value.Account.Data.GetBinary()
if PeekAccount(accountData) != AccountTypePrice {
return nil
}
priceAcc := new(PriceAccount)
if err := priceAcc.UnmarshalBinary(accountData); err != nil {
c.Log.Warn("Failed to unmarshal priceAcc account", zap.Error(err))
return nil
}

// Send update to channel.
msg := PriceAccountUpdate{
Slot: update.Context.Slot,
PriceAccount: priceAcc,
}
select {
case <-ctx.Done():
return ctx.Err()
case updates <- msg:
return nil
}
}
138 changes: 138 additions & 0 deletions prices.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package pyth

import (
"context"
"errors"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/gagliardetto/solana-go/rpc/ws"
"go.uber.org/zap"
)

// GetPriceAccount retrieves a price account from the blockchain.
func (c *Client) GetPriceAccount(ctx context.Context, productKey solana.PublicKey) (*PriceAccount, error) {
accountInfo, err := c.RPC.GetAccountInfo(ctx, productKey)
if err != nil {
return nil, err
}
accountData := accountInfo.Value.Data.GetBinary()

price := new(PriceAccount)
if err := price.UnmarshalBinary(accountData); err != nil {
return nil, err
}
return price, nil
}

type PriceAccountUpdate struct {
Slot uint64
*PriceAccount
}

// StreamPriceAccounts sends an update to Prometheus any time a Pyth oracle account changes.
func (c *Client) StreamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
const retryInterval = 3 * time.Second
return backoff.Retry(func() error {
err := c.streamPriceAccounts(ctx, updates)
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return backoff.Permanent(err)
default:
return err
}
}, backoff.NewConstantBackOff(retryInterval))
}

func (c *Client) streamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
client, err := ws.Connect(ctx, c.WebSocketURL)
if err != nil {
return err
}
defer client.Close()

metricsWsActiveConns.Inc()
defer metricsWsActiveConns.Dec()

sub, err := client.ProgramSubscribeWithOpts(
c.Opts.ProgramKey,
rpc.CommitmentConfirmed,
solana.EncodingBase64Zstd,
[]rpc.RPCFilter{
{
Memcmp: &rpc.RPCFilterMemcmp{
Offset: 0,
Bytes: solana.Base58{
0xd4, 0xc3, 0xb2, 0xa1, // Magic
0x02, 0x00, 0x00, 0x00, // V2
},
},
},
},
)
if err != nil {
return err
}

// Stream updates.
for {
if err := c.readNextUpdate(ctx, sub, updates); err != nil {
return err
}
}
}

func (c *Client) readNextUpdate(
ctx context.Context,
sub *ws.ProgramSubscription,
updates chan<- PriceAccountUpdate,
) error {
// If no update comes in within 20 seconds, bail.
const readTimeout = 20 * time.Second
ctx, cancel := context.WithTimeout(ctx, readTimeout)
defer cancel()
go func() {
<-ctx.Done()
// Terminate subscription if above timer has expired.
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
c.Log.Warn("Read deadline exceeded, terminating WebSocket connection",
zap.Duration("timeout", readTimeout))
sub.Unsubscribe()
}
}()

// Read next account update from WebSockets.
update, err := sub.Recv()
if err != nil {
return err
}
metricsWsEventsTotal.Inc()

// Decode update.
if update.Value.Account.Owner != c.Opts.ProgramKey {
return nil
}
accountData := update.Value.Account.Data.GetBinary()
if PeekAccount(accountData) != AccountTypePrice {
return nil
}
priceAcc := new(PriceAccount)
if err := priceAcc.UnmarshalBinary(accountData); err != nil {
c.Log.Warn("Failed to unmarshal priceAcc account", zap.Error(err))
return nil
}

// Send update to channel.
msg := PriceAccountUpdate{
Slot: update.Context.Slot,
PriceAccount: priceAcc,
}
select {
case <-ctx.Done():
return ctx.Err()
case updates <- msg:
return nil
}
}
22 changes: 22 additions & 0 deletions products.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pyth

import (
"context"

"github.com/gagliardetto/solana-go"
)

// GetProductAccount retrieves a product account from the blockchain.
func (c *Client) GetProductAccount(ctx context.Context, productKey solana.PublicKey) (*ProductAccount, error) {
accountInfo, err := c.RPC.GetAccountInfo(ctx, productKey)
if err != nil {
return nil, err
}
accountData := accountInfo.Value.Data.GetBinary()

product := new(ProductAccount)
if err := product.UnmarshalBinary(accountData); err != nil {
return nil, err
}
return product, nil
}
Loading