generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 950
/
dataset.go
159 lines (128 loc) · 4.49 KB
/
dataset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
Copyright 2020 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package alluxio
import (
"context"
"reflect"
"k8s.io/apimachinery/pkg/types"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
)
// UpdateCacheOfDataset updates the CacheStates and Runtimes of the dataset.
func (e *AlluxioEngine) UpdateCacheOfDataset() (err error) {
// 1. update the runtime status
runtime, err := e.getRuntime()
if err != nil {
return err
}
// 2.update the dataset status
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
}
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Status.CacheStates = runtime.Status.CacheStates
// datasetToUpdate.Status.CacheStates =
e.Log.Info("the dataset status", "status", datasetToUpdate.Status)
if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), datasetToUpdate)
if err != nil {
e.Log.Error(err, "Update dataset")
return err
}
} else {
e.Log.Info("No need to update the cache of the data")
}
return nil
})
if err != nil {
e.Log.Error(err, "Update dataset")
return err
}
return
}
// UpdateDatasetStatus updates the status of the dataset
func (e *AlluxioEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err error) {
// 1. update the runtime status
runtime, err := e.getRuntime()
if err != nil {
return err
}
// 2.update the dataset status
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
}
datasetToUpdate := dataset.DeepCopy()
var cond datav1alpha1.DatasetCondition
if phase != dataset.Status.Phase {
switch phase {
case datav1alpha1.BoundDatasetPhase:
// Stores dataset mount info
if len(datasetToUpdate.Status.Mounts) == 0 {
datasetToUpdate.Status.Mounts = datasetToUpdate.Spec.Mounts
}
// Stores binding relation between dataset and runtime
if len(datasetToUpdate.Status.Runtimes) == 0 {
datasetToUpdate.Status.Runtimes = []datav1alpha1.Runtime{}
}
datasetToUpdate.Status.Runtimes = utils.AddRuntimesIfNotExist(datasetToUpdate.Status.Runtimes, utils.NewRuntime(e.name,
e.namespace,
common.AccelerateCategory,
common.AlluxioRuntime,
e.runtime.Spec.Master.Replicas))
cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason,
"The ddc runtime is ready.",
corev1.ConditionTrue)
case datav1alpha1.FailedDatasetPhase:
cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason,
"The ddc runtime is not ready.",
corev1.ConditionFalse)
default:
cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason,
"The ddc runtime is unknown.",
corev1.ConditionFalse)
}
datasetToUpdate.Status.Phase = phase
datasetToUpdate.Status.Conditions = utils.UpdateDatasetCondition(datasetToUpdate.Status.Conditions,
cond)
}
datasetToUpdate.Status.CacheStates = runtime.Status.CacheStates
// datasetToUpdate.Status.CacheStates =
if datasetToUpdate.Status.HCFSStatus == nil {
datasetToUpdate.Status.HCFSStatus, err = e.GetHCFSStatus()
if err != nil {
return err
}
} else {
e.Log.Info("No need to update HCFS status")
}
if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), datasetToUpdate)
}
return err
})
if err != nil {
_ = utils.LoggingErrorExceptConflict(e.Log, err, "Failed to update dataset status", types.NamespacedName{Namespace: e.namespace, Name: e.name})
return err
}
return
}
func (e *AlluxioEngine) BindToDataset() (err error) {
return e.UpdateDatasetStatus(datav1alpha1.BoundDatasetPhase)
}