Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Address more comments
Browse files Browse the repository at this point in the history
* Fix race condition when calling RUnlock()
* Pass context to functions directly
  • Loading branch information
Alfonso Acosta committed May 29, 2019
1 parent 069a73e commit eeb9cf4
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 113 deletions.
15 changes: 6 additions & 9 deletions daemon/daemon.go
Expand Up @@ -78,12 +78,11 @@ func (d *Daemon) getResources(ctx context.Context) (map[string]resource.Resource
var resources map[string]resource.Resource
var globalReadOnly v6.ReadOnlyReason
err := d.WithClone(ctx, func(checkout *git.Checkout) error {
cm, err := resourcestore.NewFileResourceStore(ctx, checkout.Dir(), checkout.ManifestDirs(),
d.ManifestGenerationEnabled, d.Manifests)
cm, err := resourcestore.NewFileResourceStore(checkout.Dir(), checkout.ManifestDirs(), d.ManifestGenerationEnabled, d.Manifests)
if err != nil {
return err
}
resources, err = cm.GetAllResourcesByID()
resources, err = cm.GetAllResourcesByID(nil)
return err
})

Expand Down Expand Up @@ -407,12 +406,11 @@ func (d *Daemon) updatePolicies(spec update.Spec, updates policy.Updates) update
if policy.Set(u.Add).Has(policy.Automated) {
anythingAutomated = true
}
cm, err := resourcestore.NewFileResourceStore(ctx, working.Dir(), working.ManifestDirs(),
d.ManifestGenerationEnabled, d.Manifests)
cm, err := resourcestore.NewFileResourceStore(working.Dir(), working.ManifestDirs(), d.ManifestGenerationEnabled, d.Manifests)
if err != nil {
return result, err
}
updated, err := cm.UpdateWorkloadPolicies(workloadID, u)
updated, err := cm.UpdateWorkloadPolicies(ctx, workloadID, u)
if err != nil {
result.Result[workloadID] = update.WorkloadResult{
Status: update.ReleaseStatusFailed,
Expand Down Expand Up @@ -474,13 +472,12 @@ func (d *Daemon) updatePolicies(spec update.Spec, updates policy.Updates) update
func (d *Daemon) release(spec update.Spec, c release.Changes) updateFunc {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (job.Result, error) {
var zero job.Result
rs, err := resourcestore.NewFileResourceStore(ctx, working.Dir(), working.ManifestDirs(),
d.ManifestGenerationEnabled, d.Manifests)
rs, err := resourcestore.NewFileResourceStore(working.Dir(), working.ManifestDirs(), d.ManifestGenerationEnabled, d.Manifests)
if err != nil {
return zero, err
}
rc := release.NewReleaseContext(d.Cluster, rs, d.Registry)
result, err := release.Release(rc, c, logger)
result, err := release.Release(ctx, rc, c, logger)
if err != nil {
return zero, err
}
Expand Down
8 changes: 4 additions & 4 deletions daemon/daemon_test.go
Expand Up @@ -515,11 +515,11 @@ func TestDaemon_PolicyUpdate(t *testing.T) {
return false
}
defer co.Clean()
cm, err := resourcestore.NewFileResourceStore(ctx, co.Dir(), co.ManifestDirs(), false, d.Manifests)
cm, err := resourcestore.NewFileResourceStore(co.Dir(), co.ManifestDirs(), false, d.Manifests)
if err != nil {
t.Fatal(err)
}
m, err := cm.GetAllResourcesByID()
m, err := cm.GetAllResourcesByID(nil)
if err != nil {
t.Fatalf("Error: %s", err.Error())
}
Expand Down Expand Up @@ -859,11 +859,11 @@ func (w *wait) ForImageTag(t *testing.T, d *Daemon, workload, container, tag str
return false
}
defer co.Clean()
cm, err := resourcestore.NewFileResourceStore(context.TODO(), co.Dir(), co.ManifestDirs(), false, d.Manifests)
cm, err := resourcestore.NewFileResourceStore(co.Dir(), co.ManifestDirs(), false, d.Manifests)
if err != nil {
return false
}
resources, err := cm.GetAllResourcesByID()
resources, err := cm.GetAllResourcesByID(nil)
assert.NoError(t, err)

workload, ok := resources[workload].(resource.Workload)
Expand Down
10 changes: 5 additions & 5 deletions daemon/sync.go
Expand Up @@ -58,11 +58,11 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, revision string, s

// Run actual sync of resources on cluster
syncSetName := makeGitConfigHash(d.Repo.Origin(), d.GitConfig)
resourceStore, err := resourcestore.NewFileResourceStore(ctx, working.Dir(), working.ManifestDirs(), d.ManifestGenerationEnabled, d.Manifests)
resourceStore, err := resourcestore.NewFileResourceStore(working.Dir(), working.ManifestDirs(), d.ManifestGenerationEnabled, d.Manifests)
if err != nil {
return errors.Wrap(err, "reading the respository checkout")
}
resources, resourceErrors, err := doSync(resourceStore, d.Cluster, syncSetName, d.Logger)
resources, resourceErrors, err := doSync(ctx, resourceStore, d.Cluster, syncSetName, d.Logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -139,9 +139,9 @@ func getChangeSet(ctx context.Context, working *git.Checkout, repo *git.Repo, ti

// doSync runs the actual sync of workloads on the cluster. It returns
// a map with all resources it applied and sync errors it encountered.
func doSync(resourceStore resourcestore.ResourceStore, clus cluster.Cluster, syncSetName string,
func doSync(ctx context.Context, resourceStore resourcestore.ResourceStore, clus cluster.Cluster, syncSetName string,
logger log.Logger) (map[string]resource.Resource, []event.ResourceError, error) {
resources, err := resourceStore.GetAllResourcesByID()
resources, err := resourceStore.GetAllResourcesByID(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "loading resources from repo")
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func getChangedResources(ctx context.Context, c changeSet, timeout time.Duration
}
cancel()
// Get the resources by source
resourcesByID, err := resourceStore.GetAllResourcesByID()
resourcesByID, err := resourceStore.GetAllResourcesByID(ctx)
if err != nil {
return nil, errorf(err)
}
Expand Down
4 changes: 2 additions & 2 deletions daemon/sync_test.go
Expand Up @@ -247,11 +247,11 @@ func TestDoSync_WithNewCommit(t *testing.T) {
return err
}
// Push some new changes
cm, err := resourcestore.NewFileResourceStore(ctx, checkout.Dir(), checkout.ManifestDirs(), false, d.Manifests)
cm, err := resourcestore.NewFileResourceStore(checkout.Dir(), checkout.ManifestDirs(), false, d.Manifests)
if err != nil {
return err
}
resourcesByID, err := cm.GetAllResourcesByID()
resourcesByID, err := cm.GetAllResourcesByID(nil)
if err != nil {
return err
}
Expand Down
18 changes: 10 additions & 8 deletions release/context.go
@@ -1,6 +1,7 @@
package release

import (
"context"
"fmt"

"github.com/pkg/errors"
Expand Down Expand Up @@ -31,15 +32,15 @@ func (rc *ReleaseContext) Registry() registry.Registry {
return rc.registry
}

func (rc *ReleaseContext) GetAllResources() (map[string]resource.Resource, error) {
return rc.resourceStore.GetAllResourcesByID()
func (rc *ReleaseContext) GetAllResources(ctx context.Context) (map[string]resource.Resource, error) {
return rc.resourceStore.GetAllResourcesByID(ctx)
}

func (rc *ReleaseContext) WriteUpdates(updates []*update.WorkloadUpdate) error {
func (rc *ReleaseContext) WriteUpdates(ctx context.Context, updates []*update.WorkloadUpdate) error {
err := func() error {
for _, update := range updates {
for _, container := range update.Updates {
err := rc.resourceStore.SetWorkloadContainerImage(update.ResourceID, container.Container, container.Target)
err := rc.resourceStore.SetWorkloadContainerImage(ctx, update.ResourceID, container.Container, container.Target)
if err != nil {
return errors.Wrapf(err, "updating resource %s in %s", update.ResourceID.String(), update.Resource.Source())
}
Expand All @@ -56,10 +57,11 @@ func (rc *ReleaseContext) WriteUpdates(updates []*update.WorkloadUpdate) error {
// files and the running cluster. `WorkloadFilter`s can be provided
// to filter the controllers so found, either before (`prefilters`) or
// after (`postfilters`) consulting the cluster.
func (rc *ReleaseContext) SelectWorkloads(results update.Result, prefilters, postfilters []update.WorkloadFilter) ([]*update.WorkloadUpdate, error) {
func (rc *ReleaseContext) SelectWorkloads(ctx context.Context, results update.Result, prefilters,
postfilters []update.WorkloadFilter) ([]*update.WorkloadUpdate, error) {

// Start with all the workloads that are defined in the repo.
allDefined, err := rc.WorkloadsForUpdate()
allDefined, err := rc.WorkloadsForUpdate(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,8 +118,8 @@ func (rc *ReleaseContext) SelectWorkloads(results update.Result, prefilters, pos

// WorkloadsForUpdate collects all workloads defined in manifests and prepares a list of
// workload updates for each of them. It does not consider updatability.
func (rc *ReleaseContext) WorkloadsForUpdate() (map[flux.ResourceID]*update.WorkloadUpdate, error) {
resources, err := rc.GetAllResources()
func (rc *ReleaseContext) WorkloadsForUpdate(ctx context.Context) (map[flux.ResourceID]*update.WorkloadUpdate, error) {
resources, err := rc.GetAllResources(ctx)
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions release/releaser.go
@@ -1,6 +1,7 @@
package release

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -13,13 +14,13 @@ import (
)

type Changes interface {
CalculateRelease(update.ReleaseContext, log.Logger) ([]*update.WorkloadUpdate, update.Result, error)
CalculateRelease(context.Context, update.ReleaseContext, log.Logger) ([]*update.WorkloadUpdate, update.Result, error)
ReleaseKind() update.ReleaseKind
ReleaseType() update.ReleaseType
CommitMessage(update.Result) string
}

func Release(rc *ReleaseContext, changes Changes, logger log.Logger) (results update.Result, err error) {
func Release(ctx context.Context, rc *ReleaseContext, changes Changes, logger log.Logger) (results update.Result, err error) {
defer func(start time.Time) {
update.ObserveRelease(
start,
Expand All @@ -31,18 +32,18 @@ func Release(rc *ReleaseContext, changes Changes, logger log.Logger) (results up

logger = log.With(logger, "type", "release")

before, err := rc.GetAllResources()
updates, results, err := changes.CalculateRelease(rc, logger)
before, err := rc.GetAllResources(ctx)
updates, results, err := changes.CalculateRelease(ctx, rc, logger)
if err != nil {
return nil, err
}

err = ApplyChanges(rc, updates, logger)
err = ApplyChanges(ctx, rc, updates, logger)
if err != nil {
return nil, MakeReleaseError(errors.Wrap(err, "applying changes"))
}

after, err := rc.GetAllResources()
after, err := rc.GetAllResources(ctx)
if err != nil {
return nil, MakeReleaseError(errors.Wrap(err, "loading resources after updates"))
}
Expand All @@ -54,15 +55,15 @@ func Release(rc *ReleaseContext, changes Changes, logger log.Logger) (results up
return results, nil
}

func ApplyChanges(rc *ReleaseContext, updates []*update.WorkloadUpdate, logger log.Logger) error {
func ApplyChanges(ctx context.Context, rc *ReleaseContext, updates []*update.WorkloadUpdate, logger log.Logger) error {
logger.Log("updates", len(updates))
if len(updates) == 0 {
logger.Log("exit", "no images to update for services given")
return nil
}

timer := update.NewStageTimer("write_changes")
err := rc.WriteUpdates(updates)
err := rc.WriteUpdates(ctx, updates)
timer.ObserveDuration()
return err
}
Expand Down
32 changes: 17 additions & 15 deletions release/releaser_test.go
Expand Up @@ -162,7 +162,7 @@ func mockCluster(running ...cluster.Workload) *cluster.Mock {
}

func NewFileResourceStoreOrFail(t *testing.T, manifests cluster.Manifests, checkout *git.Checkout) resourcestore.ResourceStore {
cm, err := resourcestore.NewFileResourceStore(context.TODO(), checkout.Dir(), checkout.ManifestDirs(), false, manifests)
cm, err := resourcestore.NewFileResourceStore(checkout.Dir(), checkout.ManifestDirs(), false, manifests)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -720,12 +720,12 @@ func Test_ImageStatus(t *testing.T) {
t.Run(tst.Name, func(t *testing.T) {
checkout, cleanup := setup(t)
defer cleanup()
ctx := &ReleaseContext{
rc := &ReleaseContext{
cluster: mCluster,
resourceStore: NewFileResourceStoreOrFail(t, mockManifests, checkout),
registry: upToDateRegistry,
}
testRelease(t, ctx, tst.Spec, tst.Expected.Result())
testRelease(t, rc, tst.Spec, tst.Expected.Result())
})
}
}
Expand All @@ -747,7 +747,7 @@ func Test_UpdateMultidoc(t *testing.T) {
mCluster := mockCluster(hwSvc, lockedSvc, egSvc) // no testsvc in cluster, but it _is_ in repo
checkout, cleanup := setup(t)
defer cleanup()
ctx := &ReleaseContext{
rc := &ReleaseContext{
cluster: mCluster,
resourceStore: NewFileResourceStoreOrFail(t, mockManifests, checkout),
registry: mockRegistry,
Expand All @@ -757,7 +757,7 @@ func Test_UpdateMultidoc(t *testing.T) {
ImageSpec: update.ImageSpecLatest,
Kind: update.ReleaseKindExecute,
}
results, err := Release(ctx, spec, log.NewNopLogger())
results, err := Release(context.Background(), rc, spec, log.NewNopLogger())
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -794,7 +794,7 @@ func Test_UpdateList(t *testing.T) {
mCluster := mockCluster(hwSvc, lockedSvc, egSvc) // no testsvc in cluster, but it _is_ in repo
checkout, cleanup := setup(t)
defer cleanup()
ctx := &ReleaseContext{
rc := &ReleaseContext{
cluster: mCluster,
resourceStore: NewFileResourceStoreOrFail(t, mockManifests, checkout),
registry: mockRegistry,
Expand All @@ -804,7 +804,7 @@ func Test_UpdateList(t *testing.T) {
ImageSpec: update.ImageSpecLatest,
Kind: update.ReleaseKindExecute,
}
results, err := Release(ctx, spec, log.NewNopLogger())
results, err := Release(context.Background(), rc, spec, log.NewNopLogger())
if err != nil {
t.Error(err)
}
Expand All @@ -828,7 +828,8 @@ func Test_UpdateContainers(t *testing.T) {
mCluster := mockCluster(hwSvc, lockedSvc)
checkout, cleanup := setup(t)
defer cleanup()
ctx := &ReleaseContext{
ctx := context.Background()
rc := &ReleaseContext{
cluster: mCluster,
resourceStore: NewFileResourceStoreOrFail(t, mockManifests, checkout),
registry: mockRegistry,
Expand Down Expand Up @@ -1032,7 +1033,7 @@ func Test_UpdateContainers(t *testing.T) {
specs.SkipMismatches = ignoreMismatches
specs.Force = tst.Force

results, err := Release(ctx, specs, log.NewNopLogger())
results, err := Release(ctx, rc, specs, log.NewNopLogger())

assert.Equal(t, expected.Err, err)
if expected.Err == nil {
Expand All @@ -1044,8 +1045,8 @@ func Test_UpdateContainers(t *testing.T) {
}
}

func testRelease(t *testing.T, ctx *ReleaseContext, spec update.ReleaseImageSpec, expected update.Result) {
results, err := Release(ctx, spec, log.NewNopLogger())
func testRelease(t *testing.T, rc *ReleaseContext, spec update.ReleaseImageSpec, expected update.Result) {
results, err := Release(context.Background(), rc, spec, log.NewNopLogger())
assert.NoError(t, err)
assert.Equal(t, expected, results)
}
Expand Down Expand Up @@ -1073,25 +1074,26 @@ func Test_BadRelease(t *testing.T) {
defer cleanup1()

manifests := kubernetes.NewManifests(kubernetes.ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout))
ctx := &ReleaseContext{
ctx := context.Background()
rc := &ReleaseContext{
cluster: mCluster,
resourceStore: NewFileResourceStoreOrFail(t, manifests, checkout1),
registry: mockRegistry,
}
_, err := Release(ctx, spec, log.NewNopLogger())
_, err := Release(ctx, rc, spec, log.NewNopLogger())
if err != nil {
t.Fatal("release with 'good' manifests should succeed, but errored:", err)
}

checkout2, cleanup2 := setup(t)
defer cleanup2()

ctx = &ReleaseContext{
rc = &ReleaseContext{
cluster: mCluster,
resourceStore: NewFileResourceStoreOrFail(t, &badManifests{manifests}, checkout2),
registry: mockRegistry,
}
_, err = Release(ctx, spec, log.NewNopLogger())
_, err = Release(ctx, rc, spec, log.NewNopLogger())
if err == nil {
t.Fatal("did not return an error, but was expected to fail verification")
}
Expand Down

0 comments on commit eeb9cf4

Please sign in to comment.