-
Notifications
You must be signed in to change notification settings - Fork 298
fix: race condition (#50) #51
Conversation
Codecov Report
@@ Coverage Diff @@
## master #51 +/- ##
==========================================
- Coverage 52.49% 52.40% -0.10%
==========================================
Files 25 25
Lines 2585 2727 +142
==========================================
+ Hits 1357 1429 +72
- Misses 1100 1146 +46
- Partials 128 152 +24
Continue to review full report at Codecov.
|
2f102ac to
d78cf09
Compare
|
Hey, you beat me to it! I was also working on fixing these race conditions. I think your branch does more and does it better as I haven't tried to simplify things as much - I'm not that familiar with the codebase. You can see what I've done here if you are interested master...ash2k:ash2k/fixes |
pkg/cache/cluster.go
Outdated
| type apiMeta struct { | ||
| lock sync.Mutex | ||
| namespaced bool | ||
| resourceVersion string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this could be just a sync.Value to simplify things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
8c8713d
| } | ||
|
|
||
| type Resources struct { | ||
| sync.Map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably be better to make it a field rather than embed - currently all the map's methods are added to Resources type and perhaps this is not what you want?
pkg/cache/cluster.go
Outdated
| func runSynced(lock sync.Locker, action func() error) error { | ||
| lock.Lock() | ||
| defer lock.Unlock() | ||
| func runSynced(action func() error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be just removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
29224aa
pkg/sync/sync_context.go
Outdated
| // if there is anything that needs deleting, we are at best now in pending and | ||
| // want to return and wait for sync to be invoked again | ||
| runStateMutex.Lock() | ||
| runState = pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think apart from the data race that you are fixing here, there is also a logic error. One goroutine may set runState to failed but then another one to pending or the other way around. This non-deterministic behavior is probably not right, or is it? Please see 1e0e792 for a different approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| sc.setResourceResult(task, "", common.OperationFailed, fmt.Sprintf("Failed to delete: %v", err)) | ||
| terminateSuccessful = false | ||
| } else { | ||
| sc.setResourceResult(task, "", common.OperationSucceeded, fmt.Sprintf("Deleted")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pkg/sync/sync_context.go
Outdated
| { | ||
| var wg sync.WaitGroup | ||
| for _, task := range pruneTasks { | ||
| task := task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not needed as task is passed to the closure (line 737/742) and it uses t instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's been fixed with this commit.
c5d445f
alexmt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ash2k , @d-kuro Thanks a lot for fixing the race conditions and improving the quality! The GitOps engine code is lifting and shifting of Argo CD code and requires polishing.
Changes in sync_context.go are fixing real bugs. Great catch!
I apologize in advance for the delay in review. The updated code is very performance sensitive and requires careful testing. Before merging it please let us fix two issues that block 1.6 GA release:
Next, we will need to run performance testing on internal Argo CD instances. So it might take a few days.
|
@d-kuro 👍 I suggest to always run |
|
v1.6 testing looks good so far. We just want to run it internally a few more days and create 1.6 GA release. As soon as 1.6 is out I will test this PR's performance. |
pkg/sync/sync_context.go
Outdated
| if runState == successful && createTasks.Any(func(t *syncTask) bool { return t.needsDeleting() }) { | ||
| var wg sync.WaitGroup | ||
| for _, task := range createTasks.Filter(func(t *syncTask) bool { return t.needsDeleting() }) { | ||
| task := task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not needed, same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand.
Thank you comments.
I think that's been fixed with this commit.
c5d445f
pkg/sync/sync_context.go
Outdated
| var pruneTasks syncTasks | ||
|
|
||
| for _, task := range tasks { | ||
| task := task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's been fixed with this commit.
c5d445f
d78cf09 to
7d7a436
Compare
Make runState a new type rather than a type alias
==================
WARNING: DATA RACE
Write at 0x00c0000d0b40 by goroutine 83:
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).runTasks.func5.1()
/gitops-engine/pkg/sync/sync_context.go:786 +0x68c
Previous write at 0x00c0000d0b40 by goroutine 84:
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).runTasks.func5.1()
/gitops-engine/pkg/sync/sync_context.go:786 +0x68c
Goroutine 83 (running) created at:
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).runTasks.func5()
/gitops-engine/pkg/sync/sync_context.go:778 +0x165
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).runTasks()
/gitops-engine/pkg/sync/sync_context.go:807 +0xb32
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).Sync()
/gitops-engine/pkg/sync/sync_context.go:265 +0x1b3d
github.com/argoproj/gitops-engine/pkg/sync.TestSyncFailureHookWithFailedSync()
/gitops-engine/pkg/sync/sync_context_test.go:532 +0x4e5
testing.tRunner()
/usr/local/Cellar/go/1.14.3/libexec/src/testing/testing.go:991 +0x1eb
Goroutine 84 (running) created at:
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).runTasks.func5()
/gitops-engine/pkg/sync/sync_context.go:778 +0x165
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).runTasks()
/gitops-engine/pkg/sync/sync_context.go:807 +0xb32
github.com/argoproj/gitops-engine/pkg/sync.(*syncContext).Sync()
/gitops-engine/pkg/sync/sync_context.go:265 +0x1b3d
github.com/argoproj/gitops-engine/pkg/sync.TestSyncFailureHookWithFailedSync()
/gitops-engine/pkg/sync/sync_context_test.go:532 +0x4e5
testing.tRunner()
/usr/local/Cellar/go/1.14.3/libexec/src/testing/testing.go:991 +0x1eb
==================
ash2k
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few more comments. I think it would be very helpful to document which object/method is thread safe and which is not.
I think it might be not the best idea to use sync.Map because it makes it harder to write code:
- all those
nilchecks all over the place - lost type safety in loops (
.Range()vsfor) - introduces a chance of "logical" data races where two method don't exclude each other but both work on the same data concurrently (e.g. iterate over a map and mutate it) and that leads to unforeseen results.
It might be better to just use a mutex and a normal map. WDYT? Just my two cents.
| return version, nil | ||
| } | ||
|
|
||
| return "", fmt.Errorf("stored type is invalid in resourceVersion") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cannot happen and if it does it should be ok to panic because... it should never happen?
I'd do this instead
str, _ := a.resourceVersion.Load().(string)
return str, nilThis will not error out and it also preserves current semantics where the field is an empty string if it was never set. It's also nicer to not need to check the error in every place where this method is called.
pkg/cache/cluster.go
Outdated
| r.Store(key, value) | ||
| } | ||
|
|
||
| func (r *Resources) LoadResources(key kube.ResourceKey) (*Resource, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - panicking should be ok if something unexpected is in this private field. And the error is not checked in a few places below. Why return it then? :)
pkg/cache/cluster.go
Outdated
| c.apisMeta.Store(key, value) | ||
| } | ||
|
|
||
| func (c *clusterCache) LoadApisMeta(key schema.GroupKind) (*apiMeta, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - panicking should be ok if something unexpected is in this private field.
pkg/cache/cluster.go
Outdated
| c.nsIndex.Store(key, value) | ||
| } | ||
|
|
||
| func (c *clusterCache) LoadNsIndex(key string) (*Resources, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - panicking should be ok if something unexpected is in this private field.
pkg/cache/cluster.go
Outdated
| if err != nil { | ||
| return err | ||
| } | ||
| if info != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if can be inverted to avoid so much indentation. https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow
| c.lock.Lock() | ||
| defer c.lock.Unlock() | ||
| // before doing any work, check once again now that we have the lock, to see if it got | ||
| // synced between the first check and now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment and code above can be deleted now.
pkg/cache/cluster.go
Outdated
| if len(res.OwnerRefs) == 0 { | ||
| resources[res.ResourceKey()] = res | ||
| func (c *clusterCache) GetNamespaceTopLevelResources(namespace string) *Resources { | ||
| res, _ := c.LoadNsIndex(namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No error check here too. That's why I'm suggesting to just let it panic if the field contents is corrupted. The next line will panic anyway.
I understand. I'm going to try to change it back from sync map to map once. |
pkg/cache/cluster.go
Outdated
| if len(ns) == 0 { | ||
| delete(c.nsIndex, key.Namespace) | ||
| func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) error { | ||
| existing, err := c.resources.LoadResources(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @ash2k regarding sync.Map usage.
I think we still need to have a lock here to make sure resources map and namespace index are synchronized.
pkg/cache/cluster.go
Outdated
| c.nsIndex.Store(key, value) | ||
| } | ||
|
|
||
| func (c *clusterCache) LoadNsIndex(key string) (*Resources, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think LoadNsIndex is a private method. Please rename to loadNsIndex.
pkg/cache/cluster.go
Outdated
| return cnt | ||
| } | ||
|
|
||
| func (c *clusterCache) StoreNsIndex(key string, value *Resources) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as LoadNsIndex. Please rename to storeNsIndex.
|
Hello @d-kuro . I'm very interested in completing and merging this PR. Please, let me know if you have time to continue working on it. |
|
@alexmt I'm going to try migrating from SyncMap to map, as I wrote in my comment below. I may ask you to follow up if you try and get stuck, but I'll try to do it myself for now. |
|
Kudos, SonarCloud Quality Gate passed!
|
|
I'm trying to work on migration from
Using RLock for map range access and Lock for write is simple but for example the following code. Deadlocked this function, because we are deleting the map in range access that requires a read lock. // c.resources is "map[kube.ResourceKey]*Resource"
// Need RWMutex.RLock()
for key := range c.resources {
if key.Kind != gk.Kind || key.Group != gk.Group || ns != "" && key.Namespace != ns {
continue
}
if _, ok := objByKey[key]; !ok {
// Need RWMutex.Lock()
// "delete(c.resources, key)" in a function
c.onNodeRemoved(key)
}
} |
|
@d-kuro Perhaps I'd like to suggest to split this PR as it's getting quite big. Perhaps make all the changes to I've also looked at the |
|
Thanks a lot for continue working on it @d-kuro ! I've looked only more time where gitops-engine/pkg/cache/cluster.go Line 443 in ce9616a
gitops-engine/pkg/cache/cluster.go Line 371 in ce9616a
I suspect race condition happens in tests only: TestNamespaceModeReplace. I hope I'm not missing anything. If this is the case we just need to use lock in Agree with @ash2k about moving cluster.go changes into separate PRs. |
|
@ash2k we could not use informers framework because of informers cache the whole resource manifest. We need only resources reference and parent links. Switching to informers would increase memory consumption. That was the main reason to manually use List/Watch APIs. |
This is the default behavior, but it is configurable. If only certain fields are required, a custom Go type can be crafted to use for any watched resource type. That way only the needed fields are unmarshaled and nothing else. For example, there is a whole package just for working with metadata of objects. Tuned client and informer are available there. However, memory consumption will still be higher in the above case vs how things currently work because e.g. all annotations will be stored in the informer's cache. Thank you for answering the question :) |
|
@alexmt Hi, is there any progress? The master branch still not contains the commits in this PR. |
|
|
refs: #50
If you import the code from #52, the race condition will be gone.
Among many functions, using
sync.Mutexto secure a lock was complex and difficult.I took the approach of using
sync.Map.I don't think this is the best response, but I think it's good that race conditions are gone.
If you have any comments, please feel free to do so!