Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use sqlx instead of sql #79

Merged
merged 3 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- ([\#74](https://github.com/forbole/juno/pull/74)) Added database block count to prometheus to improve alert monitoring
- ([\#75](https://github.com/forbole/juno/pull/75)) Allow modules to handle MsgExec inner messages
- ([\#76](https://github.com/forbole/juno/pull/76)) Return 0 as height for `GetLastBlockHeight()` method while no block is saved
- ([\#79](https://github.com/forbole/juno/pull/79)) Use `sqlx` instead of `sql` inside `PostgreSQLDatabase`

## v3.4.0
### Changes
Expand Down
8 changes: 4 additions & 4 deletions database/legacy/v3/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (db *Migrator) getOldTransactions(batchSize int64, offset int64) ([]types.T
stmt := fmt.Sprintf("SELECT * FROM transaction_old ORDER BY height LIMIT %v OFFSET %v", batchSize, offset)

var rows []types.TransactionRow
err := db.Sql.Select(&rows, stmt)
err := db.SQL.Select(&rows, stmt)
if err != nil {
return nil, err
}
Expand All @@ -75,7 +75,7 @@ func (db *Migrator) createPartitionTable(table string, partitionID int64) error
partitionTable := fmt.Sprintf("%s_%v", table, partitionID)

stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES IN (%v)`, partitionTable, table, partitionID)
_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand Down Expand Up @@ -106,7 +106,7 @@ func (db *Migrator) migrateTransactions(rows []types.TransactionRow, partitionSi
stmt = stmt[:len(stmt)-1] // remove trailing ,
stmt += " ON CONFLICT DO NOTHING"

_, err := db.Sql.Exec(stmt, params...)
_, err := db.SQL.Exec(stmt, params...)
if err != nil {
return fmt.Errorf("error while inserting transaction: %s", err)
}
Expand Down Expand Up @@ -168,6 +168,6 @@ func (db *Migrator) insertTransactionMessages(tx types.TransactionRow, partition
stmt = stmt[:len(stmt)-1] // remove trailing ","
stmt += " ON CONFLICT DO NOTHING"

_, err = db.Sql.Exec(stmt, params...)
_, err = db.SQL.Exec(stmt, params...)
return err
}
4 changes: 2 additions & 2 deletions database/legacy/v3/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ var _ database.Migrator = &Migrator{}

// Migrator represents the database migrator that should be used to migrate from v2 of the database to v3
type Migrator struct {
Sql *sqlx.DB
SQL *sqlx.DB
}

func NewMigrator(db *postgresql.Database) *Migrator {
return &Migrator{
Sql: sqlx.NewDb(db.Sql, "postgres"),
SQL: db.SQL,
}
}
12 changes: 6 additions & 6 deletions database/legacy/v3/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ ALTER INDEX IF EXISTS transaction_hash_index RENAME TO transaction_old_hash_inde
ALTER INDEX IF EXISTS transaction_height_index RENAME TO transaction_old_height_index;
ALTER TABLE IF EXISTS transaction_old RENAME CONSTRAINT transaction_height_fkey TO transaction_old_height_fkey;`

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand All @@ -54,7 +54,7 @@ ALTER INDEX IF EXISTS message_transaction_hash_index RENAME TO message_old_trans
ALTER INDEX IF EXISTS message_type_index RENAME TO message_old_type_index;
ALTER TABLE IF EXISTS message_old RENAME CONSTRAINT message_transaction_hash_fkey TO message_old_transaction_hash_fkey;`

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand Down Expand Up @@ -93,7 +93,7 @@ GRANT ALL PRIVILEGES ON transaction TO "%s";
`,
config.Cfg.Database.User)

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand All @@ -120,7 +120,7 @@ CREATE INDEX message_involved_accounts_index ON message USING GIN(involved_accou
GRANT ALL PRIVILEGES ON message TO "%s";
`, config.Cfg.Database.User)

_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)
return err
}

Expand All @@ -130,7 +130,7 @@ GRANT ALL PRIVILEGES ON message TO "%s";
// the message table itself (as we've added this field with the new schema).
func (db *Migrator) migrateMessagesByAddressFunction() error {
// Delete the old function
_, err := db.Sql.Exec("DROP FUNCTION IF EXISTS messages_by_address(text[],text[],bigint,bigint);")
_, err := db.SQL.Exec("DROP FUNCTION IF EXISTS messages_by_address(text[],text[],bigint,bigint);")
if err != nil {
return err
}
Expand All @@ -151,6 +151,6 @@ ORDER BY height DESC LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
`

_, err = db.Sql.Exec(stmt)
_, err = db.SQL.Exec(stmt)
return err
}
55 changes: 27 additions & 28 deletions database/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"fmt"
"strings"

"github.com/jmoiron/sqlx"

"github.com/forbole/juno/v3/logging"

"github.com/cosmos/cosmos-sdk/simapp/params"
"github.com/lib/pq"

_ "github.com/lib/pq" // nolint

"github.com/forbole/juno/v3/database"
"github.com/forbole/juno/v3/types"
"github.com/forbole/juno/v3/types/config"
Expand Down Expand Up @@ -41,7 +41,7 @@ func Builder(ctx *database.Context) (database.Database, error) {
connStr += fmt.Sprintf(" password=%s", ctx.Cfg.Password)
}

postgresDb, err := sql.Open("postgres", connStr)
postgresDb, err := sqlx.Open("postgres", connStr)
if err != nil {
return nil, err
}
Expand All @@ -51,7 +51,7 @@ func Builder(ctx *database.Context) (database.Database, error) {
postgresDb.SetMaxIdleConns(ctx.Cfg.MaxIdleConnections)

return &Database{
Sql: postgresDb,
SQL: postgresDb,
EncodingConfig: ctx.EncodingConfig,
Logger: ctx.Logger,
}, nil
Expand All @@ -63,7 +63,7 @@ var _ database.Database = &Database{}
// Database defines a wrapper around a SQL database and implements functionality
// for data aggregation and exporting.
type Database struct {
Sql *sql.DB
SQL *sqlx.DB
EncodingConfig *params.EncodingConfig
Logger logging.Logger
}
Expand All @@ -78,7 +78,7 @@ func (db *Database) createPartitionIfNotExists(table string, partitionID int64)
table,
partitionID,
)
_, err := db.Sql.Exec(stmt)
_, err := db.SQL.Exec(stmt)

if err != nil {
return err
Expand All @@ -92,7 +92,7 @@ func (db *Database) createPartitionIfNotExists(table string, partitionID int64)
// HasBlock implements database.Database
func (db *Database) HasBlock(height int64) (bool, error) {
var res bool
err := db.Sql.QueryRow(`SELECT EXISTS(SELECT 1 FROM block WHERE height = $1);`, height).Scan(&res)
err := db.SQL.QueryRow(`SELECT EXISTS(SELECT 1 FROM block WHERE height = $1);`, height).Scan(&res)
return res, err
}

Expand All @@ -101,15 +101,14 @@ func (db *Database) GetLastBlockHeight() (int64, error) {
stmt := `SELECT height FROM block ORDER BY height DESC LIMIT 1;`

var height int64
err := db.Sql.QueryRow(stmt).Scan(&height)
if err != nil {
if strings.Contains(err.Error(), "no rows in result set") {
// If no rows stored in block table, return 0 as height
return 0, nil
}
if err := db.SQL.QueryRow(stmt).Scan(&height); err != nil {
return 0, fmt.Errorf("error while getting last block height, error: %s", err)
}

if height == 0 {
return 0, fmt.Errorf("cannot get block height, no blocks saved")
}

return height, nil
}

Expand All @@ -120,7 +119,7 @@ INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING`

proposerAddress := sql.NullString{Valid: len(block.ProposerAddress) != 0, String: block.ProposerAddress}
_, err := db.Sql.Exec(sqlStatement,
_, err := db.SQL.Exec(sqlStatement,
block.Height, block.Hash, block.TxNum, block.TotalGas, proposerAddress, block.Timestamp,
)
return err
Expand All @@ -129,7 +128,7 @@ VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING`
// GetTotalBlocks implements database.Database
func (db *Database) GetTotalBlocks() int64 {
var blockCount int64
err := db.Sql.QueryRow(`SELECT count(*) FROM block;`).Scan(&blockCount)
err := db.SQL.QueryRow(`SELECT count(*) FROM block;`).Scan(&blockCount)
if err != nil {
return 0
}
Expand All @@ -154,7 +153,7 @@ func (db *Database) SaveTx(tx *types.Tx) error {
}

// saveTxInsidePartition stores the given transaction inside the partition having the given id
func (db *Database) saveTxInsidePartition(tx *types.Tx, partitionId int64) error {
func (db *Database) saveTxInsidePartition(tx *types.Tx, partitionID int64) error {
sqlStatement := `
INSERT INTO transaction
(hash, height, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs, partition_id)
Expand Down Expand Up @@ -207,12 +206,12 @@ ON CONFLICT (hash, partition_id) DO UPDATE
return err
}

_, err = db.Sql.Exec(sqlStatement,
_, err = db.SQL.Exec(sqlStatement,
tx.TxHash, tx.Height, tx.Successful(),
msgsBz, tx.Body.Memo, pq.Array(sigs),
sigInfoBz, string(feeBz),
tx.GasWanted, tx.GasUsed, tx.RawLog, string(logsBz),
partitionId,
partitionID,
)
return err
}
Expand All @@ -221,7 +220,7 @@ ON CONFLICT (hash, partition_id) DO UPDATE
func (db *Database) HasValidator(addr string) (bool, error) {
var res bool
stmt := `SELECT EXISTS(SELECT 1 FROM validator WHERE consensus_address = $1);`
err := db.Sql.QueryRow(stmt, addr).Scan(&res)
err := db.SQL.QueryRow(stmt, addr).Scan(&res)
return res, err
}

Expand All @@ -243,7 +242,7 @@ func (db *Database) SaveValidators(validators []*types.Validator) error {

stmt = stmt[:len(stmt)-1] // Remove trailing ,
stmt += " ON CONFLICT DO NOTHING"
_, err := db.Sql.Exec(stmt, vparams...)
_, err := db.SQL.Exec(stmt, vparams...)
return err
}

Expand All @@ -265,7 +264,7 @@ func (db *Database) SaveCommitSignatures(signatures []*types.CommitSig) error {

stmt = stmt[:len(stmt)-1]
stmt += " ON CONFLICT (validator_address, timestamp) DO NOTHING"
_, err := db.Sql.Exec(stmt, sparams...)
_, err := db.SQL.Exec(stmt, sparams...)
return err
}

Expand Down Expand Up @@ -295,13 +294,13 @@ ON CONFLICT (transaction_hash, index, partition_id) DO UPDATE
value = excluded.value,
involved_accounts_addresses = excluded.involved_accounts_addresses`

_, err := db.Sql.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID)
_, err := db.SQL.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID)
return err
}

// Close implements database.Database
func (db *Database) Close() {
err := db.Sql.Close()
err := db.SQL.Close()
if err != nil {
db.Logger.Error("error while closing connection", "err", err)
}
Expand All @@ -312,29 +311,29 @@ func (db *Database) Close() {
// GetLastPruned implements database.PruningDb
func (db *Database) GetLastPruned() (int64, error) {
var lastPrunedHeight int64
err := db.Sql.QueryRow(`SELECT coalesce(MAX(last_pruned_height),0) FROM pruning LIMIT 1;`).Scan(&lastPrunedHeight)
err := db.SQL.QueryRow(`SELECT coalesce(MAX(last_pruned_height),0) FROM pruning LIMIT 1;`).Scan(&lastPrunedHeight)
return lastPrunedHeight, err
}

// StoreLastPruned implements database.PruningDb
func (db *Database) StoreLastPruned(height int64) error {
_, err := db.Sql.Exec(`DELETE FROM pruning`)
_, err := db.SQL.Exec(`DELETE FROM pruning`)
if err != nil {
return err
}

_, err = db.Sql.Exec(`INSERT INTO pruning (last_pruned_height) VALUES ($1)`, height)
_, err = db.SQL.Exec(`INSERT INTO pruning (last_pruned_height) VALUES ($1)`, height)
return err
}

// Prune implements database.PruningDb
func (db *Database) Prune(height int64) error {
_, err := db.Sql.Exec(`DELETE FROM pre_commit WHERE height = $1`, height)
_, err := db.SQL.Exec(`DELETE FROM pre_commit WHERE height = $1`, height)
if err != nil {
return err
}

_, err = db.Sql.Exec(`
_, err = db.SQL.Exec(`
DELETE FROM message
USING transaction
WHERE message.transaction_hash = transaction.hash AND transaction.height = $1
Expand Down
6 changes: 3 additions & 3 deletions database/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func (suite *DbTestSuite) SetupTest() {
suite.Require().True(ok)

// Delete the public schema
_, err = bigDipperDb.Sql.Exec(`DROP SCHEMA public CASCADE;`)
_, err = bigDipperDb.SQL.Exec(`DROP SCHEMA public CASCADE;`)
suite.Require().NoError(err)

// Re-create the schema
_, err = bigDipperDb.Sql.Exec(`CREATE SCHEMA public;`)
_, err = bigDipperDb.SQL.Exec(`CREATE SCHEMA public;`)
suite.Require().NoError(err)

dirPath := path.Join(".")
Expand All @@ -74,7 +74,7 @@ func (suite *DbTestSuite) SetupTest() {
commentsRegExp := regexp.MustCompile(`/\*.*\*/`)
requests := strings.Split(string(file), ";")
for _, request := range requests {
_, err := bigDipperDb.Sql.Exec(commentsRegExp.ReplaceAllString(request, ""))
_, err := bigDipperDb.SQL.Exec(commentsRegExp.ReplaceAllString(request, ""))
suite.Require().NoError(err)
}
}
Expand Down