From c02c354d99b966e754fc53b909fb876c03cc5521 Mon Sep 17 00:00:00 2001 From: erezrokah Date: Wed, 1 Mar 2023 15:27:07 +0200 Subject: [PATCH] feat(multiplex): Detect multiple clients --- plugins/source/scheduler_dfs.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/plugins/source/scheduler_dfs.go b/plugins/source/scheduler_dfs.go index e727e19204..e89cf86ad4 100644 --- a/plugins/source/scheduler_dfs.go +++ b/plugins/source/scheduler_dfs.go @@ -37,6 +37,19 @@ func (p *Plugin) syncDfs(ctx context.Context, spec specs.Source, client schema.C if table.Multiplex != nil { clients = table.Multiplex(client) } + // Detect duplicate clients while multiplexing + seenClients := make(map[string]bool) + for _, c := range clients { + if _, ok := seenClients[c.ID()]; !ok { + seenClients[c.ID()] = true + } else { + sentry.WithScope(func(scope *sentry.Scope) { + scope.SetTag("table", table.Name) + sentry.CurrentHub().CaptureMessage("duplicate client ID in " + table.Name) + }) + p.logger.Warn().Str("client", c.ID()).Str("table", table.Name).Msg("multiplex returned duplicate client") + } + } preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it.