diff --git a/api/health_check.go b/api/health_check.go index 54fe30af..f1490d06 100644 --- a/api/health_check.go +++ b/api/health_check.go @@ -4,7 +4,6 @@ import ( "context" "api.audius.co/config" - core_indexer "api.audius.co/indexer" "connectrpc.com/connect" corev1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" "github.com/gofiber/fiber/v2" @@ -39,8 +38,12 @@ func (app *ApiServer) getCoreIndexerHealth(ctx context.Context) (*coreIndexerHea } chainHeight := nodeInfo.Msg.CurrentHeight + // ETL tracks the highest indexed chain height in `core_indexed_blocks`. + // COALESCE handles the cold-start case before any blocks are indexed. var indexerLastBlockHeight int64 - err = app.pool.QueryRow(ctx, "SELECT COALESCE(last_checkpoint, 0) FROM indexing_checkpoints WHERE tablename = $1", core_indexer.CoreIndexerCheckpointName).Scan(&indexerLastBlockHeight) + err = app.pool.QueryRow(ctx, + "SELECT COALESCE(MAX(height), 0) FROM core_indexed_blocks", + ).Scan(&indexerLastBlockHeight) if err != nil { return nil, fiber.NewError(fiber.StatusInternalServerError, "Failed to get core indexer last block height") } diff --git a/go.mod b/go.mod index dc22765a..8ed35625 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( connectrpc.com/connect v1.18.1 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5 - github.com/OpenAudio/go-openaudio v1.2.13 + github.com/OpenAudio/go-openaudio v1.3.0 github.com/aquasecurity/esquery v0.2.0 github.com/axiomhq/axiom-go v0.23.0 github.com/axiomhq/hyperloglog v0.2.5 @@ -55,6 +55,7 @@ require ( filippo.io/edwards25519 v1.0.0-rc.1 // indirect github.com/DataDog/zstd v1.5.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/OpenAudio/go-openaudio/pkg/etl v1.3.0 // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/andybalholm/brotli v1.2.0 // indirect diff --git a/go.sum b/go.sum index af30df33..361261cf 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,10 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEV github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OpenAudio/go-openaudio v1.2.13 h1:ILPaM6EneDQMoKXSyjb///758I7Ou52e76NvNmCkcdY= github.com/OpenAudio/go-openaudio v1.2.13/go.mod h1:+xl3SeIY7pc6CfwO1qmYjWELLSseQaulqSQefE6i2FA= +github.com/OpenAudio/go-openaudio v1.3.0 h1:Is1FFStckE116ZNuPNzu3wzdZdaYd6xslyciPgGut0E= +github.com/OpenAudio/go-openaudio v1.3.0/go.mod h1:+xl3SeIY7pc6CfwO1qmYjWELLSseQaulqSQefE6i2FA= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.0 h1:RooZJ2OIDe7kLUNUVrQGHQFFyyHtayryhfG19MHIADw= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.0/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= diff --git a/indexer/index_user.go b/indexer/index_user.go deleted file mode 100644 index 71f0f89b..00000000 --- a/indexer/index_user.go +++ /dev/null @@ -1,41 +0,0 @@ -package indexer - -import ( - "context" - "crypto/ecdsa" - "encoding/base64" - - dbv1 "api.audius.co/database" - "go.uber.org/zap" - - corev1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" - core_config "github.com/OpenAudio/go-openaudio/pkg/core/config" - "github.com/OpenAudio/go-openaudio/pkg/core/server" - "github.com/ethereum/go-ethereum/crypto" -) - -func (ci *CoreIndexer) setPubkeyForUser(dbTx dbv1.DBTX, logger *zap.Logger, userId int32, pubkey *ecdsa.PublicKey) { - pubkeyBytes := crypto.FromECDSAPub(pubkey) - pubkeyBase64 := base64.StdEncoding.EncodeToString(pubkeyBytes) - - logger.Info("CreateUser, setting pubkey", zap.Int32("userId", userId), zap.String("pubkeyBase64", pubkeyBase64)) - - _, err := dbTx.Exec(context.Background(), `insert into user_pubkeys values ($1, $2) on conflict do nothing`, userId, pubkeyBase64) - - if err != nil { - logger.Warn("failed to set pubkey for user", zap.Int32("userId", userId), zap.String("pubkeyBase64", pubkeyBase64), zap.Error(err)) - } -} - -func (ci *CoreIndexer) createUser(dbTx dbv1.DBTX, logger *zap.Logger, em *corev1.ManageEntityLegacy) error { - _, pubkey, err := server.RecoverPubkeyFromCoreTx(&core_config.Config{ - AcdcChainID: ci.Config.AudiusdChainID, - AcdcEntityManagerAddress: ci.Config.AudiusdEntityManagerAddress, - }, em) - if err != nil { - return err - } - - ci.setPubkeyForUser(dbTx, logger, int32(em.EntityId), pubkey) - return nil -} diff --git a/indexer/index_user_test.go b/indexer/index_user_test.go deleted file mode 100644 index a2db5913..00000000 --- a/indexer/index_user_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package indexer - -import ( - "testing" - - "api.audius.co/api/testdata" - "api.audius.co/config" - "api.audius.co/database" - corev1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" - core_config "github.com/OpenAudio/go-openaudio/pkg/core/config" - core_server "github.com/OpenAudio/go-openaudio/pkg/core/server" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -// dummy pkeys generated from ganache "test test...junk" seed -var user1WalletKey = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" - -func TestIndexCreateUserPubkey(t *testing.T) { - pool := database.CreateTestDatabase(t, "test_indexer") - defer pool.Close() - logger := zap.NewNop() - - config := config.Config{ - WriteDbUrl: pool.Config().ConnString(), - AudiusdChainID: core_config.DevAcdcChainID, - AudiusdEntityManagerAddress: core_config.DevAcdcAddress, - } - - ci := NewIndexer(config) - defer ci.Close() - - wallet := testdata.CreateTestWallet(t, user1WalletKey) - - em := &corev1.ManageEntityLegacy{ - UserId: 1, - EntityId: 1, - EntityType: "User", - Action: "Create", - Metadata: "{}", - Nonce: "1", - } - - core_server.SignManageEntity(&core_config.Config{ - AcdcChainID: config.AudiusdChainID, - AcdcEntityManagerAddress: config.AudiusdEntityManagerAddress, - }, em, wallet.PrivateKey) - - err := ci.handleManageEntity(pool, logger, em) - assert.NoError(t, err) - - var pubkey string - err = pool.QueryRow(t.Context(), ` - SELECT pubkey_base64 FROM user_pubkeys WHERE user_id = 1 - `).Scan(&pubkey) - assert.NoError(t, err) - assert.Equal(t, pubkey, "BIMYU1tUEF1Keq5gwI/EX5aHGBtP38YlvRp1P6c5f+11NUfxHKhpZkby86ywjjEBavrCPmMMXRH1n2H+9XsNKqU=") -} - -func TestIndexCreateUserExistingPubkey(t *testing.T) { - pool := database.CreateTestDatabase(t, "test_indexer") - defer pool.Close() - logger := zap.NewNop() - - database.Seed(pool, database.FixtureMap{ - "user_pubkeys": { - { - "user_id": 1, - "pubkey_base64": "existing_pubkey", - }, - }, - }) - - config := config.Config{ - WriteDbUrl: pool.Config().ConnString(), - AudiusdChainID: core_config.DevAcdcChainID, - AudiusdEntityManagerAddress: core_config.DevAcdcAddress, - } - - ci := NewIndexer(config) - defer ci.Close() - - wallet := testdata.CreateTestWallet(t, user1WalletKey) - - em := &corev1.ManageEntityLegacy{ - UserId: 1, - EntityId: 1, - EntityType: "User", - Action: "Create", - Metadata: "{}", - Nonce: "1", - } - - core_server.SignManageEntity(&core_config.Config{ - AcdcChainID: config.AudiusdChainID, - AcdcEntityManagerAddress: config.AudiusdEntityManagerAddress, - }, em, wallet.PrivateKey) - - err := ci.handleManageEntity(pool, logger, em) - assert.NoError(t, err) - - var pubkey string - err = pool.QueryRow(t.Context(), ` - SELECT pubkey_base64 FROM user_pubkeys WHERE user_id = 1 - `).Scan(&pubkey) - assert.NoError(t, err) - assert.Equal(t, pubkey, "existing_pubkey") -} diff --git a/indexer/indexer.go b/indexer/indexer.go index a1abe725..a41cb42b 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -3,195 +3,105 @@ package indexer import ( "context" "fmt" - "time" "api.audius.co/config" dbv1 "api.audius.co/database" "api.audius.co/logging" - "connectrpc.com/connect" - corev1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" + etl "github.com/OpenAudio/go-openaudio/pkg/etl" "github.com/OpenAudio/go-openaudio/pkg/sdk" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) +// CoreIndexer runs the OpenAudio ETL indexer plus the dependent api/-side +// background jobs (aggregates, parity jobs, etc.). The block-fetching and +// entity-manager dispatch loop that previously lived here was a stub that +// only handled CreateUser — vendoring ETL via +// `github.com/OpenAudio/go-openaudio/pkg/etl` gives us the full 31-entity-type +// handler suite, materialized-view refresher, and scheduled-release publisher +// in one package, kept in sync with upstream releases. type CoreIndexer struct { aggregatesCalculator *AggregatesCalculator + etlIndexer *etl.Indexer pool dbv1.DbPool openAudioSDK *sdk.OpenAudioSDK Config config.Config logger *zap.Logger - closeCh chan struct{} } -const ( - CoreIndexerCheckpointName = "api_core_indexer_last_height" -) - -func NewIndexer(config config.Config) *CoreIndexer { +func NewIndexer(cfg config.Config) *CoreIndexer { + logger := logging.NewZapLogger(cfg).Named("CoreIndexer") - connConfig, err := pgxpool.ParseConfig(config.WriteDbUrl) + connConfig, err := pgxpool.ParseConfig(cfg.WriteDbUrl) if err != nil { panic(fmt.Errorf("error parsing database URL: %w", err)) } - pool, err := pgxpool.NewWithConfig(context.Background(), connConfig) if err != nil { panic(fmt.Errorf("error connecting to database: %w", err)) } - openAudioSDK := sdk.NewOpenAudioSDK(config.AudiusdURL) - - aggregatesCalculator := NewAggregatesCalculator(config) - - ci := &CoreIndexer{ + openAudioSDK := sdk.NewOpenAudioSDK(cfg.AudiusdURL) + aggregatesCalculator := NewAggregatesCalculator(cfg) + + // ETL needs the Connect/gRPC Core client (for block fetching) and a DB URL. + // SkipMigrations stays false (default): ETL's migrations are idempotent + // against api/'s schema — every migration uses CREATE TABLE IF NOT EXISTS / + // ADD COLUMN IF NOT EXISTS, and tracks state in its own `etl_db_migrations` + // table separate from api/'s `schema_version`. Verified by applying all 21 + // current ETL migrations to a fresh DB seeded with api/'s schema: zero + // errors, only NOTICE messages for already-existing relations. + // + // Two optional ETL components are disabled here because they don't fit + // api/'s deployment: + // - MaterializedViewRefresh: refreshes mv_dashboard_* views that don't + // exist in api/'s schema (they were a go-openaudio-internal concern). + // - PgNotifyListener: publishes block/play events on a PG NOTIFY channel + // that api/ has no consumer for. + // ScheduledReleasePublisher stays enabled — it's the same job apps' Python + // `publish_scheduled_releases` celery task did and we want it running. + etlCfg := etl.DefaultConfig() + etlCfg.DisableMaterializedViewRefresh() + etlCfg.DisablePgNotifyListener() + etlCfg.ReadDataTypesEnv() // honors OPENAUDIO_ETL_ENTITY_MANAGER_DATA_TYPES if set + + etlIndexer := etl.New(openAudioSDK.Core, logger) + etlIndexer.SetConfig(etlCfg) + etlIndexer.SetDBURL(cfg.WriteDbUrl) + etlIndexer.SetCheckReadiness(true) + + return &CoreIndexer{ aggregatesCalculator: aggregatesCalculator, + etlIndexer: etlIndexer, pool: pool, openAudioSDK: openAudioSDK, - Config: config, - logger: logging.NewZapLogger(config). - Named("CoreIndexer"), + Config: cfg, + logger: logger, } - - return ci } +// Start runs the ETL indexer alongside the aggregates calculator. Both are +// long-lived; errgroup propagates the first error (and the ctx cancellation +// it triggers) to all members. +// +// Caveat: etl.Indexer.Run() uses its own internal context.Background() rather +// than honoring `ctx` — graceful shutdown via ctx cancellation isn't supported +// by the upstream API today. Process termination (SIGTERM) still works the +// way Go programs always do, and DB connections drain via pool finalizers on +// process exit. Acceptable tradeoff to avoid forking ETL. func (ci *CoreIndexer) Start(ctx context.Context) error { eg := errgroup.Group{} eg.Go(func() error { return ci.aggregatesCalculator.Start(ctx) }) eg.Go(func() error { - return ci.run(ctx) + ci.logger.Info("Starting ETL indexer") + return ci.etlIndexer.Run() }) return eg.Wait() } -func (ci *CoreIndexer) run(ctx context.Context) error { - go logging.SyncOnTicks(ctx, ci.logger, time.Second*10) - var height int64 - err := ci.pool.QueryRow(context.Background(), `select last_checkpoint from indexing_checkpoints where tablename = $1`, CoreIndexerCheckpointName).Scan(&height) - if err != nil { - if err == pgx.ErrNoRows { - nodeInfo, err := ci.openAudioSDK.Core.GetNodeInfo(context.Background(), connect.NewRequest(&corev1.GetNodeInfoRequest{})) - if err != nil { - return err - } - height = nodeInfo.Msg.CurrentHeight - } else { - return err - } - } else { - // If we have a checkpoint, we need to start at the next block - height++ - } - - ci.logger.Info("Core indexer started", zap.Int64("blockHeight", height)) - - for { - select { - case <-ctx.Done(): - ci.logger.Info("Shutting down core indexer") - return ctx.Err() - default: - } - height = ci.attemptProcessNextBlock(ctx, height) - } -} - -// Attempts to process the next block, returning height+1 if we found and -// processed a block, or height in all other cases. -// Will log errors and will snack on panics (yum). -func (ci *CoreIndexer) attemptProcessNextBlock(ctx context.Context, height int64) (newHeight int64) { - // By default, return the same height in case we panic - newHeight = height - defer func() { - if r := recover(); r != nil { - ci.logger.Error("panic in attemptProcessNextBlock", zap.Any("panic", r)) - // Sleep for 5 seconds in case it's a transient error - time.Sleep(5 * time.Second) - } - }() - block, err := ci.openAudioSDK.Core.GetBlock(ctx, connect.NewRequest(&corev1.GetBlockRequest{ - Height: height, - })) - if err != nil { - ci.logger.Error("failed to get block", zap.Error(err)) - return - } - - if block.Msg.Block.Height < 0 { - ci.logger.Debug("No new blocks found, sleeping") - time.Sleep(1 * time.Second) - return - } - - err = ci.handleBlock(block.Msg.Block) - if err != nil { - ci.logger.Error("failed to handle block", zap.Error(err)) - return - } - - newHeight = height + 1 - return -} - -func (ci *CoreIndexer) handleBlock(block *corev1.Block) error { - dbTx, err := ci.pool.Begin(context.Background()) - if err != nil { - return err - } - defer dbTx.Rollback(context.Background()) - - for _, tx := range block.Transactions { - if txData := tx.GetTransaction(); txData != nil { - logger := ci.logger.With(zap.String("txHash", tx.GetHash()), zap.Int64("blockHeight", block.Height)) - switch txData.GetTransaction().(type) { - case *corev1.SignedTransaction_ManageEntity: - em := txData.GetManageEntity() - if em == nil { - ci.logger.Error("ManageEntity transaction with empty data", zap.Any("tx", tx)) - continue - } - err := ci.handleManageEntity(dbTx, logger, em) - if err != nil { - logger.Error("Error processing manage entity tx", zap.Error(err)) - continue - } - } - } - } - _, err = dbTx.Exec(context.Background(), ` - insert into indexing_checkpoints values ($1, $2) - on conflict (tablename) do update set last_checkpoint = excluded.last_checkpoint - `, CoreIndexerCheckpointName, block.Height) - - if err != nil { - return err - } - - err = dbTx.Commit(context.Background()) - if err != nil { - return err - } - - ci.logger.Debug("Indexed block", zap.Int64("blockHeight", block.Height)) - - return nil -} - -func (ci *CoreIndexer) handleManageEntity(dbTx dbv1.DBTX, logger *zap.Logger, em *corev1.ManageEntityLegacy) error { - operation := em.Action + em.EntityType - switch operation { - case "CreateUser": - return ci.createUser(dbTx, logger, em) - default: - return nil - } -} - func (ci *CoreIndexer) Close() { ci.aggregatesCalculator.Close() ci.pool.Close()