diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 7c0f22675daf..6fbcb79ce9ee 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -34,7 +34,6 @@ import ( "sync" "syscall" "time" - "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" @@ -314,23 +313,47 @@ func launchSDKProcess() error { var wg sync.WaitGroup wg.Add(len(workerIds)) - for _, workerId := range workerIds { - go func(workerId string) { - defer wg.Done() - - bufLogger := tools.NewBufferedLogger(logger) - errorCount := 0 - for { - childPids.mu.Lock() - if childPids.canceled { - childPids.mu.Unlock() - return - } - logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) - cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...) - childPids.v = append(childPids.v, cmd.Process.Pid) - childPids.mu.Unlock() - +for _, workerId := range workerIds { + go func(workerId string) { + defer wg.Done() + + workerCtx := grpcx.WriteWorkerID(context.WithoutCancel(ctx), workerId) + + // Create a separate logger per worker so that each worker initializes + // its own Fn logging stream with the correct worker_id metadata. + // Shared loggers would reuse the first stream and cause incorrect + // portability_worker_id attribution across workers. + workerLogger := &tools.Logger{ + Endpoint: *loggingEndpoint, + } + + bufLogger := tools.NewBufferedLoggerWithFlushInterval(workerCtx, workerLogger, 100*time.Millisecond) + + errorCount := 0 + for { + childPids.mu.Lock() + if childPids.canceled { + childPids.mu.Unlock() + return + } + + workerLogger.Printf(workerCtx, + "Executing Python (worker %v): python %v", + workerId, + strings.Join(args, " "), + ) + + cmd := StartCommandEnv( + map[string]string{"WORKER_ID": workerId}, + os.Stdin, + bufLogger, + bufLogger, + "python", + args..., + ) + + childPids.v = append(childPids.v, cmd.Process.Pid) + childPids.mu.Unlock() if err := cmd.Wait(); err != nil { // Retry on fatal errors, like OOMs and segfaults, not just // DoFns throwing exceptions. diff --git a/sdks/python/container/container b/sdks/python/container/container new file mode 100644 index 000000000000..79ca9b7b8bd2 Binary files /dev/null and b/sdks/python/container/container differ