forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
217 lines (185 loc) · 5.68 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package container
import (
"context"
"fmt"
"io"
"sync"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/chaincode/platforms"
"github.com/hyperledger/fabric/core/container/ccintf"
pb "github.com/hyperledger/fabric/protos/peer"
)
type VMProvider interface {
NewVM() VM
}
type Builder interface {
Build() (io.Reader, error)
}
//VM is an abstract virtual image for supporting arbitrary virual machines
type VM interface {
Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder Builder) error
Stop(ccid ccintf.CCID, timeout uint, dontkill bool, dontremove bool) error
Wait(ccid ccintf.CCID) (int, error)
HealthCheck(context.Context) error
}
type refCountedLock struct {
refCount int
lock *sync.RWMutex
}
//VMController - manages VMs
// . abstract construction of different types of VMs (we only care about Docker for now)
// . manage lifecycle of VM (start with build, start, stop ...
// eventually probably need fine grained management)
type VMController struct {
sync.RWMutex
containerLocks map[ccintf.CCID]*refCountedLock
vmProviders map[string]VMProvider
}
var vmLogger = flogging.MustGetLogger("container")
// NewVMController creates a new instance of VMController
func NewVMController(vmProviders map[string]VMProvider) *VMController {
return &VMController{
containerLocks: make(map[ccintf.CCID]*refCountedLock),
vmProviders: vmProviders,
}
}
func (vmc *VMController) newVM(typ string) VM {
v, ok := vmc.vmProviders[typ]
if !ok {
vmLogger.Panicf("Programming error: unsupported VM type: %s", typ)
}
return v.NewVM()
}
func (vmc *VMController) lockContainer(id ccintf.CCID) {
//get the container lock under global lock
vmc.Lock()
var refLck *refCountedLock
var ok bool
if refLck, ok = vmc.containerLocks[id]; !ok {
refLck = &refCountedLock{refCount: 1, lock: &sync.RWMutex{}}
vmc.containerLocks[id] = refLck
} else {
refLck.refCount++
vmLogger.Debugf("refcount %d (%s)", refLck.refCount, id)
}
vmc.Unlock()
vmLogger.Debugf("waiting for container(%s) lock", id)
refLck.lock.Lock()
vmLogger.Debugf("got container (%s) lock", id)
}
func (vmc *VMController) unlockContainer(id ccintf.CCID) {
vmc.Lock()
if refLck, ok := vmc.containerLocks[id]; ok {
if refLck.refCount <= 0 {
panic("refcnt <= 0")
}
refLck.lock.Unlock()
if refLck.refCount--; refLck.refCount == 0 {
vmLogger.Debugf("container lock deleted(%s)", id)
delete(vmc.containerLocks, id)
}
} else {
vmLogger.Debugf("no lock to unlock(%s)!!", id)
}
vmc.Unlock()
}
//VMCReq - all requests should implement this interface.
//The context should be passed and tested at each layer till we stop
//note that we'd stop on the first method on the stack that does not
//take context
type VMCReq interface {
Do(v VM) error
GetCCID() ccintf.CCID
}
//StartContainerReq - properties for starting a container.
type StartContainerReq struct {
ccintf.CCID
Builder Builder
Args []string
Env []string
FilesToUpload map[string][]byte
}
// PlatformBuilder implements the Build interface using
// the platforms package GenerateDockerBuild function.
// XXX This is a pretty awkward spot for the builder, it should
// really probably be pushed into the dockercontroller, as it only
// builds docker images, but, doing so would require contaminating
// the dockercontroller package with the CDS, which is also
// undesirable.
type PlatformBuilder struct {
Type string
Path string
Name string
Version string
CodePackage []byte
PlatformRegistry *platforms.Registry
}
// Build a tar stream based on the CDS
func (b *PlatformBuilder) Build() (io.Reader, error) {
return b.PlatformRegistry.GenerateDockerBuild(
b.Type,
b.Path,
b.Name,
b.Version,
b.CodePackage,
)
}
func (si StartContainerReq) Do(v VM) error {
return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}
func (si StartContainerReq) GetCCID() ccintf.CCID {
return si.CCID
}
//StopContainerReq - properties for stopping a container.
type StopContainerReq struct {
ccintf.CCID
Timeout uint
//by default we will kill the container after stopping
Dontkill bool
//by default we will remove the container after killing
Dontremove bool
}
func (si StopContainerReq) Do(v VM) error {
return v.Stop(si.CCID, si.Timeout, si.Dontkill, si.Dontremove)
}
func (si StopContainerReq) GetCCID() ccintf.CCID {
return si.CCID
}
//go:generate counterfeiter -o mock/exitedfunc.go --fake-name ExitedFunc ExitedFunc
// ExitedFunc is the prototype for the function called when a container exits.
type ExitedFunc func(exitCode int, err error)
// WaitContainerReq provides the chaincode ID of the container to wait on and a
// callback to call upon chaincode termination.
type WaitContainerReq struct {
CCID ccintf.CCID
Exited ExitedFunc
}
func (w WaitContainerReq) Do(v VM) error {
exited := w.Exited
go func() {
exitCode, err := v.Wait(w.CCID)
exited(exitCode, err)
}()
return nil
}
func (w WaitContainerReq) GetCCID() ccintf.CCID {
return w.CCID
}
func (vmc *VMController) Process(vmtype string, req VMCReq) error {
v := vmc.newVM(vmtype)
ccid := req.GetCCID()
vmc.lockContainer(ccid)
defer vmc.unlockContainer(ccid)
return req.Do(v)
}
// GetChaincodePackageBytes creates bytes for docker container generation using the supplied chaincode specification
func GetChaincodePackageBytes(pr *platforms.Registry, spec *pb.ChaincodeSpec) ([]byte, error) {
if spec == nil || spec.ChaincodeId == nil {
return nil, fmt.Errorf("invalid chaincode spec")
}
return pr.GetDeploymentPayload(spec.Type.String(), spec.ChaincodeId.Path)
}