Skip to content

Pull etl into its own package#142

Merged
raymondjacobson merged 4 commits intomainfrom
rj-etl
Mar 10, 2026
Merged

Pull etl into its own package#142
raymondjacobson merged 4 commits intomainfrom
rj-etl

Conversation

@raymondjacobson
Copy link
Contributor

@raymondjacobson raymondjacobson commented Mar 10, 2026

First PR of a few coming. This first one is just to separate out etl into its own importable package, add a readme and clean some things up.

Next will be to add entity manage validators mirroring discovery (piecemeal), add grpc support, and then expose a better interface to include it directly in api and replace some of the machinery that's there and add DB writes.

Also some minor tweaks / improvements to explorer UI:

  • Update validator healthy to be more lenient
  • Fix block height loading state (to show - instead of 1)

Tested:

#openaudio-1.env
OPENAUDIO_ETL_ENABLED=true
OPENAUDIO_EXPLORER_ENABLED=true

confirmed explorer looks good on https://node1.oap.devnet/

@rickyrombo rickyrombo requested a review from Copilot March 10, 2026 07:03
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR separates the ETL (Extract-Transform-Load) indexer into its own standalone Go module (github.com/OpenAudio/go-openaudio/etl) to make it independently importable. A new etlserver wrapper combines the indexer with location services and ConnectRPC handlers for use in the full openaudio node. Additionally, it introduces a location package with embedded SQLite databases for geo-lookup and makes minor explorer UI improvements.

Changes:

  • Refactored ETLService into two layers: a standalone etl.Indexer (own go.mod) and a full-featured etlserver.ETLService wrapper, moving RPC handler stubs and location service initialization out of the core indexer
  • Extracted play and manage_entity transaction processing into a processors sub-package with a Processor interface, and introduced a Config struct for optional ETL components
  • Improved the explorer dashboard's block height loading state to show "-" instead of "1" during initialization, updated the healthy validator SQL query to be more lenient (num_blocks_proposed > 0), and created a new pkg/location package with embedded SQLite databases

Reviewed changes

Copilot reviewed 40 out of 52 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pkg/etl/etl.go Renamed ETLService to Indexer, removed RPC handlers and location service field
pkg/etl/indexer.go Updated receiver names, delegated play/manage_entity processing to processors package
pkg/etl/config.go New config struct for enabling/disabling MV refresh and pg notify listener
pkg/etl/schema.go Re-exports transaction type constants from processors package
pkg/etl/go.mod, pkg/etl/go.sum New Go module for standalone ETL package
pkg/etl/README.md Documentation for the standalone ETL package
pkg/etl/processors/processor.go New Processor interface and TxContext/Result types with transaction type constants
pkg/etl/processors/play.go Play transaction processor extracted from indexer
pkg/etl/processors/manage_entity.go ManageEntity transaction processor extracted from indexer
pkg/etl/processors/play_test.go Test for play processor
pkg/etlserver/server.go New wrapper composing Indexer + LocationService + ConnectRPC handlers
pkg/location/location.go New location service with embedded SQLite databases
pkg/location/*.go, *.sql, *.sqlite3 Location package: sqlc schema, queries, models, and embedded database files
pkg/console/console.go Updated to use etlserver.ETLService and ChainID() method
cmd/openaudio/main.go Updated ETL initialization with separate indexer and location service
cmd/openaudio/Dockerfile Copy etl submodule's go.mod/go.sum for Docker builds
go.mod Added etl module dependency with replace directive
pkg/etl/db/sql/reads.sql, pkg/etl/db/reads.sql.go Updated healthy validator query to use num_blocks_proposed > 0
pkg/console/templates/pages/*.templ, *_templ.go Updated import paths and block height loading state UI
pkg/console/assets/css/output.css Added min-w-[3rem] utility class
pkg/etl/pubsub.go, pkg/etl/materialized_view_refresher.go Updated import paths

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +15 to +48
func (p *manageEntityProcessor) Process(ctx context.Context, tx *corev1.SignedTransaction, txCtx *TxContext, q *db.Queries) (*Result, error) {
me := tx.GetManageEntity()
if err := q.InsertAddress(ctx, db.InsertAddressParams{
Address: me.GetSigner(),
PubKey: nil,
FirstSeenBlockHeight: pgtype.Int8{Int64: txCtx.Block.Height, Valid: true},
CreatedAt: txCtx.BlockTime,
}); err != nil {
return nil, err
}

if err := q.InsertManageEntity(ctx, db.InsertManageEntityParams{
Address: me.GetSigner(),
EntityType: me.GetEntityType(),
EntityID: me.GetEntityId(),
Action: me.GetAction(),
Metadata: pgtype.Text{String: me.GetMetadata(), Valid: me.GetMetadata() != ""},
Signature: me.GetSignature(),
Signer: me.GetSigner(),
Nonce: me.GetNonce(),
BlockHeight: txCtx.Block.Height,
TxHash: txCtx.TxHash,
CreatedAt: txCtx.BlockTime,
}); err != nil {
return nil, err
}

txCtx.InsertTx.TxType = TxTypeManageEntity
txCtx.InsertTx.Address = pgtype.Text{String: me.GetSigner(), Valid: true}
return &Result{InsertTx: txCtx.InsertTx}, nil
}

// ManageEntity returns the manage_entity processor.
func ManageEntity() Processor { return &manageEntityProcessor{} }
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a test for the play processor, but no test for the ManageEntity processor. Since this is a newly extracted package with a test file for play, consider adding a corresponding test for ManageEntity to maintain consistent test coverage across the processors.

Copilot uses AI. Check for mistakes.
Comment on lines +213 to 218
res, err := processors.Play().Process(context.Background(), tx.Transaction, txCtx, e.db)
if err != nil {
e.logger.Error("error processing plays", zap.Error(err))
} else {
insertTxParams = res.InsertTx
}
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavioral regression: When a processor returns an error, insertTxParams is not updated (since the else branch is skipped), so the transaction is still inserted at line 454 with an empty TxType.

In the old code, insertTxParams.TxType was set before the entity-specific insert, so a play insert failure would still record the transaction as type "play". Now, a processor failure results in a transaction row with TxType = "".

Consider either: (1) setting insertTxParams.TxType before calling the processor, or (2) returning early from the goroutine on processor error (as with other cases), or (3) always reading TxType from the result even on error.

Copilot uses AI. Check for mistakes.
@raymondjacobson raymondjacobson merged commit dc0daad into main Mar 10, 2026
8 checks passed
@raymondjacobson raymondjacobson deleted the rj-etl branch March 10, 2026 19:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants