Skip to content

Commit

Permalink
feat(libs): introduce publisher circuit breaker (#1344)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Mar 29, 2024
1 parent f38709c commit 01c96c3
Show file tree
Hide file tree
Showing 35 changed files with 2,048 additions and 116 deletions.
Expand Up @@ -57,7 +57,6 @@ func (c *ReconciliationController) Run(cmd *cobra.Command, args []string) (fctl.
return nil, err
}

fmt.Println(args[0], atLedger, atPayments)
response, err := store.Client().Reconciliation.Reconcile(cmd.Context(), operations.ReconcileRequest{
PolicyID: args[0],
ReconciliationRequest: shared.ReconciliationRequest{
Expand Down
1 change: 0 additions & 1 deletion components/ledger/cmd/buckets.go
Expand Up @@ -21,7 +21,6 @@ func NewBucketUpgrade() *cobra.Command {
Args: cobra.ExactArgs(1),
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {

connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd.Context())
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion components/ledger/cmd/root.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/formancehq/stack/libs/go-libs/bun/bunconnect"

"github.com/formancehq/ledger/cmd/internal"
"github.com/formancehq/ledger/internal/storage/systemstore"
"github.com/formancehq/stack/libs/go-libs/auth"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
Expand Down Expand Up @@ -62,7 +63,9 @@ func NewRootCommand() *cobra.Command {
otlpmetrics.InitOTLPMetricsFlags(root.PersistentFlags())
otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
auth.InitAuthFlags(root.PersistentFlags())
publish.InitCLIFlags(root)
publish.InitCLIFlags(root, func(cd *publish.ConfigDefault) {
cd.PublisherCircuitBreakerSchema = systemstore.Schema
})
bunconnect.InitFlags(root.PersistentFlags())
iam.InitFlags(root.PersistentFlags())

Expand Down
4 changes: 2 additions & 2 deletions components/ledger/internal/bus/monitor_test.go
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/formancehq/stack/libs/go-libs/publish"
topicmapper "github.com/formancehq/stack/libs/go-libs/publish/topic_mapper"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
)
Expand All @@ -24,7 +24,7 @@ func TestMonitor(t *testing.T) {
)
messages, err := pubSub.Subscribe(context.Background(), "testing")
require.NoError(t, err)
p := publish.NewTopicMapperPublisherDecorator(pubSub, map[string]string{
p := topicmapper.NewPublisherDecorator(pubSub, map[string]string{
"*": "testing",
})
m := NewLedgerMonitor(p, uuid.New())
Expand Down
22 changes: 14 additions & 8 deletions components/ledger/libs/migrations/migrator.go
Expand Up @@ -23,8 +23,6 @@ var (
)

type Info struct {
bun.BaseModel `bun:"goose_db_version"`

Version string `json:"version" bun:"version_id"`
Name string `json:"name" bun:"-"`
State string `json:"state,omitempty" bun:"-"`
Expand All @@ -35,6 +33,7 @@ type Migrator struct {
migrations []Migration
schema string
createSchema bool
tableName string
}

type option func(m *Migrator)
Expand All @@ -46,6 +45,12 @@ func WithSchema(schema string, create bool) option {
}
}

func WithTableName(name string) option {
return func(m *Migrator) {
m.tableName = name
}
}

func (m *Migrator) RegisterMigrations(migrations ...Migration) *Migrator {
m.migrations = append(m.migrations, migrations...)
return m
Expand All @@ -57,7 +62,7 @@ func (m *Migrator) createVersionTable(ctx context.Context, tx bun.Tx) error {
version_id bigint not null,
is_applied boolean not null,
tstamp timestamp default now()
);`, migrationTable))
);`, m.tableName))
if err != nil {
return err
}
Expand All @@ -79,7 +84,7 @@ func (m *Migrator) createVersionTable(ctx context.Context, tx bun.Tx) error {
func (m *Migrator) getLastVersion(ctx context.Context, querier interface {
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}) (int64, error) {
row := querier.QueryRowContext(ctx, fmt.Sprintf(`select max(version_id) from "%s";`, migrationTable))
row := querier.QueryRowContext(ctx, fmt.Sprintf(`select max(version_id) from "%s";`, m.tableName))
if err := row.Err(); err != nil {
switch {
case err == sql.ErrNoRows:
Expand Down Expand Up @@ -110,7 +115,7 @@ func (m *Migrator) getLastVersion(ctx context.Context, querier interface {

func (m *Migrator) insertVersion(ctx context.Context, tx bun.Tx, version int) error {
_, err := tx.ExecContext(ctx,
fmt.Sprintf(`INSERT INTO "%s" (version_id, is_applied, tstamp) VALUES (?, ?, ?)`, migrationTable),
fmt.Sprintf(`INSERT INTO "%s" (version_id, is_applied, tstamp) VALUES (?, ?, ?)`, m.tableName),
version, true, time.Now())
return err
}
Expand Down Expand Up @@ -182,10 +187,9 @@ func (m *Migrator) Up(ctx context.Context, db bun.IDB) error {
}

func (m *Migrator) GetMigrations(ctx context.Context, db bun.IDB) ([]Info, error) {

ret := make([]Info, 0)
if err := m.runInTX(ctx, db, func(ctx context.Context, tx bun.Tx) error {
migrationTableName := migrationTable
migrationTableName := m.tableName
if m.schema != "" {
migrationTableName = fmt.Sprintf(`"%s".%s`, m.schema, migrationTableName)
}
Expand Down Expand Up @@ -238,7 +242,9 @@ func (m *Migrator) IsUpToDate(ctx context.Context, db *bun.DB) (bool, error) {
}

func NewMigrator(opts ...option) *Migrator {
ret := &Migrator{}
ret := &Migrator{
tableName: migrationTable,
}
for _, opt := range opts {
opt(ret)
}
Expand Down

0 comments on commit 01c96c3

Please sign in to comment.