Skip to content

Commit

Permalink
remoteconfig: fine grained locking
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellzy committed Dec 20, 2023
1 parent 4ae528b commit e909608
Showing 1 changed file with 44 additions and 20 deletions.
64 changes: 44 additions & 20 deletions internal/remoteconfig/remoteconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ type Client struct {
stop chan struct{}

callbacks []Callback
_callbacksMu sync.RWMutex
products map[string]struct{}
_productsMu sync.RWMutex
productsWithCallbacks map[string]ProductCallback
capabilities map[Capability]struct{}
_capabilitiesMu sync.RWMutex

lastError error
}
Expand Down Expand Up @@ -243,8 +246,8 @@ func Subscribe(product string, callback ProductCallback, capabilities ...Capabil
if client == nil {
return ErrClientNotStarted
}
client.Lock()
defer client.Unlock()
client._productsMu.Lock()
defer client._productsMu.Unlock()
if _, found := client.products[product]; found {
return fmt.Errorf("product %s already registered via RegisterProduct", product)
}
Expand All @@ -262,8 +265,8 @@ func RegisterCallback(f Callback) error {
if client == nil {
return ErrClientNotStarted
}
client.Lock()
defer client.Unlock()
client._callbacksMu.Lock()
defer client._callbacksMu.Unlock()
client.callbacks = append(client.callbacks, f)
return nil
}
Expand All @@ -274,8 +277,8 @@ func UnregisterCallback(f Callback) error {
if client == nil {
return ErrClientNotStarted
}
client.Lock()
defer client.Unlock()
client._callbacksMu.Lock()
defer client._callbacksMu.Unlock()
fValue := reflect.ValueOf(f)
for i, callback := range client.callbacks {
if reflect.ValueOf(callback) == fValue {
Expand All @@ -290,8 +293,8 @@ func RegisterProduct(p string) error {
if client == nil {
return ErrClientNotStarted
}
client.Lock()
defer client.Unlock()
client._productsMu.Lock()
defer client._productsMu.Unlock()
if _, found := client.productsWithCallbacks[p]; found {
return fmt.Errorf("product %s already registered via Subscribe", p)
}
Expand All @@ -304,8 +307,8 @@ func UnregisterProduct(p string) error {
if client == nil {
return ErrClientNotStarted
}
client.Lock()
defer client.Unlock()
client._productsMu.Lock()
defer client._productsMu.Unlock()
delete(client.products, p)
return nil
}
Expand All @@ -315,8 +318,8 @@ func HasProduct(p string) (bool, error) {
if client == nil {
return false, ErrClientNotStarted
}
client.RLock()
defer client.RUnlock()
client._productsMu.RLock()
defer client._productsMu.RUnlock()
_, found := client.products[p]
_, foundWithCallback := client.productsWithCallbacks[p]
return found || foundWithCallback, nil
Expand All @@ -328,8 +331,8 @@ func RegisterCapability(cap Capability) error {
if client == nil {
return ErrClientNotStarted
}
client.Lock()
defer client.Unlock()
client._capabilitiesMu.Lock()
defer client._capabilitiesMu.Unlock()
client.capabilities[cap] = struct{}{}
return nil
}
Expand All @@ -340,8 +343,8 @@ func UnregisterCapability(cap Capability) error {
if client == nil {
return ErrClientNotStarted
}
client.Lock()
defer client.Unlock()
client._capabilitiesMu.Lock()
defer client._capabilitiesMu.Unlock()
delete(client.capabilities, cap)
return nil
}
Expand All @@ -351,13 +354,33 @@ func HasCapability(cap Capability) (bool, error) {
if client == nil {
return false, ErrClientNotStarted
}
client.RLock()
defer client.RUnlock()
client._capabilitiesMu.RLock()
defer client._capabilitiesMu.RUnlock()
_, found := client.capabilities[cap]
return found, nil
}

func (c *Client) globalCallbacks() []Callback {
c._callbacksMu.RLock()
defer c._callbacksMu.RUnlock()
callbacks := make([]Callback, len(c.callbacks))
copy(callbacks, c.callbacks)
return callbacks
}

func (c *Client) productCallbacks() map[string]ProductCallback {
c._callbacksMu.RLock()
defer c._callbacksMu.RUnlock()
callbacks := make(map[string]ProductCallback, len(c.productsWithCallbacks))
for k, v := range c.productsWithCallbacks {
callbacks[k] = v
}
return callbacks
}

func (c *Client) allProducts() []string {
client._productsMu.RLock()
defer client._productsMu.RUnlock()
products := make([]string, 0, len(c.products)+len(c.productsWithCallbacks))
for p := range c.products {
products = append(products, p)
Expand Down Expand Up @@ -447,7 +470,7 @@ func (c *Client) applyUpdate(pbUpdate *clientGetConfigsResponse) error {
// 3 - ApplyStateAcknowledged
// This makes sure that any product that would need to re-receive the config in a subsequent update will be allowed to
statuses := make(map[string]rc.ApplyStatus)
for _, fn := range c.callbacks {
for _, fn := range c.globalCallbacks() {
for path, status := range fn(productUpdates) {
if s, ok := statuses[path]; !ok || status.State == rc.ApplyStateError ||
s.State == rc.ApplyStateAcknowledged && status.State == rc.ApplyStateUnacknowledged {
Expand All @@ -456,8 +479,9 @@ func (c *Client) applyUpdate(pbUpdate *clientGetConfigsResponse) error {
}
}
// Call the product-specific callbacks registered via Subscribe
productCallbacks := c.productCallbacks()
for product, update := range productUpdates {
if fn, ok := c.productsWithCallbacks[product]; ok {
if fn, ok := productCallbacks[product]; ok {
for path, status := range fn(update) {
statuses[path] = status
}
Expand Down

0 comments on commit e909608

Please sign in to comment.