Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion telemetry/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,8 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {

### Phase 2: Per-Host Management
- [x] Implement `featureflag.go` with caching and reference counting (PECOBLR-1146)
- [ ] Implement `manager.go` for client management
- [x] Implement `manager.go` for client management (PECOBLR-1147)
- [x] Implement `client.go` with minimal telemetryClient stub (PECOBLR-1147)
- [ ] Implement `circuitbreaker.go` with state machine
- [ ] Add unit tests for all components

Expand Down
38 changes: 38 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package telemetry

import (
"net/http"
)

// telemetryClient represents a client for sending telemetry data to Databricks.
// This is a minimal stub implementation that will be fully implemented in Phase 4.
type telemetryClient struct {
host string
httpClient *http.Client
cfg *Config
started bool
closed bool
}

// newTelemetryClient creates a new telemetry client for the given host.
func newTelemetryClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient {
return &telemetryClient{
host: host,
httpClient: httpClient,
cfg: cfg,
}
}

// start starts the telemetry client's background operations.
// This is a stub implementation that will be fully implemented in Phase 4.
func (c *telemetryClient) start() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to make this and below method thread safe?

c.started = true
return nil
}

// close stops the telemetry client and flushes any pending data.
// This is a stub implementation that will be fully implemented in Phase 4.
func (c *telemetryClient) close() error {
c.closed = true
return nil
}
73 changes: 73 additions & 0 deletions telemetry/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package telemetry

import (
"net/http"
"sync"
)

// clientManager manages one telemetry client per host.
// Prevents rate limiting by sharing clients across connections.
type clientManager struct {
mu sync.RWMutex
clients map[string]*clientHolder
}

// clientHolder holds a telemetry client and its reference count.
type clientHolder struct {
client *telemetryClient
refCount int
}

var (
managerOnce sync.Once
managerInstance *clientManager
)

// getClientManager returns the singleton instance.
func getClientManager() *clientManager {
managerOnce.Do(func() {
managerInstance = &clientManager{
clients: make(map[string]*clientHolder),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to put a max size or any LRU kind of cache?

}
})
return managerInstance
}

// getOrCreateClient gets or creates a telemetry client for the host.
// Increments reference count.
func (m *clientManager) getOrCreateClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient {
m.mu.Lock()
defer m.mu.Unlock()

holder, exists := m.clients[host]
if !exists {
holder = &clientHolder{
client: newTelemetryClient(host, httpClient, cfg),
}
m.clients[host] = holder
_ = holder.client.start() // Start background flush goroutine
}
holder.refCount++
return holder.client
}

// releaseClient decrements reference count for the host.
// Closes and removes client when ref count reaches zero.
func (m *clientManager) releaseClient(host string) error {
m.mu.Lock()
holder, exists := m.clients[host]
if !exists {
m.mu.Unlock()
return nil
}

holder.refCount--
if holder.refCount <= 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log when this becomes negative, that will likely be a bug

delete(m.clients, host)
m.mu.Unlock()
return holder.client.close() // Close and flush
}

m.mu.Unlock()
return nil
}
Loading
Loading