Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,685 changes: 0 additions & 2,685 deletions pkg/code/async/geyser/api/gen/confirmed_block.pb.validate.go

This file was deleted.

3,598 changes: 2,714 additions & 884 deletions pkg/code/async/geyser/api/gen/geyser.pb.go

Large diffs are not rendered by default.

2,779 changes: 0 additions & 2,779 deletions pkg/code/async/geyser/api/gen/geyser.pb.validate.go

This file was deleted.

591 changes: 185 additions & 406 deletions pkg/code/async/geyser/api/gen/geyser_grpc.pb.go

Large diffs are not rendered by default.

Large diffs are not rendered by default.

364 changes: 207 additions & 157 deletions pkg/code/async/geyser/api/proto/geyser.proto

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Source: https://github.com/jito-foundation/geyser-grpc-plugin/blob/v2.2/proto/proto/confirmed_block.proto
// Source: https://github.com/rpcpool/yellowstone-grpc/blob/v6.0.0%2Bsolana.2.2.12/yellowstone-grpc-proto/proto/solana-storage.proto

syntax = "proto3";

Expand Down Expand Up @@ -134,8 +134,8 @@ message Reward {
}

message Rewards {
repeated Reward rewards = 1;
NumPartitions num_partitions = 2;
repeated Reward rewards = 1;
NumPartitions num_partitions = 2;
}

message UnixTimestamp {
Expand Down
12 changes: 5 additions & 7 deletions pkg/code/async/geyser/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const (
GrpcPluginEndointConfigEnvName = envConfigPrefix + "GRPC_PLUGIN_ENDPOINT"
defaultGrpcPluginEndoint = ""

GrpcPluginXTokenConfigEnvName = envConfigPrefix + "GRPC_PLUGIN_X_TOKEN"
defaultGrpcPluginXToken = ""

ProgramUpdateWorkerCountConfigEnvName = envConfigPrefix + "PROGRAM_UPDATE_WORKER_COUNT"
defaultProgramUpdateWorkerCount = 1024

Expand All @@ -24,22 +27,18 @@ const (

BackupExternalDepositWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_INTERVAL"
defaultBackupExternalDepositWorkerInterval = 15 * time.Second

SwapSubsidizerPublicKeyConfigEnvName = envConfigPrefix + "SWAP_SUBSIDIZER_PUBLIC_KEY"
defaultSwapSubsidizerPublicKey = "invalid" // ensure something valid is set
)

type conf struct {
grpcPluginEndpoint config.String
grpcPluginXToken config.String

programUpdateWorkerCount config.Uint64
programUpdateQueueSize config.Uint64

backupExternalDepositWorkerInterval config.Duration

backupTimelockWorkerInterval config.Duration

swapSubsidizerPublicKey config.String
}

// ConfigProvider defines how config values are pulled
Expand All @@ -50,15 +49,14 @@ func WithEnvConfigs() ConfigProvider {
return func() *conf {
return &conf{
grpcPluginEndpoint: env.NewStringConfig(GrpcPluginEndointConfigEnvName, defaultGrpcPluginEndoint),
grpcPluginXToken: env.NewStringConfig(GrpcPluginXTokenConfigEnvName, defaultGrpcPluginXToken),

programUpdateWorkerCount: env.NewUint64Config(ProgramUpdateWorkerCountConfigEnvName, defaultProgramUpdateWorkerCount),
programUpdateQueueSize: env.NewUint64Config(ProgramUpdateQueueSizeConfigEnvName, defaultProgramUpdateQueueSize),

backupExternalDepositWorkerInterval: env.NewDurationConfig(BackupExternalDepositWorkerIntervalConfigEnvName, defaultBackupExternalDepositWorkerInterval),

backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval),

swapSubsidizerPublicKey: env.NewStringConfig(SwapSubsidizerPublicKeyConfigEnvName, defaultSwapSubsidizerPublicKey),
}
}
}
15 changes: 8 additions & 7 deletions pkg/code/async/geyser/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"time"

"github.com/mr-tron/base58"
"github.com/newrelic/go-agent/v3/newrelic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/code-payments/code-server/pkg/metrics"
"github.com/code-payments/code-server/pkg/code/common"
"github.com/code-payments/code-server/pkg/metrics"
)

func (p *service) consumeGeyserProgramUpdateEvents(ctx context.Context) error {
Expand All @@ -23,7 +24,7 @@ func (p *service) consumeGeyserProgramUpdateEvents(ctx context.Context) error {
default:
}

err := p.subscribeToProgramUpdatesFromGeyser(ctx, p.conf.grpcPluginEndpoint.Get(ctx))
err := p.subscribeToProgramUpdatesFromGeyser(ctx, p.conf.grpcPluginEndpoint.Get(ctx), p.conf.grpcPluginXToken.Get(ctx))
if err != nil && !errors.Is(err, context.Canceled) {
log.WithError(err).Warn("program update consumer unexpectedly terminated")
}
Expand All @@ -44,7 +45,7 @@ func (p *service) consumeGeyserSlotUpdateEvents(ctx context.Context) error {
default:
}

err := p.subscribeToSlotUpdatesFromGeyser(ctx, p.conf.grpcPluginEndpoint.Get(ctx))
err := p.subscribeToSlotUpdatesFromGeyser(ctx, p.conf.grpcPluginEndpoint.Get(ctx), p.conf.grpcPluginXToken.Get(ctx))
if err != nil && !errors.Is(err, context.Canceled) {
log.WithError(err).Warn("slot update consumer unexpectedly terminated")
}
Expand Down Expand Up @@ -90,13 +91,13 @@ func (p *service) programUpdateWorker(serviceCtx context.Context, id int) {
p.metricStatusLock.Unlock()
}()

publicKey, err := common.NewAccountFromPublicKeyBytes(update.Pubkey)
publicKey, err := common.NewAccountFromPublicKeyBytes(update.Account.Pubkey)
if err != nil {
log.WithError(err).Warn("invalid public key")
return
}

program, err := common.NewAccountFromPublicKeyBytes(update.Owner)
program, err := common.NewAccountFromPublicKeyBytes(update.Account.Owner)
if err != nil {
log.WithError(err).Warn("invalid owner account")
return
Expand All @@ -107,8 +108,8 @@ func (p *service) programUpdateWorker(serviceCtx context.Context, id int) {
"account": publicKey.PublicKey().ToBase58(),
"slot": update.Slot,
})
if update.TxSignature != nil {
log = log.WithField("transaction", *update.TxSignature)
if update.Account.TxnSignature != nil {
log = log.WithField("transaction", base58.Encode(update.Account.TxnSignature))
}

handler, ok := p.programUpdateHandlers[program.PublicKey().ToBase58()]
Expand Down
Loading
Loading