Skip to content

Conversation

@jasonopslevel
Copy link
Contributor

@jasonopslevel jasonopslevel commented Jan 17, 2026

Issues

We have an issue where we're using a faktory go worker manager, but not using the connection pool. So we create N connections and hold on to them, but then also open new connection ad-hoc. This is the reason we've bumped into connection limit errors.

Demo:

Running the old + new side by side, we have a stable N + 1 connections, vs uncapped essentially.

Screenshot 2026-01-19 at 10 15 41 AM

Work done:

K8s Connection Pooling (src/pkg/k8s.go)

  • Added sync.Once based singleton with GetSharedK8sClient() function
  • kubernetes.Clientset is now shared across all goroutines (it's thread-safe)
  • LoadK8SClient() and NewJobRunner() now use the shared client

Faktory Connection Pooling

src/pkg/faktoryRunnerAppendJobLogProcessor.go:

  • Removed client *faktory.Client field and faktory.Open() call
  • Changed submit() to use helper.With() for pooled connections

src/pkg/faktorySetOutcomeProcessor.go:

  • Removed client *faktory.Client field and faktory.Open() call
  • Changed Flush() to use helper.With() for pooled connections

Changelog

  • List your changes here
  • Make a changie entry

Tophatting

}
} else {
err := s.client.Push(job)
err := s.helper.With(func(cl *faktory.Client) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The go faktory worker creates a connection pool already! We just needed to use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slick

config, _ := GetKubernetesConfig()
client, _ := GetKubernetesClientset()
// kubernetes.Clientset is thread-safe and designed to be shared across goroutines
config, client, _ := GetSharedK8sClient() // Already validated by LoadK8SClient
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed comments were in general right about sharing clients. The client is threadsafe, however we just need some synchronization around initializing it, as we load the k8s client per NewJobRunner.

We don't manipulate the data from the clients at all so this should be fine.

}
} else {
err := s.client.Push(job)
err := s.helper.With(func(cl *faktory.Client) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slick

k8sClientOnce.Do(func() {
sharedK8sConfig, k8sInitError = GetKubernetesConfig()
if k8sInitError != nil {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return
return nil, nil, k8sInitError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this return is for the anonymous function from Do() - so i think the empty return is correct here.

if k8sInitError != nil {
return nil, nil, k8sInitError
}
if _, err := GetKubernetesClientset(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is no longer used if we wanna 🔪

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔪

@jasonopslevel jasonopslevel merged commit 18804c1 into main Jan 19, 2026
4 checks passed
@jasonopslevel jasonopslevel deleted the add-connection-pools branch January 19, 2026 20:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants