Skip to content

Commit

Permalink
feat: remove worker to test parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Jun 20, 2024
1 parent 1360209 commit 0c7a15f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 22 deletions.
7 changes: 3 additions & 4 deletions components/ledger/internal/engine/command/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/bus"
"github.com/formancehq/ledger/internal/engine/utils/batching"
"github.com/formancehq/ledger/internal/machine/vm"
"github.com/formancehq/stack/libs/go-libs/collectionutils"
"github.com/formancehq/stack/libs/go-libs/metadata"
Expand All @@ -28,7 +27,7 @@ type Parameters struct {
}

type Commander struct {
*batching.Batcher[*ledger.ChainedLog]
//*batching.Batcher[*ledger.ChainedLog]
store Store
locker Locker
compiler *Compiler
Expand All @@ -48,7 +47,7 @@ func New(store Store, locker Locker, compiler *Compiler, referencer *Referencer,
compiler: compiler,
lastTXID: big.NewInt(-1),
referencer: referencer,
Batcher: batching.NewBatcher(store.InsertLogs, 1, batchSize),
//Batcher: batching.NewBatcher(store.InsertLogs, 1, batchSize),
monitor: monitor,
}
}
Expand Down Expand Up @@ -311,7 +310,7 @@ func (commander *Commander) RevertTransaction(ctx context.Context, parameters Pa
}

func (commander *Commander) Close() {
commander.Batcher.Close()
//commander.Batcher.Close()
commander.running.Wait()
}

Expand Down
35 changes: 18 additions & 17 deletions components/ledger/internal/engine/command/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,32 @@ func (e *executionContext) AppendLog(ctx context.Context, log *ledger.Log) (*led
return e.commander.chainLog(log)
}()

done := make(chan struct{})
func() {
_, span := tracer.Start(ctx, "AppendLogToQueue")
defer span.End()

e.commander.Append(chainedLog, func() {
close(done)
})
}()

//done := make(chan struct{})
err := func() error {
_, span := tracer.Start(ctx, "WaitLogAck")
ctx, span := tracer.Start(ctx, "InsertLog")
defer span.End()

select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
return e.commander.store.InsertLogs(ctx, chainedLog)
}()
if err != nil {
return nil, err
}

//err := func() error {
// _, span := tracer.Start(ctx, "WaitLogAck")
// defer span.End()
//
// select {
// case <-ctx.Done():
// return ctx.Err()
// case <-done:
// return nil
// }
//}()
//if err != nil {
// return nil, err
//}

return chainedLog, nil
}

Expand Down
2 changes: 1 addition & 1 deletion components/ledger/internal/engine/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (l *Ledger) Start(ctx context.Context) {
if err := l.commander.Init(ctx); err != nil {
panic(err)
}
go l.commander.Run(logging.ContextWithField(ctx, "component", "commander"))
//go l.commander.Run(logging.ContextWithField(ctx, "component", "commander"))
}

func (l *Ledger) Close(ctx context.Context) {
Expand Down

0 comments on commit 0c7a15f

Please sign in to comment.