Skip to content

Commit

Permalink
feat: use sqlx instead of sql (#79)
Browse files Browse the repository at this point in the history
## Description
This PR updates the `PostgreSQL` database to use the sqlx driver instead of `sql` to provide more functionalities while maintaining backward compatibility

## Checklist
- [x] Targeted PR against correct branch.
- [ ] Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
- [ ] Wrote unit tests.  
- [x] Re-reviewed `Files changed` in the Github PR explorer.
  • Loading branch information
RiccardoM committed Sep 20, 2022
1 parent 91a2635 commit 8c0f7f1
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- ([\#74](https://github.com/forbole/juno/pull/74)) Added database block count to prometheus to improve alert monitoring
- ([\#75](https://github.com/forbole/juno/pull/75)) Allow modules to handle MsgExec inner messages
- ([\#76](https://github.com/forbole/juno/pull/76)) Return 0 as height for `GetLastBlockHeight()` method while no block is saved
- ([\#79](https://github.com/forbole/juno/pull/79)) Use `sqlx` instead of `sql` inside `PostgreSQLDatabase`

## v3.4.0
### Changes
Expand Down
8 changes: 4 additions & 4 deletions database/legacy/v3/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (db *Migrator) getOldTransactions(batchSize int64, offset int64) ([]types.T
stmt := fmt.Sprintf("SELECT * FROM transaction_old ORDER BY height LIMIT %v OFFSET %v", batchSize, offset)

var rows []types.TransactionRow
err := db.Sql.Select(&rows, stmt)
err := db.SQL.Select(&rows, stmt)
if err != nil {
return nil, err
}
Expand All @@ -75,7 +75,7 @@ func (db *Migrator) createPartitionTable(table string, partitionID int64) error
partitionTable := fmt.Sprintf("%s_%v", table, partitionID)

stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES IN (%v)`, partitionTable, table, partitionID)
_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand Down Expand Up @@ -106,7 +106,7 @@ func (db *Migrator) migrateTransactions(rows []types.TransactionRow, partitionSi
stmt = stmt[:len(stmt)-1] // remove trailing ,
stmt += " ON CONFLICT DO NOTHING"

_, err := db.Sql.Exec(stmt, params...)
_, err := db.SQL.Exec(stmt, params...)
if err != nil {
return fmt.Errorf("error while inserting transaction: %s", err)
}
Expand Down Expand Up @@ -168,6 +168,6 @@ func (db *Migrator) insertTransactionMessages(tx types.TransactionRow, partition
stmt = stmt[:len(stmt)-1] // remove trailing ","
stmt += " ON CONFLICT DO NOTHING"

_, err = db.Sql.Exec(stmt, params...)
_, err = db.SQL.Exec(stmt, params...)
return err
}
4 changes: 2 additions & 2 deletions database/legacy/v3/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ var _ database.Migrator = &Migrator{}

// Migrator represents the database migrator that should be used to migrate from v2 of the database to v3
type Migrator struct {
Sql *sqlx.DB
SQL *sqlx.DB
}

func NewMigrator(db *postgresql.Database) *Migrator {
return &Migrator{
Sql: sqlx.NewDb(db.Sql, "postgres"),
SQL: db.SQL,
}
}
12 changes: 6 additions & 6 deletions database/legacy/v3/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ ALTER INDEX IF EXISTS transaction_hash_index RENAME TO transaction_old_hash_inde
ALTER INDEX IF EXISTS transaction_height_index RENAME TO transaction_old_height_index;
ALTER TABLE IF EXISTS transaction_old RENAME CONSTRAINT transaction_height_fkey TO transaction_old_height_fkey;`

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand All @@ -54,7 +54,7 @@ ALTER INDEX IF EXISTS message_transaction_hash_index RENAME TO message_old_trans
ALTER INDEX IF EXISTS message_type_index RENAME TO message_old_type_index;
ALTER TABLE IF EXISTS message_old RENAME CONSTRAINT message_transaction_hash_fkey TO message_old_transaction_hash_fkey;`

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand Down Expand Up @@ -93,7 +93,7 @@ GRANT ALL PRIVILEGES ON transaction TO "%s";
`,
config.Cfg.Database.User)

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand All @@ -120,7 +120,7 @@ CREATE INDEX message_involved_accounts_index ON message USING GIN(involved_accou
GRANT ALL PRIVILEGES ON message TO "%s";
`, config.Cfg.Database.User)

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand All @@ -130,7 +130,7 @@ GRANT ALL PRIVILEGES ON message TO "%s";
// the message table itself (as we've added this field with the new schema).
func (db *Migrator) migrateMessagesByAddressFunction() error {
// Delete the old function
_, err := db.Sql.Exec("DROP FUNCTION IF EXISTS messages_by_address(text[],text[],bigint,bigint);")
_, err := db.SQL.Exec("DROP FUNCTION IF EXISTS messages_by_address(text[],text[],bigint,bigint);")
if err != nil {
return err
}
Expand All @@ -151,6 +151,6 @@ ORDER BY height DESC LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
`

_, err = db.Sql.Exec(stmt)
_, err = db.SQL.Exec(stmt)
return err
}
55 changes: 27 additions & 28 deletions database/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"fmt"
"strings"

"github.com/jmoiron/sqlx"

"github.com/forbole/juno/v3/logging"

"github.com/cosmos/cosmos-sdk/simapp/params"
"github.com/lib/pq"

_ "github.com/lib/pq" // nolint

"github.com/forbole/juno/v3/database"
"github.com/forbole/juno/v3/types"
"github.com/forbole/juno/v3/types/config"
Expand Down Expand Up @@ -41,7 +41,7 @@ func Builder(ctx *database.Context) (database.Database, error) {
connStr += fmt.Sprintf(" password=%s", ctx.Cfg.Password)
}

postgresDb, err := sql.Open("postgres", connStr)
postgresDb, err := sqlx.Open("postgres", connStr)
if err != nil {
return nil, err
}
Expand All @@ -51,7 +51,7 @@ func Builder(ctx *database.Context) (database.Database, error) {
postgresDb.SetMaxIdleConns(ctx.Cfg.MaxIdleConnections)

return &Database{
Sql: postgresDb,
SQL: postgresDb,
EncodingConfig: ctx.EncodingConfig,
Logger: ctx.Logger,
}, nil
Expand All @@ -63,7 +63,7 @@ var _ database.Database = &Database{}
// Database defines a wrapper around a SQL database and implements functionality
// for data aggregation and exporting.
type Database struct {
Sql *sql.DB
SQL *sqlx.DB
EncodingConfig *params.EncodingConfig
Logger logging.Logger
}
Expand All @@ -78,7 +78,7 @@ func (db *Database) createPartitionIfNotExists(table string, partitionID int64)
table,
partitionID,
)
_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)

if err != nil {
return err
Expand All @@ -92,7 +92,7 @@ func (db *Database) createPartitionIfNotExists(table string, partitionID int64)
// HasBlock implements database.Database
func (db *Database) HasBlock(height int64) (bool, error) {
var res bool
err := db.Sql.QueryRow(`SELECT EXISTS(SELECT 1 FROM block WHERE height = $1);`, height).Scan(&res)
err := db.SQL.QueryRow(`SELECT EXISTS(SELECT 1 FROM block WHERE height = $1);`, height).Scan(&res)
return res, err
}

Expand All @@ -101,15 +101,14 @@ func (db *Database) GetLastBlockHeight() (int64, error) {
stmt := `SELECT height FROM block ORDER BY height DESC LIMIT 1;`

var height int64
err := db.Sql.QueryRow(stmt).Scan(&height)
if err != nil {
if strings.Contains(err.Error(), "no rows in result set") {
// If no rows stored in block table, return 0 as height
return 0, nil
}
if err := db.SQL.QueryRow(stmt).Scan(&height); err != nil {
return 0, fmt.Errorf("error while getting last block height, error: %s", err)
}

if height == 0 {
return 0, fmt.Errorf("cannot get block height, no blocks saved")
}

return height, nil
}

Expand All @@ -120,7 +119,7 @@ INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING`

proposerAddress := sql.NullString{Valid: len(block.ProposerAddress) != 0, String: block.ProposerAddress}
_, err := db.Sql.Exec(sqlStatement,
_, err := db.SQL.Exec(sqlStatement,
block.Height, block.Hash, block.TxNum, block.TotalGas, proposerAddress, block.Timestamp,
)
return err
Expand All @@ -129,7 +128,7 @@ VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING`
// GetTotalBlocks implements database.Database
func (db *Database) GetTotalBlocks() int64 {
var blockCount int64
err := db.Sql.QueryRow(`SELECT count(*) FROM block;`).Scan(&blockCount)
err := db.SQL.QueryRow(`SELECT count(*) FROM block;`).Scan(&blockCount)
if err != nil {
return 0
}
Expand All @@ -154,7 +153,7 @@ func (db *Database) SaveTx(tx *types.Tx) error {
}

// saveTxInsidePartition stores the given transaction inside the partition having the given id
func (db *Database) saveTxInsidePartition(tx *types.Tx, partitionId int64) error {
func (db *Database) saveTxInsidePartition(tx *types.Tx, partitionID int64) error {
sqlStatement := `
INSERT INTO transaction
(hash, height, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs, partition_id)
Expand Down Expand Up @@ -207,12 +206,12 @@ ON CONFLICT (hash, partition_id) DO UPDATE
return err
}

_, err = db.Sql.Exec(sqlStatement,
_, err = db.SQL.Exec(sqlStatement,
tx.TxHash, tx.Height, tx.Successful(),
msgsBz, tx.Body.Memo, pq.Array(sigs),
sigInfoBz, string(feeBz),
tx.GasWanted, tx.GasUsed, tx.RawLog, string(logsBz),
partitionId,
partitionID,
)
return err
}
Expand All @@ -221,7 +220,7 @@ ON CONFLICT (hash, partition_id) DO UPDATE
func (db *Database) HasValidator(addr string) (bool, error) {
var res bool
stmt := `SELECT EXISTS(SELECT 1 FROM validator WHERE consensus_address = $1);`
err := db.Sql.QueryRow(stmt, addr).Scan(&res)
err := db.SQL.QueryRow(stmt, addr).Scan(&res)
return res, err
}

Expand All @@ -243,7 +242,7 @@ func (db *Database) SaveValidators(validators []*types.Validator) error {

stmt = stmt[:len(stmt)-1] // Remove trailing ,
stmt += " ON CONFLICT DO NOTHING"
_, err := db.Sql.Exec(stmt, vparams...)
_, err := db.SQL.Exec(stmt, vparams...)
return err
}

Expand All @@ -265,7 +264,7 @@ func (db *Database) SaveCommitSignatures(signatures []*types.CommitSig) error {

stmt = stmt[:len(stmt)-1]
stmt += " ON CONFLICT (validator_address, timestamp) DO NOTHING"
_, err := db.Sql.Exec(stmt, sparams...)
_, err := db.SQL.Exec(stmt, sparams...)
return err
}

Expand Down Expand Up @@ -295,13 +294,13 @@ ON CONFLICT (transaction_hash, index, partition_id) DO UPDATE
value = excluded.value,
involved_accounts_addresses = excluded.involved_accounts_addresses`

_, err := db.Sql.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID)
_, err := db.SQL.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID)
return err
}

// Close implements database.Database
func (db *Database) Close() {
err := db.Sql.Close()
err := db.SQL.Close()
if err != nil {
db.Logger.Error("error while closing connection", "err", err)
}
Expand All @@ -312,29 +311,29 @@ func (db *Database) Close() {
// GetLastPruned implements database.PruningDb
func (db *Database) GetLastPruned() (int64, error) {
var lastPrunedHeight int64
err := db.Sql.QueryRow(`SELECT coalesce(MAX(last_pruned_height),0) FROM pruning LIMIT 1;`).Scan(&lastPrunedHeight)
err := db.SQL.QueryRow(`SELECT coalesce(MAX(last_pruned_height),0) FROM pruning LIMIT 1;`).Scan(&lastPrunedHeight)
return lastPrunedHeight, err
}

// StoreLastPruned implements database.PruningDb
func (db *Database) StoreLastPruned(height int64) error {
_, err := db.Sql.Exec(`DELETE FROM pruning`)
_, err := db.SQL.Exec(`DELETE FROM pruning`)
if err != nil {
return err
}

_, err = db.Sql.Exec(`INSERT INTO pruning (last_pruned_height) VALUES ($1)`, height)
_, err = db.SQL.Exec(`INSERT INTO pruning (last_pruned_height) VALUES ($1)`, height)
return err
}

// Prune implements database.PruningDb
func (db *Database) Prune(height int64) error {
_, err := db.Sql.Exec(`DELETE FROM pre_commit WHERE height = $1`, height)
_, err := db.SQL.Exec(`DELETE FROM pre_commit WHERE height = $1`, height)
if err != nil {
return err
}

_, err = db.Sql.Exec(`
_, err = db.SQL.Exec(`
DELETE FROM message
USING transaction
WHERE message.transaction_hash = transaction.hash AND transaction.height = $1
Expand Down
6 changes: 3 additions & 3 deletions database/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func (suite *DbTestSuite) SetupTest() {
suite.Require().True(ok)

// Delete the public schema
_, err = bigDipperDb.Sql.Exec(`DROP SCHEMA public CASCADE;`)
_, err = bigDipperDb.SQL.Exec(`DROP SCHEMA public CASCADE;`)
suite.Require().NoError(err)

// Re-create the schema
_, err = bigDipperDb.Sql.Exec(`CREATE SCHEMA public;`)
_, err = bigDipperDb.SQL.Exec(`CREATE SCHEMA public;`)
suite.Require().NoError(err)

dirPath := path.Join(".")
Expand All @@ -74,7 +74,7 @@ func (suite *DbTestSuite) SetupTest() {
commentsRegExp := regexp.MustCompile(`/\*.*\*/`)
requests := strings.Split(string(file), ";")
for _, request := range requests {
_, err := bigDipperDb.Sql.Exec(commentsRegExp.ReplaceAllString(request, ""))
_, err := bigDipperDb.SQL.Exec(commentsRegExp.ReplaceAllString(request, ""))
suite.Require().NoError(err)
}
}
Expand Down

0 comments on commit 8c0f7f1

Please sign in to comment.