From 0c7a15fac1e07f04f6682deea359cd0015a51f44 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 20 Jun 2024 15:36:57 +0200 Subject: [PATCH] feat: remove worker to test parallelism --- .../internal/engine/command/commander.go | 7 ++-- .../ledger/internal/engine/command/context.go | 35 ++++++++++--------- components/ledger/internal/engine/ledger.go | 2 +- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/components/ledger/internal/engine/command/commander.go b/components/ledger/internal/engine/command/commander.go index 406aa5e93c..ecbd41bab0 100644 --- a/components/ledger/internal/engine/command/commander.go +++ b/components/ledger/internal/engine/command/commander.go @@ -15,7 +15,6 @@ import ( ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/internal/bus" - "github.com/formancehq/ledger/internal/engine/utils/batching" "github.com/formancehq/ledger/internal/machine/vm" "github.com/formancehq/stack/libs/go-libs/collectionutils" "github.com/formancehq/stack/libs/go-libs/metadata" @@ -28,7 +27,7 @@ type Parameters struct { } type Commander struct { - *batching.Batcher[*ledger.ChainedLog] + //*batching.Batcher[*ledger.ChainedLog] store Store locker Locker compiler *Compiler @@ -48,7 +47,7 @@ func New(store Store, locker Locker, compiler *Compiler, referencer *Referencer, compiler: compiler, lastTXID: big.NewInt(-1), referencer: referencer, - Batcher: batching.NewBatcher(store.InsertLogs, 1, batchSize), + //Batcher: batching.NewBatcher(store.InsertLogs, 1, batchSize), monitor: monitor, } } @@ -311,7 +310,7 @@ func (commander *Commander) RevertTransaction(ctx context.Context, parameters Pa } func (commander *Commander) Close() { - commander.Batcher.Close() + //commander.Batcher.Close() commander.running.Wait() } diff --git a/components/ledger/internal/engine/command/context.go b/components/ledger/internal/engine/command/context.go index c4551ddec7..7464c207c1 100644 --- a/components/ledger/internal/engine/command/context.go +++ b/components/ledger/internal/engine/command/context.go @@ -30,31 +30,32 @@ func (e *executionContext) AppendLog(ctx context.Context, log *ledger.Log) (*led return e.commander.chainLog(log) }() - done := make(chan struct{}) - func() { - _, span := tracer.Start(ctx, "AppendLogToQueue") - defer span.End() - - e.commander.Append(chainedLog, func() { - close(done) - }) - }() - + //done := make(chan struct{}) err := func() error { - _, span := tracer.Start(ctx, "WaitLogAck") + ctx, span := tracer.Start(ctx, "InsertLog") defer span.End() - select { - case <-ctx.Done(): - return ctx.Err() - case <-done: - return nil - } + return e.commander.store.InsertLogs(ctx, chainedLog) }() if err != nil { return nil, err } + //err := func() error { + // _, span := tracer.Start(ctx, "WaitLogAck") + // defer span.End() + // + // select { + // case <-ctx.Done(): + // return ctx.Err() + // case <-done: + // return nil + // } + //}() + //if err != nil { + // return nil, err + //} + return chainedLog, nil } diff --git a/components/ledger/internal/engine/ledger.go b/components/ledger/internal/engine/ledger.go index bbb12e5988..fd63d4ed20 100644 --- a/components/ledger/internal/engine/ledger.go +++ b/components/ledger/internal/engine/ledger.go @@ -60,7 +60,7 @@ func (l *Ledger) Start(ctx context.Context) { if err := l.commander.Init(ctx); err != nil { panic(err) } - go l.commander.Run(logging.ContextWithField(ctx, "component", "commander")) + //go l.commander.Run(logging.ContextWithField(ctx, "component", "commander")) } func (l *Ledger) Close(ctx context.Context) {