diff --git a/CHANGELOG.md b/CHANGELOG.md index b1b407a0..c8b447c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ - ([\#74](https://github.com/forbole/juno/pull/74)) Added database block count to prometheus to improve alert monitoring - ([\#75](https://github.com/forbole/juno/pull/75)) Allow modules to handle MsgExec inner messages - ([\#76](https://github.com/forbole/juno/pull/76)) Return 0 as height for `GetLastBlockHeight()` method while no block is saved +- ([\#79](https://github.com/forbole/juno/pull/79)) Use `sqlx` instead of `sql` inside `PostgreSQLDatabase` ## v3.4.0 ### Changes diff --git a/database/legacy/v3/migrate.go b/database/legacy/v3/migrate.go index 5e212f47..b43630d4 100644 --- a/database/legacy/v3/migrate.go +++ b/database/legacy/v3/migrate.go @@ -63,7 +63,7 @@ func (db *Migrator) getOldTransactions(batchSize int64, offset int64) ([]types.T stmt := fmt.Sprintf("SELECT * FROM transaction_old ORDER BY height LIMIT %v OFFSET %v", batchSize, offset) var rows []types.TransactionRow - err := db.Sql.Select(&rows, stmt) + err := db.SQL.Select(&rows, stmt) if err != nil { return nil, err } @@ -75,7 +75,7 @@ func (db *Migrator) createPartitionTable(table string, partitionID int64) error partitionTable := fmt.Sprintf("%s_%v", table, partitionID) stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES IN (%v)`, partitionTable, table, partitionID) - _, err := db.Sql.Exec(stmt) + _, err := db.SQL.Exec(stmt) return err } @@ -106,7 +106,7 @@ func (db *Migrator) migrateTransactions(rows []types.TransactionRow, partitionSi stmt = stmt[:len(stmt)-1] // remove trailing , stmt += " ON CONFLICT DO NOTHING" - _, err := db.Sql.Exec(stmt, params...) + _, err := db.SQL.Exec(stmt, params...) if err != nil { return fmt.Errorf("error while inserting transaction: %s", err) } @@ -168,6 +168,6 @@ func (db *Migrator) insertTransactionMessages(tx types.TransactionRow, partition stmt = stmt[:len(stmt)-1] // remove trailing "," stmt += " ON CONFLICT DO NOTHING" - _, err = db.Sql.Exec(stmt, params...) + _, err = db.SQL.Exec(stmt, params...) return err } diff --git a/database/legacy/v3/migrator.go b/database/legacy/v3/migrator.go index 849937d7..2ca4a0c5 100644 --- a/database/legacy/v3/migrator.go +++ b/database/legacy/v3/migrator.go @@ -11,11 +11,11 @@ var _ database.Migrator = &Migrator{} // Migrator represents the database migrator that should be used to migrate from v2 of the database to v3 type Migrator struct { - Sql *sqlx.DB + SQL *sqlx.DB } func NewMigrator(db *postgresql.Database) *Migrator { return &Migrator{ - Sql: sqlx.NewDb(db.Sql, "postgres"), + SQL: db.SQL, } } diff --git a/database/legacy/v3/prepare.go b/database/legacy/v3/prepare.go index 18bf3467..5e948971 100644 --- a/database/legacy/v3/prepare.go +++ b/database/legacy/v3/prepare.go @@ -43,7 +43,7 @@ ALTER INDEX IF EXISTS transaction_hash_index RENAME TO transaction_old_hash_inde ALTER INDEX IF EXISTS transaction_height_index RENAME TO transaction_old_height_index; ALTER TABLE IF EXISTS transaction_old RENAME CONSTRAINT transaction_height_fkey TO transaction_old_height_fkey;` - _, err := db.Sql.Exec(stmt) + _, err := db.SQL.Exec(stmt) return err } @@ -54,7 +54,7 @@ ALTER INDEX IF EXISTS message_transaction_hash_index RENAME TO message_old_trans ALTER INDEX IF EXISTS message_type_index RENAME TO message_old_type_index; ALTER TABLE IF EXISTS message_old RENAME CONSTRAINT message_transaction_hash_fkey TO message_old_transaction_hash_fkey;` - _, err := db.Sql.Exec(stmt) + _, err := db.SQL.Exec(stmt) return err } @@ -93,7 +93,7 @@ GRANT ALL PRIVILEGES ON transaction TO "%s"; `, config.Cfg.Database.User) - _, err := db.Sql.Exec(stmt) + _, err := db.SQL.Exec(stmt) return err } @@ -120,7 +120,7 @@ CREATE INDEX message_involved_accounts_index ON message USING GIN(involved_accou GRANT ALL PRIVILEGES ON message TO "%s"; `, config.Cfg.Database.User) - _, err := db.Sql.Exec(stmt) + _, err := db.SQL.Exec(stmt) return err } @@ -130,7 +130,7 @@ GRANT ALL PRIVILEGES ON message TO "%s"; // the message table itself (as we've added this field with the new schema). func (db *Migrator) migrateMessagesByAddressFunction() error { // Delete the old function - _, err := db.Sql.Exec("DROP FUNCTION IF EXISTS messages_by_address(text[],text[],bigint,bigint);") + _, err := db.SQL.Exec("DROP FUNCTION IF EXISTS messages_by_address(text[],text[],bigint,bigint);") if err != nil { return err } @@ -151,6 +151,6 @@ ORDER BY height DESC LIMIT "limit" OFFSET "offset" $$ LANGUAGE sql STABLE; ` - _, err = db.Sql.Exec(stmt) + _, err = db.SQL.Exec(stmt) return err } diff --git a/database/postgresql/postgresql.go b/database/postgresql/postgresql.go index 7a60c5bd..0244dbe4 100644 --- a/database/postgresql/postgresql.go +++ b/database/postgresql/postgresql.go @@ -6,13 +6,13 @@ import ( "fmt" "strings" + "github.com/jmoiron/sqlx" + "github.com/forbole/juno/v3/logging" "github.com/cosmos/cosmos-sdk/simapp/params" "github.com/lib/pq" - _ "github.com/lib/pq" // nolint - "github.com/forbole/juno/v3/database" "github.com/forbole/juno/v3/types" "github.com/forbole/juno/v3/types/config" @@ -41,7 +41,7 @@ func Builder(ctx *database.Context) (database.Database, error) { connStr += fmt.Sprintf(" password=%s", ctx.Cfg.Password) } - postgresDb, err := sql.Open("postgres", connStr) + postgresDb, err := sqlx.Open("postgres", connStr) if err != nil { return nil, err } @@ -51,7 +51,7 @@ func Builder(ctx *database.Context) (database.Database, error) { postgresDb.SetMaxIdleConns(ctx.Cfg.MaxIdleConnections) return &Database{ - Sql: postgresDb, + SQL: postgresDb, EncodingConfig: ctx.EncodingConfig, Logger: ctx.Logger, }, nil @@ -63,7 +63,7 @@ var _ database.Database = &Database{} // Database defines a wrapper around a SQL database and implements functionality // for data aggregation and exporting. type Database struct { - Sql *sql.DB + SQL *sqlx.DB EncodingConfig *params.EncodingConfig Logger logging.Logger } @@ -78,7 +78,7 @@ func (db *Database) createPartitionIfNotExists(table string, partitionID int64) table, partitionID, ) - _, err := db.Sql.Exec(stmt) + _, err := db.SQL.Exec(stmt) if err != nil { return err @@ -92,7 +92,7 @@ func (db *Database) createPartitionIfNotExists(table string, partitionID int64) // HasBlock implements database.Database func (db *Database) HasBlock(height int64) (bool, error) { var res bool - err := db.Sql.QueryRow(`SELECT EXISTS(SELECT 1 FROM block WHERE height = $1);`, height).Scan(&res) + err := db.SQL.QueryRow(`SELECT EXISTS(SELECT 1 FROM block WHERE height = $1);`, height).Scan(&res) return res, err } @@ -101,15 +101,14 @@ func (db *Database) GetLastBlockHeight() (int64, error) { stmt := `SELECT height FROM block ORDER BY height DESC LIMIT 1;` var height int64 - err := db.Sql.QueryRow(stmt).Scan(&height) - if err != nil { - if strings.Contains(err.Error(), "no rows in result set") { - // If no rows stored in block table, return 0 as height - return 0, nil - } + if err := db.SQL.QueryRow(stmt).Scan(&height); err != nil { return 0, fmt.Errorf("error while getting last block height, error: %s", err) } + if height == 0 { + return 0, fmt.Errorf("cannot get block height, no blocks saved") + } + return height, nil } @@ -120,7 +119,7 @@ INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING` proposerAddress := sql.NullString{Valid: len(block.ProposerAddress) != 0, String: block.ProposerAddress} - _, err := db.Sql.Exec(sqlStatement, + _, err := db.SQL.Exec(sqlStatement, block.Height, block.Hash, block.TxNum, block.TotalGas, proposerAddress, block.Timestamp, ) return err @@ -129,7 +128,7 @@ VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING` // GetTotalBlocks implements database.Database func (db *Database) GetTotalBlocks() int64 { var blockCount int64 - err := db.Sql.QueryRow(`SELECT count(*) FROM block;`).Scan(&blockCount) + err := db.SQL.QueryRow(`SELECT count(*) FROM block;`).Scan(&blockCount) if err != nil { return 0 } @@ -154,7 +153,7 @@ func (db *Database) SaveTx(tx *types.Tx) error { } // saveTxInsidePartition stores the given transaction inside the partition having the given id -func (db *Database) saveTxInsidePartition(tx *types.Tx, partitionId int64) error { +func (db *Database) saveTxInsidePartition(tx *types.Tx, partitionID int64) error { sqlStatement := ` INSERT INTO transaction (hash, height, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs, partition_id) @@ -207,12 +206,12 @@ ON CONFLICT (hash, partition_id) DO UPDATE return err } - _, err = db.Sql.Exec(sqlStatement, + _, err = db.SQL.Exec(sqlStatement, tx.TxHash, tx.Height, tx.Successful(), msgsBz, tx.Body.Memo, pq.Array(sigs), sigInfoBz, string(feeBz), tx.GasWanted, tx.GasUsed, tx.RawLog, string(logsBz), - partitionId, + partitionID, ) return err } @@ -221,7 +220,7 @@ ON CONFLICT (hash, partition_id) DO UPDATE func (db *Database) HasValidator(addr string) (bool, error) { var res bool stmt := `SELECT EXISTS(SELECT 1 FROM validator WHERE consensus_address = $1);` - err := db.Sql.QueryRow(stmt, addr).Scan(&res) + err := db.SQL.QueryRow(stmt, addr).Scan(&res) return res, err } @@ -243,7 +242,7 @@ func (db *Database) SaveValidators(validators []*types.Validator) error { stmt = stmt[:len(stmt)-1] // Remove trailing , stmt += " ON CONFLICT DO NOTHING" - _, err := db.Sql.Exec(stmt, vparams...) + _, err := db.SQL.Exec(stmt, vparams...) return err } @@ -265,7 +264,7 @@ func (db *Database) SaveCommitSignatures(signatures []*types.CommitSig) error { stmt = stmt[:len(stmt)-1] stmt += " ON CONFLICT (validator_address, timestamp) DO NOTHING" - _, err := db.Sql.Exec(stmt, sparams...) + _, err := db.SQL.Exec(stmt, sparams...) return err } @@ -295,13 +294,13 @@ ON CONFLICT (transaction_hash, index, partition_id) DO UPDATE value = excluded.value, involved_accounts_addresses = excluded.involved_accounts_addresses` - _, err := db.Sql.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID) + _, err := db.SQL.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID) return err } // Close implements database.Database func (db *Database) Close() { - err := db.Sql.Close() + err := db.SQL.Close() if err != nil { db.Logger.Error("error while closing connection", "err", err) } @@ -312,29 +311,29 @@ func (db *Database) Close() { // GetLastPruned implements database.PruningDb func (db *Database) GetLastPruned() (int64, error) { var lastPrunedHeight int64 - err := db.Sql.QueryRow(`SELECT coalesce(MAX(last_pruned_height),0) FROM pruning LIMIT 1;`).Scan(&lastPrunedHeight) + err := db.SQL.QueryRow(`SELECT coalesce(MAX(last_pruned_height),0) FROM pruning LIMIT 1;`).Scan(&lastPrunedHeight) return lastPrunedHeight, err } // StoreLastPruned implements database.PruningDb func (db *Database) StoreLastPruned(height int64) error { - _, err := db.Sql.Exec(`DELETE FROM pruning`) + _, err := db.SQL.Exec(`DELETE FROM pruning`) if err != nil { return err } - _, err = db.Sql.Exec(`INSERT INTO pruning (last_pruned_height) VALUES ($1)`, height) + _, err = db.SQL.Exec(`INSERT INTO pruning (last_pruned_height) VALUES ($1)`, height) return err } // Prune implements database.PruningDb func (db *Database) Prune(height int64) error { - _, err := db.Sql.Exec(`DELETE FROM pre_commit WHERE height = $1`, height) + _, err := db.SQL.Exec(`DELETE FROM pre_commit WHERE height = $1`, height) if err != nil { return err } - _, err = db.Sql.Exec(` + _, err = db.SQL.Exec(` DELETE FROM message USING transaction WHERE message.transaction_hash = transaction.hash AND transaction.height = $1 diff --git a/database/postgresql/postgresql_test.go b/database/postgresql/postgresql_test.go index cfc515b6..2578440e 100644 --- a/database/postgresql/postgresql_test.go +++ b/database/postgresql/postgresql_test.go @@ -52,11 +52,11 @@ func (suite *DbTestSuite) SetupTest() { suite.Require().True(ok) // Delete the public schema - _, err = bigDipperDb.Sql.Exec(`DROP SCHEMA public CASCADE;`) + _, err = bigDipperDb.SQL.Exec(`DROP SCHEMA public CASCADE;`) suite.Require().NoError(err) // Re-create the schema - _, err = bigDipperDb.Sql.Exec(`CREATE SCHEMA public;`) + _, err = bigDipperDb.SQL.Exec(`CREATE SCHEMA public;`) suite.Require().NoError(err) dirPath := path.Join(".") @@ -74,7 +74,7 @@ func (suite *DbTestSuite) SetupTest() { commentsRegExp := regexp.MustCompile(`/\*.*\*/`) requests := strings.Split(string(file), ";") for _, request := range requests { - _, err := bigDipperDb.Sql.Exec(commentsRegExp.ReplaceAllString(request, "")) + _, err := bigDipperDb.SQL.Exec(commentsRegExp.ReplaceAllString(request, "")) suite.Require().NoError(err) } }