Skip to content

Commit

Permalink
Refactoring: database connection
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Dec 7, 2021
1 parent c161049 commit c7b329e
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 77 deletions.
70 changes: 58 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This library partially implements DipDup framework features and can be used for

## Packages

#### `cmdline`
### `cmdline`

Command line argument parser, compatible with [DipDup CLI](https://docs.dipdup.net/command-line).

Expand All @@ -20,7 +20,7 @@ if args.Help {
}
```

#### `config`
### `config`

DipDup YAML [configuration](https://docs.dipdup.net/config-file-reference) parser. You can validate config by `validate` tag from [validator package](https://github.com/go-playground/validator).

Expand Down Expand Up @@ -48,7 +48,7 @@ if err := config.Parse("config.yaml", &cfg); err != nil {
}
```

#### `node`
### `node`

Simple Tezos RPC API wrapper.

Expand All @@ -58,29 +58,75 @@ import "github.com/dipdup-net/go-lib/node"
rpc := node.NewNodeRPC(url, node.WithTimeout(timeout))
```

#### `state`
### `database`

Managing DipDup database connection. Default interface contains custom method `Connect` and extends 3 interfaces `driver.Pinger`, `StateRepository` and `io.Closer`.

Managing DipDup index state.

```go
import "github.com/dipdup-net/go-lib/state"
// Database -
type Database interface {
Connect(ctx context.Context, cfg config.Database) error

StateRepository

driver.Pinger
io.Closer
}

s := state.State{}
// StateRepository -
type StateRepository interface {
State(name string) (State, error)
UpdateState(state State) error
CreateState(state State) error
DeleteState(state State) error
}
```

where `State` structure is:

```go
// State -
type State struct {
IndexName string `gorm:"primaryKey"`
IndexType string
Hash string
Level uint64
//nolint
tableName struct{} `gorm:"-" pg:"dipdup_state" json:"-"`

IndexName string `gorm:"primaryKey" pg:",pk" json:"index_name"`
IndexType string `json:"index_type"`
Hash string `json:"hash,omitempty"`
Level uint64 `json:"level"`
UpdatedAt int `gorm:"autoUpdateTime"`
}
```

#### `tzkt`
There are 2 default implementations of `Database` interface:
* `Gorm` - database connection via [gorm](https://gorm.io/)
* `PgGo` - database connection via [pg-go](https://pg.uptrace.dev/)

There is method `Wait` which waiting until database connection will be established.

Exaple of usage:

```go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db := database.NewPgGo()
if err := db.Connect(ctx, cfg); err != nil {
panic(err)
}

database.Wait(ctx, db, 5*time.Second)

var yourModel struct{}
conn := db.DB()
if err := conn.WithContext(ctx).Model(&yourModel).Limit(10).Select(); err != nil {
panic(err)
}
```


### `tzkt`

TzKT API and Events wrapper.
Read more about events and SignalR in the [doc](https://github.com/dipdup-net/go-lib/blob/master/tzkt/events/README.md)
Expand Down
50 changes: 50 additions & 0 deletions database/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package database

import (
"context"
"database/sql/driver"
"io"
"time"

"github.com/dipdup-net/go-lib/config"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// Database -
type Database interface {
Connect(ctx context.Context, cfg config.Database) error

StateRepository

driver.Pinger
io.Closer
}

// errors
var (
ErrConnectionIsNotInitialized = errors.New("connection is not initialized")
ErrUnsupportedDatabaseType = errors.New("unsupported database type")
)

// Wait -
func Wait(ctx context.Context, db driver.Pinger, checkPeriod time.Duration) {
logrus.Info("Waiting database is up and runnning")
if err := db.Ping(ctx); err == nil {
return
}

ticker := time.NewTicker(checkPeriod)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := db.Ping(ctx); err == nil {
logrus.Warn(err)
continue
}
return
}
}
}
95 changes: 60 additions & 35 deletions state/db.go → database/gorm.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package state
package database

import (
"context"
Expand All @@ -9,30 +9,30 @@ import (

"github.com/dipdup-net/go-lib/config"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)

// CheckConnection
func CheckConnection(db *gorm.DB) error {
sql, err := db.DB()
if err != nil {
return err
}
// Gorm -
type Gorm struct {
conn *gorm.DB
}

if err = sql.Ping(); err != nil {
return err
}
// NewGorm -
func NewGorm() *Gorm {
return new(Gorm)
}

return nil
// DB -
func (db *Gorm) DB() *gorm.DB {
return db.conn
}

// OpenConnection -
func OpenConnection(ctx context.Context, cfg config.Database) (*gorm.DB, error) {
// Connect -
func (db *Gorm) Connect(ctx context.Context, cfg config.Database) error {
var dialector gorm.Dialector
switch cfg.Kind {
case config.DBKindSqlite:
Expand All @@ -56,10 +56,10 @@ func OpenConnection(ctx context.Context, cfg config.Database) (*gorm.DB, error)
}
dialector = mysql.Open(connString)
default:
return nil, errors.Errorf("Unsupported database kind %s", cfg.Kind)
return errors.Wrap(ErrUnsupportedDatabaseType, cfg.Kind)
}

db, err := gorm.Open(dialector, &gorm.Config{
conn, err := gorm.Open(dialector, &gorm.Config{
SkipDefaultTransaction: true,
PrepareStmt: true,
Logger: logger.New(
Expand All @@ -71,31 +71,56 @@ func OpenConnection(ctx context.Context, cfg config.Database) (*gorm.DB, error)
),
})
if err != nil {
return nil, err
return err
}
db.conn = conn

checkHealth(ctx, db)
return nil
}

return db, nil
// Close -
func (db *Gorm) Close() error {
sql, err := db.conn.DB()
if err != nil {
return err
}
return sql.Close()
}

func checkHealth(ctx context.Context, db *gorm.DB) {
logrus.Info("Waiting database is up and runnning")
if err := CheckConnection(db); err == nil {
return
// Ping -
func (db *Gorm) Ping(ctx context.Context) error {
if db.conn == nil {
return ErrConnectionIsNotInitialized
}
sql, err := db.conn.DB()
if err != nil {
return err
}

ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := CheckConnection(db); err != nil {
logrus.Warn(err)
continue
}
return
}
if err = sql.PingContext(ctx); err != nil {
return err
}

return nil
}

// State -
func (db *Gorm) State(indexName string) (s State, err error) {
err = db.conn.Where("index_name = ?", indexName).First(&s).Error
return
}

// CreateState -
func (db *Gorm) CreateState(s State) error {
return db.conn.Create(&s).Error
}

// UpdateState -
func (db *Gorm) UpdateState(s State) error {
return db.conn.Save(&s).Error
}

// DeleteState -
func (db *Gorm) DeleteState(s State) error {
return db.conn.Delete(&s).Error
}
86 changes: 86 additions & 0 deletions database/pg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package database

import (
"context"
"fmt"

"github.com/dipdup-net/go-lib/config"
pg "github.com/go-pg/pg/v10"
"github.com/pkg/errors"
)

// PgGo -
type PgGo struct {
conn *pg.DB
}

// NewPgGo -
func NewPgGo() *PgGo {
return new(PgGo)
}

// DB -
func (db *PgGo) DB() *pg.DB {
return db.conn
}

// Connect -
func (db *PgGo) Connect(ctx context.Context, cfg config.Database) error {
if cfg.Kind != config.DBKindPostgres {
return errors.Wrap(ErrUnsupportedDatabaseType, cfg.Kind)
}
var conn *pg.DB
if cfg.Path != "" {
opt, err := pg.ParseURL(cfg.Path)
if err != nil {
return err
}
conn = pg.Connect(opt)
} else {
conn = pg.Connect(&pg.Options{
Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port),
User: cfg.User,
Password: cfg.Password,
Database: cfg.Database,
})
}
db.conn = conn
return nil
}

// Close -
func (db *PgGo) Close() error {
return db.conn.Close()
}

// Ping -
func (db *PgGo) Ping(ctx context.Context) error {
if db.conn == nil {
return ErrConnectionIsNotInitialized
}
return db.conn.Ping(ctx)
}

// State -
func (db *PgGo) State(indexName string) (s State, err error) {
err = db.conn.Model(&s).Where("index_name = ?", indexName).Limit(1).Select()
return
}

// CreateState -
func (db *PgGo) CreateState(s State) error {
_, err := db.conn.Model(&s).Insert()
return err
}

// UpdateState -
func (db *PgGo) UpdateState(s State) error {
_, err := db.conn.Model(&s).WherePK().Update()
return err
}

// DeleteState -
func (db *PgGo) DeleteState(s State) error {
_, err := db.conn.Model(&s).WherePK().Delete()
return err
}

0 comments on commit c7b329e

Please sign in to comment.