Skip to content

Commit 990cb99

Browse files
committed
easwars review #1
1 parent a06eb28 commit 990cb99

File tree

6 files changed

+22
-22
lines changed

6 files changed

+22
-22
lines changed

xds/internal/clients/xdsclient/ads_stream.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ type dataAndErrTuple struct {
6868
// occur on the ADS stream. Methods on this interface may be invoked
6969
// concurrently and implementations need to handle them in a thread-safe manner.
7070
type adsStreamEventHandler interface {
71-
onStreamError(error) // Called when the ADS stream breaks.
72-
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
73-
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
74-
onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) // Called when it is needed to remove unsubscribed cache entries.
71+
onStreamError(error) // Called when the ADS stream breaks.
72+
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
73+
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
74+
onRequest(typeURL string) // Called when a request is about to be sent on the ADS stream.
7575
}
7676

7777
// state corresponding to a resource type.
@@ -446,7 +446,7 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
446446
}
447447

448448
// Call the event handler to remove unsubscribed cache entries.
449-
s.eventHandler.onRequiredToRemoveUnsubscribedCacheEntries(url)
449+
s.eventHandler.onRequest(url)
450450

451451
msg, err := proto.Marshal(req)
452452
if err != nil {

xds/internal/clients/xdsclient/authority.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -731,10 +731,6 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
731731
// there when the watch was registered.
732732
resources := a.resources[rType]
733733
state := resources[resourceName]
734-
if state == nil {
735-
a.logger.Warningf("Attempting to unwatch resource %q of type %q which is not currently watched", resourceName, rType.TypeName)
736-
return
737-
}
738734

739735
// Delete this particular watcher from the list of watchers, so that its
740736
// callback will not be invoked in the future.
@@ -808,7 +804,7 @@ func (a *authority) closeXDSChannels() {
808804
func (a *authority) watcherExistsForUncachedResource() bool {
809805
for _, resourceStates := range a.resources {
810806
for _, state := range resourceStates {
811-
if state.md.Status == xdsresource.ServiceStatusRequested {
807+
if len(state.watchers) > 0 && state.md.Status == xdsresource.ServiceStatusRequested {
812808
return true
813809
}
814810
}
@@ -878,6 +874,8 @@ func (a *authority) close() {
878874
// after sending a discovery request to ensure that resources that were
879875
// unsubscribed (and thus have no watchers) are eventually removed from the
880876
// authority's cache.
877+
//
878+
// This method is only executed in the context of a serializer callback.
881879
func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) {
882880
resources := a.resources[rType]
883881
if resources == nil {
@@ -887,7 +885,7 @@ func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) {
887885
for name, state := range resources {
888886
if len(state.watchers) == 0 {
889887
if a.logger.V(2) {
890-
a.logger.Infof("Removing resource state for %q of type %q as it has no watchers after an update cycle", name, rType.TypeName)
888+
a.logger.Infof("Removing resource state for %q of type %q as it has no watchers", name, rType.TypeName)
891889
}
892890
delete(resources, name)
893891
}

xds/internal/clients/xdsclient/channel.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ type xdsChannelEventHandler interface {
6060
// requested ADS resource does not exist.
6161
adsResourceDoesNotExist(ResourceType, string)
6262

63-
// requiredToRemoveUnsubscribedCacheEntries is called when the xdsChannel
63+
// adsResourceRemoveUnsubscribedCacheEntries is called when the xdsChannel
6464
// needs to remove unsubscribed cache entries.
65-
requiredToRemoveUnsubscribedCacheEntries(ResourceType)
65+
adsResourceRemoveUnsubscribedCacheEntries(ResourceType)
6666
}
6767

6868
// xdsChannelOpts holds the options for creating a new xdsChannel.
@@ -160,7 +160,7 @@ func (xc *xdsChannel) close() {
160160
// on all authorities that were interested in this channel.
161161
if _, ok := xc.eventHandler.(*channelState); ok {
162162
for _, typ := range typesHandledByStream {
163-
xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(typ)
163+
xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(typ)
164164
}
165165
}
166166

@@ -254,7 +254,9 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error)
254254
return names, err
255255
}
256256

257-
func (xc *xdsChannel) onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) {
257+
// onRequest invoked when a request is about to be sent on the ADS stream. It
258+
// removes the cache entries for the resource type that are no longer subscribed to.
259+
func (xc *xdsChannel) onRequest(typeURL string) {
258260
if xc.closed.HasFired() {
259261
if xc.logger.V(2) {
260262
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
@@ -269,7 +271,7 @@ func (xc *xdsChannel) onRequiredToRemoveUnsubscribedCacheEntries(typeURL string)
269271
return
270272
}
271273

272-
xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(rType)
274+
xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(rType)
273275
}
274276

275277
// decodeResponse decodes the resources in the given ADS response.

xds/internal/clients/xdsclient/channel_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,5 +773,5 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re
773773
return typ, name, nil
774774
}
775775

776-
func (*testEventHandler) requiredToRemoveUnsubscribedCacheEntries(ResourceType) {
776+
func (*testEventHandler) adsResourceRemoveUnsubscribedCacheEntries(ResourceType) {
777777
}

xds/internal/clients/xdsclient/test/misc_watchers_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,10 +575,10 @@ func (s) TestRaceUnsubscribeResubscribe(t *testing.T) {
575575
}
576576
defer client.Close()
577577

578-
ldsResourceName1 := "test-listener-resource1"
579-
ldsResourceName2 := "test-route-configuration-resource1"
580-
rdsName1 := "test-listener-resource2"
581-
rdsName2 := "test-route-configuration-resource2"
578+
const ldsResourceName1 = "test-listener-resource1"
579+
const ldsResourceName2 = "test-listener-resource2"
580+
const rdsName1 = "test-route-configuration-resource1"
581+
const rdsName2 = "test-route-configuration-resource2"
582582
listenerResource1 := e2e.DefaultClientListener(ldsResourceName1, rdsName1)
583583
listenerResource2 := e2e.DefaultClientListener(ldsResourceName2, rdsName2)
584584

xds/internal/clients/xdsclient/xdsclient.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s
439439
}
440440
}
441441

442-
func (cs *channelState) requiredToRemoveUnsubscribedCacheEntries(rType ResourceType) {
442+
func (cs *channelState) adsResourceRemoveUnsubscribedCacheEntries(rType ResourceType) {
443443
if cs.parent.done.HasFired() {
444444
return
445445
}

0 commit comments

Comments
 (0)