forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
factory.go
135 lines (113 loc) · 4.52 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package factory
import (
"errors"
"time"
"github.com/golang/glog"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
buildapi "github.com/openshift/origin/pkg/build/api"
controller "github.com/openshift/origin/pkg/build/controller"
strategy "github.com/openshift/origin/pkg/build/controller/strategy"
osclient "github.com/openshift/origin/pkg/client"
)
type BuildControllerFactory struct {
Client *osclient.Client
KubeClient *kclient.Client
DockerBuildStrategy *strategy.DockerBuildStrategy
STIBuildStrategy *strategy.STIBuildStrategy
buildStore cache.Store
}
func (factory *BuildControllerFactory) Create() *controller.BuildController {
factory.buildStore = cache.NewStore()
cache.NewReflector(&buildLW{client: factory.Client}, &buildapi.Build{}, factory.buildStore).Run()
buildQueue := cache.NewFIFO()
cache.NewReflector(&buildLW{client: factory.Client}, &buildapi.Build{}, buildQueue).Run()
// Kubernetes does not currently synchronize Pod status in storage with a Pod's container
// states. Because of this, we can't receive events related to container (and thus Pod)
// state changes, such as Running -> Terminated. As a workaround, populate the FIFO with
// a polling implementation which relies on client calls to list Pods - the Get/List
// REST implementations will populate the synchronized container/pod status on-demand.
//
// TODO: Find a way to get watch events for Pod/container status updates. The polling
// strategy is horribly inefficient and should be addressed upstream somehow.
podQueue := cache.NewFIFO()
cache.NewPoller(factory.pollPods, 10*time.Second, podQueue).Run()
return &controller.BuildController{
BuildStore: factory.buildStore,
BuildUpdater: factory.Client,
PodCreator: factory.KubeClient,
NextBuild: func() *buildapi.Build {
return buildQueue.Pop().(*buildapi.Build)
},
NextPod: func() *kapi.Pod {
return podQueue.Pop().(*kapi.Pod)
},
BuildStrategy: &typeBasedFactoryStrategy{
DockerBuildStrategy: factory.DockerBuildStrategy,
STIBuildStrategy: factory.STIBuildStrategy,
},
}
}
// pollPods lists pods for all builds in the buildStore which are pending or running and
// returns an enumerator for cache.Poller. The poll scope is narrowed for efficiency.
func (factory *BuildControllerFactory) pollPods() (cache.Enumerator, error) {
list := &kapi.PodList{}
for _, obj := range factory.buildStore.List() {
build := obj.(*buildapi.Build)
switch build.Status {
case buildapi.BuildStatusPending, buildapi.BuildStatusRunning:
pod, err := factory.KubeClient.GetPod(kapi.WithNamespace(kapi.NewContext(), build.Namespace), build.PodID)
if err != nil {
glog.V(2).Infof("Couldn't find pod %s for build %s: %#v", build.PodID, build.ID, err)
continue
}
list.Items = append(list.Items, *pod)
}
}
return &podEnumerator{list}, nil
}
// podEnumerator allows a cache.Poller to enumerate items in an api.PodList
type podEnumerator struct {
*kapi.PodList
}
// Len returns the number of items in the pod list.
func (pe *podEnumerator) Len() int {
if pe.PodList == nil {
return 0
}
return len(pe.Items)
}
// Get returns the item (and ID) with the particular index.
func (pe *podEnumerator) Get(index int) (string, interface{}) {
return pe.Items[index].ID, &pe.Items[index]
}
type typeBasedFactoryStrategy struct {
DockerBuildStrategy *strategy.DockerBuildStrategy
STIBuildStrategy *strategy.STIBuildStrategy
}
func (f *typeBasedFactoryStrategy) CreateBuildPod(build *buildapi.Build) (*kapi.Pod, error) {
switch build.Parameters.Strategy.Type {
case buildapi.DockerBuildStrategyType:
return f.DockerBuildStrategy.CreateBuildPod(build)
case buildapi.STIBuildStrategyType:
return f.STIBuildStrategy.CreateBuildPod(build)
default:
return nil, errors.New("No strategy defined for type")
}
}
// buildLW is a ListWatcher implementation for Builds.
type buildLW struct {
client osclient.Interface
}
// List lists all Builds.
func (lw *buildLW) List() (runtime.Object, error) {
return lw.client.ListBuilds(kapi.NewContext(), labels.Everything())
}
// Watch watches all Builds.
func (lw *buildLW) Watch(resourceVersion string) (watch.Interface, error) {
return lw.client.WatchBuilds(kapi.NewContext(), labels.Everything(), labels.Everything(), "0")
}