/
fdb_data_loader.go
287 lines (268 loc) · 9.79 KB
/
fdb_data_loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/*
* fdb_data_loader.go
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fixtures
import (
"bytes"
"context"
"errors"
"github.com/onsi/gomega"
"io"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
"text/template"
"time"
)
const (
// The name of the data loader Job.
dataLoaderName = "fdb-data-loader"
// For now we only load 2GB into the cluster, we can increase this later if we want.
dataLoaderJob = `apiVersion: batch/v1
kind: Job
metadata:
name: {{ .Name }}
namespace: {{ .Namespace }}
labels:
app: {{ .Name }}
spec:
backoffLimit: 2
completions: 2
parallelism: 2
template:
spec:
containers:
- image: {{ .Image }}
imagePullPolicy: Always
name: {{ .Name }}
# This configuration will load ~1GB per data loader.
args:
- --keys=1000000
- --batch-size=50
- --value-size=1000
env:
- name: FDB_CLUSTER_FILE
value: /var/dynamic/fdb/fdb.cluster
- name: FDB_TLS_CERTIFICATE_FILE
value: /tmp/fdb-certs/tls.crt
- name: FDB_TLS_CA_FILE
value: /tmp/fdb-certs/ca.pem
- name: FDB_TLS_KEY_FILE
value: /tmp/fdb-certs/tls.key
# FDB 7.3 adds a check for loading external client library, which doesn't work with 6.3.
# Consider remove this option once 6.3 is no longer being used.
- name: FDB_NETWORK_OPTION_IGNORE_EXTERNAL_CLIENT_FAILURES
value: ""
- name: LD_LIBRARY_PATH
value: /var/dynamic/fdb/primary/lib
- name: FDB_NETWORK_OPTION_TRACE_LOG_GROUP
value: {{ .Name }}
- name: FDB_NETWORK_OPTION_EXTERNAL_CLIENT_DIRECTORY
value: /var/dynamic/fdb/libs
- name: PYTHONUNBUFFERED
value: "on"
volumeMounts:
- name: config-map
mountPath: /var/dynamic-conf
- name: fdb-libs
mountPath: /var/dynamic/fdb
- name: fdb-certs
mountPath: /tmp/fdb-certs
readOnly: true
resources:
requests:
cpu: "1"
memory: 4Gi
initContainers:
{{ range $index, $version := .SidecarVersions }}
- name: foundationdb-kubernetes-init-{{ $index }}
image: {{ .BaseImage }}:{{ .SidecarTag}}
imagePullPolicy: Always
command:
- /bin/bash
# This is a workaround for a change of the version schema that was never tested/supported
args:
- -c
- echo "{{ .FDBVersion.String }}" > /var/fdb/version && runuser -u fdb -g fdb -- /entrypoint.bash --copy-library {{ .FDBVersion.Compact }} --output-dir /var/output-files/{{ .FDBVersion.Compact }} --init-mode
volumeMounts:
- name: fdb-libs
mountPath: /var/output-files
securityContext:
runAsUser: 0
runAsGroup: 0
# Install this library in a special location to force the operator to use it as the primary library.
{{ if eq .FDBVersion.Compact "7.1" }}
- name: foundationdb-kubernetes-init-7-1-primary
image: {{ .BaseImage }}:{{ .SidecarTag}}
imagePullPolicy: {{ .ImagePullPolicy }}
args:
# Note that we are only copying a library, rather than copying any binaries.
- "--copy-library"
- "{{ .FDBVersion.Compact }}"
- "--output-dir"
- "/var/output-files/primary" # Note that we use primary as the subdirectory rather than specifying the FoundationDB version like we did in the other examples.
- "--init-mode"
volumeMounts:
- name: fdb-libs
mountPath: /var/output-files
{{ end }}
{{ end }}
- image: {{ .Image }}
imagePullPolicy: Always
name: fdb-lib-copy
command:
- /bin/bash
args:
- -c
- mkdir -p /var/dynamic/fdb/libs && {{ range $index, $version := .SidecarVersions -}} cp /var/dynamic/fdb/{{ .FDBVersion.Compact }}/lib/libfdb_c.so /var/dynamic/fdb/libs/libfdb_{{ .FDBVersion.Compact }}_c.so && {{ end }} cp /var/dynamic-conf/fdb.cluster /var/dynamic/fdb/fdb.cluster
volumeMounts:
- name: config-map
mountPath: /var/dynamic-conf
- name: fdb-libs
mountPath: /var/dynamic/fdb
- name: fdb-certs
mountPath: /tmp/fdb-certs
readOnly: true
restartPolicy: Never
volumes:
- name: config-map
configMap:
name: {{ .ClusterName }}-config
items:
- key: cluster-file
path: fdb.cluster
- name: fdb-libs
emptyDir: {}
- name: fdb-certs
secret:
secretName: {{ .SecretName }}`
)
// dataLoaderConfig represents the configuration of the Dataloader Job.
type dataLoaderConfig struct {
// Name of the data loader Job.
Name string
// Image represents the data loader image that should be used in the Job.
Image string
// SidecarVersions represents the sidecar configurations for different FoundationDB versions.
SidecarVersions []SidecarConfig
// Namespace represents the namespace for the Deployment and all associated resources
Namespace string
// ClusterName the name of the cluster to load data into.
ClusterName string
// SecretName represents the Kubernetes secret that contains the certificates for communicating with the FoundationDB
// cluster.
SecretName string
}
func (factory *Factory) getDataLoaderConfig(cluster *FdbCluster) *dataLoaderConfig {
return &dataLoaderConfig{
Name: dataLoaderName,
Image: factory.GetDataLoaderImage(),
Namespace: cluster.Namespace(),
SidecarVersions: factory.GetSidecarConfigs(),
ClusterName: cluster.Name(),
SecretName: factory.GetSecretName(),
}
}
// CreateDataLoaderIfAbsent will create the data loader for the provided cluster and load some random data into the cluster.
func (factory *Factory) CreateDataLoaderIfAbsent(cluster *FdbCluster) {
if !factory.options.enableDataLoading {
return
}
t, err := template.New("dataLoaderJob").Parse(dataLoaderJob)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
buf := bytes.Buffer{}
gomega.Expect(t.Execute(&buf, factory.getDataLoaderConfig(cluster))).NotTo(gomega.HaveOccurred())
decoder := yamlutil.NewYAMLOrJSONDecoder(&buf, 100000)
for {
var rawObj runtime.RawExtension
err := decoder.Decode(&rawObj)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
obj, _, err := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).
Decode(rawObj.Raw, nil, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
unstructuredObj := &unstructured.Unstructured{Object: unstructuredMap}
gomega.Expect(
factory.CreateIfAbsent(unstructuredObj),
).NotTo(gomega.HaveOccurred())
}
factory.WaitUntilDataLoaderIsDone(cluster)
// Remove data loader Pods again, as the loading was done.
gomega.Expect(factory.controllerRuntimeClient.Delete(context.Background(), &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: dataLoaderName,
Namespace: cluster.Namespace(),
},
})).NotTo(gomega.HaveOccurred())
gomega.Expect(factory.controllerRuntimeClient.DeleteAllOf(context.Background(), &corev1.Pod{},
client.InNamespace(cluster.Namespace()),
client.MatchingLabels(map[string]string{"job-name": dataLoaderName}),
)).NotTo(gomega.HaveOccurred())
}
// WaitUntilDataLoaderIsDone will wait until the data loader Job has finished.
func (factory *Factory) WaitUntilDataLoaderIsDone(cluster *FdbCluster) {
gomega.Eventually(func() int {
pods := &corev1.PodList{}
gomega.Expect(
factory.controllerRuntimeClient.List(
context.Background(),
pods,
client.InNamespace(cluster.Namespace()),
client.MatchingLabels(map[string]string{"job-name": dataLoaderName}),
),
).NotTo(gomega.HaveOccurred())
var runningPods int
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning {
runningPods++
}
}
return runningPods
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeNumerically(">", 0))
// Wait for at most 15 minutes to let the data load complete.
gomega.Eventually(func() corev1.ConditionStatus {
job := &batchv1.Job{}
gomega.Expect(
factory.controllerRuntimeClient.Get(
context.Background(),
client.ObjectKey{
Namespace: cluster.Namespace(),
Name: dataLoaderName,
},
job),
).NotTo(gomega.HaveOccurred())
for _, condition := range job.Status.Conditions {
if condition.Type == batchv1.JobComplete {
return condition.Status
}
}
return corev1.ConditionUnknown
}).WithTimeout(15 * time.Minute).WithPolling(5 * time.Second).Should(gomega.Equal(corev1.ConditionTrue))
}