/
resources_persistence_staged.go
123 lines (112 loc) · 3.59 KB
/
resources_persistence_staged.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package state
import (
"bytes"
"github.com/juju/errors"
"gopkg.in/mgo.v2/txn"
)
// StagedResource represents resource info that has been added to the
// "staging" area of the underlying data store. It remains unavailable
// until finalized, at which point it moves out of the staging area and
// replaces the current active resource info.
type StagedResource struct {
base ResourcePersistenceBase
id string
stored storedResource
}
func (staged StagedResource) stage() error {
buildTxn := func(attempt int) ([]txn.Op, error) {
var ops []txn.Op
switch attempt {
case 0:
ops = newInsertStagedResourceOps(staged.stored)
case 1:
ops = newEnsureStagedResourceSameOps(staged.stored)
default:
return nil, errors.NewAlreadyExists(nil, "already staged")
}
if staged.stored.PendingID == "" {
// Only non-pending resources must have an existing service.
ops = append(ops, staged.base.ApplicationExistsOps(staged.stored.ApplicationID)...)
}
return ops, nil
}
if err := staged.base.Run(buildTxn); err != nil {
return errors.Trace(err)
}
return nil
}
// Unstage ensures that the resource is removed
// from the staging area. If it isn't in the staging area
// then this is a noop.
func (staged StagedResource) Unstage() error {
buildTxn := func(attempt int) ([]txn.Op, error) {
if attempt > 0 {
// The op has no assert so we should not get here.
return nil, errors.New("unstaging the resource failed")
}
ops := newRemoveStagedResourceOps(staged.id)
return ops, nil
}
if err := staged.base.Run(buildTxn); err != nil {
return errors.Trace(err)
}
return nil
}
// Activate makes the staged resource the active resource.
func (staged StagedResource) Activate() error {
buildTxn := func(attempt int) ([]txn.Op, error) {
// This is an "upsert".
var ops []txn.Op
switch attempt {
case 0:
ops = newInsertResourceOps(staged.stored)
case 1:
ops = newUpdateResourceOps(staged.stored)
default:
return nil, errors.New("setting the resource failed")
}
if staged.stored.PendingID == "" {
// Only non-pending resources must have an existing service.
ops = append(ops, staged.base.ApplicationExistsOps(staged.stored.ApplicationID)...)
}
// No matter what, we always remove any staging.
ops = append(ops, newRemoveStagedResourceOps(staged.id)...)
// If we are changing the bytes for a resource, we increment the
// CharmModifiedVersion on the service, since resources are integral to
// the high level "version" of the charm.
if staged.stored.PendingID == "" {
hasNewBytes, err := staged.hasNewBytes()
if err != nil {
logger.Errorf("can't read existing resource during activate: %v", errors.Details(err))
return nil, errors.Trace(err)
}
if hasNewBytes {
incOps := staged.base.IncCharmModifiedVersionOps(staged.stored.ApplicationID)
ops = append(ops, incOps...)
}
}
logger.Debugf("activate ops: %#v", ops)
return ops, nil
}
if err := staged.base.Run(buildTxn); err != nil {
return errors.Trace(err)
}
return nil
}
func (staged StagedResource) hasNewBytes() (bool, error) {
var current resourceDoc
err := staged.base.One(resourcesC, staged.stored.ID, ¤t)
switch {
case errors.IsNotFound(err):
// if there's no current resource stored, then any non-zero bytes will
// be new.
return !staged.stored.Fingerprint.IsZero(), nil
case err != nil:
return false, errors.Annotate(err, "couldn't read existing resource")
default:
diff := !bytes.Equal(staged.stored.Fingerprint.Bytes(), current.Fingerprint)
return diff, nil
}
}